package org.apache.bookkeeper.clients.impl.container;

import com.google.common.collect.Lists;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import org.apache.bookkeeper.clients.exceptions.ClientException;
import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
import org.apache.bookkeeper.clients.impl.channel.StorageServerChannelManager;
import org.apache.bookkeeper.clients.impl.internal.api.LocationClient;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.exceptions.ObjectClosedException;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.stream.proto.common.Endpoint;
import org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointResponse;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.apache.bookkeeper.stream.proto.storage.StorageContainerEndpoint;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/bookkeeper/clients/impl/container/TestStorageContainerChannel.class */
public class TestStorageContainerChannel extends GrpcClientTestBase {
    private OrderedScheduler scheduler;
    private final LocationClient locationClient = (LocationClient) Mockito.mock(LocationClient.class);
    private StorageServerChannel mockChannel = newMockServerChannel();
    private StorageServerChannel mockChannel2 = newMockServerChannel();
    private StorageServerChannel mockChannel3 = newMockServerChannel();
    private final Endpoint endpoint = Endpoint.newBuilder().setHostname("127.0.0.1").setPort(8181).build();
    private final Endpoint endpoint2 = Endpoint.newBuilder().setHostname("127.0.0.2").setPort(8282).build();
    private final Endpoint endpoint3 = Endpoint.newBuilder().setHostname("127.0.0.3").setPort(8383).build();
    private final StorageServerChannelManager channelManager = new StorageServerChannelManager(endpoint -> {
        return this.endpoint2 == endpoint ? this.mockChannel2 : this.endpoint3 == endpoint ? this.mockChannel3 : this.mockChannel;
    });
    private StorageContainerChannel scClient;

    @Override // org.apache.bookkeeper.clients.grpc.GrpcClientTestBase
    protected void doSetup() throws Exception {
        this.scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test-range-server-manager").build();
        this.scClient = new StorageContainerChannel(0L, this.channelManager, this.locationClient, this.scheduler.chooseThread(0L));
    }

    @Override // org.apache.bookkeeper.clients.grpc.GrpcClientTestBase
    protected void doTeardown() throws Exception {
        if (null != this.scheduler) {
            this.scheduler.shutdown();
        }
    }

    private StorageServerChannel newMockServerChannel() {
        StorageServerChannel storageServerChannel = (StorageServerChannel) Mockito.mock(StorageServerChannel.class);
        Mockito.when(storageServerChannel.intercept(ArgumentMatchers.anyLong())).thenReturn(storageServerChannel);
        return storageServerChannel;
    }

    private void ensureCallbackExecuted() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.scheduler.submit(() -> {
            countDownLatch.countDown();
        });
        countDownLatch.await();
    }

    @Test
    public void testGetRootRangeServiceSuccess() throws Exception {
        CompletableFuture createFuture = FutureUtils.createFuture();
        Mockito.when(this.locationClient.locateStorageContainers(ArgumentMatchers.anyList())).thenReturn(createFuture);
        Assert.assertNull(this.scClient.getStorageServerChannelFuture());
        Assert.assertNull(this.scClient.getStorageContainerInfo());
        CompletableFuture storageContainerChannelFuture = this.scClient.getStorageContainerChannelFuture();
        Assert.assertNotNull(this.scClient.getStorageServerChannelFuture());
        Assert.assertNull(this.scClient.getStorageContainerInfo());
        ((LocationClient) Mockito.verify(this.locationClient, Mockito.times(1))).locateStorageContainers(ArgumentMatchers.anyList());
        Assert.assertTrue(storageContainerChannelFuture == this.scClient.getStorageContainerChannelFuture());
        Assert.assertNull(this.scClient.getStorageContainerInfo());
        ((LocationClient) Mockito.verify(this.locationClient, Mockito.times(1))).locateStorageContainers(ArgumentMatchers.anyList());
        createFuture.complete(Lists.newArrayList(new OneStorageContainerEndpointResponse[]{OneStorageContainerEndpointResponse.newBuilder().setStatusCode(StatusCode.SUCCESS).setEndpoint(StorageContainerEndpoint.newBuilder().setStorageContainerId(0L).setRevision(1000L).setRwEndpoint(this.endpoint).addRoEndpoint(this.endpoint).build()).build()}));
        Assert.assertTrue(((StorageServerChannel) storageContainerChannelFuture.get()) == this.mockChannel);
        StorageContainerInfo storageContainerInfo = this.scClient.getStorageContainerInfo();
        Assert.assertEquals(0L, storageContainerInfo.getGroupId());
        Assert.assertEquals(1000L, storageContainerInfo.getRevision());
        Assert.assertEquals(this.endpoint, storageContainerInfo.getWriteEndpoint());
        Assert.assertEquals(Lists.newArrayList(new Endpoint[]{this.endpoint, this.endpoint}), storageContainerInfo.getReadEndpoints());
        Assert.assertEquals(this.mockChannel, this.channelManager.getChannel(this.endpoint));
        ((LocationClient) Mockito.verify(this.locationClient, Mockito.times(1))).locateStorageContainers(ArgumentMatchers.anyList());
    }

    @Test
    public void testGetRootRangeServiceFailureWhenClosingChannelManager() throws Exception {
        CompletableFuture createFuture = FutureUtils.createFuture();
        Mockito.when(this.locationClient.locateStorageContainers(ArgumentMatchers.anyList())).thenReturn(createFuture);
        Assert.assertNull(this.scClient.getStorageServerChannelFuture());
        Assert.assertNull(this.scClient.getStorageContainerInfo());
        CompletableFuture storageContainerChannelFuture = this.scClient.getStorageContainerChannelFuture();
        Assert.assertNotNull(this.scClient.getStorageServerChannelFuture());
        Assert.assertNull(this.scClient.getStorageContainerInfo());
        ((LocationClient) Mockito.verify(this.locationClient, Mockito.times(1))).locateStorageContainers(ArgumentMatchers.anyList());
        Assert.assertTrue(storageContainerChannelFuture == this.scClient.getStorageContainerChannelFuture());
        Assert.assertNull(this.scClient.getStorageContainerInfo());
        ((LocationClient) Mockito.verify(this.locationClient, Mockito.times(1))).locateStorageContainers(ArgumentMatchers.anyList());
        this.channelManager.close();
        createFuture.complete(Lists.newArrayList(new OneStorageContainerEndpointResponse[]{OneStorageContainerEndpointResponse.newBuilder().setStatusCode(StatusCode.SUCCESS).setEndpoint(StorageContainerEndpoint.newBuilder().setStorageContainerId(0L).setRevision(1000L).setRwEndpoint(this.endpoint).addRoEndpoint(this.endpoint).build()).build()}));
        try {
            storageContainerChannelFuture.get();
            Assert.fail("Should fail get root range service if channel manager is shutting down.");
        } catch (ExecutionException e) {
            Assert.assertNotNull(e.getCause());
            Assert.assertTrue(e.getCause() instanceof ObjectClosedException);
        }
        StorageContainerInfo storageContainerInfo = this.scClient.getStorageContainerInfo();
        Assert.assertEquals(0L, storageContainerInfo.getGroupId());
        Assert.assertEquals(1000L, storageContainerInfo.getRevision());
        Assert.assertEquals(this.endpoint, storageContainerInfo.getWriteEndpoint());
        Assert.assertEquals(Lists.newArrayList(new Endpoint[]{this.endpoint, this.endpoint}), storageContainerInfo.getReadEndpoints());
        Assert.assertNull(this.channelManager.getChannel(this.endpoint));
        ((LocationClient) Mockito.verify(this.locationClient, Mockito.times(1))).locateStorageContainers(ArgumentMatchers.anyList());
    }

    @Test
    public void testGetRootRangeServiceFailureOnStaleGroupInfo() throws Exception {
        CompletableFuture createFuture = FutureUtils.createFuture();
        CompletableFuture createFuture2 = FutureUtils.createFuture();
        CompletableFuture createFuture3 = FutureUtils.createFuture();
        Mockito.when(this.locationClient.locateStorageContainers(ArgumentMatchers.anyList())).thenReturn(createFuture).thenReturn(createFuture3);
        Assert.assertNull(this.scClient.getStorageServerChannelFuture());
        Assert.assertNull(this.scClient.getStorageContainerInfo());
        CompletableFuture storageContainerChannelFuture = this.scClient.getStorageContainerChannelFuture();
        Assert.assertNotNull(this.scClient.getStorageServerChannelFuture());
        Assert.assertNull(this.scClient.getStorageContainerInfo());
        ((LocationClient) Mockito.verify(this.locationClient, Mockito.times(1))).locateStorageContainers(ArgumentMatchers.anyList());
        Assert.assertTrue(storageContainerChannelFuture == this.scClient.getStorageContainerChannelFuture());
        Assert.assertNull(this.scClient.getStorageContainerInfo());
        ((LocationClient) Mockito.verify(this.locationClient, Mockito.times(1))).locateStorageContainers(ArgumentMatchers.anyList());
        createFuture.complete(Lists.newArrayList(new OneStorageContainerEndpointResponse[]{OneStorageContainerEndpointResponse.newBuilder().setStatusCode(StatusCode.SUCCESS).setEndpoint(StorageContainerEndpoint.newBuilder().setStorageContainerId(0L).setRevision(1000L).setRwEndpoint(this.endpoint).addRoEndpoint(this.endpoint).build()).build()}));
        Assert.assertTrue(((StorageServerChannel) storageContainerChannelFuture.get()) == this.mockChannel);
        StorageContainerInfo storageContainerInfo = this.scClient.getStorageContainerInfo();
        Assert.assertEquals(0L, storageContainerInfo.getGroupId());
        Assert.assertEquals(1000L, storageContainerInfo.getRevision());
        Assert.assertEquals(this.endpoint, storageContainerInfo.getWriteEndpoint());
        Assert.assertEquals(Lists.newArrayList(new Endpoint[]{this.endpoint, this.endpoint}), storageContainerInfo.getReadEndpoints());
        Assert.assertEquals(this.mockChannel, this.channelManager.getChannel(this.endpoint));
        this.scClient.resetStorageServerChannelFuture();
        CompletableFuture storageContainerChannelFuture2 = this.scClient.getStorageContainerChannelFuture();
        createFuture2.complete(Lists.newArrayList(new OneStorageContainerEndpointResponse[]{OneStorageContainerEndpointResponse.newBuilder().setStatusCode(StatusCode.SUCCESS).setEndpoint(StorageContainerEndpoint.newBuilder().setStorageContainerId(0L).setRevision(999L).setRwEndpoint(this.endpoint2).addRoEndpoint(this.endpoint2).build()).build()}));
        ensureCallbackExecuted();
        StorageContainerInfo storageContainerInfo2 = this.scClient.getStorageContainerInfo();
        Assert.assertEquals(0L, storageContainerInfo2.getGroupId());
        Assert.assertEquals(1000L, storageContainerInfo2.getRevision());
        Assert.assertEquals(this.endpoint, storageContainerInfo2.getWriteEndpoint());
        Assert.assertEquals(Lists.newArrayList(new Endpoint[]{this.endpoint, this.endpoint}), storageContainerInfo2.getReadEndpoints());
        Assert.assertFalse(storageContainerChannelFuture2.isDone());
        this.scClient.resetStorageServerChannelFuture();
        CompletableFuture storageContainerChannelFuture3 = this.scClient.getStorageContainerChannelFuture();
        createFuture3.complete(Lists.newArrayList(new OneStorageContainerEndpointResponse[]{OneStorageContainerEndpointResponse.newBuilder().setStatusCode(StatusCode.SUCCESS).setEndpoint(StorageContainerEndpoint.newBuilder().setStorageContainerId(0L).setRevision(1001L).setRwEndpoint(this.endpoint3).addRoEndpoint(this.endpoint3).build()).build()}));
        ensureCallbackExecuted();
        Assert.assertTrue(((StorageServerChannel) storageContainerChannelFuture3.get()) == this.mockChannel3);
        StorageContainerInfo storageContainerInfo3 = this.scClient.getStorageContainerInfo();
        Assert.assertEquals(0L, storageContainerInfo3.getGroupId());
        Assert.assertEquals(1001L, storageContainerInfo3.getRevision());
        Assert.assertEquals(this.endpoint3, storageContainerInfo3.getWriteEndpoint());
        Assert.assertEquals(Lists.newArrayList(new Endpoint[]{this.endpoint3, this.endpoint3}), storageContainerInfo3.getReadEndpoints());
        ((LocationClient) Mockito.verify(this.locationClient, Mockito.times(3))).locateStorageContainers(ArgumentMatchers.anyList());
    }

    @Test
    public void testGetRootRangeServiceUnexpectedException() throws Exception {
        CompletableFuture createFuture = FutureUtils.createFuture();
        CompletableFuture createFuture2 = FutureUtils.createFuture();
        Mockito.when(this.locationClient.locateStorageContainers(ArgumentMatchers.anyList())).thenReturn(createFuture).thenReturn(createFuture2);
        Assert.assertNull(this.scClient.getStorageServerChannelFuture());
        Assert.assertNull(this.scClient.getStorageContainerInfo());
        CompletableFuture storageContainerChannelFuture = this.scClient.getStorageContainerChannelFuture();
        Assert.assertNotNull(this.scClient.getStorageServerChannelFuture());
        Assert.assertNull(this.scClient.getStorageContainerInfo());
        ((LocationClient) Mockito.verify(this.locationClient, Mockito.times(1))).locateStorageContainers(ArgumentMatchers.anyList());
        Assert.assertTrue(storageContainerChannelFuture == this.scClient.getStorageContainerChannelFuture());
        Assert.assertNull(this.scClient.getStorageContainerInfo());
        ((LocationClient) Mockito.verify(this.locationClient, Mockito.times(1))).locateStorageContainers(ArgumentMatchers.anyList());
        OneStorageContainerEndpointResponse build = OneStorageContainerEndpointResponse.newBuilder().setStatusCode(StatusCode.SUCCESS).setEndpoint(StorageContainerEndpoint.newBuilder().setStorageContainerId(0L).setRevision(1000L).setRwEndpoint(this.endpoint).addRoEndpoint(this.endpoint).build()).build();
        createFuture.complete(Lists.newArrayList(new OneStorageContainerEndpointResponse[]{build, build}));
        ensureCallbackExecuted();
        Assert.assertNull(this.channelManager.getChannel(this.endpoint));
        Assert.assertNull(this.scClient.getStorageContainerInfo());
        createFuture2.complete(Lists.newArrayList(new OneStorageContainerEndpointResponse[]{build}));
        Assert.assertTrue(((StorageServerChannel) storageContainerChannelFuture.get()) == this.mockChannel);
        StorageContainerInfo storageContainerInfo = this.scClient.getStorageContainerInfo();
        Assert.assertEquals(0L, storageContainerInfo.getGroupId());
        Assert.assertEquals(1000L, storageContainerInfo.getRevision());
        Assert.assertEquals(this.endpoint, storageContainerInfo.getWriteEndpoint());
        Assert.assertEquals(Lists.newArrayList(new Endpoint[]{this.endpoint, this.endpoint}), storageContainerInfo.getReadEndpoints());
        Assert.assertEquals(this.mockChannel, this.channelManager.getChannel(this.endpoint));
        ((LocationClient) Mockito.verify(this.locationClient, Mockito.times(2))).locateStorageContainers(ArgumentMatchers.anyList());
    }

    @Test
    public void testGetRootRangeServiceExceptionally() throws Exception {
        CompletableFuture createFuture = FutureUtils.createFuture();
        CompletableFuture createFuture2 = FutureUtils.createFuture();
        Mockito.when(this.locationClient.locateStorageContainers(ArgumentMatchers.anyList())).thenReturn(createFuture).thenReturn(createFuture2);
        Assert.assertNull(this.scClient.getStorageServerChannelFuture());
        Assert.assertNull(this.scClient.getStorageContainerInfo());
        CompletableFuture storageContainerChannelFuture = this.scClient.getStorageContainerChannelFuture();
        Assert.assertNotNull(this.scClient.getStorageServerChannelFuture());
        Assert.assertNull(this.scClient.getStorageContainerInfo());
        ((LocationClient) Mockito.verify(this.locationClient, Mockito.times(1))).locateStorageContainers(ArgumentMatchers.anyList());
        Assert.assertTrue(storageContainerChannelFuture == this.scClient.getStorageContainerChannelFuture());
        Assert.assertNull(this.scClient.getStorageContainerInfo());
        ((LocationClient) Mockito.verify(this.locationClient, Mockito.times(1))).locateStorageContainers(ArgumentMatchers.anyList());
        OneStorageContainerEndpointResponse build = OneStorageContainerEndpointResponse.newBuilder().setStatusCode(StatusCode.SUCCESS).setEndpoint(StorageContainerEndpoint.newBuilder().setStorageContainerId(0L).setRevision(1000L).setRwEndpoint(this.endpoint).addRoEndpoint(this.endpoint).build()).build();
        createFuture.completeExceptionally(new ClientException("test-exception"));
        ensureCallbackExecuted();
        Assert.assertNull(this.channelManager.getChannel(this.endpoint));
        Assert.assertNull(this.scClient.getStorageContainerInfo());
        createFuture2.complete(Lists.newArrayList(new OneStorageContainerEndpointResponse[]{build}));
        Assert.assertTrue(((StorageServerChannel) storageContainerChannelFuture.get()) == this.mockChannel);
        StorageContainerInfo storageContainerInfo = this.scClient.getStorageContainerInfo();
        Assert.assertEquals(0L, storageContainerInfo.getGroupId());
        Assert.assertEquals(1000L, storageContainerInfo.getRevision());
        Assert.assertEquals(this.endpoint, storageContainerInfo.getWriteEndpoint());
        Assert.assertEquals(Lists.newArrayList(new Endpoint[]{this.endpoint, this.endpoint}), storageContainerInfo.getReadEndpoints());
        Assert.assertEquals(this.mockChannel, this.channelManager.getChannel(this.endpoint));
        ((LocationClient) Mockito.verify(this.locationClient, Mockito.times(2))).locateStorageContainers(ArgumentMatchers.anyList());
    }
}
