package org.apache.bookkeeper.client;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/client/TestParallelRead.class */
public class TestParallelRead extends BookKeeperClusterTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(TestParallelRead.class);
    final BookKeeper.DigestType digestType;
    final byte[] passwd;

    public TestParallelRead() {
        super(6);
        this.passwd = "parallel-read".getBytes();
        this.digestType = BookKeeper.DigestType.CRC32;
    }

    long getLedgerToRead(int i, int i2, int i3, int i4) throws Exception {
        LedgerHandle createLedger = this.bkc.createLedger(i, i2, i3, this.digestType, this.passwd);
        for (int i5 = 0; i5 < i4; i5++) {
            createLedger.addEntry(("" + i5).getBytes());
        }
        createLedger.close();
        return createLedger.getId();
    }

    @Test
    public void testNormalParallelRead() throws Exception {
        LedgerHandle openLedger = this.bkc.openLedger(getLedgerToRead(5, 2, 2, 10), this.digestType, this.passwd);
        for (int i = 0; i < 10; i++) {
            PendingReadOp pendingReadOp = new PendingReadOp(openLedger, openLedger.bk.scheduler, i, i);
            pendingReadOp.parallelRead(true).submit();
            Iterator it = ((LedgerEntries) pendingReadOp.future().get()).iterator();
            Assert.assertTrue(it.hasNext());
            LedgerEntry ledgerEntry = (LedgerEntry) it.next();
            Assert.assertNotNull(ledgerEntry);
            Assert.assertEquals(i, Integer.parseInt(new String(ledgerEntry.getEntryBytes())));
            ledgerEntry.close();
            Assert.assertFalse(it.hasNext());
        }
        PendingReadOp pendingReadOp2 = new PendingReadOp(openLedger, openLedger.bk.scheduler, 0L, 10 - 1);
        pendingReadOp2.parallelRead(true).submit();
        int i2 = 0;
        for (LedgerEntry ledgerEntry2 : (LedgerEntries) pendingReadOp2.future().get()) {
            Assert.assertNotNull(ledgerEntry2);
            Assert.assertEquals(i2, Integer.parseInt(new String(ledgerEntry2.getEntryBytes())));
            ledgerEntry2.close();
            i2++;
        }
        Assert.assertEquals(10, i2);
        openLedger.close();
    }

    private static <T> void expectFail(CompletableFuture<T> completableFuture, int i) {
        try {
            FutureUtils.result(completableFuture);
            Assert.fail("Expect to fail");
        } catch (Exception e) {
            Assert.assertTrue(e instanceof BKException);
            Assert.assertEquals(i, e.getCode());
        }
    }

    @Test
    public void testParallelReadMissingEntries() throws Exception {
        LedgerHandle openLedger = this.bkc.openLedger(getLedgerToRead(5, 2, 2, 10), this.digestType, this.passwd);
        PendingReadOp pendingReadOp = new PendingReadOp(openLedger, openLedger.bk.scheduler, 11L, 11L);
        pendingReadOp.parallelRead(true).submit();
        expectFail(pendingReadOp.future(), -13);
        PendingReadOp pendingReadOp2 = new PendingReadOp(openLedger, openLedger.bk.scheduler, 8L, 11L);
        pendingReadOp2.parallelRead(true).submit();
        expectFail(pendingReadOp2.future(), -13);
        openLedger.close();
    }

    @Test
    public void testFailParallelRecoveryReadMissingEntryImmediately() throws Exception {
        long ledgerToRead = getLedgerToRead(5, 5, 3, 1);
        ClientConfiguration readEntryTimeout = new ClientConfiguration().setReadEntryTimeout(30000);
        readEntryTimeout.setZkServers(this.zkUtil.getZooKeeperConnectString());
        BookKeeper bookKeeper = new BookKeeper(readEntryTimeout);
        LedgerHandle openLedger = this.bkc.openLedger(ledgerToRead, this.digestType, this.passwd);
        ArrayList ensemble = openLedger.getLedgerMetadata().getEnsemble(10L);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        sleepBookie((BookieSocketAddress) ensemble.get(0), countDownLatch);
        sleepBookie((BookieSocketAddress) ensemble.get(1), countDownLatch2);
        PendingReadOp pendingReadOp = new PendingReadOp(openLedger, openLedger.bk.scheduler, 10L, 10L, true);
        pendingReadOp.parallelRead(true).submit();
        expectFail(pendingReadOp.future(), -13);
        countDownLatch.countDown();
        countDownLatch2.countDown();
        openLedger.close();
        bookKeeper.close();
    }

    @Test
    public void testParallelReadWithFailedBookies() throws Exception {
        long ledgerToRead = getLedgerToRead(5, 3, 3, 10);
        ClientConfiguration readEntryTimeout = new ClientConfiguration().setReadEntryTimeout(30000);
        readEntryTimeout.setZkServers(this.zkUtil.getZooKeeperConnectString());
        BookKeeper bookKeeper = new BookKeeper(readEntryTimeout);
        LedgerHandle openLedger = this.bkc.openLedger(ledgerToRead, this.digestType, this.passwd);
        ArrayList ensemble = openLedger.getLedgerMetadata().getEnsemble(5L);
        killBookie((BookieSocketAddress) ensemble.get(0));
        killBookie((BookieSocketAddress) ensemble.get(1));
        PendingReadOp pendingReadOp = new PendingReadOp(openLedger, openLedger.bk.scheduler, 0L, 10 - 1);
        pendingReadOp.parallelRead(true).submit();
        Iterator it = ((LedgerEntries) pendingReadOp.future().get()).iterator();
        int i = 0;
        while (it.hasNext()) {
            Assert.assertNotNull((LedgerEntry) it.next());
            Assert.assertEquals(i, Integer.parseInt(new String(r0.getEntryBytes())));
            i++;
        }
        Assert.assertEquals(10, i);
        openLedger.close();
        bookKeeper.close();
    }

    @Test
    public void testParallelReadFailureWithFailedBookies() throws Exception {
        long ledgerToRead = getLedgerToRead(5, 3, 3, 10);
        ClientConfiguration readEntryTimeout = new ClientConfiguration().setReadEntryTimeout(30000);
        readEntryTimeout.setZkServers(this.zkUtil.getZooKeeperConnectString());
        BookKeeper bookKeeper = new BookKeeper(readEntryTimeout);
        LedgerHandle openLedger = this.bkc.openLedger(ledgerToRead, this.digestType, this.passwd);
        ArrayList ensemble = openLedger.getLedgerMetadata().getEnsemble(5L);
        killBookie((BookieSocketAddress) ensemble.get(0));
        killBookie((BookieSocketAddress) ensemble.get(1));
        killBookie((BookieSocketAddress) ensemble.get(2));
        PendingReadOp pendingReadOp = new PendingReadOp(openLedger, openLedger.bk.scheduler, 0L, 10 - 1);
        pendingReadOp.parallelRead(true).submit();
        expectFail(pendingReadOp.future(), -8);
        openLedger.close();
        bookKeeper.close();
    }
}
