package org.apache.bookkeeper.proto;

import com.google.common.util.concurrent.SettableFuture;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.channel.EventLoop;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/bookkeeper/proto/ReadEntryProcessorTest.class */
public class ReadEntryProcessorTest {
    private Channel channel;
    private BookieRequestProcessor requestProcessor;
    private Bookie bookie;

    @Before
    public void setup() throws IOException, BookieException {
        this.channel = (Channel) Mockito.mock(Channel.class);
        this.bookie = (Bookie) Mockito.mock(Bookie.class);
        this.requestProcessor = (BookieRequestProcessor) Mockito.mock(BookieRequestProcessor.class);
        Mockito.when(this.requestProcessor.getBookie()).thenReturn(this.bookie);
        Mockito.when(Long.valueOf(this.requestProcessor.getWaitTimeoutOnBackpressureMillis())).thenReturn(-1L);
        Mockito.when(this.requestProcessor.getRequestStats()).thenReturn(new RequestStats(NullStatsLogger.INSTANCE));
        Mockito.when(this.channel.voidPromise()).thenReturn(Mockito.mock(ChannelPromise.class));
        Mockito.when(this.channel.writeAndFlush(ArgumentMatchers.any())).thenReturn(Mockito.mock(ChannelPromise.class));
        EventLoop eventLoop = (EventLoop) Mockito.mock(EventLoop.class);
        Mockito.when(Boolean.valueOf(eventLoop.inEventLoop())).thenReturn(true);
        Mockito.when(this.channel.eventLoop()).thenReturn(eventLoop);
    }

    @Test
    public void testSuccessfulAsynchronousFenceRequest() throws Exception {
        testAsynchronousRequest(true, 0);
    }

    @Test
    public void testFailedAsynchronousFenceRequest() throws Exception {
        testAsynchronousRequest(false, 101);
    }

    private void testAsynchronousRequest(boolean z, int i) throws Exception {
        SettableFuture create = SettableFuture.create();
        Mockito.when(this.bookie.fenceLedger(ArgumentMatchers.anyLong(), (byte[]) ArgumentMatchers.any())).thenReturn(create);
        DefaultChannelPromise defaultChannelPromise = new DefaultChannelPromise(this.channel);
        AtomicReference atomicReference = new AtomicReference();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((Channel) Mockito.doAnswer(invocationOnMock -> {
            atomicReference.set(invocationOnMock.getArgument(0));
            defaultChannelPromise.setSuccess();
            countDownLatch.countDown();
            return defaultChannelPromise;
        }).when(this.channel)).writeAndFlush(ArgumentMatchers.any(BookieProtocol.Response.class));
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        long currentTimeMillis = System.currentTimeMillis();
        ReadEntryProcessor.create(new BookieProtocol.ReadRequest((byte) 2, currentTimeMillis, 1L, (short) 1, new byte[0]), this.channel, this.requestProcessor, newCachedThreadPool, true).run();
        create.set(Boolean.valueOf(z));
        countDownLatch.await();
        ((Channel) Mockito.verify(this.channel, Mockito.times(1))).writeAndFlush(ArgumentMatchers.any(BookieProtocol.Response.class));
        Assert.assertTrue(atomicReference.get() instanceof BookieProtocol.Response);
        BookieProtocol.Response response = (BookieProtocol.Response) atomicReference.get();
        Assert.assertEquals(1L, response.getEntryId());
        Assert.assertEquals(currentTimeMillis, response.getLedgerId());
        Assert.assertEquals(2L, response.getOpCode());
        Assert.assertEquals(i, response.getErrorCode());
        newCachedThreadPool.shutdown();
    }

    @Test
    public void testSuccessfulSynchronousFenceRequest() throws Exception {
        testSynchronousRequest(true, 0);
    }

    @Test
    public void testFailedSynchronousFenceRequest() throws Exception {
        testSynchronousRequest(false, 101);
    }

    private void testSynchronousRequest(boolean z, int i) throws Exception {
        SettableFuture create = SettableFuture.create();
        Mockito.when(this.bookie.fenceLedger(ArgumentMatchers.anyLong(), (byte[]) ArgumentMatchers.any())).thenReturn(create);
        DefaultChannelPromise defaultChannelPromise = new DefaultChannelPromise(this.channel);
        AtomicReference atomicReference = new AtomicReference();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((Channel) Mockito.doAnswer(invocationOnMock -> {
            atomicReference.set(invocationOnMock.getArgument(0));
            defaultChannelPromise.setSuccess();
            countDownLatch.countDown();
            return defaultChannelPromise;
        }).when(this.channel)).writeAndFlush(ArgumentMatchers.any(BookieProtocol.Response.class));
        long currentTimeMillis = System.currentTimeMillis();
        ReadEntryProcessor create2 = ReadEntryProcessor.create(new BookieProtocol.ReadRequest((byte) 2, currentTimeMillis, 1L, (short) 1, new byte[0]), this.channel, this.requestProcessor, (ExecutorService) null, true);
        create.set(Boolean.valueOf(z));
        create2.run();
        countDownLatch.await();
        ((Channel) Mockito.verify(this.channel, Mockito.times(1))).writeAndFlush(ArgumentMatchers.any(BookieProtocol.Response.class));
        Assert.assertTrue(atomicReference.get() instanceof BookieProtocol.Response);
        BookieProtocol.Response response = (BookieProtocol.Response) atomicReference.get();
        Assert.assertEquals(1L, response.getEntryId());
        Assert.assertEquals(currentTimeMillis, response.getLedgerId());
        Assert.assertEquals(2L, response.getOpCode());
        Assert.assertEquals(i, response.getErrorCode());
    }

    @Test
    public void testNonFenceRequest() throws Exception {
        DefaultChannelPromise defaultChannelPromise = new DefaultChannelPromise(this.channel);
        AtomicReference atomicReference = new AtomicReference();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((Channel) Mockito.doAnswer(invocationOnMock -> {
            atomicReference.set(invocationOnMock.getArgument(0));
            defaultChannelPromise.setSuccess();
            countDownLatch.countDown();
            return defaultChannelPromise;
        }).when(this.channel)).writeAndFlush(ArgumentMatchers.any(BookieProtocol.Response.class));
        long currentTimeMillis = System.currentTimeMillis();
        ReadEntryProcessor.create(new BookieProtocol.ReadRequest((byte) 2, currentTimeMillis, 1L, (short) 0, new byte[0]), this.channel, this.requestProcessor, (ExecutorService) null, true).run();
        countDownLatch.await();
        ((Channel) Mockito.verify(this.channel, Mockito.times(1))).writeAndFlush(ArgumentMatchers.any(BookieProtocol.Response.class));
        Assert.assertTrue(atomicReference.get() instanceof BookieProtocol.Response);
        BookieProtocol.Response response = (BookieProtocol.Response) atomicReference.get();
        Assert.assertEquals(1L, response.getEntryId());
        Assert.assertEquals(currentTimeMillis, response.getLedgerId());
        Assert.assertEquals(2L, response.getOpCode());
        Assert.assertEquals(0L, response.getErrorCode());
    }
}
