package org.apache.bookkeeper.clients.utils;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessorTest.class */
public class ListenableFutureRpcProcessorTest {
    private ListenableFutureRpcProcessor<String, String, String> processor;
    private StorageContainerChannel scChannel;
    private ScheduledExecutorService executor;

    @Before
    public void setup() {
        this.executor = Executors.newSingleThreadScheduledExecutor();
        this.scChannel = (StorageContainerChannel) Mockito.mock(StorageContainerChannel.class);
        this.processor = (ListenableFutureRpcProcessor) Mockito.spy(new ListenableFutureRpcProcessor<String, String, String>(this.scChannel, this.executor, ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY) { // from class: org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessorTest.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* renamed from: createRequest, reason: merged with bridge method [inline-methods] */
            public String m7createRequest() {
                return null;
            }

            /* JADX INFO: Access modifiers changed from: protected */
            public ListenableFuture<String> sendRPC(StorageServerChannel storageServerChannel, String str) {
                return null;
            }

            /* JADX INFO: Access modifiers changed from: protected */
            public String processResponse(String str) throws Exception {
                return null;
            }
        });
    }

    @Test
    public void testFailToConnect() {
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(this.scChannel.getStorageContainerChannelFuture()).thenReturn(completableFuture);
        CompletableFuture process = this.processor.process();
        ((StorageContainerChannel) Mockito.verify(this.scChannel, Mockito.times(1))).getStorageContainerChannelFuture();
        Exception exc = new Exception("test-exception");
        completableFuture.completeExceptionally(exc);
        try {
            FutureUtils.result(process);
            Assert.fail("Should fail the process if failed to connect to storage server");
        } catch (Exception e) {
            Assert.assertSame(exc, e);
        }
    }

    @Test
    public void testProcessSuccessfully() throws Exception {
        StorageServerChannel storageServerChannel = (StorageServerChannel) Mockito.mock(StorageServerChannel.class);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(this.scChannel.getStorageContainerChannelFuture()).thenReturn(completableFuture);
        SettableFuture create = SettableFuture.create();
        Mockito.when((String) this.processor.createRequest()).thenReturn("request");
        Mockito.when(this.processor.sendRPC((StorageServerChannel) ArgumentMatchers.same(storageServerChannel), (String) ArgumentMatchers.eq("request"))).thenReturn(create);
        Mockito.when((String) this.processor.processResponse((String) ArgumentMatchers.eq("response"))).thenReturn("result");
        CompletableFuture process = this.processor.process();
        ((StorageContainerChannel) Mockito.verify(this.scChannel, Mockito.times(1))).getStorageContainerChannelFuture();
        FutureUtils.complete(completableFuture, storageServerChannel);
        create.set("response");
        Assert.assertEquals("result", process.get());
    }

    @Test
    public void testProcessResponseException() throws Exception {
        StorageServerChannel storageServerChannel = (StorageServerChannel) Mockito.mock(StorageServerChannel.class);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(this.scChannel.getStorageContainerChannelFuture()).thenReturn(completableFuture);
        SettableFuture create = SettableFuture.create();
        Exception exc = new Exception("test-exception");
        Mockito.when((String) this.processor.createRequest()).thenReturn("request");
        Mockito.when(this.processor.sendRPC((StorageServerChannel) ArgumentMatchers.same(storageServerChannel), (String) ArgumentMatchers.eq("request"))).thenReturn(create);
        Mockito.when((String) this.processor.processResponse((String) ArgumentMatchers.eq("response"))).thenThrow(new Throwable[]{exc});
        CompletableFuture process = this.processor.process();
        ((StorageContainerChannel) Mockito.verify(this.scChannel, Mockito.times(1))).getStorageContainerChannelFuture();
        FutureUtils.complete(completableFuture, storageServerChannel);
        create.set("response");
        try {
            FutureUtils.result(process);
            Assert.fail("Should throw exception on processing result");
        } catch (Exception e) {
            Assert.assertSame(exc, e);
        }
    }

    @Test
    public void testProcessRpcException() throws Exception {
        StorageServerChannel storageServerChannel = (StorageServerChannel) Mockito.mock(StorageServerChannel.class);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(this.scChannel.getStorageContainerChannelFuture()).thenReturn(completableFuture);
        SettableFuture create = SettableFuture.create();
        Mockito.when((String) this.processor.createRequest()).thenReturn("request");
        Mockito.when(this.processor.sendRPC((StorageServerChannel) ArgumentMatchers.same(storageServerChannel), (String) ArgumentMatchers.eq("request"))).thenReturn(create);
        Mockito.when((String) this.processor.processResponse((String) ArgumentMatchers.eq("response"))).thenReturn("result");
        CompletableFuture process = this.processor.process();
        ((StorageContainerChannel) Mockito.verify(this.scChannel, Mockito.times(1))).getStorageContainerChannelFuture();
        FutureUtils.complete(completableFuture, storageServerChannel);
        create.setException(new StatusRuntimeException(Status.INTERNAL));
        try {
            FutureUtils.result(process);
            Assert.fail("Should throw fail immediately if rpc request failed");
        } catch (Exception e) {
            Assert.assertTrue(e instanceof StatusRuntimeException);
            Assert.assertEquals(Status.INTERNAL, e.getStatus());
        }
    }

    @Test
    public void testProcessRetryNotFoundRpcException() throws Exception {
        String str = "response";
        StorageServerChannel storageServerChannel = (StorageServerChannel) Mockito.mock(StorageServerChannel.class);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(this.scChannel.getStorageContainerChannelFuture()).thenReturn(completableFuture);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Mockito.when((String) this.processor.createRequest()).thenReturn("request");
        Mockito.when((String) this.processor.processResponse((String) ArgumentMatchers.eq("response"))).thenReturn("result");
        Mockito.when(this.processor.sendRPC((StorageServerChannel) ArgumentMatchers.same(storageServerChannel), (String) ArgumentMatchers.eq("request"))).thenAnswer(invocationOnMock -> {
            SettableFuture create = SettableFuture.create();
            if (atomicInteger.getAndIncrement() > 2) {
                create.set(str);
            } else {
                create.setException(new StatusRuntimeException(Status.NOT_FOUND));
            }
            return create;
        });
        CompletableFuture process = this.processor.process();
        FutureUtils.complete(completableFuture, storageServerChannel);
        Assert.assertEquals("result", FutureUtils.result(process));
        ((StorageContainerChannel) Mockito.verify(this.scChannel, Mockito.times(4))).getStorageContainerChannelFuture();
    }
}
