package org.apache.distributedlog;

import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.io.AsyncCloseable;
import org.apache.distributedlog.util.Utils;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Pair;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/TestTruncate.class */
public class TestTruncate extends TestDistributedLogBase {
    static final Logger LOG = LoggerFactory.getLogger(TestTruncate.class);
    protected static DistributedLogConfiguration conf = new DistributedLogConfiguration().setLockTimeout(10).setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10).setSchedulerShutdownTimeoutMs(0).setDLLedgerMetadataLayoutVersion(LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO.value);

    static void updateCompletionTime(ZooKeeperClient zooKeeperClient, LogSegmentMetadata logSegmentMetadata, long j) throws Exception {
        DLMTestUtil.updateSegmentMetadata(zooKeeperClient, logSegmentMetadata.mutator().setCompletionTime(j).build());
    }

    static void setTruncationStatus(ZooKeeperClient zooKeeperClient, LogSegmentMetadata logSegmentMetadata, LogSegmentMetadata.TruncationStatus truncationStatus) throws Exception {
        DLMTestUtil.updateSegmentMetadata(zooKeeperClient, logSegmentMetadata.mutator().setTruncationStatus(truncationStatus).build());
    }

    /* JADX WARN: Type inference failed for: r0v31, types: [org.apache.distributedlog.api.AsyncLogWriter, long, org.apache.distributedlog.io.AsyncCloseable] */
    @Test(timeout = 60000)
    public void testPurgeLogs() throws Exception {
        URI createDLMURI = createDLMURI("/distrlog-purge-logs");
        populateData(new HashMap(), conf, "distrlog-purge-logs", 10, 10, false);
        BKDistributedLogManager createNewDLM = createNewDLM(conf, "distrlog-purge-logs");
        List<LogSegmentMetadata> logSegments = createNewDLM.getLogSegments();
        LOG.info("Segments before modifying completion time : {}", logSegments);
        ZooKeeperClient build = TestZooKeeperClientBuilder.newBuilder(conf).uri(createDLMURI).build();
        long currentTimeMillis = System.currentTimeMillis() - 7200000;
        for (int i = 0; i < 5; i++) {
            updateCompletionTime(build, logSegments.get(i), currentTimeMillis + i);
        }
        build.close();
        LOG.info("Segments after modifying completion time : {}", createNewDLM.getLogSegments());
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(conf);
        distributedLogConfiguration.setRetentionPeriodHours(1);
        distributedLogConfiguration.setExplicitTruncationByApplication(false);
        BKDistributedLogManager createNewDLM2 = createNewDLM(distributedLogConfiguration, "distrlog-purge-logs");
        ?? startAsyncLogSegmentNonPartitioned = createNewDLM2.startAsyncLogSegmentNonPartitioned();
        long j = 101;
        for (int i2 = 1; i2 <= 10; i2++) {
            long j2 = j;
            j = startAsyncLogSegmentNonPartitioned + 1;
            Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(j2)));
        }
        BKAsyncLogWriter bKAsyncLogWriter = (BKAsyncLogWriter) startAsyncLogSegmentNonPartitioned;
        CompletableFuture<List<LogSegmentMetadata>> lastTruncationAttempt = bKAsyncLogWriter.getLastTruncationAttempt();
        while (true) {
            CompletableFuture<List<LogSegmentMetadata>> completableFuture = lastTruncationAttempt;
            if (completableFuture != null && completableFuture.isDone()) {
                Assert.assertEquals(6L, createNewDLM.getLogSegments().size());
                Utils.close((AsyncCloseable) startAsyncLogSegmentNonPartitioned);
                createNewDLM2.close();
                createNewDLM.close();
                return;
            }
            TimeUnit.MILLISECONDS.sleep(20L);
            lastTruncationAttempt = bKAsyncLogWriter.getLastTruncationAttempt();
        }
    }

    @Test(timeout = 60000)
    public void testTruncation() throws Exception {
        HashMap hashMap = new HashMap();
        Pair<DistributedLogManager, AsyncLogWriter> populateData = populateData(hashMap, conf, "distrlog-truncation", 4, 10, true);
        Thread.sleep(1000L);
        Assert.assertFalse(((Boolean) Utils.ioResult(populateData.getRight().truncate(DLSN.InvalidDLSN))).booleanValue());
        verifyEntries("distrlog-truncation", 1L, 1L, 50);
        for (int i = 1; i <= 4; i++) {
            Assert.assertTrue(((Boolean) Utils.ioResult(populateData.getRight().truncate(hashMap.get(Long.valueOf(((i - 1) * 10) + i))))).booleanValue());
            verifyEntries("distrlog-truncation", 1L, ((i - 1) * 10) + 1, ((5 - i) + 1) * 10);
        }
        Assert.assertTrue(((Boolean) Utils.ioResult(populateData.getRight().truncate(hashMap.get(Long.valueOf(43))))).booleanValue());
        verifyEntries("distrlog-truncation", 1L, 41L, 10);
        Utils.close(populateData.getRight());
        populateData.getLeft().close();
    }

    @Test(timeout = 60000)
    public void testExplicitTruncation() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(conf);
        distributedLogConfiguration.setExplicitTruncationByApplication(true);
        HashMap hashMap = new HashMap();
        Pair<DistributedLogManager, AsyncLogWriter> populateData = populateData(hashMap, distributedLogConfiguration, "distrlog-truncation-explicit", 4, 10, true);
        Thread.sleep(1000L);
        for (int i = 1; i <= 4; i++) {
            Assert.assertTrue(((Boolean) Utils.ioResult(populateData.getRight().truncate(hashMap.get(Long.valueOf(((i - 1) * 10) + i))))).booleanValue());
            verifyEntries("distrlog-truncation-explicit", 1L, ((i - 1) * 10) + 1, ((5 - i) + 1) * 10);
        }
        Assert.assertTrue(((Boolean) Utils.ioResult(populateData.getRight().truncate(hashMap.get(Long.valueOf(43))))).booleanValue());
        verifyEntries("distrlog-truncation-explicit", 1L, 41L, 10);
        Utils.close(populateData.getRight());
        populateData.getLeft().close();
        Utils.ioResult(createNewDLM(distributedLogConfiguration, "distrlog-truncation-explicit").createWriteHandler(true).purgeLogSegmentsOlderThanTxnId(2147483647L));
        verifyEntries("distrlog-truncation-explicit", 1L, 41L, 10);
    }

    /* JADX WARN: Type inference failed for: r0v41, types: [org.apache.distributedlog.api.AsyncLogWriter, long, org.apache.distributedlog.io.AsyncCloseable] */
    @Test(timeout = 60000)
    public void testOnlyPurgeSegmentsBeforeNoneFullyTruncatedSegment() throws Exception {
        URI createDLMURI = createDLMURI("/distrlog-only-purge-segments-before-none-fully-truncated-segment");
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setExplicitTruncationByApplication(true);
        populateData(new HashMap(), distributedLogConfiguration, "distrlog-only-purge-segments-before-none-fully-truncated-segment", 4, 10, false);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-only-purge-segments-before-none-fully-truncated-segment");
        List<LogSegmentMetadata> logSegments = createNewDLM.getLogSegments();
        LOG.info("Segments before modifying segment status : {}", logSegments);
        ZooKeeperClient build = TestZooKeeperClientBuilder.newBuilder(conf).uri(createDLMURI).build();
        setTruncationStatus(build, logSegments.get(0), LogSegmentMetadata.TruncationStatus.PARTIALLY_TRUNCATED);
        for (int i = 1; i < 4; i++) {
            setTruncationStatus(build, logSegments.get(i), LogSegmentMetadata.TruncationStatus.TRUNCATED);
        }
        List<LogSegmentMetadata> logSegments2 = createNewDLM.getLogSegments();
        createNewDLM.purgeLogsOlderThan(999999L);
        List<LogSegmentMetadata> logSegments3 = createNewDLM.getLogSegments();
        LOG.info("Segments after purge segments older than 999999 : {}", logSegments3);
        Assert.assertArrayEquals(logSegments2.toArray(new LogSegmentMetadata[logSegments2.size()]), logSegments3.toArray(new LogSegmentMetadata[logSegments3.size()]));
        createNewDLM.close();
        long currentTimeMillis = System.currentTimeMillis() - 36000000;
        for (int i2 = 0; i2 < 4; i2++) {
            updateCompletionTime(build, logSegments3.get(i2), currentTimeMillis + i2);
        }
        DistributedLogConfiguration distributedLogConfiguration2 = new DistributedLogConfiguration();
        distributedLogConfiguration2.addConfiguration(distributedLogConfiguration);
        distributedLogConfiguration2.setRetentionPeriodHours(1);
        BKDistributedLogManager createNewDLM2 = createNewDLM(distributedLogConfiguration2, "distrlog-only-purge-segments-before-none-fully-truncated-segment");
        ?? startAsyncLogSegmentNonPartitioned = createNewDLM2.startAsyncLogSegmentNonPartitioned();
        long j = 41;
        for (int i3 = 1; i3 <= 10; i3++) {
            long j2 = j;
            j = startAsyncLogSegmentNonPartitioned + 1;
            Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(j2)));
        }
        LOG.info("Get last dlsn of stream {} : {}", "distrlog-only-purge-segments-before-none-fully-truncated-segment", (DLSN) Utils.ioResult(createNewDLM2.getLastDLSNAsync()));
        Assert.assertEquals(5L, createNewDLM2.getLogSegments().size());
        Utils.close((AsyncCloseable) startAsyncLogSegmentNonPartitioned);
        createNewDLM2.close();
        build.close();
    }

    @Test(timeout = 60000)
    public void testPartiallyTruncateTruncatedSegments() throws Exception {
        URI createDLMURI = createDLMURI("/distrlog-partially-truncate-truncated-segments");
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setExplicitTruncationByApplication(true);
        HashMap hashMap = new HashMap();
        populateData(hashMap, distributedLogConfiguration, "distrlog-partially-truncate-truncated-segments", 4, 10, false);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-partially-truncate-truncated-segments");
        List<LogSegmentMetadata> logSegments = createNewDLM.getLogSegments();
        LOG.info("Segments before modifying segment status : {}", logSegments);
        ZooKeeperClient build = TestZooKeeperClientBuilder.newBuilder(conf).uri(createDLMURI).build();
        for (int i = 0; i < 4; i++) {
            setTruncationStatus(build, logSegments.get(i), LogSegmentMetadata.TruncationStatus.TRUNCATED);
        }
        List<LogSegmentMetadata> logSegments2 = createNewDLM.getLogSegments();
        LOG.info("Segments after changing truncation status : {}", logSegments2);
        createNewDLM.close();
        BKDistributedLogManager createNewDLM2 = createNewDLM(distributedLogConfiguration, "distrlog-partially-truncate-truncated-segments");
        AsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM2.startAsyncLogSegmentNonPartitioned();
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.truncate(hashMap.get(15L)));
        Assert.assertArrayEquals(logSegments2.toArray(new LogSegmentMetadata[4]), createNewDLM2.getLogSegments().toArray(new LogSegmentMetadata[4]));
        Utils.close(startAsyncLogSegmentNonPartitioned);
        createNewDLM2.close();
        build.close();
    }

    private Pair<DistributedLogManager, AsyncLogWriter> populateData(Map<Long, DLSN> map, DistributedLogConfiguration distributedLogConfiguration, String str, int i, int i2, boolean z) throws Exception {
        long j = 1;
        long j2 = 1;
        while (true) {
            long j3 = j2;
            if (j3 > i) {
                break;
            }
            LOG.info("Writing Log Segment {}.", Long.valueOf(j3));
            BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, str);
            AsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
            for (int i3 = 1; i3 <= i2; i3++) {
                long j4 = j;
                j = j4 + 1;
                map.put(Long.valueOf(j4), (DLSN) Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(j4))));
            }
            Utils.close(startAsyncLogSegmentNonPartitioned);
            createNewDLM.close();
            j2 = j3 + 1;
        }
        if (!z) {
            return null;
        }
        BKDistributedLogManager createNewDLM2 = createNewDLM(distributedLogConfiguration, str);
        AsyncLogWriter startAsyncLogSegmentNonPartitioned2 = createNewDLM2.startAsyncLogSegmentNonPartitioned();
        for (int i4 = 1; i4 <= 10; i4++) {
            long j5 = j;
            j = j5 + 1;
            map.put(Long.valueOf(j5), (DLSN) Utils.ioResult(startAsyncLogSegmentNonPartitioned2.write(DLMTestUtil.getLogRecordInstance(j5))));
        }
        return new ImmutablePair(createNewDLM2, startAsyncLogSegmentNonPartitioned2);
    }

    private void verifyEntries(String str, long j, long j2, int i) throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, str);
        LogReader inputStream = createNewDLM.getInputStream(j);
        long j3 = j2;
        int i2 = 0;
        LogRecordWithDLSN readNext = inputStream.readNext(false);
        while (true) {
            LogRecordWithDLSN logRecordWithDLSN = readNext;
            if (null == logRecordWithDLSN) {
                Assert.assertEquals(i, i2);
                inputStream.close();
                createNewDLM.close();
                return;
            } else {
                DLMTestUtil.verifyLogRecord(logRecordWithDLSN);
                long j4 = j3;
                j3 = j4 + 1;
                Assert.assertEquals(j4, logRecordWithDLSN.getTransactionId());
                i2++;
                readNext = inputStream.readNext(false);
            }
        }
    }
}
