package org.apache.bookkeeper.proto;

import io.netty.buffer.UnpooledByteBufAllocator;
import java.lang.reflect.Field;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.Journal;
import org.apache.bookkeeper.bookie.SlowBufferedChannel;
import org.apache.bookkeeper.bookie.SlowInterleavedLedgerStorage;
import org.apache.bookkeeper.bookie.SlowSortedLedgerStorage;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/proto/BookieBackpressureTest.class */
public class BookieBackpressureTest extends BookKeeperClusterTestCase implements AsyncCallback.AddCallback, AsyncCallback.ReadCallback, AsyncCallback.ReadLastConfirmedCallback {
    private static final Logger LOG = LoggerFactory.getLogger(BookieBackpressureTest.class);
    byte[] ledgerPassword;
    final byte[] data;
    static final int NUM_ENTRIES_TO_WRITE = 200;
    static final int ENTRIES_IN_MEMTABLE = 2;
    static final int MAX_PENDING = 5;
    static final int NUM_OF_LEDGERS = 10;
    BookKeeper.DigestType digestType;
    long getDelay;
    long addDelay;
    long flushDelay;

    /* loaded from: input_file:org/apache/bookkeeper/proto/BookieBackpressureTest$SyncObj.class */
    class SyncObj {
        AtomicInteger rc = new AtomicInteger(0);
        Enumeration<LedgerEntry> ls = null;
        volatile int counter = 0;
        long lastConfirmed = -1;
        boolean value = false;

        public SyncObj() {
        }

        void setReturnCode(int i) {
            this.rc.compareAndSet(0, i);
        }

        void setLedgerEntries(Enumeration<LedgerEntry> enumeration) {
            this.ls = enumeration;
        }
    }

    public BookieBackpressureTest() {
        super(1);
        this.ledgerPassword = "aaa".getBytes();
        this.data = new byte[8192];
        this.digestType = BookKeeper.DigestType.CRC32;
        this.baseClientConf.setAddEntryTimeout(100);
        this.baseClientConf.setAddEntryQuorumTimeout(100);
        this.baseClientConf.setReadEntryTimeout(100);
    }

    @Override // org.apache.bookkeeper.test.BookKeeperClusterTestCase
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.getDelay = 0L;
        this.addDelay = 0L;
        this.flushDelay = 0L;
    }

    private void mockJournal(Bookie bookie, long j, long j2, long j3) throws Exception {
        if (j > 0 || j2 > 0 || j3 > 0) {
            List<Journal> journals = getJournals(bookie);
            for (int i = 0; i < journals.size(); i++) {
                Journal journal = (Journal) Mockito.spy(journals.get(i));
                Mockito.when(journal.getBufferedChannelBuilder()).thenReturn((fileChannel, i2) -> {
                    SlowBufferedChannel slowBufferedChannel = new SlowBufferedChannel(UnpooledByteBufAllocator.DEFAULT, fileChannel, i2);
                    slowBufferedChannel.setAddDelay(j2);
                    slowBufferedChannel.setGetDelay(j);
                    slowBufferedChannel.setFlushDelay(j3);
                    return slowBufferedChannel;
                });
                journals.set(i, journal);
            }
        }
    }

    private List<Journal> getJournals(Bookie bookie) throws NoSuchFieldException, IllegalAccessException {
        Field declaredField = bookie.getClass().getDeclaredField("journals");
        declaredField.setAccessible(true);
        return (List) declaredField.get(bookie);
    }

    @Test
    public void testWriteNoBackpressureSlowJournal() throws Exception {
        this.bsConfs.get(0).setMaxAddsInProgressLimit(0);
        this.addDelay = 1L;
        doWritesNoBackpressure(0);
    }

    @Test
    public void testWriteNoBackpressureSlowJournalFlush() throws Exception {
        this.bsConfs.get(0).setMaxAddsInProgressLimit(0);
        this.bsConfs.get(0).setJournalAdaptiveGroupWrites(false);
        this.flushDelay = 1L;
        doWritesNoBackpressure(0);
    }

    @Test
    public void testWriteWithBackpressureSlowJournal() throws Exception {
        this.bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
        this.flushDelay = 1L;
        doWritesWithBackpressure(0);
    }

    @Test
    public void testWriteWithBackpressureSlowJournalFlush() throws Exception {
        this.bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
        this.bsConfs.get(0).setJournalAdaptiveGroupWrites(false);
        this.flushDelay = 1L;
        doWritesWithBackpressure(0);
    }

    @Test
    public void testWriteNoBackpressureSlowInterleavedStorage() throws Exception {
        this.bsConfs.get(0).setMaxAddsInProgressLimit(0);
        this.bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
        this.bsConfs.get(0).setWriteBufferBytes(this.data.length);
        this.bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
        doWritesNoBackpressure(0);
    }

    @Test
    public void testWriteWithBackpressureSlowInterleavedStorage() throws Exception {
        this.bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
        this.bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
        this.bsConfs.get(0).setWriteBufferBytes(this.data.length);
        this.bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
        doWritesWithBackpressure(0);
    }

    @Test
    public void testWriteNoBackpressureSlowInterleavedStorageFlush() throws Exception {
        this.bsConfs.get(0).setMaxAddsInProgressLimit(0);
        this.bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
        this.bsConfs.get(0).setWriteBufferBytes(this.data.length);
        this.bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
        doWritesNoBackpressure(0);
    }

    @Test
    public void testWriteWithBackpressureSlowInterleavedStorageFlush() throws Exception {
        this.bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
        this.bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
        this.bsConfs.get(0).setWriteBufferBytes(this.data.length);
        this.bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
        doWritesWithBackpressure(0);
    }

    @Test
    public void testWriteNoBackpressureSortedStorage() throws Exception {
        this.bsConfs.get(0).setMaxAddsInProgressLimit(0);
        this.bsConfs.get(0).setLedgerStorageClass(SlowSortedLedgerStorage.class.getName());
        this.bsConfs.get(0).setWriteBufferBytes(this.data.length);
        Assert.assertTrue("for the test, memtable should not keep more entries than allowed", true);
        this.bsConfs.get(0).setSkipListSizeLimit((this.data.length * ENTRIES_IN_MEMTABLE) - 1);
        this.bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
        this.bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
        doWritesNoBackpressure(0);
    }

    @Test
    public void testWriteWithBackpressureSortedStorage() throws Exception {
        this.bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
        this.bsConfs.get(0).setLedgerStorageClass(SlowSortedLedgerStorage.class.getName());
        this.bsConfs.get(0).setWriteBufferBytes(this.data.length);
        Assert.assertTrue("for the test, memtable should not keep more entries than allowed", true);
        this.bsConfs.get(0).setSkipListSizeLimit((this.data.length * ENTRIES_IN_MEMTABLE) - 1);
        this.bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
        this.bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
        doWritesWithBackpressure(0);
    }

    @Test
    public void testReadsNoBackpressure() throws Exception {
        this.bsConfs.get(0).setMaxReadsInProgressLimit(0);
        this.bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
        this.bsConfs.get(0).setWriteBufferBytes(this.data.length);
        this.bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_GET_DELAY, "1");
        Assert.assertThat("reads in progress should exceed MAX_PENDING", Integer.valueOf(generateDataAndDoReads(0).maxReadsInProgressCount()), Matchers.greaterThan(Integer.valueOf(MAX_PENDING)));
    }

    @Test
    public void testReadsWithBackpressure() throws Exception {
        this.bsConfs.get(0).setMaxReadsInProgressLimit(MAX_PENDING);
        this.bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
        this.bsConfs.get(0).setWriteBufferBytes(this.data.length);
        this.bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_GET_DELAY, "1");
        Assert.assertThat("reads in progress should NOT exceed MAX_PENDING ", Integer.valueOf(generateDataAndDoReads(0).maxReadsInProgressCount()), Matchers.lessThanOrEqualTo(Integer.valueOf(MAX_PENDING)));
    }

    private BookieRequestProcessor generateDataAndDoReads(int i) throws Exception {
        this.bs.get(i).shutdown();
        BookieServer bookieServer = new BookieServer(this.bsConfs.get(i));
        mockJournal(bookieServer.getBookie(), this.getDelay, this.addDelay, this.flushDelay);
        bookieServer.start();
        this.bs.set(i, bookieServer);
        LOG.info("creating ledgers");
        LedgerHandle[] ledgerHandleArr = new LedgerHandle[NUM_OF_LEDGERS];
        for (int i2 = 0; i2 < NUM_OF_LEDGERS; i2++) {
            ledgerHandleArr[i2] = this.bkc.createLedger(1, 1, this.digestType, this.ledgerPassword);
            LOG.info("created ledger ID: {}", Long.valueOf(ledgerHandleArr[i2].getId()));
        }
        LOG.info("generating data for reads");
        CountDownLatch countDownLatch = new CountDownLatch(100);
        for (int i3 = 0; i3 < NUM_OF_LEDGERS; i3++) {
            for (int i4 = 0; i4 < NUM_OF_LEDGERS; i4++) {
                ledgerHandleArr[i4].asyncAddEntry(this.data, (i5, ledgerHandle, j, obj) -> {
                    countDownLatch.countDown();
                }, (Object) null);
            }
        }
        countDownLatch.await();
        LOG.info("issue bunch of async reads");
        CountDownLatch countDownLatch2 = new CountDownLatch(100);
        for (int i6 = 0; i6 < NUM_OF_LEDGERS; i6++) {
            for (int i7 = 0; i7 < NUM_OF_LEDGERS; i7++) {
                ledgerHandleArr[i7].asyncReadEntries(i6, i6, (i8, ledgerHandle2, enumeration, obj2) -> {
                    countDownLatch2.countDown();
                }, (Object) null);
            }
        }
        countDownLatch2.await();
        LOG.info("reads finished");
        return bookieServer.getBookieRequestProcessor();
    }

    private void doWritesNoBackpressure(int i) throws Exception {
        this.bs.get(i).shutdown();
        BookieServer bookieServer = new BookieServer(this.bsConfs.get(i));
        mockJournal(bookieServer.getBookie(), this.getDelay, this.addDelay, this.flushDelay);
        bookieServer.start();
        this.bs.set(i, bookieServer);
        LOG.info("Creating ledgers");
        LedgerHandle[] ledgerHandleArr = new LedgerHandle[NUM_OF_LEDGERS];
        for (int i2 = 0; i2 < NUM_OF_LEDGERS; i2++) {
            ledgerHandleArr[i2] = this.bkc.createLedger(1, 1, this.digestType, this.ledgerPassword);
            LOG.info("created ledger ID: {}", Long.valueOf(ledgerHandleArr[i2].getId()));
        }
        CountDownLatch countDownLatch = new CountDownLatch(2000);
        LOG.info("submitting writes");
        for (int i3 = 0; i3 < NUM_ENTRIES_TO_WRITE; i3++) {
            for (int i4 = 0; i4 < NUM_OF_LEDGERS; i4++) {
                ledgerHandleArr[i4].asyncAddEntry(this.data, (i5, ledgerHandle, j, obj) -> {
                    countDownLatch.countDown();
                }, (Object) null);
            }
        }
        boolean z = false;
        BookieRequestProcessor bookieRequestProcessor = bookieServer.getBookieRequestProcessor();
        while (true) {
            if (countDownLatch.await(1L, TimeUnit.MILLISECONDS)) {
                break;
            }
            int maxAddsInProgressCount = bookieRequestProcessor.maxAddsInProgressCount();
            if (maxAddsInProgressCount > MAX_PENDING) {
                z = true;
                break;
            }
            LOG.info("Waiting until all writes succeeded or maxAddsInProgressCount {} > MAX_PENDING {}", Integer.valueOf(maxAddsInProgressCount), Integer.valueOf(MAX_PENDING));
        }
        Assert.assertTrue("expected to exceed number of pending writes", z);
        for (int i6 = 0; i6 < NUM_OF_LEDGERS; i6++) {
            ledgerHandleArr[i6].close();
        }
    }

    private void doWritesWithBackpressure(int i) throws Exception {
        this.bs.get(i).shutdown();
        BookieServer bookieServer = new BookieServer(this.bsConfs.get(i));
        mockJournal(bookieServer.getBookie(), this.getDelay, this.addDelay, this.flushDelay);
        bookieServer.start();
        this.bs.set(i, bookieServer);
        LOG.info("Creating ledgers");
        LedgerHandle[] ledgerHandleArr = new LedgerHandle[NUM_OF_LEDGERS];
        for (int i2 = 0; i2 < NUM_OF_LEDGERS; i2++) {
            ledgerHandleArr[i2] = this.bkc.createLedger(1, 1, this.digestType, this.ledgerPassword);
            LOG.info("created ledger ID: {}", Long.valueOf(ledgerHandleArr[i2].getId()));
        }
        CountDownLatch countDownLatch = new CountDownLatch(2000);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        LOG.info("submitting writes");
        for (int i3 = 0; i3 < NUM_ENTRIES_TO_WRITE; i3++) {
            for (int i4 = 0; i4 < NUM_OF_LEDGERS; i4++) {
                ledgerHandleArr[i4].asyncAddEntry(this.data, (i5, ledgerHandle, j, obj) -> {
                    atomicInteger.compareAndSet(0, i5);
                    countDownLatch.countDown();
                }, (Object) null);
            }
        }
        LOG.info("test submitted all writes");
        BookieRequestProcessor bookieRequestProcessor = bookieServer.getBookieRequestProcessor();
        while (!countDownLatch.await(1L, TimeUnit.MILLISECONDS)) {
            int maxAddsInProgressCount = bookieRequestProcessor.maxAddsInProgressCount();
            Assert.assertTrue("writes in progress should not exceed limit, got " + maxAddsInProgressCount, maxAddsInProgressCount <= MAX_PENDING);
            LOG.info("Waiting for all writes to succeed, left {} of {}", Long.valueOf(countDownLatch.getCount()), 2000);
        }
        if (atomicInteger.get() != 0) {
            throw BKException.create(atomicInteger.get());
        }
        for (int i6 = 0; i6 < NUM_OF_LEDGERS; i6++) {
            ledgerHandleArr[i6].close();
        }
    }

    public void addComplete(int i, LedgerHandle ledgerHandle, long j, Object obj) {
        SyncObj syncObj = (SyncObj) obj;
        syncObj.setReturnCode(i);
        synchronized (syncObj) {
            syncObj.counter++;
            syncObj.notify();
        }
    }

    public void readComplete(int i, LedgerHandle ledgerHandle, Enumeration<LedgerEntry> enumeration, Object obj) {
        SyncObj syncObj = (SyncObj) obj;
        syncObj.setLedgerEntries(enumeration);
        syncObj.setReturnCode(i);
        synchronized (syncObj) {
            syncObj.value = true;
            syncObj.notify();
        }
    }

    public void readLastConfirmedComplete(int i, long j, Object obj) {
        SyncObj syncObj = (SyncObj) obj;
        syncObj.setReturnCode(i);
        synchronized (syncObj) {
            syncObj.lastConfirmed = j;
            syncObj.notify();
        }
    }
}
