package org.apache.distributedlog;

import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.distributedlog.Entry;
import org.apache.distributedlog.ReadAheadEntryReader;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException;
import org.apache.distributedlog.exceptions.DLIllegalStateException;
import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryStore;
import org.apache.distributedlog.injector.AsyncFailureInjector;
import org.apache.distributedlog.io.AsyncCloseable;
import org.apache.distributedlog.util.ConfUtils;
import org.apache.distributedlog.util.Utils;
import org.apache.pulsar.shade.com.google.common.base.Ticker;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

/* loaded from: input_file:org/apache/distributedlog/TestReadAheadEntryReader.class */
public class TestReadAheadEntryReader extends TestDistributedLogBase {
    private static final int MAX_CACHED_ENTRIES = 5;
    private static final int NUM_PREFETCH_ENTRIES = 10;

    @Rule
    public TestName runtime = new TestName();
    private DistributedLogConfiguration baseConf;
    private OrderedScheduler scheduler;
    private BookKeeperClient bkc;
    private ZooKeeperClient zkc;

    @Override // org.apache.distributedlog.TestDistributedLogBase
    @Before
    public void setup() throws Exception {
        super.setup();
        this.baseConf = new DistributedLogConfiguration();
        this.baseConf.addConfiguration(conf);
        this.baseConf.setOutputBufferSize(0);
        this.baseConf.setPeriodicFlushFrequencyMilliSeconds(0);
        this.baseConf.setImmediateFlushEnabled(false);
        this.baseConf.setReadAheadMaxRecords(5);
        this.baseConf.setNumPrefetchEntriesPerLogSegment(10);
        this.baseConf.setMaxPrefetchEntriesPerLogSegment(10);
        this.zkc = ZooKeeperClientBuilder.newBuilder().name("test-zk").zkServers(bkutil.getZkServers()).sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()).zkAclId(conf.getZkAclId()).build();
        this.bkc = BookKeeperClientBuilder.newBuilder().name("test-bk").dlConfig(conf).ledgersPath("/ledgers").zkServers(bkutil.getZkServers()).build();
        this.scheduler = OrderedScheduler.newSchedulerBuilder().name("test-read-ahead-entry-reader").numThreads(1).build();
    }

    @Override // org.apache.distributedlog.TestDistributedLogBase
    @After
    public void teardown() throws Exception {
        if (null != this.bkc) {
            this.bkc.close();
        }
        if (null != this.scheduler) {
            this.scheduler.shutdown();
        }
        if (null != this.zkc) {
            this.zkc.close();
        }
        super.teardown();
    }

    private ReadAheadEntryReader createEntryReader(String str, DLSN dlsn, BKDistributedLogManager bKDistributedLogManager, DistributedLogConfiguration distributedLogConfiguration) throws Exception {
        return new ReadAheadEntryReader(str, dlsn, distributedLogConfiguration, bKDistributedLogManager.createReadHandler(Optional.empty(), true), new BKLogSegmentEntryStore(distributedLogConfiguration, ConfUtils.getConstDynConf(distributedLogConfiguration), this.zkc, this.bkc, this.scheduler, null, NullStatsLogger.INSTANCE, AsyncFailureInjector.NULL), this.scheduler, Ticker.systemTicker(), new AlertStatsLogger(NullStatsLogger.INSTANCE, "test-alert"));
    }

    private void ensureOrderSchedulerEmpty(String str) throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.scheduler.executeOrdered(str, () -> {
            FutureUtils.complete(completableFuture, null);
        });
        Utils.ioResult(completableFuture);
    }

    void generateCompletedLogSegments(DistributedLogManager distributedLogManager, long j, long j2) throws Exception {
        generateCompletedLogSegments(distributedLogManager, j, j2, 1L);
    }

    /* JADX WARN: Type inference failed for: r0v8, types: [org.apache.distributedlog.api.AsyncLogWriter, long, org.apache.distributedlog.io.AsyncCloseable] */
    void generateCompletedLogSegments(DistributedLogManager distributedLogManager, long j, long j2, long j3) throws Exception {
        long j4 = j3;
        long j5 = 0;
        while (true) {
            long j6 = j5;
            if (j6 >= j) {
                return;
            }
            ?? r0 = (AsyncLogWriter) Utils.ioResult(distributedLogManager.openAsyncLogWriter());
            long j7 = 1;
            while (true) {
                long j8 = j7;
                if (j8 <= j2) {
                    j4++;
                    Utils.ioResult(r0.write(DLMTestUtil.getLogRecordInstance(r0)));
                    LogRecord logRecordInstance = DLMTestUtil.getLogRecordInstance(j4);
                    logRecordInstance.setControl();
                    Utils.ioResult(r0.write(logRecordInstance));
                    j7 = j8 + 1;
                }
            }
            Utils.close((AsyncCloseable) r0);
            j5 = j6 + 1;
        }
    }

    AsyncLogWriter createInprogressLogSegment(DistributedLogManager distributedLogManager, DistributedLogConfiguration distributedLogConfiguration, long j) throws Exception {
        AsyncLogWriter asyncLogWriter = (AsyncLogWriter) Utils.ioResult(distributedLogManager.openAsyncLogWriter());
        long j2 = 1;
        while (true) {
            long j3 = j2;
            if (j3 > j) {
                return asyncLogWriter;
            }
            Utils.ioResult(asyncLogWriter.write(DLMTestUtil.getLogRecordInstance(j3)));
            LogRecord logRecordInstance = DLMTestUtil.getLogRecordInstance(j3);
            logRecordInstance.setControl();
            Utils.ioResult(asyncLogWriter.write(logRecordInstance));
            j2 = j3 + 1;
        }
    }

    void expectAlreadyTruncatedTransactionException(ReadAheadEntryReader readAheadEntryReader, String str) throws Exception {
        try {
            readAheadEntryReader.checkLastException();
            Assert.fail(str);
        } catch (AlreadyTruncatedTransactionException e) {
        }
    }

    void expectIllegalStateException(ReadAheadEntryReader readAheadEntryReader, String str) throws Exception {
        try {
            readAheadEntryReader.checkLastException();
            Assert.fail(str);
        } catch (DLIllegalStateException e) {
        }
    }

    void expectNoException(ReadAheadEntryReader readAheadEntryReader) throws Exception {
        readAheadEntryReader.checkLastException();
    }

    @Test(timeout = 60000)
    public void testStartWithEmptySegmentList() throws Exception {
        String methodName = this.runtime.getMethodName();
        BKDistributedLogManager createNewDLM = createNewDLM(this.baseConf, methodName);
        ReadAheadEntryReader createEntryReader = createEntryReader(methodName, DLSN.InitialDLSN, createNewDLM, this.baseConf);
        createEntryReader.start(Lists.newArrayList());
        ensureOrderSchedulerEmpty(methodName);
        Assert.assertFalse("ReadAhead should not be initialized with empty segment list", createEntryReader.isInitialized());
        Assert.assertTrue("ReadAhead should be empty when it isn't initialized", createEntryReader.isCacheEmpty());
        Assert.assertFalse("ReadAhead should not be marked as caught up when it isn't initialized", createEntryReader.isReadAheadCaughtUp());
        generateCompletedLogSegments(createNewDLM, 1L, 3L);
        List<LogSegmentMetadata> logSegments = createNewDLM.getLogSegments();
        Assert.assertEquals(logSegments.size() + " log segments found, expected to be only one", 1L, logSegments.size());
        createEntryReader.onSegmentsUpdated(logSegments);
        ensureOrderSchedulerEmpty(methodName);
        Assert.assertTrue("ReadAhead should be initialized with non-empty segment list", createEntryReader.isInitialized());
        Assert.assertNotNull("current segment reader should be initialized", createEntryReader.getCurrentSegmentReader());
        Assert.assertEquals("current segment sequence number should be " + logSegments.get(0).getLogSegmentSequenceNumber(), logSegments.get(0).getLogSegmentSequenceNumber(), createEntryReader.getCurrentSegmentSequenceNumber());
        Assert.assertNull("there should be no next segment reader", createEntryReader.getNextSegmentReader());
        Assert.assertTrue("there should be no remaining segment readers", createEntryReader.getSegmentReaders().isEmpty());
        Utils.close(createEntryReader);
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testInitializeMultipleClosedLogSegments0() throws Exception {
        testInitializeMultipleClosedLogSegments(5, DLSN.InitialDLSN, 0);
    }

    @Test(timeout = 60000)
    public void testInitializeMultipleClosedLogSegments1() throws Exception {
        testInitializeMultipleClosedLogSegments(5, new DLSN(4L, 0L, 0L), 3);
    }

    private void testInitializeMultipleClosedLogSegments(int i, DLSN dlsn, int i2) throws Exception {
        String methodName = this.runtime.getMethodName();
        BKDistributedLogManager createNewDLM = createNewDLM(this.baseConf, methodName);
        generateCompletedLogSegments(createNewDLM, 1L, 3L, 1L);
        generateCompletedLogSegments(createNewDLM, i - 1, 1L, 7L);
        List<LogSegmentMetadata> logSegments = createNewDLM.getLogSegments();
        Assert.assertEquals(logSegments.size() + " log segments found, expected to be " + i, i, logSegments.size());
        ReadAheadEntryReader createEntryReader = createEntryReader(methodName, dlsn, createNewDLM, this.baseConf);
        createEntryReader.start(logSegments);
        ensureOrderSchedulerEmpty(methodName);
        Assert.assertTrue("ReadAhead should be initialized with non-empty segment list", createEntryReader.isInitialized());
        Assert.assertNotNull("current segment reader should be initialized", createEntryReader.getCurrentSegmentReader());
        Assert.assertTrue("current segment reader should be open and started", createEntryReader.getCurrentSegmentReader().isReaderOpen() && createEntryReader.getCurrentSegmentReader().isReaderStarted());
        Assert.assertEquals("current segment reader should read " + logSegments.get(i2), logSegments.get(i2), createEntryReader.getCurrentSegmentReader().getSegment());
        Assert.assertEquals("current segment sequence number should be " + logSegments.get(i2).getLogSegmentSequenceNumber(), logSegments.get(i2).getLogSegmentSequenceNumber(), createEntryReader.getCurrentSegmentSequenceNumber());
        Assert.assertNull("next segment reader should not be initialized since it is a closed log segment", createEntryReader.getNextSegmentReader());
        Assert.assertEquals("there should be " + (i - (i2 + 1)) + " remaining segment readers", i - (i2 + 1), createEntryReader.getSegmentReaders().size());
        int i3 = i2 + 1;
        Iterator<ReadAheadEntryReader.SegmentReader> it = createEntryReader.getSegmentReaders().iterator();
        while (it.hasNext()) {
            ReadAheadEntryReader.SegmentReader next = it.next();
            LogSegmentMetadata logSegmentMetadata = logSegments.get(i3);
            Assert.assertEquals("Segment should " + logSegmentMetadata, logSegmentMetadata, next.getSegment());
            Assert.assertTrue("Segment reader for " + logSegmentMetadata + " should be open", next.isReaderOpen());
            Assert.assertFalse("Segment reader for " + logSegmentMetadata + " should not be started", next.isReaderStarted());
            i3++;
        }
        Utils.close(createEntryReader);
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testPositioningAtInvalidLogSegment() throws Exception {
        String methodName = this.runtime.getMethodName();
        BKDistributedLogManager createNewDLM = createNewDLM(this.baseConf, methodName);
        generateCompletedLogSegments(createNewDLM, 3L, 3L);
        AsyncLogWriter asyncLogWriter = (AsyncLogWriter) Utils.ioResult(createNewDLM.openAsyncLogWriter());
        Utils.ioResult(asyncLogWriter.truncate(new DLSN(2L, 1L, 0L)));
        List<LogSegmentMetadata> logSegments = createNewDLM.getLogSegments();
        ReadAheadEntryReader createEntryReader = createEntryReader(methodName, DLSN.InitialDLSN, createNewDLM, this.baseConf);
        createEntryReader.start(logSegments);
        ensureOrderSchedulerEmpty(methodName);
        expectNoException(createEntryReader);
        Entry.Reader nextReadAheadEntry = createEntryReader.getNextReadAheadEntry(DistributedLogConstants.MAX_TXID, TimeUnit.MILLISECONDS);
        Assert.assertEquals(2L, nextReadAheadEntry.getLSSN());
        Assert.assertEquals(1L, nextReadAheadEntry.getEntryId());
        nextReadAheadEntry.release();
        Utils.close(createEntryReader);
        ReadAheadEntryReader createEntryReader2 = createEntryReader(methodName, new DLSN(2L, 0L, 0L), createNewDLM, this.baseConf);
        createEntryReader2.start(logSegments);
        ensureOrderSchedulerEmpty(methodName);
        expectNoException(createEntryReader2);
        Entry.Reader nextReadAheadEntry2 = createEntryReader2.getNextReadAheadEntry(DistributedLogConstants.MAX_TXID, TimeUnit.MILLISECONDS);
        Assert.assertEquals(2L, nextReadAheadEntry2.getLSSN());
        Assert.assertEquals(1L, nextReadAheadEntry2.getEntryId());
        nextReadAheadEntry2.release();
        Utils.close(createEntryReader2);
        ReadAheadEntryReader createEntryReader3 = createEntryReader(methodName, new DLSN(2L, 2L, 0L), createNewDLM, this.baseConf);
        createEntryReader3.start(logSegments);
        ensureOrderSchedulerEmpty(methodName);
        expectNoException(createEntryReader3);
        Entry.Reader nextReadAheadEntry3 = createEntryReader3.getNextReadAheadEntry(DistributedLogConstants.MAX_TXID, TimeUnit.MILLISECONDS);
        Assert.assertEquals(2L, nextReadAheadEntry3.getLSSN());
        Assert.assertEquals(2L, nextReadAheadEntry3.getEntryId());
        nextReadAheadEntry3.release();
        Utils.close(createEntryReader3);
        Utils.close(asyncLogWriter);
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testPositioningIgnoreTruncationStatus() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(this.baseConf);
        distributedLogConfiguration.setIgnoreTruncationStatus(true);
        String methodName = this.runtime.getMethodName();
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        generateCompletedLogSegments(createNewDLM, 3L, 2L);
        AsyncLogWriter asyncLogWriter = (AsyncLogWriter) Utils.ioResult(createNewDLM.openAsyncLogWriter());
        Utils.ioResult(asyncLogWriter.truncate(new DLSN(2L, 1L, 0L)));
        List<LogSegmentMetadata> logSegments = createNewDLM.getLogSegments();
        ReadAheadEntryReader createEntryReader = createEntryReader(methodName, DLSN.InitialDLSN, createNewDLM, distributedLogConfiguration);
        createEntryReader.start(logSegments);
        ensureOrderSchedulerEmpty(methodName);
        expectNoException(createEntryReader);
        Utils.close(createEntryReader);
        ReadAheadEntryReader createEntryReader2 = createEntryReader(methodName, new DLSN(2L, 0L, 0L), createNewDLM, distributedLogConfiguration);
        createEntryReader2.start(logSegments);
        ensureOrderSchedulerEmpty(methodName);
        expectNoException(createEntryReader2);
        Utils.close(createEntryReader2);
        ReadAheadEntryReader createEntryReader3 = createEntryReader(methodName, new DLSN(2L, 1L, 0L), createNewDLM, distributedLogConfiguration);
        createEntryReader3.start(logSegments);
        ensureOrderSchedulerEmpty(methodName);
        expectNoException(createEntryReader3);
        Utils.close(createEntryReader3);
        Utils.close(asyncLogWriter);
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testLogSegmentSequenceNumberGap() throws Exception {
        String methodName = this.runtime.getMethodName();
        BKDistributedLogManager createNewDLM = createNewDLM(this.baseConf, methodName);
        generateCompletedLogSegments(createNewDLM, 3L, 2L);
        List<LogSegmentMetadata> logSegments = createNewDLM.getLogSegments();
        ReadAheadEntryReader createEntryReader = createEntryReader(methodName, DLSN.InitialDLSN, createNewDLM, this.baseConf);
        createEntryReader.start(logSegments.subList(0, 1));
        ensureOrderSchedulerEmpty(methodName);
        Assert.assertTrue("ReadAhead should be initialized with non-empty segment list", createEntryReader.isInitialized());
        Assert.assertNotNull("current segment reader should be initialized", createEntryReader.getCurrentSegmentReader());
        Assert.assertTrue("current segment reader should be open and started", createEntryReader.getCurrentSegmentReader().isReaderOpen() && createEntryReader.getCurrentSegmentReader().isReaderStarted());
        Assert.assertEquals("current segment reader should read " + logSegments.get(0), logSegments.get(0), createEntryReader.getCurrentSegmentReader().getSegment());
        Assert.assertEquals("current segment sequence number should be " + logSegments.get(0).getLogSegmentSequenceNumber(), logSegments.get(0).getLogSegmentSequenceNumber(), createEntryReader.getCurrentSegmentSequenceNumber());
        Assert.assertNull("next segment reader should not be initialized since it is a closed log segment", createEntryReader.getNextSegmentReader());
        createEntryReader.onSegmentsUpdated(logSegments.subList(2, 3));
        ensureOrderSchedulerEmpty(methodName);
        expectIllegalStateException(createEntryReader, "inconsistent log segment found");
        Utils.close(createEntryReader);
        createNewDLM.close();
    }
}
