package org.apache.bookkeeper.stream.storage.impl.sc;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.bookkeeper.clients.utils.NetUtils;
import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
import org.apache.bookkeeper.stream.proto.cluster.ServerAssignmentData;
import org.apache.bookkeeper.stream.proto.common.Endpoint;
import org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManager;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRegistry;
import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManager.class */
public class ZkStorageContainerManager extends AbstractLifecycleComponent<StorageConfiguration> implements StorageContainerManager, Consumer<Void> {
    private static final Logger log = LoggerFactory.getLogger(ZkStorageContainerManager.class);
    private final Endpoint endpoint;
    private final ClusterMetadataStore metadataStore;
    private final StorageContainerRegistry registry;
    private final ScheduledExecutorService executor;
    private volatile ClusterAssignmentData clusterAssignmentData;
    private volatile Map<Endpoint, ServerAssignmentData> clusterAssignmentMap;
    private volatile ServerAssignmentData myAssignmentData;
    private volatile ConcurrentLongHashMap<Endpoint> containerAssignmentMap;
    private ScheduledFuture<?> containerProbeTask;
    private final Duration probeInterval;
    private final Map<Long, StorageContainer> liveContainers;
    private final Set<Long> pendingStartStopContainers;

    public ZkStorageContainerManager(Endpoint endpoint, StorageConfiguration storageConfiguration, ClusterMetadataStore clusterMetadataStore, StorageContainerRegistry storageContainerRegistry, StatsLogger statsLogger) {
        super("zk-storage-container-manager", storageConfiguration, statsLogger);
        this.endpoint = endpoint;
        this.metadataStore = clusterMetadataStore;
        this.registry = storageContainerRegistry;
        this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("zk-storage-container-manager").build());
        this.liveContainers = Collections.synchronizedMap(Maps.newConcurrentMap());
        this.pendingStartStopContainers = Collections.synchronizedSet(Sets.newConcurrentHashSet());
        this.containerAssignmentMap = new ConcurrentLongHashMap<>();
        this.clusterAssignmentMap = Maps.newHashMap();
        this.probeInterval = Duration.ofMillis(storageConfiguration.getClusterControllerScheduleIntervalMs() / 2);
    }

    protected void doStart() {
        this.metadataStore.watchClusterAssignmentData(this, this.executor);
        log.info("Watched cluster assignment data.");
        this.containerProbeTask = this.executor.scheduleAtFixedRate(this::probeContainers, 0L, this.probeInterval.toMillis(), TimeUnit.MILLISECONDS);
        log.info("Scheduled storage container probe task at every {} ms", Long.valueOf(this.probeInterval.toMillis()));
    }

    protected void doStop() {
        this.metadataStore.unwatchClusterAssignmentData(this);
        if (!this.containerProbeTask.cancel(true)) {
            log.warn("Failed to cancel the container probe task.");
        }
        stopContainers();
    }

    protected void doClose() throws IOException {
        this.registry.close();
        this.executor.shutdown();
    }

    public Endpoint getStorageContainer(long j) {
        return (Endpoint) this.containerAssignmentMap.get(j);
    }

    void probeContainers() {
        if (refreshMyAssignment()) {
            if (this.myAssignmentData == null) {
                stopContainers();
            } else {
                processMyAssignment(this.myAssignmentData);
            }
        }
    }

    private boolean refreshMyAssignment() {
        ClusterAssignmentData clusterAssignmentData = this.metadataStore.getClusterAssignmentData();
        if (null == clusterAssignmentData) {
            log.info("Cluster assignment data is empty, so skip refreshing");
            return false;
        }
        Map<Endpoint, ServerAssignmentData> map = (Map) clusterAssignmentData.getServersMap().entrySet().stream().collect(Collectors.toMap(entry -> {
            return NetUtils.parseEndpoint((String) entry.getKey());
        }, entry2 -> {
            return (ServerAssignmentData) entry2.getValue();
        }));
        Set<Endpoint> keySet = this.clusterAssignmentMap.keySet();
        Set<Endpoint> keySet2 = map.keySet();
        ImmutableSet immutableCopy = Sets.difference(keySet2, keySet).immutableCopy();
        ImmutableSet immutableCopy2 = Sets.difference(keySet, keySet2).immutableCopy();
        ImmutableSet immutableCopy3 = Sets.intersection(keySet2, keySet).immutableCopy();
        processServersLeft(immutableCopy2, this.clusterAssignmentMap);
        processServersJoined(immutableCopy, map);
        processServersAssignmentChanged(immutableCopy3, this.clusterAssignmentMap, map);
        this.clusterAssignmentMap = map;
        this.myAssignmentData = map.get(this.endpoint);
        return true;
    }

    private void processServersJoined(Set<Endpoint> set, Map<Endpoint, ServerAssignmentData> map) {
        if (!set.isEmpty()) {
            log.info("Servers joined : {}", set);
        }
        set.forEach(endpoint -> {
            ServerAssignmentData serverAssignmentData = (ServerAssignmentData) map.get(endpoint);
            if (null != serverAssignmentData) {
                serverAssignmentData.getContainersList().forEach(l -> {
                    this.containerAssignmentMap.put(l.longValue(), endpoint);
                });
            }
        });
    }

    private void processServersLeft(Set<Endpoint> set, Map<Endpoint, ServerAssignmentData> map) {
        if (!set.isEmpty()) {
            log.info("Servers left : {}", set);
        }
        set.forEach(endpoint -> {
            ServerAssignmentData serverAssignmentData = (ServerAssignmentData) map.get(endpoint);
            if (null != serverAssignmentData) {
                serverAssignmentData.getContainersList().forEach(l -> {
                    this.containerAssignmentMap.remove(l.longValue(), endpoint);
                });
            }
        });
    }

    private void processServersAssignmentChanged(Set<Endpoint> set, Map<Endpoint, ServerAssignmentData> map, Map<Endpoint, ServerAssignmentData> map2) {
        set.forEach(endpoint -> {
            ServerAssignmentData serverAssignmentData = (ServerAssignmentData) map.getOrDefault(endpoint, ServerAssignmentData.getDefaultInstance());
            ServerAssignmentData serverAssignmentData2 = (ServerAssignmentData) map2.getOrDefault(endpoint, ServerAssignmentData.getDefaultInstance());
            if (serverAssignmentData.equals(serverAssignmentData2)) {
                return;
            }
            log.info("Server assignment is change for {}:\nold assignment: {}\nnew assignment: {}", new Object[]{NetUtils.endpointToString(endpoint), serverAssignmentData, serverAssignmentData2});
            serverAssignmentData.getContainersList().forEach(l -> {
                this.containerAssignmentMap.remove(l.longValue(), endpoint);
            });
            serverAssignmentData2.getContainersList().forEach(l2 -> {
                this.containerAssignmentMap.put(l2.longValue(), endpoint);
            });
        });
    }

    private void stopContainers() {
        ImmutableSet.copyOf(this.liveContainers.keySet()).forEach((v1) -> {
            stopStorageContainer(v1);
        });
    }

    private void processMyAssignment(ServerAssignmentData serverAssignmentData) {
        Set set = (Set) serverAssignmentData.getContainersList().stream().collect(Collectors.toSet());
        HashSet newHashSet = Sets.newHashSet(this.liveContainers.keySet());
        HashSet newHashSet2 = Sets.newHashSet(Sets.difference(set, newHashSet).immutableCopy());
        HashSet newHashSet3 = Sets.newHashSet(Sets.difference(newHashSet, set).immutableCopy());
        Set filter = Sets.filter(newHashSet2, l -> {
            return !this.pendingStartStopContainers.contains(l);
        });
        Set filter2 = Sets.filter(newHashSet3, l2 -> {
            return !this.pendingStartStopContainers.contains(l2);
        });
        if (!filter.isEmpty() || !filter2.isEmpty()) {
            log.info("Process container changes:\n\tIdeal = {}\n\tLive = {}\n\tPending = {}\n\tToStart = {}\n\tToStop = {}", new Object[]{set, newHashSet, this.pendingStartStopContainers, filter, filter2});
        }
        filter.forEach((v1) -> {
            startStorageContainer(v1);
        });
        filter2.forEach((v1) -> {
            stopStorageContainer(v1);
        });
    }

    private CompletableFuture<StorageContainer> startStorageContainer(long j) {
        log.info("Starting storage container ({})", Long.valueOf(j));
        StorageContainer storageContainer = this.liveContainers.get(Long.valueOf(j));
        if (null != storageContainer) {
            log.warn("Storage container ({}) is already started", Long.valueOf(j));
            return FutureUtils.value(storageContainer);
        }
        this.pendingStartStopContainers.add(Long.valueOf(j));
        return this.registry.startStorageContainer(j).whenComplete((storageContainer2, th) -> {
            try {
                if (null != th) {
                    log.warn("Failed to start storage container ({})", Long.valueOf(j), th);
                } else {
                    log.info("Successfully started storage container ({})", Long.valueOf(j));
                    addStorageContainer(j, storageContainer2);
                }
                this.pendingStartStopContainers.remove(Long.valueOf(j));
            } catch (Throwable th) {
                this.pendingStartStopContainers.remove(Long.valueOf(j));
                throw th;
            }
        });
    }

    private CompletableFuture<Void> stopStorageContainer(long j) {
        log.info("Stopping storage container ({})", Long.valueOf(j));
        StorageContainer storageContainer = this.liveContainers.get(Long.valueOf(j));
        if (null == storageContainer) {
            log.warn("Storage container ({}) is not alive anymore", Long.valueOf(j));
            return FutureUtils.Void();
        }
        this.pendingStartStopContainers.add(Long.valueOf(j));
        return this.registry.stopStorageContainer(j, storageContainer).whenComplete((r9, th) -> {
            try {
                if (th != null) {
                    log.warn("Failed to stop storage container ({})", Long.valueOf(j), th);
                } else {
                    log.info("Successfully stopped storage container ({})", Long.valueOf(j));
                    removeStorageContainer(j, storageContainer);
                }
                this.pendingStartStopContainers.remove(Long.valueOf(j));
            } catch (Throwable th) {
                this.pendingStartStopContainers.remove(Long.valueOf(j));
                throw th;
            }
        });
    }

    private StorageContainer addStorageContainer(long j, StorageContainer storageContainer) {
        StorageContainer putIfAbsent = this.liveContainers.putIfAbsent(Long.valueOf(j), storageContainer);
        if (null == putIfAbsent) {
            log.info("Storage container ({}) is added to live set.", storageContainer);
            return storageContainer;
        }
        log.warn("Storage container ({}) has already been added to live set", storageContainer);
        storageContainer.stop();
        return putIfAbsent;
    }

    private void removeStorageContainer(long j, StorageContainer storageContainer) {
        if (this.liveContainers.remove(Long.valueOf(j), storageContainer)) {
            log.info("Storage container ({}) is removed from live set.", Long.valueOf(j));
        } else {
            log.warn("Storage container ({}) can't be removed from live set.", Long.valueOf(j));
        }
    }

    @Override // java.util.function.Consumer
    public void accept(Void r4) {
        this.executor.submit(() -> {
            probeContainers();
        });
    }

    Map<Long, StorageContainer> getLiveContainers() {
        return this.liveContainers;
    }

    Set<Long> getPendingStartStopContainers() {
        return this.pendingStartStopContainers;
    }
}
