package org.apache.bookkeeper.client;

import java.util.BitSet;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.LocalBookieEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BatchedReadOp;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.ReadOpBase;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/client/TestSpeculativeBatchRead.class */
public class TestSpeculativeBatchRead extends BookKeeperClusterTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(TestSpeculativeBatchRead.class);
    private final BookKeeper.DigestType digestType;
    byte[] passwd;

    /* loaded from: input_file:org/apache/bookkeeper/client/TestSpeculativeBatchRead$LatchCallback.class */
    class LatchCallback implements AsyncCallback.ReadCallback {
        CountDownLatch l = new CountDownLatch(1);
        boolean success = false;
        long startMillis = System.currentTimeMillis();
        long endMillis = Long.MAX_VALUE;

        LatchCallback() {
        }

        public void readComplete(int i, LedgerHandle ledgerHandle, Enumeration<LedgerEntry> enumeration, Object obj) {
            this.endMillis = System.currentTimeMillis();
            if (TestSpeculativeBatchRead.LOG.isDebugEnabled()) {
                TestSpeculativeBatchRead.LOG.debug("Got response {} {}", Integer.valueOf(i), Long.valueOf(getDuration()));
            }
            this.success = i == 0;
            this.l.countDown();
        }

        long getDuration() {
            return this.endMillis - this.startMillis;
        }

        void expectSuccess(int i) throws Exception {
            System.out.println(this.l.await(i, TimeUnit.MILLISECONDS));
        }

        void expectFail(int i) throws Exception {
            Assert.assertTrue(this.l.await(i, TimeUnit.MILLISECONDS));
            Assert.assertFalse(this.success);
        }

        void expectTimeout(int i) throws Exception {
            Assert.assertFalse(this.l.await(i, TimeUnit.MILLISECONDS));
        }
    }

    public TestSpeculativeBatchRead() {
        super(10);
        this.passwd = "specPW".getBytes();
        this.digestType = BookKeeper.DigestType.CRC32;
    }

    long getLedgerToRead(int i, int i2) throws Exception {
        byte[] bytes = "Data for test".getBytes();
        LedgerHandle createLedger = this.bkc.createLedger(i, i2, this.digestType, this.passwd);
        for (int i3 = 0; i3 < 10; i3++) {
            createLedger.addEntry(bytes);
        }
        createLedger.close();
        return createLedger.getId();
    }

    BookKeeperTestClient createClient(int i) throws Exception {
        return new BookKeeperTestClient(new ClientConfiguration().setSpeculativeReadTimeout(i).setReadTimeout(30000).setUseV2WireProtocol(true).setReorderReadSequenceEnabled(true).setEnsemblePlacementPolicySlowBookies(true).setMetadataServiceUri(this.zkUtil.getMetadataServiceUri()), new TestStatsProvider());
    }

    @Test
    public void testSpeculativeRead() throws Exception {
        long ledgerToRead = getLedgerToRead(3, 2);
        BookKeeperTestClient createClient = createClient(0);
        BookKeeperTestClient createClient2 = createClient(2000);
        LedgerHandle openLedger = createClient.openLedger(ledgerToRead, this.digestType, this.passwd);
        LedgerHandle openLedger2 = createClient2.openLedger(ledgerToRead, this.digestType, this.passwd);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        sleepBookie((BookieId) ((List) openLedger.getLedgerMetadata().getAllEnsembles().get(0L)).get(1), countDownLatch);
        try {
            LatchCallback latchCallback = new LatchCallback();
            LatchCallback latchCallback2 = new LatchCallback();
            openLedger.asyncBatchReadEntries(0L, 1, 1024L, latchCallback, (Object) null);
            openLedger2.asyncBatchReadEntries(0L, 1, 1024L, latchCallback2, (Object) null);
            latchCallback.expectSuccess(2000);
            latchCallback2.expectSuccess(2000);
            LatchCallback latchCallback3 = new LatchCallback();
            LatchCallback latchCallback4 = new LatchCallback();
            openLedger.asyncReadEntries(1L, 1L, latchCallback3, (Object) null);
            openLedger2.asyncReadEntries(1L, 1L, latchCallback4, (Object) null);
            latchCallback4.expectSuccess(4000);
            latchCallback3.expectTimeout(4000);
            Assert.assertTrue(createClient2.getPlacementPolicy().slowBookies.asMap().size() == 1);
            Assert.assertTrue("Stats should not reflect speculative reads if disabled", createClient.getTestStatsProvider().getCounter("bookkeeper_client.SPECULATIVE_READ_COUNT").get().longValue() == 0);
            Assert.assertTrue("Stats should reflect speculative reads", createClient2.getTestStatsProvider().getCounter("bookkeeper_client.SPECULATIVE_READ_COUNT").get().longValue() > 0);
            countDownLatch.countDown();
            openLedger2.close();
            openLedger.close();
            createClient2.close();
            createClient.close();
        } catch (Throwable th) {
            countDownLatch.countDown();
            openLedger2.close();
            openLedger.close();
            createClient2.close();
            createClient.close();
            throw th;
        }
    }

    @Test
    public void testSpeculativeReadMultipleReplicasDown() throws Exception {
        long ledgerToRead = getLedgerToRead(5, 5);
        BookKeeperTestClient createClient = createClient(5000);
        LedgerHandle openLedger = createClient.openLedger(ledgerToRead, this.digestType, this.passwd);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        sleepBookie((BookieId) ((List) openLedger.getLedgerMetadata().getAllEnsembles().get(0L)).get(1), countDownLatch);
        sleepBookie((BookieId) ((List) openLedger.getLedgerMetadata().getAllEnsembles().get(0L)).get(2), countDownLatch);
        sleepBookie((BookieId) ((List) openLedger.getLedgerMetadata().getAllEnsembles().get(0L)).get(4), countDownLatch);
        try {
            LatchCallback latchCallback = new LatchCallback();
            openLedger.asyncBatchReadEntries(0L, 1, 1024L, latchCallback, (Object) null);
            latchCallback.expectSuccess(5000 / 2);
            LatchCallback latchCallback2 = new LatchCallback();
            openLedger.asyncBatchReadEntries(1L, 1, 1024L, latchCallback2, (Object) null);
            latchCallback2.expectTimeout(5000);
            latchCallback2.expectSuccess(5000 * 2);
            LOG.info("Timeout {} latch1 duration {}", 5000, Long.valueOf(latchCallback2.getDuration()));
            Assert.assertTrue("should have taken longer than two timeouts, but less than 3", latchCallback2.getDuration() >= ((long) (5000 * 2)) && latchCallback2.getDuration() < ((long) (5000 * 3)));
            HashSet hashSet = new HashSet();
            hashSet.add(((List) openLedger.getLedgerMetadata().getAllEnsembles().get(0L)).get(1));
            hashSet.add(((List) openLedger.getLedgerMetadata().getAllEnsembles().get(0L)).get(2));
            Assert.assertEquals(createClient.getPlacementPolicy().slowBookies.asMap().keySet(), hashSet);
            LatchCallback latchCallback3 = new LatchCallback();
            openLedger.asyncBatchReadEntries(2L, 1, 1024L, latchCallback3, (Object) null);
            latchCallback3.expectSuccess(5000);
            LatchCallback latchCallback4 = new LatchCallback();
            openLedger.asyncBatchReadEntries(3L, 1, 1024L, latchCallback4, (Object) null);
            latchCallback4.expectSuccess(5000 / 2);
            LatchCallback latchCallback5 = new LatchCallback();
            openLedger.asyncBatchReadEntries(4L, 1, 1024L, latchCallback5, (Object) null);
            latchCallback5.expectTimeout(5000 / 2);
            latchCallback5.expectSuccess(5000);
            LOG.info("Timeout {} latch4 duration {}", 5000, Long.valueOf(latchCallback5.getDuration()));
            Assert.assertTrue("should have taken longer than one timeout, but less than 2", latchCallback5.getDuration() >= ((long) 5000) && latchCallback5.getDuration() < ((long) (5000 * 2)));
            countDownLatch.countDown();
            openLedger.close();
            createClient.close();
        } catch (Throwable th) {
            countDownLatch.countDown();
            openLedger.close();
            createClient.close();
            throw th;
        }
    }

    @Test
    public void testSpeculativeReadFirstReadCompleteIsOk() throws Exception {
        long ledgerToRead = getLedgerToRead(2, 2);
        BookKeeperTestClient createClient = createClient(1000);
        LedgerHandle openLedger = createClient.openLedger(ledgerToRead, this.digestType, this.passwd);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        sleepBookie((BookieId) ((List) openLedger.getLedgerMetadata().getAllEnsembles().get(0L)).get(0), countDownLatch);
        sleepBookie((BookieId) ((List) openLedger.getLedgerMetadata().getAllEnsembles().get(0L)).get(1), countDownLatch2);
        try {
            LatchCallback latchCallback = new LatchCallback();
            openLedger.asyncBatchReadEntries(0L, 1, 1024L, latchCallback, (Object) null);
            latchCallback.expectTimeout(1000);
            countDownLatch.countDown();
            latchCallback.expectSuccess(1000 / 2);
            countDownLatch2.countDown();
            LatchCallback latchCallback2 = new LatchCallback();
            openLedger.asyncBatchReadEntries(1L, 1, 1024L, latchCallback2, (Object) null);
            latchCallback2.expectSuccess(1000 / 2);
            countDownLatch.countDown();
            countDownLatch2.countDown();
            openLedger.close();
            createClient.close();
        } catch (Throwable th) {
            countDownLatch.countDown();
            countDownLatch2.countDown();
            openLedger.close();
            createClient.close();
            throw th;
        }
    }

    @Test
    public void testSpeculativeReadScheduledTaskCancel() throws Exception {
        long ledgerToRead = getLedgerToRead(3, 2);
        BookKeeperTestClient createClient = createClient(1000);
        BatchedReadOp batchedReadOp = null;
        try {
            batchedReadOp = new BatchedReadOp(createClient.openLedger(ledgerToRead, this.digestType, this.passwd), createClient.getClientCtx(), 0L, 5, 5120L, false);
            batchedReadOp.initiate();
            batchedReadOp.future().get();
            Assert.assertNull("Speculative Read tasks must be null", batchedReadOp.getSpeculativeTask());
        } catch (Throwable th) {
            Assert.assertNull("Speculative Read tasks must be null", batchedReadOp.getSpeculativeTask());
            throw th;
        }
    }

    @Test
    public void testSpeculativeReadScheduling() throws Exception {
        long ledgerToRead = getLedgerToRead(3, 2);
        BookKeeperTestClient createClient = createClient(1000);
        LedgerHandle openLedger = createClient.openLedger(ledgerToRead, this.digestType, this.passwd);
        List list = (List) openLedger.getLedgerMetadata().getAllEnsembles().get(0L);
        BitSet bitSet = new BitSet(list.size());
        for (int i = 0; i < list.size(); i++) {
            bitSet.set(i, true);
        }
        BitSet bitSet2 = new BitSet(list.size());
        BitSet bitSet3 = new BitSet(list.size());
        bitSet3.set(1, true);
        ReadOpBase.LedgerEntryRequest ledgerEntryRequest = null;
        ReadOpBase.LedgerEntryRequest ledgerEntryRequest2 = null;
        ReadOpBase.LedgerEntryRequest ledgerEntryRequest3 = null;
        try {
            BatchedReadOp batchedReadOp = new BatchedReadOp(openLedger, createClient.getClientCtx(), 0L, 5, 5120L, false);
            batchedReadOp.getClass();
            ledgerEntryRequest = new BatchedReadOp.SequenceReadRequest(batchedReadOp, list, openLedger.getId(), 0L, 1, 1024L);
            Assert.assertTrue("Should have sent to first", ledgerEntryRequest.maybeSendSpeculativeRead(bitSet).equals(list.get(0)));
            Assert.assertNull("Should not have sent another", ledgerEntryRequest.maybeSendSpeculativeRead(bitSet));
            batchedReadOp.getClass();
            ledgerEntryRequest2 = new BatchedReadOp.SequenceReadRequest(batchedReadOp, list, openLedger.getId(), 2L, 1, 1024L);
            Assert.assertTrue("Should have sent to third", ledgerEntryRequest2.maybeSendSpeculativeRead(bitSet2).equals(list.get(2)));
            Assert.assertTrue("Should have sent to first", ledgerEntryRequest2.maybeSendSpeculativeRead(bitSet3).equals(list.get(0)));
            batchedReadOp.getClass();
            ledgerEntryRequest3 = new BatchedReadOp.SequenceReadRequest(batchedReadOp, list, openLedger.getId(), 4L, 1, 1024L);
            Assert.assertTrue("Should have sent to second", ledgerEntryRequest3.maybeSendSpeculativeRead(bitSet2).equals(list.get(1)));
            Assert.assertNull("Should not have sent another", ledgerEntryRequest3.maybeSendSpeculativeRead(bitSet3));
            for (ReadOpBase.LedgerEntryRequest ledgerEntryRequest4 : new ReadOpBase.LedgerEntryRequest[]{ledgerEntryRequest, ledgerEntryRequest2, ledgerEntryRequest3}) {
                if (ledgerEntryRequest4 != null) {
                    int i2 = 0;
                    while (!ledgerEntryRequest4.isComplete()) {
                        int i3 = i2;
                        i2++;
                        if (i3 > 10) {
                            break;
                        } else {
                            Thread.sleep(1000L);
                        }
                    }
                    Assert.assertTrue("Request should be done", ledgerEntryRequest4.isComplete());
                }
            }
            openLedger.close();
            createClient.close();
        } catch (Throwable th) {
            for (ReadOpBase.LedgerEntryRequest ledgerEntryRequest5 : new ReadOpBase.LedgerEntryRequest[]{ledgerEntryRequest, ledgerEntryRequest2, ledgerEntryRequest3}) {
                if (ledgerEntryRequest5 != null) {
                    int i4 = 0;
                    while (!ledgerEntryRequest5.isComplete()) {
                        int i5 = i4;
                        i4++;
                        if (i5 > 10) {
                            break;
                        } else {
                            Thread.sleep(1000L);
                        }
                    }
                    Assert.assertTrue("Request should be done", ledgerEntryRequest5.isComplete());
                }
            }
            openLedger.close();
            createClient.close();
            throw th;
        }
    }

    @Test
    public void testSequenceReadLocalEnsemble() throws Exception {
        BookKeeperTestClient bookKeeperTestClient = new BookKeeperTestClient(new ClientConfiguration().setSpeculativeReadTimeout(1000).setEnsemblePlacementPolicy(LocalBookieEnsemblePlacementPolicy.class).setReorderReadSequenceEnabled(true).setEnsemblePlacementPolicySlowBookies(true).setMetadataServiceUri(this.zkUtil.getMetadataServiceUri()), new TestStatsProvider());
        LedgerHandle createLedger = bookKeeperTestClient.createLedger(1, 1, this.digestType, this.passwd);
        List list = (List) createLedger.getLedgerMetadata().getAllEnsembles().get(0L);
        BatchedReadOp batchedReadOp = new BatchedReadOp(createLedger, bookKeeperTestClient.getClientCtx(), 0L, 5, 5120L, false);
        batchedReadOp.getClass();
        Assert.assertNotNull(((ReadOpBase.LedgerEntryRequest) new BatchedReadOp.SequenceReadRequest(batchedReadOp, list, createLedger.getId(), 0L, 1, 1024L)).writeSet);
    }
}
