package com.couchbase.client.core.config.refresher;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.cnc.events.config.IndividualGlobalConfigRefreshFailedEvent;
import com.couchbase.client.core.config.ConfigRefreshFailure;
import com.couchbase.client.core.config.ConfigVersion;
import com.couchbase.client.core.config.ConfigurationProvider;
import com.couchbase.client.core.config.GlobalConfig;
import com.couchbase.client.core.config.PortInfo;
import com.couchbase.client.core.config.ProposedGlobalConfigContext;
import com.couchbase.client.core.msg.kv.CarrierGlobalConfigRequest;
import com.couchbase.client.core.retry.FailFastRetryStrategy;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.util.NanoTimestamp;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/couchbase/client/core/config/refresher/GlobalRefresher.class */
public class GlobalRefresher {
    private final ConfigurationProvider provider;
    private final Core core;
    private final Duration configPollInterval;
    private final Duration configRequestTimeout;
    private final Disposable pollRegistration;
    private final AtomicLong nodeOffset = new AtomicLong(0);
    private volatile boolean started = false;
    private volatile NanoTimestamp lastPoll = NanoTimestamp.never();
    private final AtomicInteger numFailedRefreshes = new AtomicInteger(0);

    public GlobalRefresher(ConfigurationProvider configurationProvider, Core core) {
        this.provider = configurationProvider;
        this.core = core;
        this.configPollInterval = core.context().environment().ioConfig().configPollInterval();
        this.configRequestTimeout = KeyValueBucketRefresher.clampConfigRequestTimeout(this.configPollInterval);
        Flux flatMap = Flux.merge(new Publisher[]{Flux.interval(pollerInterval(), core.context().environment().scheduler()), configurationProvider.configChangeNotifications()}).onBackpressureDrop().filter(l -> {
            return this.started;
        }).filter(l2 -> {
            return l2.longValue() == -1 || this.lastPoll.hasElapsed(this.configPollInterval);
        }).flatMap(l3 -> {
            List<PortInfo> filterEligibleNodes = filterEligibleNodes();
            if (this.numFailedRefreshes.get() >= filterEligibleNodes.size()) {
                configurationProvider.signalConfigRefreshFailed(ConfigRefreshFailure.ALL_NODES_TRIED_ONCE_WITHOUT_SUCCESS);
                this.numFailedRefreshes.set(0);
            }
            return attemptUpdateGlobalConfig(Flux.fromIterable(filterEligibleNodes).take(1L));
        });
        Objects.requireNonNull(configurationProvider);
        this.pollRegistration = flatMap.subscribe(configurationProvider::proposeGlobalConfig);
    }

    protected Duration pollerInterval() {
        return KeyValueBucketRefresher.POLLER_INTERVAL;
    }

    private Flux<ProposedGlobalConfigContext> attemptUpdateGlobalConfig(Flux<PortInfo> flux) {
        return flux.flatMap(portInfo -> {
            NanoTimestamp now = NanoTimestamp.now();
            CarrierGlobalConfigRequest carrierGlobalConfigRequest = new CarrierGlobalConfigRequest(this.configRequestTimeout, this.core.context(), FailFastRetryStrategy.INSTANCE, portInfo.identifier(), currentVersion());
            this.core.send(carrierGlobalConfigRequest);
            return Reactor.wrap(carrierGlobalConfigRequest, carrierGlobalConfigRequest.response(), true).filter(carrierGlobalConfigResponse -> {
                if (carrierGlobalConfigResponse.status().success()) {
                    return true;
                }
                this.numFailedRefreshes.incrementAndGet();
                this.core.context().environment().eventBus().publish(new IndividualGlobalConfigRefreshFailedEvent(now.elapsed(), this.core.context(), null, portInfo.hostname(), carrierGlobalConfigResponse));
                return false;
            }).map(carrierGlobalConfigResponse2 -> {
                return new ProposedGlobalConfigContext(new String(carrierGlobalConfigResponse2.content(), StandardCharsets.UTF_8), portInfo.hostname());
            }).doOnSuccess(proposedGlobalConfigContext -> {
                this.numFailedRefreshes.set(0);
                this.lastPoll = NanoTimestamp.now();
            }).onErrorResume(th -> {
                this.numFailedRefreshes.incrementAndGet();
                this.core.context().environment().eventBus().publish(new IndividualGlobalConfigRefreshFailedEvent(now.elapsed(), this.core.context(), th, portInfo.hostname(), null));
                return Mono.empty();
            });
        });
    }

    private ConfigVersion currentVersion() {
        GlobalConfig globalConfig = this.provider.config().globalConfig();
        return globalConfig == null ? ConfigVersion.ZERO : globalConfig.version();
    }

    private List<PortInfo> filterEligibleNodes() {
        GlobalConfig globalConfig = this.provider.config().globalConfig();
        if (globalConfig == null) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList(globalConfig.portInfos());
        shiftNodeList(arrayList);
        return (List) arrayList.stream().filter(portInfo -> {
            return portInfo.ports().containsKey(ServiceType.KV) || portInfo.sslPorts().containsKey(ServiceType.KV);
        }).collect(Collectors.toList());
    }

    private <T> void shiftNodeList(List<T> list) {
        Collections.rotate(list, -((int) (this.nodeOffset.getAndIncrement() % list.size())));
    }

    public Mono<Void> start() {
        return Mono.defer(() -> {
            this.started = true;
            this.numFailedRefreshes.set(0);
            return Mono.empty();
        });
    }

    public Mono<Void> stop() {
        return Mono.defer(() -> {
            this.started = false;
            this.numFailedRefreshes.set(0);
            return Mono.empty();
        });
    }

    public Mono<Void> shutdown() {
        return stop().then(Mono.defer(() -> {
            if (!this.pollRegistration.isDisposed()) {
                this.pollRegistration.dispose();
            }
            return Mono.empty();
        }));
    }
}
