package org.apache.bookkeeper.stream.storage.impl.cluster;

import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
import org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerController;
import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerController;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Versioned;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.internal.util.collections.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.class */
public class ClusterControllerLeaderImplTest {
    private static final Logger log = LoggerFactory.getLogger(ClusterControllerLeaderImplTest.class);
    private static final int NUM_STORAGE_CONTAINERS = 32;
    private ClusterMetadataStore metadataStore;
    private ClusterControllerLeaderImpl clusterController;
    private ExecutorService leaderExecutor;
    private final Semaphore coordSem = new Semaphore(0);
    private final AtomicReference<RegistrationClient.RegistrationListener> regListenerRef = new AtomicReference<>(null);
    private final CompletableFuture<Void> watchFuture = new CompletableFuture<>();

    @Before
    public void setup() {
        this.metadataStore = (ClusterMetadataStore) Mockito.spy(new InMemClusterMetadataStore(NUM_STORAGE_CONTAINERS));
        this.metadataStore.initializeCluster(NUM_STORAGE_CONTAINERS);
        final ClusterMetadataStore clusterMetadataStore = this.metadataStore;
        this.metadataStore = new ClusterMetadataStore() { // from class: org.apache.bookkeeper.stream.storage.impl.cluster.ClusterControllerLeaderImplTest.1
            public boolean initializeCluster(int i, Optional<String> optional) {
                return clusterMetadataStore.initializeCluster(i);
            }

            public ClusterAssignmentData getClusterAssignmentData() {
                return clusterMetadataStore.getClusterAssignmentData();
            }

            public void updateClusterAssignmentData(ClusterAssignmentData clusterAssignmentData) {
                clusterMetadataStore.updateClusterAssignmentData(clusterAssignmentData);
                ClusterControllerLeaderImplTest.this.coordSem.release();
            }

            public void watchClusterAssignmentData(Consumer<Void> consumer, Executor executor) {
                clusterMetadataStore.watchClusterAssignmentData(consumer, executor);
            }

            public void unwatchClusterAssignmentData(Consumer<Void> consumer) {
                clusterMetadataStore.unwatchClusterAssignmentData(consumer);
            }

            public ClusterMetadata getClusterMetadata() {
                return clusterMetadataStore.getClusterMetadata();
            }

            public void updateClusterMetadata(ClusterMetadata clusterMetadata) {
                clusterMetadataStore.updateClusterMetadata(clusterMetadata);
            }

            public void close() {
                clusterMetadataStore.close();
            }
        };
        StorageContainerController storageContainerController = (StorageContainerController) Mockito.spy(new DefaultStorageContainerController());
        RegistrationClient registrationClient = (RegistrationClient) Mockito.mock(RegistrationClient.class);
        Mockito.when(registrationClient.watchWritableBookies((RegistrationClient.RegistrationListener) ArgumentMatchers.any(RegistrationClient.RegistrationListener.class))).thenAnswer(invocationOnMock -> {
            this.regListenerRef.set((RegistrationClient.RegistrationListener) invocationOnMock.getArgument(0));
            return this.watchFuture;
        });
        ((RegistrationClient) Mockito.doAnswer(invocationOnMock2 -> {
            this.regListenerRef.compareAndSet((RegistrationClient.RegistrationListener) invocationOnMock2.getArgument(0), null);
            return null;
        }).when(registrationClient)).unwatchWritableBookies((RegistrationClient.RegistrationListener) ArgumentMatchers.any(RegistrationClient.RegistrationListener.class));
        this.clusterController = new ClusterControllerLeaderImpl(this.metadataStore, storageContainerController, registrationClient, Duration.ofMillis(10L));
        this.leaderExecutor = Executors.newSingleThreadExecutor();
    }

    @After
    public void teardown() {
        if (null != this.metadataStore) {
            this.metadataStore.close();
        }
        if (null != this.leaderExecutor) {
            this.leaderExecutor.shutdown();
        }
    }

    @Test
    public void testProcessAsLeader() throws Exception {
        this.clusterController.suspend();
        Assert.assertTrue(this.clusterController.isSuspended());
        this.leaderExecutor.submit(() -> {
            try {
                this.clusterController.processAsLeader();
            } catch (Exception e) {
                log.info("Encountered exception when cluster controller processes as a leader", e);
            }
        });
        this.clusterController.resume();
        Assert.assertFalse(this.clusterController.isSuspended());
        FutureUtils.complete(this.watchFuture, (Object) null);
        Assert.assertNotNull(this.regListenerRef);
        Assert.assertFalse(this.coordSem.tryAcquire(1L, TimeUnit.SECONDS));
        Assert.assertTrue(this.clusterController.getLastSuccessfulAssigmentAt() < 0);
        Set newSet = Sets.newSet(new BookieId[]{BookieId.parse("127.0.0.1:4181")});
        LongVersion longVersion = new LongVersion(0L);
        this.regListenerRef.get().onBookiesChanged(new Versioned(newSet, longVersion));
        this.coordSem.acquire();
        Assert.assertTrue(this.clusterController.getLastSuccessfulAssigmentAt() > 0);
        long lastSuccessfulAssigmentAt = this.clusterController.getLastSuccessfulAssigmentAt();
        this.regListenerRef.get().onBookiesChanged(new Versioned(newSet, longVersion));
        Assert.assertFalse(this.coordSem.tryAcquire(200L, TimeUnit.MILLISECONDS));
        Assert.assertEquals(lastSuccessfulAssigmentAt, this.clusterController.getLastSuccessfulAssigmentAt());
        newSet.add(BookieId.parse("127.0.0.1:4182"));
        newSet.add(BookieId.parse("127.0.0.1:4183"));
        newSet.add(BookieId.parse("127.0.0.1:4184"));
        newSet.add(BookieId.parse("127.0.0.1:4185"));
        this.regListenerRef.get().onBookiesChanged(new Versioned(newSet, new LongVersion(1L)));
        this.coordSem.acquire();
        Assert.assertTrue(this.clusterController.getLastSuccessfulAssigmentAt() > lastSuccessfulAssigmentAt);
        long lastSuccessfulAssigmentAt2 = this.clusterController.getLastSuccessfulAssigmentAt();
        this.regListenerRef.get().onBookiesChanged(new Versioned(Collections.emptySet(), new LongVersion(2L)));
        Assert.assertFalse(this.coordSem.tryAcquire(1L, TimeUnit.SECONDS));
        Assert.assertEquals(lastSuccessfulAssigmentAt2, this.clusterController.getLastSuccessfulAssigmentAt());
    }
}
