package org.apache.bookkeeper.client;

import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/client/TestDelayEnsembleChange.class */
public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
    private static final Logger logger = LoggerFactory.getLogger(TestDelayEnsembleChange.class);
    final BookKeeper.DigestType digestType;
    final byte[] testPasswd;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/bookkeeper/client/TestDelayEnsembleChange$VerificationCallback.class */
    public static class VerificationCallback implements BookkeeperInternalCallbacks.ReadEntryCallback {
        final CountDownLatch latch;
        final AtomicLong numSuccess = new AtomicLong(0);
        final AtomicLong numMissing = new AtomicLong(0);
        final AtomicLong numFailure = new AtomicLong(0);

        VerificationCallback(int i) {
            this.latch = new CountDownLatch(i);
        }

        public void readEntryComplete(int i, long j, long j2, ByteBuf byteBuf, Object obj) {
            if (i == 0) {
                this.numSuccess.incrementAndGet();
            } else if (i == -13 || i == -7) {
                TestDelayEnsembleChange.logger.error("Missed entry({}, {}) from host {}.", new Object[]{Long.valueOf(j), Long.valueOf(j2), obj});
                this.numMissing.incrementAndGet();
            } else {
                TestDelayEnsembleChange.logger.error("Failed to get entry({}, {}) from host {} : {}", new Object[]{Long.valueOf(j), Long.valueOf(j2), obj, Integer.valueOf(i)});
                this.numFailure.incrementAndGet();
            }
            this.latch.countDown();
        }
    }

    public TestDelayEnsembleChange() {
        super(5);
        this.testPasswd = "".getBytes();
        this.digestType = BookKeeper.DigestType.CRC32;
    }

    @Override // org.apache.bookkeeper.test.BookKeeperClusterTestCase
    @Before
    public void setUp() throws Exception {
        this.baseClientConf.setDelayEnsembleChange(true);
        super.setUp();
    }

    private void verifyEntries(LedgerHandle ledgerHandle, long j, long j2, long j3, long j4) throws Exception {
        LedgerMetadata ledgerMetadata = ledgerHandle.getLedgerMetadata();
        long j5 = j;
        while (true) {
            long j6 = j5;
            if (j6 >= j2) {
                return;
            }
            List<BookieId> ensembleAt = ledgerMetadata.getEnsembleAt(j6);
            VerificationCallback verificationCallback = new VerificationCallback(ensembleAt.size());
            for (BookieId bookieId : ensembleAt) {
                this.bkc.getBookieClient().readEntry(bookieId, ledgerHandle.getId(), j6, verificationCallback, bookieId, 0, (byte[]) null);
            }
            verificationCallback.latch.await();
            Assert.assertEquals(j3, verificationCallback.numSuccess.get());
            Assert.assertEquals(j4, verificationCallback.numMissing.get());
            Assert.assertEquals(0L, verificationCallback.numFailure.get());
            j5 = j6 + 1;
        }
    }

    private void verifyEntriesRange(LedgerHandle ledgerHandle, long j, long j2, long j3, long j4) throws Exception {
        LedgerMetadata ledgerMetadata = ledgerHandle.getLedgerMetadata();
        long j5 = j;
        while (true) {
            long j6 = j5;
            if (j6 >= j2) {
                return;
            }
            List<BookieId> ensembleAt = ledgerMetadata.getEnsembleAt(j6);
            VerificationCallback verificationCallback = new VerificationCallback(ensembleAt.size());
            for (BookieId bookieId : ensembleAt) {
                this.bkc.getBookieClient().readEntry(bookieId, ledgerHandle.getId(), j6, verificationCallback, bookieId, 0, (byte[]) null);
            }
            verificationCallback.latch.await();
            Assert.assertTrue(j3 >= verificationCallback.numSuccess.get());
            Assert.assertTrue(j4 <= verificationCallback.numMissing.get());
            Assert.assertEquals(0L, verificationCallback.numFailure.get());
            j5 = j6 + 1;
        }
    }

    @Test
    public void testNotChangeEnsembleIfNotBrokenAckQuorum() throws Exception {
        LedgerHandle createLedger = this.bkc.createLedger(5, 5, 3, this.digestType, this.testPasswd);
        byte[] bytes = "foobar".getBytes();
        for (int i = 0; i < 10; i++) {
            createLedger.addEntry(bytes);
        }
        ServerConfiguration killBookie = killBookie((BookieId) createLedger.getCurrentEnsemble().get(0));
        ServerConfiguration killBookie2 = killBookie((BookieId) createLedger.getCurrentEnsemble().get(1));
        for (int i2 = 10; i2 < 2 * 10; i2++) {
            createLedger.addEntry(bytes);
        }
        Assert.assertEquals("There should be no ensemble change if delaying ensemble change is enabled.", 1L, createLedger.getLedgerMetadata().getAllEnsembles().size());
        this.bsConfs.add(killBookie);
        this.bs.add(startBookie(killBookie));
        this.bsConfs.add(killBookie2);
        this.bs.add(startBookie(killBookie2));
        for (int i3 = 2 * 10; i3 < 3 * 10; i3++) {
            createLedger.addEntry(bytes);
        }
        Assert.assertEquals("There should be no ensemble change if delaying ensemble change is enabled.", 1L, createLedger.getLedgerMetadata().getAllEnsembles().size());
        verifyEntries(createLedger, 0L, 10, 5L, 0L);
        verifyEntries(createLedger, 10, 2 * 10, 3L, 2L);
        verifyEntries(createLedger, 2 * 10, 3 * 10, 5L, 0L);
    }

    @Test
    public void testChangeEnsembleIfBrokenAckQuorum() throws Exception {
        startNewBookie();
        startNewBookie();
        startNewBookie();
        this.bkc.getTestStatsProvider().clear();
        LedgerHandle createLedger = this.bkc.createLedger(5, 5, 3, this.digestType, this.testPasswd);
        byte[] bytes = "foobar".getBytes();
        for (int i = 0; i < 5; i++) {
            createLedger.addEntry(bytes);
        }
        for (BookieId bookieId : (List) createLedger.getLedgerMetadata().getAllEnsembles().get(0L)) {
            Assert.assertTrue("LEDGER_ENSEMBLE_BOOKIE_DISTRIBUTION should be > 0 for " + bookieId, this.bkc.getTestStatsProvider().getCounter(new StringBuilder().append("bookkeeper_client.LEDGER_ENSEMBLE_BOOKIE_DISTRIBUTION-").append(bookieId).toString()).get().longValue() > 0);
        }
        Assert.assertTrue("Stats should have captured a new ensemble", this.bkc.getTestStatsProvider().getOpStatsLogger("bookkeeper_client.bookie_watcher.NEW_ENSEMBLE_TIME").getSuccessCount() > 0);
        Assert.assertTrue("Stats should not have captured an ensemble change", this.bkc.getTestStatsProvider().getOpStatsLogger("bookkeeper_client.bookie_watcher.REPLACE_BOOKIE_TIME").getSuccessCount() == 0);
        logger.info("Kill bookie 0 and write {} entries.", 5);
        ServerConfiguration killBookie = killBookie((BookieId) createLedger.getCurrentEnsemble().get(0));
        for (int i2 = 5; i2 < 2 * 5; i2++) {
            createLedger.addEntry(bytes);
        }
        Assert.assertEquals("There should be no ensemble change if delaying ensemble change is enabled.", 1L, createLedger.getLedgerMetadata().getAllEnsembles().size());
        Assert.assertTrue("Stats should not have captured an ensemble change", this.bkc.getTestStatsProvider().getOpStatsLogger("bookkeeper_client.bookie_watcher.REPLACE_BOOKIE_TIME").getSuccessCount() == 0);
        logger.info("Kill bookie 1 and write another {} entries.", 5);
        ServerConfiguration killBookie2 = killBookie((BookieId) createLedger.getCurrentEnsemble().get(1));
        for (int i3 = 2 * 5; i3 < 3 * 5; i3++) {
            createLedger.addEntry(bytes);
        }
        Assert.assertEquals("There should be no ensemble change if delaying ensemble change is enabled.", 1L, createLedger.getLedgerMetadata().getAllEnsembles().size());
        logger.info("Kill bookie 2 and write another {} entries.", 5);
        ServerConfiguration killBookie3 = killBookie((BookieId) createLedger.getCurrentEnsemble().get(2));
        for (int i4 = 3 * 5; i4 < 4 * 5; i4++) {
            createLedger.addEntry(bytes);
        }
        Assert.assertEquals("There should be ensemble change if ack quorum couldn't be formed.", 2L, createLedger.getLedgerMetadata().getAllEnsembles().size());
        Assert.assertTrue("Stats should have captured an ensemble change", this.bkc.getTestStatsProvider().getOpStatsLogger("bookkeeper_client.bookie_watcher.REPLACE_BOOKIE_TIME").getSuccessCount() > 0);
        List ensembleAt = createLedger.getLedgerMetadata().getEnsembleAt(0L);
        List ensembleAt2 = createLedger.getLedgerMetadata().getEnsembleAt(3 * 5);
        Assert.assertFalse(((BookieId) ensembleAt.get(0)).equals(ensembleAt2.get(0)));
        Assert.assertFalse(((BookieId) ensembleAt.get(1)).equals(ensembleAt2.get(1)));
        Assert.assertFalse(((BookieId) ensembleAt.get(2)).equals(ensembleAt2.get(2)));
        Assert.assertEquals(ensembleAt.get(3), ensembleAt2.get(3));
        Assert.assertEquals(ensembleAt.get(4), ensembleAt2.get(4));
        this.bsConfs.add(killBookie);
        this.bs.add(startBookie(killBookie));
        this.bsConfs.add(killBookie2);
        this.bs.add(startBookie(killBookie2));
        this.bsConfs.add(killBookie3);
        this.bs.add(startBookie(killBookie3));
        for (int i5 = 4 * 5; i5 < 5 * 5; i5++) {
            createLedger.addEntry(bytes);
        }
        Assert.assertEquals("There should be no ensemble change if delaying ensemble change is enabled.", 2L, createLedger.getLedgerMetadata().getAllEnsembles().size());
        verifyEntries(createLedger, 0L, 5, 5L, 0L);
        verifyEntries(createLedger, 5, 2 * 5, 4L, 1L);
        verifyEntries(createLedger, 2 * 5, 3 * 5, 3L, 2L);
        verifyEntries(createLedger, 3 * 5, 4 * 5, 5L, 0L);
        verifyEntries(createLedger, 4 * 5, 5 * 5, 5L, 0L);
    }

    @Test
    public void testEnsembleChangeWithNotEnoughBookies() throws Exception {
        startNewBookie();
        LedgerHandle createLedger = this.bkc.createLedger(5, 5, 3, this.digestType, this.testPasswd);
        byte[] bytes = "foobar".getBytes();
        for (int i = 0; i < 10; i++) {
            createLedger.addEntry(bytes);
        }
        logger.info("Killed 3 bookies and add {} more entries : {}", 10, createLedger.getLedgerMetadata());
        ServerConfiguration killBookie = killBookie((BookieId) createLedger.getCurrentEnsemble().get(0));
        ServerConfiguration killBookie2 = killBookie((BookieId) createLedger.getCurrentEnsemble().get(1));
        ServerConfiguration killBookie3 = killBookie((BookieId) createLedger.getCurrentEnsemble().get(2));
        for (int i2 = 10; i2 < 2 * 10; i2++) {
            createLedger.addEntry(bytes);
        }
        logger.info("Ledger metadata after killed bookies : {}", createLedger.getLedgerMetadata());
        Assert.assertEquals("There should be ensemble change if ack quorum is broken.", 2L, createLedger.getLedgerMetadata().getAllEnsembles().size());
        this.bsConfs.add(killBookie);
        this.bs.add(startBookie(killBookie));
        this.bsConfs.add(killBookie2);
        this.bs.add(startBookie(killBookie2));
        this.bsConfs.add(killBookie3);
        this.bs.add(startBookie(killBookie3));
        for (int i3 = 2 * 10; i3 < 3 * 10; i3++) {
            createLedger.addEntry(bytes);
        }
        Assert.assertEquals("There should be no ensemble change after adding failed bookies back.", 2L, createLedger.getLedgerMetadata().getAllEnsembles().size());
        verifyEntries(createLedger, 0L, 10, 5L, 0L);
        verifyEntries(createLedger, 10, 2 * 10, 3L, 2L);
        verifyEntries(createLedger, 2 * 10, 3 * 10, 5L, 0L);
    }

    @Test
    public void testEnsembleChangeWithMoreBookieFailures() throws Exception {
        for (int i = 0; i < 5; i++) {
            startNewBookie();
        }
        LedgerHandle createLedger = this.bkc.createLedger(5, 5, 3, this.digestType, this.testPasswd);
        byte[] bytes = "foobar".getBytes();
        for (int i2 = 0; i2 < 10; i2++) {
            logger.info("Add entry {}", Integer.valueOf(i2));
            createLedger.addEntry(bytes);
        }
        logger.info("Killed 5 bookies and add {} more entries : {}", 10, createLedger.getLedgerMetadata());
        ArrayList<ServerConfiguration> arrayList = new ArrayList(5);
        for (int i3 = 0; i3 < 5; i3++) {
            arrayList.add(killBookie((BookieId) createLedger.getCurrentEnsemble().get(i3)));
        }
        for (int i4 = 10; i4 < 2 * 10; i4++) {
            logger.info("Add entry {}", Integer.valueOf(i4));
            createLedger.addEntry(bytes);
        }
        logger.info("Ledger metadata after killed bookies : {}", createLedger.getLedgerMetadata());
        Assert.assertEquals("There should be ensemble change if breaking ack quorum.", 2L, createLedger.getLedgerMetadata().getAllEnsembles().size());
        for (ServerConfiguration serverConfiguration : arrayList) {
            this.bsConfs.add(serverConfiguration);
            this.bs.add(startBookie(serverConfiguration));
        }
        for (int i5 = 2 * 10; i5 < 3 * 10; i5++) {
            logger.info("Add entry {}", Integer.valueOf(i5));
            createLedger.addEntry(bytes);
        }
        Assert.assertEquals("There should not be ensemble changed if delaying ensemble change is enabled.", 2L, createLedger.getLedgerMetadata().getAllEnsembles().size());
        verifyEntries(createLedger, 0L, 10, 5L, 0L);
        verifyEntriesRange(createLedger, 10, 2 * 10, 5L, 0L);
        verifyEntries(createLedger, 2 * 10, 3 * 10, 5L, 0L);
    }

    @Test
    public void testChangeEnsembleIfBookieReadOnly() throws Exception {
        LedgerHandle createLedger = this.bkc.createLedger(3, 3, 2, this.digestType, this.testPasswd);
        byte[] bytes = "foobar".getBytes();
        for (int i = 0; i < 10; i++) {
            createLedger.addEntry(bytes);
        }
        setBookieToReadOnly((BookieId) createLedger.getCurrentEnsemble().get(0));
        for (int i2 = 10; i2 < 2 * 10; i2++) {
            createLedger.addEntry(bytes);
        }
        Assert.assertEquals("The ensemble should change when a bookie is readonly even if we delay ensemble change.", 2L, createLedger.getLedgerMetadata().getAllEnsembles().size());
    }

    @Test
    public void testChangeEnsembleSecondBookieReadOnly() throws Exception {
        LedgerHandle createLedger = this.bkc.createLedger(3, 3, 2, this.digestType, this.testPasswd);
        byte[] bytes = "foobar".getBytes();
        for (int i = 0; i < 10; i++) {
            createLedger.addEntry(bytes);
        }
        BookieId bookieId = (BookieId) createLedger.getCurrentEnsemble().get(0);
        BookieId bookieId2 = (BookieId) createLedger.getCurrentEnsemble().get(1);
        killBookie(bookieId);
        for (int i2 = 0; i2 < 10; i2++) {
            createLedger.addEntry(bytes);
        }
        Assert.assertEquals("There should be ensemble change if delaying ensemble change is enabled.", 1L, createLedger.getLedgerMetadata().getAllEnsembles().size());
        setBookieToReadOnly(bookieId2);
        for (int i3 = 0; i3 < 10; i3++) {
            createLedger.addEntry(bytes);
        }
        Assert.assertEquals("The ensemble should change when a bookie is readonly even if we delay ensemble change.", 2L, createLedger.getLedgerMetadata().getAllEnsembles().size());
        Assert.assertEquals(3L, createLedger.getCurrentEnsemble().size());
        Assert.assertFalse(createLedger.getCurrentEnsemble().contains(bookieId));
        Assert.assertFalse(createLedger.getCurrentEnsemble().contains(bookieId2));
    }
}
