package org.apache.distributedlog;

import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.distributedlog.api.AsyncLogReader;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.api.LogWriter;
import org.apache.distributedlog.api.MetadataAccessor;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.api.subscription.SubscriptionsStore;
import org.apache.distributedlog.bk.LedgerMetadata;
import org.apache.distributedlog.callback.LogSegmentListener;
import org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException;
import org.apache.distributedlog.exceptions.BKTransmitException;
import org.apache.distributedlog.exceptions.DLIllegalStateException;
import org.apache.distributedlog.exceptions.EndOfStreamException;
import org.apache.distributedlog.exceptions.InvalidStreamNameException;
import org.apache.distributedlog.exceptions.LogEmptyException;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.distributedlog.exceptions.LogReadException;
import org.apache.distributedlog.exceptions.LogRecordTooLongException;
import org.apache.distributedlog.exceptions.TransactionIdOutOfOrderException;
import org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.distributedlog.impl.ZKLogSegmentMetadataStore;
import org.apache.distributedlog.io.Abortables;
import org.apache.distributedlog.metadata.LogMetadata;
import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
import org.apache.distributedlog.util.Utils;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.BKException;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.BookKeeper;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedScheduler;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/TestBKDistributedLogManager.class */
public class TestBKDistributedLogManager extends TestDistributedLogBase {
    static final Logger LOG;
    private static final Random RAND;

    @Rule
    public TestName testNames = new TestName();
    private static final long DEFAULT_SEGMENT_SIZE = 1000;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX WARN: Type inference failed for: r0v44, types: [org.apache.distributedlog.BKSyncLogWriter, long] */
    private void testNonPartitionedWritesInternal(String str, DistributedLogConfiguration distributedLogConfiguration) throws Exception {
        long j;
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, str);
        long j2 = 1;
        long j3 = 0;
        while (true) {
            long j4 = j3;
            if (j4 >= 3) {
                break;
            }
            long j5 = j2;
            ?? startLogSegmentNonPartitioned = createNewDLM.startLogSegmentNonPartitioned();
            long j6 = 1;
            while (true) {
                long j7 = j6;
                if (j7 <= 1000) {
                    j2++;
                    startLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(startLogSegmentNonPartitioned));
                    j6 = j7 + 1;
                }
            }
            BKLogSegmentWriter cachedLogWriter = startLogSegmentNonPartitioned.getCachedLogWriter();
            startLogSegmentNonPartitioned.closeAndComplete();
            BKLogWriteHandler createWriteHandler = createNewDLM.createWriteHandler(true);
            j = j2 - 1;
            Assert.assertNotNull(this.zkc.exists(createWriteHandler.completedLedgerZNode(j5, j, cachedLogWriter.getLogSegmentSequenceNumber()), false));
            Utils.ioResult(createWriteHandler.asyncClose());
            j3 = j4 + 1;
        }
        BKSyncLogWriter startLogSegmentNonPartitioned2 = createNewDLM.startLogSegmentNonPartitioned();
        long j8 = 1;
        while (true) {
            long j9 = j8;
            if (j9 > 500) {
                break;
            }
            long j10 = j;
            j = 1;
            j2++;
            startLogSegmentNonPartitioned2.write(DLMTestUtil.getLogRecordInstance(j10));
            j8 = j9 + 1;
        }
        startLogSegmentNonPartitioned2.flush();
        startLogSegmentNonPartitioned2.commit();
        startLogSegmentNonPartitioned2.close();
        LogReader inputStream = createNewDLM.getInputStream(1L);
        long j11 = 0;
        long j12 = -1;
        for (LogRecordWithDLSN readNext = inputStream.readNext(false); null != readNext; readNext = inputStream.readNext(false)) {
            DLMTestUtil.verifyLogRecord(readNext);
            if (!$assertionsDisabled && j12 >= readNext.getTransactionId()) {
                throw new AssertionError();
            }
            j12 = readNext.getTransactionId();
            j11++;
        }
        inputStream.close();
        Assert.assertEquals(j2 - 1, j11);
    }

    @Test(timeout = 60000)
    public void testSimpleWrite() throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, "distrlog-simplewrite");
        BKSyncLogWriter startLogSegmentNonPartitioned = createNewDLM.startLogSegmentNonPartitioned();
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 > 100) {
                BKLogSegmentWriter cachedLogWriter = startLogSegmentNonPartitioned.getCachedLogWriter();
                startLogSegmentNonPartitioned.closeAndComplete();
                BKLogWriteHandler createWriteHandler = createNewDLM.createWriteHandler(true);
                Assert.assertNotNull(this.zkc.exists(createWriteHandler.completedLedgerZNode(1L, 100L, cachedLogWriter.getLogSegmentSequenceNumber()), false));
                Utils.ioResult(createWriteHandler.asyncClose());
                return;
            }
            startLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(j2));
            j = j2 + 1;
        }
    }

    @Test(timeout = 60000)
    public void testNumberOfTransactions() throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, "distrlog-txncount");
        BKSyncLogWriter bKSyncLogWriter = (BKSyncLogWriter) createNewDLM.startLogSegmentNonPartitioned();
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 > 100) {
                bKSyncLogWriter.closeAndComplete();
                Assert.assertEquals(100L, DLMTestUtil.getNumberofLogRecords(createNewDLM(conf, "distrlog-txncount"), 1L));
                createNewDLM.close();
                return;
            }
            bKSyncLogWriter.write(DLMTestUtil.getLogRecordInstance(j2));
            j = j2 + 1;
        }
    }

    @Test(timeout = 60000)
    public void testContinuousReaders() throws Exception {
        long j;
        BKDistributedLogManager createNewDLM = createNewDLM(conf, "distrlog-continuous");
        long j2 = 1;
        long j3 = 0;
        while (true) {
            long j4 = j3;
            if (j4 >= 3) {
                break;
            }
            long j5 = j2;
            BKSyncLogWriter startLogSegmentNonPartitioned = createNewDLM.startLogSegmentNonPartitioned();
            long j6 = 1;
            while (true) {
                long j7 = j6;
                if (j7 <= 1000) {
                    long j8 = j2;
                    j2 = j8 + 1;
                    startLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(j8));
                    j6 = j7 + 1;
                }
            }
            BKLogSegmentWriter cachedLogWriter = startLogSegmentNonPartitioned.getCachedLogWriter();
            startLogSegmentNonPartitioned.closeAndComplete();
            BKLogWriteHandler createWriteHandler = createNewDLM.createWriteHandler(true);
            j = j2 - 1;
            Assert.assertNotNull(this.zkc.exists(createWriteHandler.completedLedgerZNode(j5, j, cachedLogWriter.getLogSegmentSequenceNumber()), false));
            Utils.ioResult(createWriteHandler.asyncClose());
            j3 = j4 + 1;
        }
        BKSyncLogWriter startLogSegmentNonPartitioned2 = createNewDLM.startLogSegmentNonPartitioned();
        long j9 = 1;
        while (true) {
            long j10 = j9;
            if (j10 > 500) {
                break;
            }
            long j11 = j2;
            j2 = j + 1;
            startLogSegmentNonPartitioned2.write(DLMTestUtil.getLogRecordInstance(j11));
            j9 = j10 + 1;
        }
        startLogSegmentNonPartitioned2.flush();
        startLogSegmentNonPartitioned2.commit();
        startLogSegmentNonPartitioned2.close();
        createNewDLM.close();
        BKDistributedLogManager createNewDLM2 = createNewDLM(conf, "distrlog-continuous");
        LogReader inputStream = createNewDLM2.getInputStream(1L);
        long j12 = 0;
        LogRecordWithDLSN readNext = inputStream.readNext(false);
        while (true) {
            LogRecordWithDLSN logRecordWithDLSN = readNext;
            if (null == logRecordWithDLSN) {
                Assert.assertEquals(j2 - 1, j12);
                Assert.assertEquals(j2 - 1, createNewDLM2.getLogRecordCount());
                inputStream.close();
                createNewDLM2.close();
                return;
            }
            DLMTestUtil.verifyLogRecord(logRecordWithDLSN);
            j12++;
            readNext = inputStream.readNext(false);
        }
    }

    @Test(timeout = 60000)
    public void testWriteRestartFrom1() throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, "distrlog-restartFrom1");
        long j = 1;
        BKSyncLogWriter bKSyncLogWriter = (BKSyncLogWriter) createNewDLM.startLogSegmentNonPartitioned();
        long j2 = 1;
        while (true) {
            long j3 = j2;
            if (j3 > 1000) {
                break;
            }
            long j4 = j;
            j = j4 + 1;
            bKSyncLogWriter.write(DLMTestUtil.getLogRecordInstance(j4));
            j2 = j3 + 1;
        }
        bKSyncLogWriter.closeAndComplete();
        try {
            try {
                bKSyncLogWriter = (BKSyncLogWriter) createNewDLM.startLogSegmentNonPartitioned();
                bKSyncLogWriter.write(DLMTestUtil.getLogRecordInstance(1L));
                Assert.fail("Shouldn't be able to start another journal from 1 when one already exists");
                bKSyncLogWriter.close();
            } catch (Exception e) {
                LOG.info("Caught exception as expected", e);
                bKSyncLogWriter.close();
            }
            try {
                try {
                    bKSyncLogWriter = (BKSyncLogWriter) createNewDLM.startLogSegmentNonPartitioned();
                    bKSyncLogWriter.write(DLMTestUtil.getLogRecordInstance(999L));
                    Assert.fail("Shouldn't be able to start another journal from 999 when one already exists");
                    bKSyncLogWriter.close();
                } catch (Throwable th) {
                    bKSyncLogWriter.close();
                    throw th;
                }
            } catch (TransactionIdOutOfOrderException e2) {
                LOG.info("Caught exception as expected", e2);
                bKSyncLogWriter.close();
            }
            long j5 = 1001;
            bKSyncLogWriter = (BKSyncLogWriter) createNewDLM.startLogSegmentNonPartitioned();
            Assert.assertNotNull(bKSyncLogWriter);
            long j6 = 1;
            while (true) {
                long j7 = j6;
                if (j7 > 1000) {
                    bKSyncLogWriter.closeAndComplete();
                    BKSyncLogWriter bKSyncLogWriter2 = (BKSyncLogWriter) createNewDLM.startLogSegmentNonPartitioned();
                    bKSyncLogWriter2.write(DLMTestUtil.getLogRecordInstance(4000L));
                    bKSyncLogWriter2.close();
                    createNewDLM.close();
                    return;
                }
                long j8 = j5;
                j5 = j8 + 1;
                bKSyncLogWriter.write(DLMTestUtil.getLogRecordInstance(j8));
                j6 = j7 + 1;
            }
        } catch (Throwable th2) {
            bKSyncLogWriter.close();
            throw th2;
        }
    }

    @Test(timeout = 60000)
    public void testTwoWritersOnLockDisabled() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setWriteLockEnabled(false);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-two-writers-lock-disabled");
        AsyncLogWriter asyncLogWriter = (AsyncLogWriter) Utils.ioResult(createNewDLM.openAsyncLogWriter());
        Utils.ioResult(asyncLogWriter.write(DLMTestUtil.getLogRecordInstance(1L)));
        Utils.ioResult(((AsyncLogWriter) Utils.ioResult(createNewDLM.openAsyncLogWriter())).write(DLMTestUtil.getLogRecordInstance(2L)));
        try {
            Utils.ioResult(asyncLogWriter.write(DLMTestUtil.getLogRecordInstance(3L)));
            Assert.fail("Should fail writing record to writer 1 again as writer 2 took over the ownership");
        } catch (BKTransmitException e) {
            Assert.assertEquals(-101L, e.getBKResultCode());
        }
    }

    @Test(timeout = 60000)
    public void testSimpleRead() throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, "distrlog-simpleread");
        BKSyncLogWriter bKSyncLogWriter = (BKSyncLogWriter) createNewDLM.startLogSegmentNonPartitioned();
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 > 10000) {
                bKSyncLogWriter.closeAndComplete();
                Assert.assertEquals(10000L, DLMTestUtil.getNumberofLogRecords(createNewDLM(conf, "distrlog-simpleread"), 1L));
                createNewDLM.close();
                return;
            }
            bKSyncLogWriter.write(DLMTestUtil.getLogRecordInstance(j2));
            j = j2 + 1;
        }
    }

    @Test(timeout = 60000)
    public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
        long j;
        BKDistributedLogManager createNewDLM = createNewDLM(conf, "distrlog-inprogressAtEnd");
        long j2 = 1;
        long j3 = 0;
        while (true) {
            long j4 = j3;
            if (j4 >= 3) {
                break;
            }
            long j5 = j2;
            BKSyncLogWriter bKSyncLogWriter = (BKSyncLogWriter) createNewDLM.startLogSegmentNonPartitioned();
            long j6 = 1;
            while (true) {
                long j7 = j6;
                if (j7 <= 1000) {
                    long j8 = j2;
                    j2 = j8 + 1;
                    bKSyncLogWriter.write(DLMTestUtil.getLogRecordInstance(j8));
                    j6 = j7 + 1;
                }
            }
            BKLogSegmentWriter cachedLogWriter = bKSyncLogWriter.getCachedLogWriter();
            bKSyncLogWriter.closeAndComplete();
            BKLogWriteHandler createWriteHandler = createNewDLM.createWriteHandler(true);
            j = j2 - 1;
            Assert.assertNotNull(this.zkc.exists(createWriteHandler.completedLedgerZNode(j5, j, cachedLogWriter.getLogSegmentSequenceNumber()), false));
            Utils.ioResult(createWriteHandler.asyncClose());
            j3 = j4 + 1;
        }
        BKSyncLogWriter bKSyncLogWriter2 = (BKSyncLogWriter) createNewDLM.startLogSegmentNonPartitioned();
        long j9 = 1;
        while (true) {
            long j10 = j9;
            if (j10 > 500) {
                bKSyncLogWriter2.flush();
                bKSyncLogWriter2.commit();
                bKSyncLogWriter2.close();
                Assert.assertEquals(j2 - 1, DLMTestUtil.getNumberofLogRecords(createNewDLM(conf, "distrlog-inprogressAtEnd"), 1L));
                return;
            }
            long j11 = j2;
            j2 = j + 1;
            bKSyncLogWriter2.write(DLMTestUtil.getLogRecordInstance(j11));
            j9 = j10 + 1;
        }
    }

    @Test(timeout = 60000)
    public void testContinuousReaderBulk() throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, "distrlog-continuous-bulk");
        long j = 1;
        long j2 = 0;
        while (true) {
            long j3 = j2;
            if (j3 >= 3) {
                break;
            }
            BKSyncLogWriter bKSyncLogWriter = (BKSyncLogWriter) createNewDLM.startLogSegmentNonPartitioned();
            long j4 = 1;
            while (true) {
                long j5 = j4;
                if (j5 <= 1000) {
                    long j6 = j;
                    j = j6 + 1;
                    bKSyncLogWriter.write(DLMTestUtil.getLogRecordInstance(j6));
                    j4 = j5 + 1;
                }
            }
            bKSyncLogWriter.closeAndComplete();
            j2 = j3 + 1;
        }
        BKSyncLogWriter bKSyncLogWriter2 = (BKSyncLogWriter) createNewDLM.startLogSegmentNonPartitioned();
        long j7 = 1;
        while (true) {
            long j8 = j7;
            if (j8 > 500) {
                break;
            }
            long j9 = j;
            j = j9 + 1;
            bKSyncLogWriter2.write(DLMTestUtil.getLogRecordInstance(j9));
            j7 = j8 + 1;
        }
        bKSyncLogWriter2.flush();
        bKSyncLogWriter2.commit();
        bKSyncLogWriter2.close();
        createNewDLM.close();
        LogReader inputStream = createNewDLM(conf, "distrlog-continuous-bulk").getInputStream(1L);
        long j10 = 0;
        long j11 = -1;
        for (List<LogRecordWithDLSN> readBulk = inputStream.readBulk(false, 13); !readBulk.isEmpty(); readBulk = inputStream.readBulk(false, 13)) {
            for (LogRecordWithDLSN logRecordWithDLSN : readBulk) {
                if (!$assertionsDisabled && j11 >= logRecordWithDLSN.getTransactionId()) {
                    throw new AssertionError();
                }
                j11 = logRecordWithDLSN.getTransactionId();
                DLMTestUtil.verifyLogRecord(logRecordWithDLSN);
                j10++;
            }
        }
        inputStream.close();
        Assert.assertEquals(j - 1, j10);
    }

    @Test(timeout = 60000)
    public void testContinuousReadersWithEmptyLedgers() throws Exception {
        long j;
        BKDistributedLogManager createNewDLM = createNewDLM(conf, "distrlog-continuous-emptyledgers");
        long j2 = 1;
        long j3 = 0;
        while (true) {
            long j4 = j3;
            if (j4 >= 3) {
                break;
            }
            long j5 = j2;
            BKSyncLogWriter bKSyncLogWriter = (BKSyncLogWriter) createNewDLM.startLogSegmentNonPartitioned();
            long j6 = 1;
            while (true) {
                long j7 = j6;
                if (j7 <= 1000) {
                    long j8 = j2;
                    j2 = j8 + 1;
                    bKSyncLogWriter.write(DLMTestUtil.getLogRecordInstance(j8));
                    j6 = j7 + 1;
                }
            }
            BKLogSegmentWriter cachedLogWriter = bKSyncLogWriter.getCachedLogWriter();
            bKSyncLogWriter.closeAndComplete();
            BKLogWriteHandler createWriteHandler = createNewDLM.createWriteHandler(true);
            Assert.assertNotNull(this.zkc.exists(createWriteHandler.completedLedgerZNode(j5, j2 - 1, cachedLogWriter.getLogSegmentSequenceNumber()), false));
            BKLogSegmentWriter startLogSegment = createWriteHandler.startLogSegment(j2 - 1);
            createWriteHandler.completeAndCloseLogSegment(startLogSegment.getLogSegmentSequenceNumber(), startLogSegment.getLogSegmentId(), j2 - 1, j2 - 1, 0);
            j = j2 - 1;
            Assert.assertNotNull(this.zkc.exists(createWriteHandler.completedLedgerZNode(j2 - 1, j, startLogSegment.getLogSegmentSequenceNumber()), false));
            Utils.ioResult(createWriteHandler.asyncClose());
            j3 = j4 + 1;
        }
        BKSyncLogWriter bKSyncLogWriter2 = (BKSyncLogWriter) createNewDLM.startLogSegmentNonPartitioned();
        long j9 = 1;
        while (true) {
            long j10 = j9;
            if (j10 > 500) {
                break;
            }
            long j11 = j2;
            j2 = j + 1;
            bKSyncLogWriter2.write(DLMTestUtil.getLogRecordInstance(j11));
            j9 = j10 + 1;
        }
        bKSyncLogWriter2.flush();
        bKSyncLogWriter2.commit();
        bKSyncLogWriter2.close();
        createNewDLM.close();
        BKDistributedLogManager createNewDLM2 = createNewDLM(conf, "distrlog-continuous-emptyledgers");
        AsyncLogReader asyncLogReader = createNewDLM2.getAsyncLogReader(DLSN.InvalidDLSN);
        long j12 = 0;
        Object ioResult = Utils.ioResult(asyncLogReader.readNext());
        while (true) {
            LogRecordWithDLSN logRecordWithDLSN = (LogRecordWithDLSN) ioResult;
            if (null == logRecordWithDLSN) {
                break;
            }
            DLMTestUtil.verifyLogRecord(logRecordWithDLSN);
            j12++;
            if (j12 >= j2 - 1) {
                break;
            } else {
                ioResult = Utils.ioResult(asyncLogReader.readNext());
            }
        }
        Assert.assertEquals(j2 - 1, j12);
        Utils.close(asyncLogReader);
        LogReader inputStream = createNewDLM2.getInputStream(1L);
        long j13 = 0;
        LogRecordWithDLSN readNext = inputStream.readNext(false);
        while (true) {
            LogRecordWithDLSN logRecordWithDLSN2 = readNext;
            if (null == logRecordWithDLSN2) {
                Assert.assertEquals(j2 - 1, j13);
                inputStream.close();
                Assert.assertEquals(j2 - 1, createNewDLM2.getLogRecordCount());
                createNewDLM2.close();
                return;
            }
            DLMTestUtil.verifyLogRecord(logRecordWithDLSN2);
            j13++;
            readNext = inputStream.readNext(false);
        }
    }

    @Test(timeout = 60000)
    public void testNonPartitionedWrites() throws Exception {
        testNonPartitionedWritesInternal("distrlog-non-partitioned-bulk", conf);
    }

    /* JADX WARN: Type inference failed for: r0v5, types: [long, org.apache.distributedlog.api.LogWriter] */
    @Test(timeout = 60000)
    public void testCheckLogExists() throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, "distrlog-check-log-exists");
        long j = 1;
        ?? startLogSegmentNonPartitioned = createNewDLM.startLogSegmentNonPartitioned();
        long j2 = 1;
        while (true) {
            long j3 = j2;
            if (j3 > 500) {
                break;
            }
            j++;
            startLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(startLogSegmentNonPartitioned));
            j2 = j3 + 1;
        }
        startLogSegmentNonPartitioned.flush();
        startLogSegmentNonPartitioned.commit();
        startLogSegmentNonPartitioned.close();
        createNewDLM.close();
        Namespace build = NamespaceBuilder.newBuilder().conf(conf).uri(createDLMURI("/distrlog-check-log-exists")).build();
        Assert.assertTrue(build.logExists("distrlog-check-log-exists"));
        Assert.assertFalse(build.logExists("non-existent-log"));
        Assert.assertFalse(NamespaceBuilder.newBuilder().conf(conf).uri(createDLMURI("/non-existent-ns")).build().logExists("distrlog-check-log-exists"));
        int i = 0;
        Iterator<String> logs = build.getLogs();
        while (logs.hasNext()) {
            i++;
            Assert.assertEquals("distrlog-check-log-exists", logs.next());
        }
        Assert.assertEquals(1L, i);
        build.close();
    }

    @Test(timeout = 60000)
    public void testMetadataAccessor() throws Exception {
        MetadataAccessor createNewMetadataAccessor = DLMTestUtil.createNewMetadataAccessor(conf, "distrlog-metadata-accessor", createDLMURI("/distrlog-metadata-accessor"));
        Assert.assertEquals("distrlog-metadata-accessor", createNewMetadataAccessor.getStreamName());
        createNewMetadataAccessor.createOrUpdateMetadata("distrlog-metadata-accessor".getBytes());
        Assert.assertEquals("distrlog-metadata-accessor", new String(createNewMetadataAccessor.getMetadata()));
        createNewMetadataAccessor.deleteMetadata();
        Assert.assertEquals((Object) null, createNewMetadataAccessor.getMetadata());
    }

    @Test(timeout = 60000)
    public void testSubscriptionsStore() throws Exception {
        DLSN dlsn = new DLSN(4L, 33L, 5L);
        DLSN dlsn2 = new DLSN(4L, 34L, 5L);
        DLSN dlsn3 = new DLSN(5L, 34L, 5L);
        DLSN dlsn4 = new DLSN(6L, 35L, 6L);
        BKDistributedLogManager createNewDLM = createNewDLM(conf, "distrlog-subscriptions-store");
        SubscriptionsStore subscriptionsStore = createNewDLM.getSubscriptionsStore();
        Assert.assertEquals(Utils.ioResult(subscriptionsStore.getLastCommitPosition("subscriber-0")), DLSN.NonInclusiveLowerBound);
        Assert.assertEquals(Utils.ioResult(subscriptionsStore.getLastCommitPosition("subscriber-1")), DLSN.NonInclusiveLowerBound);
        Assert.assertEquals(Utils.ioResult(subscriptionsStore.getLastCommitPosition("subscriber-2")), DLSN.NonInclusiveLowerBound);
        Assert.assertTrue(((Map) Utils.ioResult(subscriptionsStore.getLastCommitPositions())).isEmpty());
        Utils.ioResult(subscriptionsStore.advanceCommitPosition("subscriber-0", dlsn));
        Assert.assertEquals(dlsn, Utils.ioResult(subscriptionsStore.getLastCommitPosition("subscriber-0")));
        Map map = (Map) Utils.ioResult(subscriptionsStore.getLastCommitPositions());
        Assert.assertEquals(1L, map.size());
        Assert.assertEquals(dlsn, map.get("subscriber-0"));
        Utils.ioResult(subscriptionsStore.advanceCommitPosition("subscriber-1", dlsn2));
        Assert.assertEquals(dlsn2, Utils.ioResult(subscriptionsStore.getLastCommitPosition("subscriber-1")));
        Map map2 = (Map) Utils.ioResult(subscriptionsStore.getLastCommitPositions());
        Assert.assertEquals(2L, map2.size());
        Assert.assertEquals(dlsn, map2.get("subscriber-0"));
        Assert.assertEquals(dlsn2, map2.get("subscriber-1"));
        Utils.ioResult(subscriptionsStore.advanceCommitPosition("subscriber-2", dlsn3));
        Assert.assertEquals(dlsn3, Utils.ioResult(subscriptionsStore.getLastCommitPosition("subscriber-2")));
        Map map3 = (Map) Utils.ioResult(subscriptionsStore.getLastCommitPositions());
        Assert.assertEquals(3L, map3.size());
        Assert.assertEquals(dlsn, map3.get("subscriber-0"));
        Assert.assertEquals(dlsn2, map3.get("subscriber-1"));
        Assert.assertEquals(dlsn3, map3.get("subscriber-2"));
        BKDistributedLogManager createNewDLM2 = createNewDLM(conf, "distrlog-subscriptions-store");
        SubscriptionsStore subscriptionsStore2 = createNewDLM2.getSubscriptionsStore();
        Utils.ioResult(subscriptionsStore2.advanceCommitPosition("subscriber-2", dlsn4));
        subscriptionsStore2.close();
        createNewDLM2.close();
        Map map4 = (Map) Utils.ioResult(subscriptionsStore.getLastCommitPositions());
        Assert.assertEquals(3L, map4.size());
        Assert.assertEquals(dlsn, map4.get("subscriber-0"));
        Assert.assertEquals(dlsn2, map4.get("subscriber-1"));
        Assert.assertEquals(dlsn4, map4.get("subscriber-2"));
        createNewDLM.close();
    }

    /* JADX WARN: Type inference failed for: r0v8, types: [org.apache.distributedlog.BKSyncLogWriter, long] */
    private long writeAndMarkEndOfStream(DistributedLogManager distributedLogManager, long j) throws Exception {
        long j2 = 0;
        while (true) {
            long j3 = j2;
            if (j3 >= 3) {
                return j;
            }
            long j4 = j;
            ?? r0 = (BKSyncLogWriter) distributedLogManager.startLogSegmentNonPartitioned();
            long j5 = 1;
            while (true) {
                long j6 = j5;
                if (j6 > 1000) {
                    break;
                }
                j++;
                r0.write(DLMTestUtil.getLogRecordInstance(r0));
                j5 = j6 + 1;
            }
            BKLogSegmentWriter cachedLogWriter = r0.getCachedLogWriter();
            if (j3 < 2) {
                r0.closeAndComplete();
                BKLogWriteHandler createWriteHandler = ((BKDistributedLogManager) distributedLogManager).createWriteHandler(true);
                Assert.assertNotNull(this.zkc.exists(createWriteHandler.completedLedgerZNode(j4, j - 1, cachedLogWriter.getLogSegmentSequenceNumber()), false));
                Utils.ioResult(createWriteHandler.asyncClose());
            } else {
                r0.markEndOfStream();
                BKLogWriteHandler createWriteHandler2 = ((BKDistributedLogManager) distributedLogManager).createWriteHandler(true);
                Assert.assertNotNull(this.zkc.exists(createWriteHandler2.completedLedgerZNode(j4, DistributedLogConstants.MAX_TXID, cachedLogWriter.getLogSegmentSequenceNumber()), false));
                Utils.ioResult(createWriteHandler2.asyncClose());
            }
            j2 = j3 + 1;
        }
    }

    @Test(timeout = 60000)
    public void testMarkEndOfStream() throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, "distrlog-mark-end-of-stream");
        long writeAndMarkEndOfStream = writeAndMarkEndOfStream(createNewDLM, 1L);
        LogReader inputStream = createNewDLM.getInputStream(1L);
        long j = 0;
        boolean z = false;
        LogRecordWithDLSN logRecordWithDLSN = null;
        try {
            logRecordWithDLSN = inputStream.readNext(false);
            long j2 = 1;
            while (null != logRecordWithDLSN) {
                DLMTestUtil.verifyLogRecord(logRecordWithDLSN);
                Assert.assertEquals(j2, logRecordWithDLSN.getTransactionId());
                j2++;
                j++;
                logRecordWithDLSN = inputStream.readNext(false);
            }
        } catch (EndOfStreamException e) {
            LOG.info("Encountered EndOfStream on reading records after {}", logRecordWithDLSN);
            z = true;
        }
        Assert.assertEquals(writeAndMarkEndOfStream - 1, j);
        Assert.assertTrue(z);
        boolean z2 = false;
        try {
            inputStream.readNext(false);
        } catch (EndOfStreamException e2) {
            z2 = true;
        }
        Assert.assertTrue(z2);
        inputStream.close();
    }

    /* JADX WARN: Type inference failed for: r14v0 */
    /* JADX WARN: Type inference failed for: r14v1 */
    /* JADX WARN: Type inference failed for: r14v3, types: [long, org.apache.distributedlog.api.LogWriter] */
    @Test(timeout = 60000)
    public void testWriteFailsAfterMarkEndOfStream() throws Exception {
        LogWriter logWriter;
        BKDistributedLogManager createNewDLM = createNewDLM(conf, "distrlog-mark-end-failure");
        long writeAndMarkEndOfStream = writeAndMarkEndOfStream(createNewDLM, 1L);
        Assert.assertEquals(writeAndMarkEndOfStream - 1, createNewDLM.getLastTxId());
        LogRecordWithDLSN lastLogRecord = createNewDLM.getLastLogRecord();
        Assert.assertEquals(writeAndMarkEndOfStream - 1, lastLogRecord.getTransactionId());
        DLMTestUtil.verifyLogRecord(lastLogRecord);
        Assert.assertTrue(createNewDLM.isEndOfStreamMarked());
        ?? r14 = 0;
        boolean z = false;
        try {
            r14 = createNewDLM.startLogSegmentNonPartitioned();
            for (long j = 1; j <= 500; j++) {
                writeAndMarkEndOfStream++;
                r14.write(DLMTestUtil.getLogRecordInstance(r14));
            }
            logWriter = r14;
        } catch (EndOfStreamException e) {
            z = true;
            logWriter = r14;
        }
        logWriter.close();
        Assert.assertTrue(z);
    }

    @Test(timeout = 60000)
    public void testMarkEndOfStreamOnEmptyStream() throws Exception {
        markEndOfStreamOnEmptyLogSegment(0);
    }

    @Test(timeout = 60000)
    public void testMarkEndOfStreamOnClosedStream() throws Exception {
        markEndOfStreamOnEmptyLogSegment(3);
    }

    private void markEndOfStreamOnEmptyLogSegment(int i) throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, "distrlog-mark-end-empty-" + i);
        DLMTestUtil.generateCompletedLogSegments(createNewDLM, conf, i, 1000L);
        ((BKSyncLogWriter) createNewDLM.startLogSegmentNonPartitioned()).markEndOfStream();
        LogReader inputStream = createNewDLM.getInputStream(1L);
        long j = 0;
        boolean z = false;
        try {
            long j2 = -1;
            for (LogRecordWithDLSN readNext = inputStream.readNext(false); null != readNext; readNext = inputStream.readNext(false)) {
                DLMTestUtil.verifyLogRecord(readNext);
                if (!$assertionsDisabled && j2 >= readNext.getTransactionId()) {
                    throw new AssertionError();
                }
                j2 = readNext.getTransactionId();
                j++;
            }
        } catch (EndOfStreamException e) {
            z = true;
        }
        Assert.assertEquals(i * 1000, j);
        Assert.assertTrue(z);
        boolean z2 = false;
        try {
            inputStream.readNext(false);
        } catch (EndOfStreamException e2) {
            z2 = true;
        }
        Assert.assertTrue(z2);
        inputStream.close();
    }

    @Test(timeout = 60000, expected = LogRecordTooLongException.class)
    public void testMaxLogRecSize() throws Exception {
        Utils.ioResult(((AsyncLogWriter) Utils.ioResult(createNewDLM(conf, "distrlog-maxlogRecSize").openAsyncLogWriter())).write(new LogRecord(1L, DLMTestUtil.repeatString(DLMTestUtil.repeatString("abcdefgh", 256), 512).getBytes())));
    }

    @Test(timeout = 60000)
    public void testMaxTransmissionSize() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(conf);
        distributedLogConfiguration.setOutputBufferSize(1048576);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-transmissionSize");
        AsyncLogWriter asyncLogWriter = (AsyncLogWriter) Utils.ioResult(createNewDLM.openAsyncLogWriter());
        boolean z = false;
        byte[] bArr = new byte[522242];
        RAND.nextBytes(bArr);
        try {
            CompletableFuture<DLSN> write = asyncLogWriter.write(new LogRecord(1L, bArr));
            asyncLogWriter.write(new LogRecord(2L, bArr));
            Utils.ioResult(write);
            Utils.ioResult(asyncLogWriter.asyncClose());
        } catch (LogRecordTooLongException e) {
            z = true;
            Utils.ioResult(asyncLogWriter.asyncClose());
        } catch (Throwable th) {
            Utils.ioResult(asyncLogWriter.asyncClose());
            throw th;
        }
        Assert.assertFalse(z);
        Abortables.abortQuietly(asyncLogWriter);
        createNewDLM.close();
    }

    /* JADX WARN: Type inference failed for: r0v39, types: [org.apache.distributedlog.BKSyncLogWriter, long] */
    @Test(timeout = 60000)
    public void deleteDuringRead() throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, "distrlog-delete-with-reader");
        long j = 1;
        long j2 = 0;
        while (true) {
            long j3 = j2;
            if (j3 >= 3) {
                break;
            }
            long j4 = j;
            ?? r0 = (BKSyncLogWriter) createNewDLM.startLogSegmentNonPartitioned();
            long j5 = 1;
            while (true) {
                long j6 = j5;
                if (j6 <= 1000) {
                    j++;
                    r0.write(DLMTestUtil.getLogRecordInstance(r0));
                    j5 = j6 + 1;
                }
            }
            BKLogSegmentWriter cachedLogWriter = r0.getCachedLogWriter();
            r0.closeAndComplete();
            BKLogWriteHandler createWriteHandler = createNewDLM.createWriteHandler(true);
            Assert.assertNotNull(this.zkc.exists(createWriteHandler.completedLedgerZNode(j4, j - 1, cachedLogWriter.getLogSegmentSequenceNumber()), false));
            Utils.ioResult(createWriteHandler.asyncClose());
            j2 = j3 + 1;
        }
        LogReader inputStream = createNewDLM.getInputStream(1L);
        LogRecordWithDLSN readNext = inputStream.readNext(false);
        if (!$assertionsDisabled && null == readNext) {
            throw new AssertionError();
        }
        DLMTestUtil.verifyLogRecord(readNext);
        long transactionId = readNext.getTransactionId();
        createNewDLM.delete();
        try {
            for (LogRecordWithDLSN readNext2 = inputStream.readNext(false); null != readNext2; readNext2 = inputStream.readNext(false)) {
                DLMTestUtil.verifyLogRecord(readNext2);
                if (!$assertionsDisabled && transactionId >= readNext2.getTransactionId()) {
                    throw new AssertionError();
                }
                transactionId = readNext2.getTransactionId();
            }
            while (true) {
                inputStream.readNext(false);
            }
        } catch (DLIllegalStateException | LogNotFoundException | LogReadException e) {
            Assert.assertTrue(true);
            inputStream.close();
        }
    }

    @Test(timeout = 60000)
    public void testImmediateFlush() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(conf);
        distributedLogConfiguration.setOutputBufferSize(0);
        testNonPartitionedWritesInternal("distrlog-immediate-flush", distributedLogConfiguration);
    }

    @Test(timeout = 60000)
    public void testLastLogRecordWithEmptyLedgers() throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, "distrlog-lastLogRec-emptyledgers");
        long j = 1;
        long j2 = 0;
        while (true) {
            long j3 = j2;
            if (j3 >= 3) {
                BKSyncLogWriter bKSyncLogWriter = (BKSyncLogWriter) createNewDLM.startLogSegmentNonPartitioned();
                LogRecord logRecordInstance = DLMTestUtil.getLogRecordInstance(j);
                logRecordInstance.setControl();
                bKSyncLogWriter.write(logRecordInstance);
                bKSyncLogWriter.flush();
                bKSyncLogWriter.commit();
                bKSyncLogWriter.abort();
                createNewDLM.close();
                BKDistributedLogManager createNewDLM2 = createNewDLM(conf, "distrlog-lastLogRec-emptyledgers");
                Assert.assertEquals(j - 1, createNewDLM2.getLastTxId());
                LogRecordWithDLSN lastLogRecord = createNewDLM2.getLastLogRecord();
                Assert.assertEquals(j - 1, lastLogRecord.getTransactionId());
                DLMTestUtil.verifyLogRecord(lastLogRecord);
                Assert.assertEquals(j - 1, createNewDLM2.getLogRecordCount());
                createNewDLM2.close();
                return;
            }
            long j4 = j;
            BKSyncLogWriter bKSyncLogWriter2 = (BKSyncLogWriter) createNewDLM.startLogSegmentNonPartitioned();
            long j5 = 1;
            while (true) {
                long j6 = j5;
                if (j6 <= 1000) {
                    long j7 = j;
                    j = j7 + 1;
                    bKSyncLogWriter2.write(DLMTestUtil.getLogRecordInstance(j7));
                    j5 = j6 + 1;
                }
            }
            BKLogSegmentWriter cachedLogWriter = bKSyncLogWriter2.getCachedLogWriter();
            bKSyncLogWriter2.closeAndComplete();
            BKLogWriteHandler createWriteHandler = createNewDLM.createWriteHandler(true);
            Assert.assertNotNull(this.zkc.exists(createWriteHandler.completedLedgerZNode(j4, j - 1, cachedLogWriter.getLogSegmentSequenceNumber()), false));
            BKLogSegmentWriter startLogSegment = createWriteHandler.startLogSegment(j - 1);
            createWriteHandler.completeAndCloseLogSegment(startLogSegment.getLogSegmentSequenceNumber(), startLogSegment.getLogSegmentId(), j - 1, j - 1, 0);
            Assert.assertNotNull(this.zkc.exists(createWriteHandler.completedLedgerZNode(j - 1, j - 1, startLogSegment.getLogSegmentSequenceNumber()), false));
            Utils.ioResult(createWriteHandler.asyncClose());
            j2 = j3 + 1;
        }
    }

    /* JADX WARN: Type inference failed for: r0v8, types: [long, java.util.concurrent.atomic.AtomicReference] */
    @Test(timeout = 60000)
    public void testLogSegmentListener() throws Exception {
        final CountDownLatch[] countDownLatchArr = new CountDownLatch[3 + 1];
        for (int i = 0; i < 3 + 1; i++) {
            countDownLatchArr[i] = new CountDownLatch(1);
        }
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final ?? atomicReference = new AtomicReference();
        BKDistributedLogManager createNewDLM = createNewDLM(conf, "distrlog-logsegment-listener");
        Utils.ioResult(createNewDLM.getWriterMetadataStore().getLog(createNewDLM.getUri(), "distrlog-logsegment-listener", true, true));
        createNewDLM.registerListener(new LogSegmentListener() { // from class: org.apache.distributedlog.TestBKDistributedLogManager.1
            @Override // org.apache.distributedlog.callback.LogSegmentListener
            public void onSegmentsUpdated(List<LogSegmentMetadata> list) {
                int size = list.size();
                boolean z = false;
                Iterator<LogSegmentMetadata> it = list.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    } else if (it.next().isInProgress()) {
                        z = true;
                        break;
                    }
                }
                if (z) {
                    return;
                }
                if (size >= 1 && list.get(list.size() - 1).getLogSegmentSequenceNumber() != size) {
                    atomicInteger.incrementAndGet();
                }
                atomicReference.set(list);
                countDownLatchArr[size].countDown();
            }

            @Override // org.apache.distributedlog.callback.LogSegmentListener
            public void onLogStreamDeleted() {
            }
        });
        long j = 1;
        for (int i2 = 0; i2 < 3; i2++) {
            LOG.info("Waiting for creating log segment {}.", Integer.valueOf(i2));
            countDownLatchArr[i2].await();
            LOG.info("Creating log segment {}.", Integer.valueOf(i2));
            BKSyncLogWriter startLogSegmentNonPartitioned = createNewDLM.startLogSegmentNonPartitioned();
            LOG.info("Created log segment {}.", Integer.valueOf(i2));
            long j2 = 1;
            while (true) {
                long j3 = j2;
                if (j3 <= 1000) {
                    long j4 = j;
                    j = atomicReference + 1;
                    startLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(j4));
                    j2 = j3 + 1;
                }
            }
            startLogSegmentNonPartitioned.closeAndComplete();
            LOG.info("Completed log segment {}.", Integer.valueOf(i2));
        }
        countDownLatchArr[3].await();
        Assert.assertEquals(0L, atomicInteger.get());
        Assert.assertNotNull(atomicReference.get());
        Assert.assertEquals(3, ((Collection) atomicReference.get()).size());
        int i3 = 1;
        for (LogSegmentMetadata logSegmentMetadata : (Collection) atomicReference.get()) {
            Assert.assertEquals(i3, logSegmentMetadata.getLogSegmentSequenceNumber());
            Assert.assertEquals(((i3 - 1) * 1000) + 1, logSegmentMetadata.getFirstTxId());
            Assert.assertEquals(i3 * 1000, logSegmentMetadata.getLastTxId());
            i3++;
        }
        createNewDLM.close();
    }

    /* JADX WARN: Type inference failed for: r0v25, types: [org.apache.distributedlog.BKAsyncLogWriter, long] */
    @Test(timeout = 60000)
    public void testGetLastDLSN() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(conf);
        distributedLogConfiguration.setFirstNumEntriesPerReadLastRecordScan(2);
        distributedLogConfiguration.setMaxNumEntriesPerReadLastRecordScan(4);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        distributedLogConfiguration.setOutputBufferSize(0);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-get-last-dlsn");
        BKAsyncLogWriter bKAsyncLogWriter = (BKAsyncLogWriter) createNewDLM.startAsyncLogSegmentNonPartitioned();
        long j = 1;
        LOG.info("Writing 10 control records");
        for (int i = 0; i < 10; i++) {
            long j2 = j;
            j = j2 + 1;
            LogRecord logRecordInstance = DLMTestUtil.getLogRecordInstance(j2);
            logRecordInstance.setControl();
            Utils.ioResult(bKAsyncLogWriter.writeControlRecord(logRecordInstance));
        }
        LOG.info("10 control records are written");
        try {
            createNewDLM.getLastDLSN();
            Assert.fail("Should fail on getting last dlsn from an empty log.");
        } catch (LogEmptyException e) {
        }
        bKAsyncLogWriter.closeAndComplete();
        LOG.info("Completed first log segment");
        ?? r0 = (BKAsyncLogWriter) createNewDLM.startAsyncLogSegmentNonPartitioned();
        long j3 = j;
        long j4 = r0 + 1;
        Utils.ioResult(r0.write(DLMTestUtil.getLogRecordInstance(j3)));
        LOG.info("Completed second log segment");
        LOG.info("Writing another 10 control records");
        for (int i2 = 1; i2 < 10; i2++) {
            long j5 = j4;
            j4 = j5 + 1;
            LogRecord logRecordInstance2 = DLMTestUtil.getLogRecordInstance(j5);
            logRecordInstance2.setControl();
            Utils.ioResult(r0.write(logRecordInstance2));
        }
        Assert.assertEquals(new DLSN(2L, 0L, 0L), createNewDLM.getLastDLSN());
        r0.closeAndComplete();
        LOG.info("Completed third log segment");
        Assert.assertEquals(new DLSN(2L, 0L, 0L), createNewDLM.getLastDLSN());
        r0.close();
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testGetLogRecordCountAsync() throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, this.testNames.getMethodName());
        BKAsyncLogWriter bKAsyncLogWriter = (BKAsyncLogWriter) createNewDLM.startAsyncLogSegmentNonPartitioned();
        DLMTestUtil.generateCompletedLogSegments(createNewDLM, conf, 2L, 10L);
        Assert.assertEquals(20L, ((Long) Utils.ioResult(createNewDLM.getLogRecordCountAsync(DLSN.InitialDLSN), 2L, TimeUnit.SECONDS)).longValue());
        bKAsyncLogWriter.close();
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testInvalidStreamFromInvalidZkPath() throws Exception {
        Namespace build = NamespaceBuilder.newBuilder().conf(conf).uri(createDLMURI("/" + this.testNames.getMethodName())).build();
        DistributedLogManager distributedLogManager = null;
        AsyncLogWriter asyncLogWriter = null;
        try {
            distributedLogManager = build.openLog("��blah");
            asyncLogWriter = distributedLogManager.startAsyncLogSegmentNonPartitioned();
            Assert.fail("should have thrown");
            if (null != asyncLogWriter) {
                Utils.close(asyncLogWriter);
            }
            if (null != distributedLogManager) {
                distributedLogManager.close();
            }
            build.close();
        } catch (InvalidStreamNameException e) {
            if (null != asyncLogWriter) {
                Utils.close(asyncLogWriter);
            }
            if (null != distributedLogManager) {
                distributedLogManager.close();
            }
            build.close();
        } catch (Throwable th) {
            if (null != asyncLogWriter) {
                Utils.close(asyncLogWriter);
            }
            if (null != distributedLogManager) {
                distributedLogManager.close();
            }
            build.close();
            throw th;
        }
    }

    /* JADX WARN: Type inference failed for: r0v10, types: [long, org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedScheduler] */
    @Test(timeout = 60000)
    public void testTruncationValidation() throws Exception {
        URI createDLMURI = createDLMURI("/distrlog-truncation-validation");
        ZooKeeperClient build = TestZooKeeperClientBuilder.newBuilder().uri(createDLMURI).build();
        OrderedScheduler build2 = OrderedScheduler.newSchedulerBuilder().name("test-truncation-validation").numThreads(1).build();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(conf);
        distributedLogConfiguration.setDLLedgerMetadataLayoutVersion(LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setLogSegmentCacheEnabled(false);
        ZKLogSegmentMetadataStore zKLogSegmentMetadataStore = new ZKLogSegmentMetadataStore(distributedLogConfiguration, build, build2);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-truncation-validation");
        DLSN dlsn = DLSN.InitialDLSN;
        DLSN dlsn2 = DLSN.InitialDLSN;
        long j = 1;
        long j2 = 1;
        long j3 = 0;
        while (true) {
            long j4 = j3;
            if (j4 >= 3) {
                break;
            }
            BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
            long j5 = 1;
            while (true) {
                long j6 = j5;
                if (j6 <= 10) {
                    long j7 = j2;
                    j2 = build2 + 1;
                    LogRecord largeLogRecordInstance = DLMTestUtil.getLargeLogRecordInstance(j7);
                    CompletableFuture<DLSN> write = startAsyncLogSegmentNonPartitioned.write(largeLogRecordInstance);
                    if (j4 == 1 && j6 == 2) {
                        dlsn = (DLSN) Utils.ioResult(write);
                    } else if (j4 == 2 && j6 == 3) {
                        dlsn2 = (DLSN) Utils.ioResult(write);
                        j = largeLogRecordInstance.getTransactionId();
                    } else if (j6 == 10) {
                        Utils.ioResult(write);
                    }
                    j5 = j6 + 1;
                }
            }
            startAsyncLogSegmentNonPartitioned.close();
            j3 = j4 + 1;
        }
        LogReader inputStream = createNewDLM.getInputStream(DLSN.InitialDLSN);
        LogRecordWithDLSN readNext = inputStream.readNext(false);
        Assert.assertTrue(readNext != null && readNext.getDlsn().compareTo(DLSN.InitialDLSN) == 0);
        inputStream.close();
        Map<Long, LogSegmentMetadata> readLogSegments = DLMTestUtil.readLogSegments(build, LogMetadata.getLogSegmentsPath(createDLMURI, "distrlog-truncation-validation", distributedLogConfiguration.getUnpartitionedStreamName()));
        LOG.info("Read segments before truncating first segment : {}", readLogSegments);
        Utils.ioResult(LogSegmentMetadataStoreUpdater.createMetadataUpdater(distributedLogConfiguration, zKLogSegmentMetadataStore).setLogSegmentTruncated(readLogSegments.get(1L)));
        Map<Long, LogSegmentMetadata> readLogSegments2 = DLMTestUtil.readLogSegments(build, LogMetadata.getLogSegmentsPath(createDLMURI, "distrlog-truncation-validation", distributedLogConfiguration.getUnpartitionedStreamName()));
        LOG.info("Read segments after truncated first segment : {}", readLogSegments2);
        LogReader inputStream2 = createNewDLM.getInputStream(DLSN.InitialDLSN);
        LogRecordWithDLSN readNext2 = inputStream2.readNext(false);
        Assert.assertTrue("Unexpected record : " + readNext2, readNext2 != null && readNext2.getDlsn().compareTo(new DLSN(2L, 0L, 0L)) == 0);
        inputStream2.close();
        LogReader inputStream3 = createNewDLM.getInputStream(1L);
        LogRecordWithDLSN readNext3 = inputStream3.readNext(false);
        Assert.assertTrue(readNext3 != null && readNext3.getDlsn().compareTo(new DLSN(2L, 0L, 0L)) == 0);
        inputStream3.close();
        Utils.ioResult(LogSegmentMetadataStoreUpdater.createMetadataUpdater(distributedLogConfiguration, zKLogSegmentMetadataStore).setLogSegmentActive(readLogSegments2.get(1L)));
        Map<Long, LogSegmentMetadata> readLogSegments3 = DLMTestUtil.readLogSegments(build, LogMetadata.getLogSegmentsPath(createDLMURI, "distrlog-truncation-validation", distributedLogConfiguration.getUnpartitionedStreamName()));
        LOG.info("Read segments after marked first segment as active : {}", readLogSegments3);
        Utils.ioResult(LogSegmentMetadataStoreUpdater.createMetadataUpdater(distributedLogConfiguration, zKLogSegmentMetadataStore).setLogSegmentTruncated(readLogSegments3.get(2L)));
        Map<Long, LogSegmentMetadata> readLogSegments4 = DLMTestUtil.readLogSegments(build, LogMetadata.getLogSegmentsPath(createDLMURI, "distrlog-truncation-validation", distributedLogConfiguration.getUnpartitionedStreamName()));
        LOG.info("Read segments after truncated second segment : {}", readLogSegments4);
        AsyncLogReader asyncLogReader = createNewDLM.getAsyncLogReader(DLSN.InitialDLSN);
        long j8 = 1;
        boolean z = false;
        for (int i = 0; i < 30; i++) {
            try {
                LogRecordWithDLSN logRecordWithDLSN = (LogRecordWithDLSN) Utils.ioResult(asyncLogReader.readNext());
                DLMTestUtil.verifyLargeLogRecord(logRecordWithDLSN);
                Assert.assertEquals(j8, logRecordWithDLSN.getTransactionId());
                j8++;
            } catch (AlreadyTruncatedTransactionException e) {
                z = true;
            }
        }
        Assert.assertTrue(z);
        Utils.close(asyncLogReader);
        Utils.ioResult(LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, zKLogSegmentMetadataStore).setLogSegmentActive(readLogSegments4.get(2L)));
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned2 = createNewDLM.startAsyncLogSegmentNonPartitioned();
        Assert.assertTrue(((Boolean) Utils.ioResult(startAsyncLogSegmentNonPartitioned2.truncate(dlsn))).booleanValue());
        for (LogSegmentMetadata logSegmentMetadata : startAsyncLogSegmentNonPartitioned2.getCachedWriteHandler().getCachedLogSegments(LogSegmentMetadata.COMPARATOR)) {
            if (logSegmentMetadata.getLastDLSN().compareTo(dlsn) < 0) {
                Assert.assertTrue(logSegmentMetadata.isTruncated());
                Assert.assertTrue(!logSegmentMetadata.isPartiallyTruncated());
            } else if (logSegmentMetadata.getFirstDLSN().compareTo(dlsn) < 0) {
                Assert.assertTrue(!logSegmentMetadata.isTruncated());
                Assert.assertTrue(logSegmentMetadata.isPartiallyTruncated());
            } else {
                Assert.assertTrue(!logSegmentMetadata.isTruncated());
                Assert.assertTrue(!logSegmentMetadata.isPartiallyTruncated());
            }
        }
        Assert.assertTrue(DLMTestUtil.readLogSegments(build, LogMetadata.getLogSegmentsPath(createDLMURI, "distrlog-truncation-validation", conf.getUnpartitionedStreamName())).get(Long.valueOf(dlsn.getLogSegmentSequenceNo())).getMinActiveDLSN().compareTo(dlsn) == 0);
        LogReader inputStream4 = createNewDLM.getInputStream(DLSN.InitialDLSN);
        LogRecordWithDLSN readNext4 = inputStream4.readNext(false);
        Assert.assertTrue(readNext4 != null);
        Assert.assertEquals(dlsn, readNext4.getDlsn());
        inputStream4.close();
        LogReader inputStream5 = createNewDLM.getInputStream(1L);
        LogRecordWithDLSN readNext5 = inputStream5.readNext(false);
        Assert.assertTrue(readNext5 != null);
        Assert.assertEquals(dlsn, readNext5.getDlsn());
        inputStream5.close();
        AsyncLogReader asyncLogReader2 = createNewDLM.getAsyncLogReader(DLSN.InitialDLSN);
        LogRecordWithDLSN logRecordWithDLSN2 = (LogRecordWithDLSN) Utils.ioResult(asyncLogReader2.readNext());
        Assert.assertTrue(logRecordWithDLSN2 != null);
        Assert.assertEquals(dlsn, logRecordWithDLSN2.getDlsn());
        Utils.close(asyncLogReader2);
        LogReader inputStream6 = createNewDLM.getInputStream(dlsn2);
        LogRecordWithDLSN readNext6 = inputStream6.readNext(false);
        Assert.assertTrue(readNext6 != null);
        Assert.assertEquals(dlsn2, readNext6.getDlsn());
        inputStream6.close();
        LogReader inputStream7 = createNewDLM.getInputStream(j);
        LogRecordWithDLSN readNext7 = inputStream7.readNext(false);
        Assert.assertTrue(readNext7 != null);
        Assert.assertEquals(dlsn2, readNext7.getDlsn());
        Assert.assertEquals(j, readNext7.getTransactionId());
        inputStream7.close();
        AsyncLogReader asyncLogReader3 = createNewDLM.getAsyncLogReader(dlsn2);
        LogRecordWithDLSN logRecordWithDLSN3 = (LogRecordWithDLSN) Utils.ioResult(asyncLogReader3.readNext());
        Assert.assertTrue(logRecordWithDLSN3 != null);
        Assert.assertEquals(dlsn2, logRecordWithDLSN3.getDlsn());
        Utils.close(asyncLogReader3);
        build.close();
    }

    /* JADX WARN: Type inference failed for: r0v6, types: [org.apache.distributedlog.BKSyncLogWriter, long] */
    @Test(timeout = 60000)
    public void testDeleteLog() throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, "delete-log-should-delete-ledgers");
        long j = 1;
        ?? r0 = (BKSyncLogWriter) createNewDLM.startLogSegmentNonPartitioned();
        long j2 = 1;
        while (true) {
            long j3 = j2;
            if (j3 > 1000) {
                break;
            }
            j++;
            r0.write(DLMTestUtil.getLogRecordInstance(r0));
            j2 = j3 + 1;
        }
        BKLogSegmentWriter cachedLogWriter = r0.getCachedLogWriter();
        r0.closeAndComplete();
        BKLogWriteHandler createWriteHandler = createNewDLM.createWriteHandler(true);
        Assert.assertNotNull(this.zkc.exists(createWriteHandler.completedLedgerZNode(j, j - 1, cachedLogWriter.getLogSegmentSequenceNumber()), false));
        Utils.ioResult(createWriteHandler.asyncClose());
        long logSegmentId = cachedLogWriter.getLogSegmentId();
        BKNamespaceDriver bKNamespaceDriver = (BKNamespaceDriver) createNewDLM.getNamespaceDriver();
        bKNamespaceDriver.getReaderBKC().get().openLedgerNoRecovery(logSegmentId, BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(StandardCharsets.UTF_8));
        createNewDLM.delete();
        try {
            bKNamespaceDriver.getReaderBKC().get().openLedgerNoRecovery(logSegmentId, BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(StandardCharsets.UTF_8));
            Assert.fail("Should fail to open ledger after we delete the log");
        } catch (BKException.BKNoSuchLedgerExistsOnMetadataServerException e) {
        }
        try {
            createNewDLM.delete();
        } catch (IOException e2) {
            Assert.fail("Delete log twice should not throw any exception");
        }
    }

    @Test(timeout = 60000)
    public void testSyncLogWithLedgerMetadata() throws Exception {
        LedgerMetadata ledgerMetadata = new LedgerMetadata();
        ledgerMetadata.setApplication("myapplication");
        ledgerMetadata.setComponent("mycomponent");
        ledgerMetadata.addCustomMetadata("custom", "mycustommetadata");
        BKSyncLogWriter openLogWriter = createNewDLM(conf, "distrlog-writemetadata").openLogWriter(ledgerMetadata);
        openLogWriter.write(DLMTestUtil.getLogRecordInstance(1L));
        Map<String, byte[]> customMetadata = getLedgerHandle(openLogWriter.getCachedLogWriter()).getCustomMetadata();
        Assert.assertEquals("myapplication", new String(customMetadata.get("application"), StandardCharsets.UTF_8));
        Assert.assertEquals("mycomponent", new String(customMetadata.get("component"), StandardCharsets.UTF_8));
        Assert.assertEquals("mycustommetadata", new String(customMetadata.get("custom"), StandardCharsets.UTF_8));
        openLogWriter.closeAndComplete();
    }

    @Test(timeout = 60000)
    public void testAsyncLogWithLedgerMetadata() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setWriteLockEnabled(false);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-writemetadata");
        LedgerMetadata ledgerMetadata = new LedgerMetadata();
        ledgerMetadata.setApplication("myapplication");
        ledgerMetadata.addCustomMetadata("custom", "mycustommetadata");
        AsyncLogWriter asyncLogWriter = (AsyncLogWriter) Utils.ioResult(createNewDLM.openAsyncLogWriter(ledgerMetadata));
        Utils.ioResult(asyncLogWriter.write(DLMTestUtil.getLogRecordInstance(2L)));
        Map<String, byte[]> customMetadata = getLedgerHandle(((BKAsyncLogWriter) asyncLogWriter).getCachedLogWriter()).getCustomMetadata();
        Assert.assertEquals("myapplication", new String(customMetadata.get("application"), StandardCharsets.UTF_8));
        Assert.assertNull(customMetadata.get("component"));
        Assert.assertEquals("mycustommetadata", new String(customMetadata.get("custom"), StandardCharsets.UTF_8));
    }

    static {
        $assertionsDisabled = !TestBKDistributedLogManager.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(TestBKDistributedLogManager.class);
        RAND = new Random(System.currentTimeMillis());
    }
}
