package org.apache.distributedlog;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.distributedlog.exceptions.LogRecordTooLongException;
import org.apache.distributedlog.exceptions.WriteCancelledException;
import org.apache.distributedlog.exceptions.WriteException;
import org.apache.distributedlog.util.FailpointUtils;
import org.apache.distributedlog.util.Utils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/TestAsyncBulkWrite.class */
public class TestAsyncBulkWrite extends TestDistributedLogBase {
    static final Logger LOG = LoggerFactory.getLogger(TestAsyncBulkWrite.class);

    @Rule
    public TestName runtime = new TestName();
    protected final DistributedLogConfiguration testConf = new DistributedLogConfiguration();

    public TestAsyncBulkWrite() {
        this.testConf.addConfiguration(conf);
        this.testConf.setReaderIdleErrorThresholdMillis(1200000);
    }

    @Test(timeout = 60000)
    public void testAsyncBulkWritePartialFailureBufferFailure() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(1024);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-testAsyncBulkWritePartialFailure");
        BKAsyncLogWriter bKAsyncLogWriter = (BKAsyncLogWriter) createNewDLM.startAsyncLogSegmentNonPartitioned();
        List<LogRecord> largeLogRecordInstanceList = DLMTestUtil.getLargeLogRecordInstanceList(1L, 10);
        largeLogRecordInstanceList.add(DLMTestUtil.getLogRecordInstance(10L, 1040385));
        largeLogRecordInstanceList.addAll(DLMTestUtil.getLargeLogRecordInstanceList(1L, 10));
        List list = (List) DLMTestUtil.validateFutureSucceededAndGetResult(bKAsyncLogWriter.writeBulk(largeLogRecordInstanceList));
        Assert.assertEquals(21L, list.size());
        for (int i = 0; i < 10; i++) {
        }
        DLMTestUtil.validateFutureFailed((CompletableFuture) list.get(10), LogRecordTooLongException.class);
        for (int i2 = 11; i2 < 21; i2++) {
            DLMTestUtil.validateFutureFailed((CompletableFuture) list.get(i2), WriteCancelledException.class);
        }
        bKAsyncLogWriter.closeAndComplete();
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testAsyncBulkWriteTotalFailureTransmitFailure() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(1024);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-testAsyncBulkWriteTotalFailureDueToTransmitFailure");
        BKAsyncLogWriter bKAsyncLogWriter = (BKAsyncLogWriter) createNewDLM.startAsyncLogSegmentNonPartitioned();
        FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_TransmitComplete, FailpointUtils.FailPointActions.FailPointAction_Default);
        try {
            checkAllSubmittedButFailed(bKAsyncLogWriter, 100, 1024, 1L);
            FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_TransmitComplete);
            bKAsyncLogWriter.abort();
            createNewDLM.close();
        } catch (Throwable th) {
            FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_TransmitComplete);
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testAsyncBulkWriteNoLedgerRollWithPartialFailures() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(1024);
        distributedLogConfiguration.setMaxLogSegmentBytes(1024L);
        distributedLogConfiguration.setLogSegmentRollingIntervalMinutes(0);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-testAsyncBulkWriteNoLedgerRollWithPartialFailures");
        BKAsyncLogWriter bKAsyncLogWriter = (BKAsyncLogWriter) createNewDLM.startAsyncLogSegmentNonPartitioned();
        int i = 1 + 1;
        Assert.assertEquals(1L, ((DLSN) DLMTestUtil.validateFutureSucceededAndGetResult(bKAsyncLogWriter.write(DLMTestUtil.getLogRecordInstance(1, 2048)))).getLogSegmentSequenceNo());
        ArrayList arrayList = new ArrayList(2);
        int i2 = i + 1;
        arrayList.add(DLMTestUtil.getLogRecordInstance(i, 2048));
        int i3 = i2 + 1;
        arrayList.add(DLMTestUtil.getLogRecordInstance(i2, 1040385));
        Assert.assertEquals(1L, ((DLSN) DLMTestUtil.validateFutureSucceededAndGetResult((CompletableFuture) ((List) DLMTestUtil.validateFutureSucceededAndGetResult(bKAsyncLogWriter.writeBulk(arrayList))).get(0))).getLogSegmentSequenceNo());
        ArrayList arrayList2 = new ArrayList(1);
        int i4 = i3 + 1;
        arrayList2.add(DLMTestUtil.getLogRecordInstance(i3, 2048));
        DLMTestUtil.validateFutureFailed(bKAsyncLogWriter.writeBulk(arrayList2), WriteException.class);
        bKAsyncLogWriter.closeAndComplete();
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testSimpleAsyncBulkWriteSpanningEntryAndLedger() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(1024);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-testSimpleAsyncBulkWriteSpanningEntryAndLedger");
        BKAsyncLogWriter bKAsyncLogWriter = (BKAsyncLogWriter) createNewDLM.startAsyncLogSegmentNonPartitioned();
        checkAllSucceeded(bKAsyncLogWriter, 100, 1024, 1L, 0L, 0L, 1L);
        long j = 1 + 100;
        checkAllSucceeded(bKAsyncLogWriter, 100, 1024, 1L, 0 + 1, 0L, j);
        long j2 = j + 100;
        bKAsyncLogWriter.closeAndComplete();
        BKAsyncLogWriter bKAsyncLogWriter2 = (BKAsyncLogWriter) createNewDLM.startAsyncLogSegmentNonPartitioned();
        checkAllSucceeded(bKAsyncLogWriter2, 100, 1024, 1 + 1, 0L, 0L, j2);
        bKAsyncLogWriter2.closeAndComplete();
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testAsyncBulkWriteSpanningPackets() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(1024);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-testAsyncBulkWriteSpanningPackets");
        BKAsyncLogWriter bKAsyncLogWriter = (BKAsyncLogWriter) createNewDLM.startAsyncLogSegmentNonPartitioned();
        DLSN checkAllSucceeded = checkAllSucceeded(bKAsyncLogWriter, ((4 * LogRecord.MAX_LOGRECORDSET_SIZE) + 1) / 10240, 10240, 1L, 0L, 0L, 1L);
        Assert.assertEquals(4L, checkAllSucceeded.getEntryId());
        Assert.assertEquals(1L, checkAllSucceeded.getLogSegmentSequenceNo());
        bKAsyncLogWriter.closeAndComplete();
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testAsyncBulkWriteSpanningPacketsWithTransmitFailure() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(1024);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-testAsyncBulkWriteSpanningPacketsWithTransmitFailure");
        BKAsyncLogWriter bKAsyncLogWriter = (BKAsyncLogWriter) createNewDLM.startAsyncLogSegmentNonPartitioned();
        int i = ((4 * LogRecord.MAX_LOGRECORDSET_SIZE) + 1) / 10240;
        DLSN checkAllSucceeded = checkAllSucceeded(bKAsyncLogWriter, i, 10240, 1L, 0L, 0L, 1L);
        Assert.assertEquals(4L, checkAllSucceeded.getEntryId());
        Assert.assertEquals(1L, checkAllSucceeded.getLogSegmentSequenceNo());
        FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_TransmitComplete, FailpointUtils.FailPointActions.FailPointAction_Default);
        try {
            checkAllSubmittedButFailed(bKAsyncLogWriter, i, 10240, 1L);
            FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_TransmitComplete);
            bKAsyncLogWriter.abort();
            createNewDLM.close();
        } catch (Throwable th) {
            FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_TransmitComplete);
            throw th;
        }
    }

    private DLSN checkAllSucceeded(BKAsyncLogWriter bKAsyncLogWriter, int i, int i2, long j, long j2, long j3, long j4) throws Exception {
        CompletableFuture<List<CompletableFuture<DLSN>>> writeBulk = bKAsyncLogWriter.writeBulk(DLMTestUtil.getLogRecordInstanceList(j4, i, i2));
        Assert.assertNotNull(writeBulk);
        List list = (List) Utils.ioResult(writeBulk, 10L, TimeUnit.SECONDS);
        Assert.assertNotNull(list);
        Assert.assertEquals(list.size(), r0.size());
        long j5 = 0;
        DLSN dlsn = null;
        Iterator it = list.iterator();
        while (it.hasNext()) {
            DLSN dlsn2 = (DLSN) Utils.ioResult((CompletableFuture) it.next(), 10L, TimeUnit.SECONDS);
            dlsn = dlsn2;
            if (dlsn2.getEntryId() > j5) {
                j3 = 0;
            }
            Assert.assertEquals(j, dlsn2.getLogSegmentSequenceNo());
            Assert.assertEquals(j3, dlsn2.getSlotId());
            j3++;
            j5 = dlsn2.getEntryId();
        }
        return dlsn;
    }

    private void checkAllSubmittedButFailed(BKAsyncLogWriter bKAsyncLogWriter, int i, int i2, long j) throws Exception {
        CompletableFuture<List<CompletableFuture<DLSN>>> writeBulk = bKAsyncLogWriter.writeBulk(DLMTestUtil.getLogRecordInstanceList(j, i, i2));
        Assert.assertNotNull(writeBulk);
        List list = (List) Utils.ioResult(writeBulk, 10L, TimeUnit.SECONDS);
        Assert.assertNotNull(list);
        Assert.assertEquals(list.size(), r0.size());
        Iterator it = list.iterator();
        while (it.hasNext()) {
            DLMTestUtil.validateFutureFailed((CompletableFuture) it.next(), IOException.class);
        }
    }
}
