package org.apache.bookkeeper.client;

import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.DistributionSchedule;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/client/SlowBookieTest.class */
public class SlowBookieTest extends BookKeeperClusterTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(SlowBookieTest.class);
    final byte[] entry;

    public SlowBookieTest() {
        super(4);
        this.entry = "Test Entry".getBytes();
        this.baseConf.setNumAddWorkerThreads(0);
        this.baseConf.setNumReadWorkerThreads(0);
    }

    @Test
    public void testSlowBookie() throws Exception {
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setReadTimeout(360).setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        LedgerHandle createLedger = new BookKeeper(clientConfiguration).createLedger(4, 3, 2, BookKeeper.DigestType.CRC32, new byte[0]);
        byte[] bytes = "Test Entry".getBytes();
        for (int i = 0; i < 10; i++) {
            createLedger.addEntry(bytes);
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        final CountDownLatch countDownLatch3 = new CountDownLatch(1);
        List currentEnsemble = createLedger.getCurrentEnsemble();
        try {
            sleepBookie((BookieId) currentEnsemble.get(0), countDownLatch);
            for (int i2 = 0; i2 < 10; i2++) {
                createLedger.addEntry(bytes);
            }
            sleepBookie((BookieId) currentEnsemble.get(2), countDownLatch2);
            final AtomicInteger atomicInteger = new AtomicInteger(-559038737);
            createLedger.asyncAddEntry(bytes, new AsyncCallback.AddCallback() { // from class: org.apache.bookkeeper.client.SlowBookieTest.1
                public void addComplete(int i3, LedgerHandle ledgerHandle, long j, Object obj) {
                    atomicInteger.set(i3);
                    countDownLatch3.countDown();
                }
            }, (Object) null);
            Thread.sleep(3000L);
            Assert.assertEquals("Successfully added entry!", -559038737L, atomicInteger.get());
            countDownLatch.countDown();
            countDownLatch2.countDown();
            countDownLatch3.await(4000L, TimeUnit.MILLISECONDS);
            Assert.assertEquals("Failed to add entry!", 0L, atomicInteger.get());
            countDownLatch.countDown();
            countDownLatch2.countDown();
            countDownLatch3.countDown();
        } catch (Throwable th) {
            countDownLatch.countDown();
            countDownLatch2.countDown();
            countDownLatch3.countDown();
            throw th;
        }
    }

    @Test
    public void testBookieFailureWithSlowBookie() throws Exception {
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setReadTimeout(5).setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper bookKeeper = new BookKeeper(clientConfiguration);
        byte[] bArr = new byte[0];
        final LedgerHandle createLedger = bookKeeper.createLedger(4, 3, 2, BookKeeper.DigestType.CRC32, bArr);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: org.apache.bookkeeper.client.SlowBookieTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (!atomicBoolean.get()) {
                    try {
                        createLedger.addEntry(SlowBookieTest.this.entry);
                    } catch (Exception e) {
                        SlowBookieTest.LOG.error("Exception in add entry thread", e);
                        atomicBoolean2.set(true);
                        return;
                    }
                }
            }
        };
        thread.start();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        startNewBookie();
        sleepBookie(getBookie(0), countDownLatch);
        Thread.sleep(10000L);
        countDownLatch.countDown();
        atomicBoolean.set(true);
        thread.join();
        Assert.assertFalse(atomicBoolean2.get());
        createLedger.close();
        LedgerHandle openLedger = bookKeeper.openLedger(createLedger.getId(), BookKeeper.DigestType.CRC32, bArr);
        LedgerChecker ledgerChecker = new LedgerChecker(bookKeeper);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        final AtomicInteger atomicInteger = new AtomicInteger(-1);
        ledgerChecker.checkLedger(openLedger, new BookkeeperInternalCallbacks.GenericCallback<Set<LedgerFragment>>() { // from class: org.apache.bookkeeper.client.SlowBookieTest.3
            public void operationComplete(int i, Set<LedgerFragment> set) {
                SlowBookieTest.LOG.debug("Checked ledgers returned {} {}", Integer.valueOf(i), set);
                if (i == 0) {
                    atomicInteger.set(set.size());
                }
                countDownLatch2.countDown();
            }
        });
        countDownLatch2.await();
        Assert.assertEquals("There should be no missing fragments", 0L, atomicInteger.get());
    }

    @Test
    public void testSlowBookieAndBackpressureOn() throws Exception {
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setReadTimeout(5).setAddEntryTimeout(1).setAddEntryQuorumTimeout(1).setNumChannelsPerBookie(1).setZkServers(this.zkUtil.getZooKeeperConnectString()).setClientWriteBufferLowWaterMark(1).setClientWriteBufferHighWaterMark(this.entry.length - 1).setWaitTimeoutOnBackpressureMillis(5000L);
        Assert.assertTrue(doBackpressureTest(this.entry, clientConfiguration, false, false, 2000L).readLastConfirmed() < 5);
    }

    @Test
    public void testSlowBookieAndFastFailOn() throws Exception {
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setReadTimeout(5).setAddEntryTimeout(1).setAddEntryQuorumTimeout(1).setNumChannelsPerBookie(1).setZkServers(this.zkUtil.getZooKeeperConnectString()).setClientWriteBufferLowWaterMark(1).setClientWriteBufferHighWaterMark(2).setWaitTimeoutOnBackpressureMillis(0L);
        Assert.assertTrue(doBackpressureTest(this.entry, clientConfiguration, true, false, 1000L).readLastConfirmed() < 5);
    }

    @Test
    public void testSlowBookieAndNoBackpressure() throws Exception {
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setReadTimeout(5).setAddEntryTimeout(1).setAddEntryQuorumTimeout(1).setNumChannelsPerBookie(1).setZkServers(this.zkUtil.getZooKeeperConnectString()).setClientWriteBufferLowWaterMark(1).setClientWriteBufferHighWaterMark(this.entry.length - 1).setWaitTimeoutOnBackpressureMillis(-1L);
        Assert.assertTrue(doBackpressureTest(this.entry, clientConfiguration, false, false, 4000L).readLastConfirmed() > 90);
    }

    private LedgerHandle doBackpressureTest(byte[] bArr, ClientConfiguration clientConfiguration, boolean z, boolean z2, long j) throws Exception {
        BookKeeper bookKeeper = new BookKeeper(clientConfiguration);
        byte[] bArr2 = new byte[0];
        LedgerHandle createLedger = bookKeeper.createLedger(4, 3, 1, BookKeeper.DigestType.CRC32, bArr2);
        createLedger.addEntry(bArr);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean3 = new AtomicBoolean(false);
        Thread thread = new Thread(() -> {
            int i = 0;
            while (!atomicBoolean.get()) {
                try {
                    createLedger.asyncAddEntry(bArr, (i2, ledgerHandle, j2, obj) -> {
                        if (i2 != 0) {
                            atomicBoolean.set(true);
                            atomicBoolean3.set(true);
                        }
                    }, (Object) null);
                    i++;
                    if (i > 100) {
                        atomicBoolean.set(true);
                    }
                } catch (Exception e) {
                    LOG.error("Exception in add entry thread", e);
                    atomicBoolean2.set(true);
                    return;
                }
            }
        });
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        sleepBookie(getBookie(0), countDownLatch);
        sleepBookie(getBookie(1), countDownLatch2);
        setTargetChannelState(bookKeeper, getBookie(0), 0L, false);
        setTargetChannelState(bookKeeper, getBookie(1), 0L, false);
        thread.start();
        Thread.sleep(j);
        atomicBoolean.set(true);
        countDownLatch.countDown();
        countDownLatch2.countDown();
        setTargetChannelState(bookKeeper, getBookie(0), 0L, true);
        setTargetChannelState(bookKeeper, getBookie(1), 0L, true);
        thread.join();
        Assert.assertEquals("write error", Boolean.valueOf(z), Boolean.valueOf(atomicBoolean3.get()));
        Assert.assertEquals("test failure", Boolean.valueOf(z2), Boolean.valueOf(atomicBoolean2.get()));
        createLedger.close();
        LedgerHandle openLedger = bookKeeper.openLedger(createLedger.getId(), BookKeeper.DigestType.CRC32, bArr2);
        LedgerChecker ledgerChecker = new LedgerChecker(bookKeeper);
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        AtomicInteger atomicInteger = new AtomicInteger(-1);
        ledgerChecker.checkLedger(openLedger, (i, set) -> {
            LOG.debug("Checked ledgers returned {} {}", Integer.valueOf(i), set);
            if (i == 0) {
                atomicInteger.set(set.size());
                LOG.error("Checked ledgers returned {} {}", Integer.valueOf(i), set);
            }
            countDownLatch3.countDown();
        });
        countDownLatch3.await();
        Assert.assertEquals("There should be no missing fragments", 0L, atomicInteger.get());
        return openLedger;
    }

    private void setTargetChannelState(BookKeeper bookKeeper, BookieId bookieId, long j, boolean z) throws Exception {
        bookKeeper.getBookieClient().lookupClient(bookieId).obtain((i, perChannelBookieClient) -> {
            perChannelBookieClient.setWritable(z);
        }, j);
    }

    @Test
    public void testWritesetWriteableCheck() throws Exception {
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper bookKeeper = new BookKeeper(clientConfiguration);
        LedgerHandle createLedger = bookKeeper.createLedger(4, 2, 2, BookKeeper.DigestType.CRC32, new byte[0]);
        try {
            createLedger.addEntry(this.entry);
            DistributionSchedule.WriteSet writeSet = new RoundRobinDistributionSchedule(2, 2, 4).getWriteSet(createLedger.addEntry(this.entry) + 1);
            setTargetChannelState(bookKeeper, (BookieId) createLedger.getCurrentEnsemble().get(writeSet.get(ThreadLocalRandom.current().nextInt(writeSet.size()))), 0L, false);
            Assert.assertFalse("We should check b2,b3 both are writeable", createLedger.waitForWritable(writeSet, 0, 1000L));
            createLedger.close();
        } catch (Throwable th) {
            createLedger.close();
            throw th;
        }
    }

    @Test
    public void testManyBookieFailureWithSlowBookies() throws Exception {
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setReadTimeout(5).setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper bookKeeper = new BookKeeper(clientConfiguration);
        byte[] bArr = new byte[0];
        final LedgerHandle createLedger = bookKeeper.createLedger(4, 3, 2, BookKeeper.DigestType.CRC32, bArr);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: org.apache.bookkeeper.client.SlowBookieTest.4
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (!atomicBoolean.get()) {
                    try {
                        createLedger.addEntry(SlowBookieTest.this.entry);
                        Thread.sleep(1L);
                    } catch (Exception e) {
                        SlowBookieTest.LOG.error("Exception in add entry thread", e);
                        atomicBoolean2.set(true);
                        return;
                    }
                }
            }
        };
        thread.start();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        startNewBookie();
        startNewBookie();
        sleepBookie(getBookie(0), countDownLatch);
        sleepBookie(getBookie(1), countDownLatch2);
        Thread.sleep(10000L);
        countDownLatch.countDown();
        countDownLatch2.countDown();
        atomicBoolean.set(true);
        thread.join();
        Assert.assertFalse(atomicBoolean2.get());
        createLedger.close();
        LedgerHandle openLedger = bookKeeper.openLedger(createLedger.getId(), BookKeeper.DigestType.CRC32, bArr);
        LedgerChecker ledgerChecker = new LedgerChecker(bookKeeper);
        final CountDownLatch countDownLatch3 = new CountDownLatch(1);
        final AtomicInteger atomicInteger = new AtomicInteger(-1);
        ledgerChecker.checkLedger(openLedger, new BookkeeperInternalCallbacks.GenericCallback<Set<LedgerFragment>>() { // from class: org.apache.bookkeeper.client.SlowBookieTest.5
            public void operationComplete(int i, Set<LedgerFragment> set) {
                SlowBookieTest.LOG.debug("Checked ledgers returned {} {}", Integer.valueOf(i), set);
                if (i == 0) {
                    atomicInteger.set(set.size());
                }
                countDownLatch3.countDown();
            }
        });
        countDownLatch3.await();
        Assert.assertEquals("There should be no missing fragments", 0L, atomicInteger.get());
    }
}
