package org.apache.distributedlog;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.exceptions.IdleReaderException;
import org.apache.distributedlog.util.Utils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Ignore
/* loaded from: input_file:org/apache/distributedlog/TestNonBlockingReads.class */
public class TestNonBlockingReads extends TestDistributedLogBase {
    static final Logger LOG = LoggerFactory.getLogger(TestNonBlockingReads.class);

    @Test(timeout = 100000)
    public void testNonBlockingRead() throws Exception {
        final DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(conf);
        distributedLogConfiguration.setReadAheadBatchSize(1);
        distributedLogConfiguration.setReadAheadMaxRecords(1);
        distributedLogConfiguration.setReaderIdleWarnThresholdMillis(100);
        distributedLogConfiguration.setReadLACLongPollTimeout(49);
        final BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-non-blocking-reader");
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        ScheduledFuture<?> scheduledFuture = null;
        try {
            final Thread currentThread = Thread.currentThread();
            scheduledFuture = scheduledThreadPoolExecutor.schedule(new Runnable() { // from class: org.apache.distributedlog.TestNonBlockingReads.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        NonBlockingReadsTestUtil.writeRecordsForNonBlockingReads(distributedLogConfiguration, createNewDLM, false);
                    } catch (Exception e) {
                        currentThread.interrupt();
                    }
                }
            }, 100L, TimeUnit.MILLISECONDS);
            NonBlockingReadsTestUtil.readNonBlocking(createNewDLM, false);
            Assert.assertFalse(currentThread.isInterrupted());
            if (scheduledFuture != null) {
                scheduledFuture.get();
            }
            scheduledThreadPoolExecutor.shutdown();
            createNewDLM.close();
        } catch (Throwable th) {
            if (scheduledFuture != null) {
                scheduledFuture.get();
            }
            scheduledThreadPoolExecutor.shutdown();
            createNewDLM.close();
            throw th;
        }
    }

    @Test(timeout = 100000)
    public void testNonBlockingReadRecovery() throws Exception {
        final DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(conf);
        distributedLogConfiguration.setReadAheadBatchSize(10);
        distributedLogConfiguration.setReadAheadMaxRecords(10);
        final BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-non-blocking-reader-recovery");
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        ScheduledFuture<?> scheduledFuture = null;
        try {
            final Thread currentThread = Thread.currentThread();
            scheduledFuture = scheduledThreadPoolExecutor.schedule(new Runnable() { // from class: org.apache.distributedlog.TestNonBlockingReads.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        NonBlockingReadsTestUtil.writeRecordsForNonBlockingReads(distributedLogConfiguration, createNewDLM, true);
                    } catch (Exception e) {
                        currentThread.interrupt();
                    }
                }
            }, 100L, TimeUnit.MILLISECONDS);
            NonBlockingReadsTestUtil.readNonBlocking(createNewDLM, false);
            Assert.assertFalse(currentThread.isInterrupted());
            if (scheduledFuture != null) {
                scheduledFuture.get();
            }
            scheduledThreadPoolExecutor.shutdown();
            createNewDLM.close();
        } catch (Throwable th) {
            if (scheduledFuture != null) {
                scheduledFuture.get();
            }
            scheduledThreadPoolExecutor.shutdown();
            createNewDLM.close();
            throw th;
        }
    }

    @Test(timeout = 100000)
    public void testNonBlockingReadIdleError() throws Exception {
        final DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(conf);
        distributedLogConfiguration.setReadAheadBatchSize(1);
        distributedLogConfiguration.setReadAheadMaxRecords(1);
        distributedLogConfiguration.setReadLACLongPollTimeout(24);
        distributedLogConfiguration.setReaderIdleWarnThresholdMillis(50);
        distributedLogConfiguration.setReaderIdleErrorThresholdMillis(100);
        final BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-non-blocking-reader-error");
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        ScheduledFuture<?> scheduledFuture = null;
        try {
            final Thread currentThread = Thread.currentThread();
            scheduledFuture = scheduledThreadPoolExecutor.schedule(new Runnable() { // from class: org.apache.distributedlog.TestNonBlockingReads.3
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        NonBlockingReadsTestUtil.writeRecordsForNonBlockingReads(distributedLogConfiguration, createNewDLM, false);
                    } catch (Exception e) {
                        currentThread.interrupt();
                    }
                }
            }, 100L, TimeUnit.MILLISECONDS);
            boolean z = false;
            try {
                NonBlockingReadsTestUtil.readNonBlocking(createNewDLM, false, 1000L, true);
            } catch (IdleReaderException e) {
                z = true;
            }
            Assert.assertTrue(z);
            Assert.assertFalse(currentThread.isInterrupted());
            if (scheduledFuture != null) {
                scheduledFuture.get();
            }
            scheduledThreadPoolExecutor.shutdown();
            createNewDLM.close();
        } catch (Throwable th) {
            if (scheduledFuture != null) {
                scheduledFuture.get();
            }
            scheduledThreadPoolExecutor.shutdown();
            createNewDLM.close();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testNonBlockingReadAheadStall() throws Exception {
        final DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(conf);
        distributedLogConfiguration.setReadAheadBatchSize(1);
        distributedLogConfiguration.setReadAheadMaxRecords(3);
        distributedLogConfiguration.setReadLACLongPollTimeout(249);
        distributedLogConfiguration.setReaderIdleWarnThresholdMillis(500);
        distributedLogConfiguration.setReaderIdleErrorThresholdMillis(30000);
        final BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-non-blocking-reader-stall");
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        ScheduledFuture<?> scheduledFuture = null;
        try {
            final Thread currentThread = Thread.currentThread();
            scheduledFuture = scheduledThreadPoolExecutor.schedule(new Runnable() { // from class: org.apache.distributedlog.TestNonBlockingReads.4
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        NonBlockingReadsTestUtil.writeRecordsForNonBlockingReads(distributedLogConfiguration, createNewDLM, false, 3L);
                    } catch (Exception e) {
                        currentThread.interrupt();
                    }
                }
            }, 10L, TimeUnit.MILLISECONDS);
            boolean z = false;
            try {
                NonBlockingReadsTestUtil.readNonBlocking(createNewDLM, false, 3L, false);
            } catch (IdleReaderException e) {
                LOG.info("Exception encountered", e);
                z = true;
            }
            Assert.assertFalse(z);
            Assert.assertFalse(currentThread.isInterrupted());
            if (scheduledFuture != null) {
                scheduledFuture.get();
            }
            scheduledThreadPoolExecutor.shutdown();
            createNewDLM.close();
        } catch (Throwable th) {
            if (scheduledFuture != null) {
                scheduledFuture.get();
            }
            scheduledThreadPoolExecutor.shutdown();
            createNewDLM.close();
            throw th;
        }
    }

    private long createStreamWithInconsistentMetadata(String str) throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, str);
        ZooKeeperClient build = TestZooKeeperClientBuilder.newBuilder().uri(createDLMURI("/")).build();
        long j = 1;
        long j2 = 0;
        long j3 = 0;
        while (true) {
            long j4 = j3;
            if (j4 >= 3) {
                break;
            }
            BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
            long j5 = 1;
            while (true) {
                long j6 = j5;
                if (j6 <= 10) {
                    long j7 = j;
                    j = j7 + 1;
                    Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(j7)));
                    j2++;
                    j5 = j6 + 1;
                }
            }
            startAsyncLogSegmentNonPartitioned.closeAndComplete();
            j3 = j4 + 1;
        }
        String completedLedgerZNode = createNewDLM.createWriteHandler(true).completedLedgerZNode(j - 10, j - 1, 3L);
        LogSegmentMetadata logSegmentMetadata = (LogSegmentMetadata) Utils.ioResult(LogSegmentMetadata.read(build, completedLedgerZNode));
        build.get().delete(completedLedgerZNode, -1);
        long j8 = 100;
        logSegmentMetadata.mutator().setLastEntryId(logSegmentMetadata.getLastEntryId() + 100).setLastTxId(logSegmentMetadata.getLastTxId() + 100).build().write(build);
        long j9 = j + 100;
        long j10 = 0;
        while (true) {
            long j11 = j10;
            if (j11 >= 3) {
                createNewDLM.close();
                return j2;
            }
            BKAsyncLogWriter startAsyncLogSegmentNonPartitioned2 = createNewDLM.startAsyncLogSegmentNonPartitioned();
            long j12 = 1;
            while (true) {
                long j13 = j12;
                if (j13 <= 10) {
                    long j14 = j9;
                    long j15 = j8;
                    j8 = 1;
                    j9 = j15 + 1;
                    Utils.ioResult(startAsyncLogSegmentNonPartitioned2.write(DLMTestUtil.getLogRecordInstance(j14)));
                    j2++;
                    j12 = j13 + 1;
                }
            }
            startAsyncLogSegmentNonPartitioned2.closeAndComplete();
            j10 = j11 + 1;
        }
    }

    @Test(timeout = 60000)
    public void testHandleInconsistentMetadata() throws Exception {
        long createStreamWithInconsistentMetadata = createStreamWithInconsistentMetadata("distrlog-inconsistent-metadata-blocking-read");
        BKDistributedLogManager createNewDLM = createNewDLM(conf, "distrlog-inconsistent-metadata-blocking-read");
        try {
            LogReader inputStream = createNewDLM.getInputStream(45L);
            long j = 0;
            LogRecord readNext = inputStream.readNext(false);
            long j2 = -1;
            while (j < createStreamWithInconsistentMetadata / 2) {
                if (null != readNext) {
                    DLMTestUtil.verifyLogRecord(readNext);
                    Assert.assertTrue(j2 < readNext.getTransactionId());
                    j2 = readNext.getTransactionId();
                    j++;
                } else {
                    Thread.sleep(1L);
                }
                readNext = inputStream.readNext(false);
            }
            inputStream.close();
            Assert.assertEquals(createStreamWithInconsistentMetadata / 2, j);
            createNewDLM.close();
        } catch (Throwable th) {
            createNewDLM.close();
            throw th;
        }
    }

    @Test(timeout = 15000)
    public void testHandleInconsistentMetadataNonBlocking() throws Exception {
        long createStreamWithInconsistentMetadata = createStreamWithInconsistentMetadata("distrlog-inconsistent-metadata-nonblocking-read");
        BKDistributedLogManager createNewDLM = createNewDLM(conf, "distrlog-inconsistent-metadata-nonblocking-read");
        try {
            LogReader inputStream = createNewDLM.getInputStream(45L);
            long j = 0;
            long j2 = -1;
            while (j < createStreamWithInconsistentMetadata / 2) {
                LogRecordWithDLSN readNext = inputStream.readNext(false);
                if (readNext != null) {
                    DLMTestUtil.verifyLogRecord(readNext);
                    Assert.assertTrue(j2 < readNext.getTransactionId());
                    j2 = readNext.getTransactionId();
                    j++;
                } else {
                    Thread.sleep(1L);
                }
            }
            inputStream.close();
            createNewDLM.close();
        } catch (Throwable th) {
            createNewDLM.close();
            throw th;
        }
    }

    @Test(timeout = 15000)
    public void testHandleInconsistentMetadataDLSNNonBlocking() throws Exception {
        long createStreamWithInconsistentMetadata = createStreamWithInconsistentMetadata("distrlog-inconsistent-metadata-nonblocking-read-dlsn");
        BKDistributedLogManager createNewDLM = createNewDLM(conf, "distrlog-inconsistent-metadata-nonblocking-read-dlsn");
        try {
            LogReader inputStream = createNewDLM.getInputStream(DLSN.InitialDLSN);
            long j = 0;
            long j2 = -1;
            while (j < createStreamWithInconsistentMetadata) {
                LogRecordWithDLSN readNext = inputStream.readNext(false);
                if (readNext != null) {
                    DLMTestUtil.verifyLogRecord(readNext);
                    Assert.assertTrue(j2 < readNext.getTransactionId());
                    j2 = readNext.getTransactionId();
                    j++;
                } else {
                    Thread.sleep(1L);
                }
            }
            inputStream.close();
            createNewDLM.close();
        } catch (Throwable th) {
            createNewDLM.close();
            throw th;
        }
    }

    static {
        conf.setOutputBufferSize(0);
        conf.setImmediateFlushEnabled(true);
    }
}
