package org.apache.distributedlog;

import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.distributedlog.common.util.PermitLimiter;
import org.apache.distributedlog.exceptions.BKTransmitException;
import org.apache.distributedlog.exceptions.EndOfStreamException;
import org.apache.distributedlog.exceptions.WriteCancelledException;
import org.apache.distributedlog.exceptions.WriteException;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryWriter;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.distributedlog.lock.ZKDistributedLock;
import org.apache.distributedlog.lock.ZKSessionLockFactory;
import org.apache.distributedlog.util.ConfUtils;
import org.apache.distributedlog.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

/* loaded from: input_file:org/apache/distributedlog/TestBKLogSegmentWriter.class */
public class TestBKLogSegmentWriter extends TestDistributedLogBase {

    @Rule
    public TestName runtime = new TestName();
    private OrderedScheduler scheduler;
    private OrderedScheduler lockStateExecutor;
    private ZooKeeperClient zkc;
    private ZooKeeperClient zkc0;
    private BookKeeperClient bkc;

    @Override // org.apache.distributedlog.TestDistributedLogBase
    @Before
    public void setup() throws Exception {
        super.setup();
        this.scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).build();
        this.lockStateExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(1).build();
        URI createDLMURI = createDLMURI("");
        this.zkc = TestZooKeeperClientBuilder.newBuilder(conf).name("test-zkc").uri(createDLMURI).build();
        this.zkc0 = TestZooKeeperClientBuilder.newBuilder(conf).name("test-zkc0").uri(createDLMURI).build();
        this.bkc = BookKeeperClientBuilder.newBuilder().dlConfig(conf).name("test-bkc").ledgersPath(BKDLConfig.resolveDLConfig(this.zkc, createDLMURI).getBkLedgersPath()).zkServers(BKNamespaceDriver.getZKServersFromDLUri(createDLMURI)).build();
    }

    @Override // org.apache.distributedlog.TestDistributedLogBase
    @After
    public void teardown() throws Exception {
        if (null != this.bkc) {
            this.bkc.close();
        }
        if (null != this.zkc) {
            this.zkc.close();
        }
        if (null != this.lockStateExecutor) {
            this.lockStateExecutor.shutdown();
        }
        if (null != this.scheduler) {
            this.scheduler.shutdown();
        }
        super.teardown();
    }

    private DistributedLogConfiguration newLocalConf() {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        return distributedLogConfiguration;
    }

    private ZKDistributedLock createLock(String str, ZooKeeperClient zooKeeperClient, boolean z) throws Exception {
        try {
            Utils.ioResult(Utils.zkAsyncCreateFullPathOptimistic(zooKeeperClient, str, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
        } catch (ZKException e) {
        }
        ZKDistributedLock zKDistributedLock = new ZKDistributedLock(this.lockStateExecutor, new ZKSessionLockFactory(zooKeeperClient, "test-lock", this.lockStateExecutor, 0, Long.MAX_VALUE, conf.getZKSessionTimeoutMilliseconds(), NullStatsLogger.INSTANCE), str, Long.MAX_VALUE, NullStatsLogger.INSTANCE);
        return z ? (ZKDistributedLock) Utils.ioResult(zKDistributedLock.asyncAcquire()) : zKDistributedLock;
    }

    private void closeWriterAndLock(BKLogSegmentWriter bKLogSegmentWriter, ZKDistributedLock zKDistributedLock) throws Exception {
        try {
            Utils.ioResult(bKLogSegmentWriter.asyncClose());
        } finally {
            Utils.closeQuietly(zKDistributedLock);
        }
    }

    private void abortWriterAndLock(BKLogSegmentWriter bKLogSegmentWriter, ZKDistributedLock zKDistributedLock) throws IOException {
        try {
            Utils.abort(bKLogSegmentWriter, false);
        } finally {
            Utils.closeQuietly(zKDistributedLock);
        }
    }

    private BKLogSegmentWriter createLogSegmentWriter(DistributedLogConfiguration distributedLogConfiguration, long j, long j2, ZKDistributedLock zKDistributedLock) throws Exception {
        return new BKLogSegmentWriter(this.runtime.getMethodName(), this.runtime.getMethodName(), distributedLogConfiguration, LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION, new BKLogSegmentEntryWriter(this.bkc.get().createLedger(3, 2, 2, BookKeeper.DigestType.CRC32, distributedLogConfiguration.getBKDigestPW().getBytes(StandardCharsets.UTF_8))), zKDistributedLock, j2, j, this.scheduler, NullStatsLogger.INSTANCE, NullStatsLogger.INSTANCE, new AlertStatsLogger(NullStatsLogger.INSTANCE, "test"), PermitLimiter.NULL_PERMIT_LIMITER, new SettableFeatureProvider("", 0), ConfUtils.getConstDynConf(distributedLogConfiguration));
    }

    private LedgerHandle openLedgerNoRecovery(LedgerHandle ledgerHandle) throws Exception {
        return this.bkc.get().openLedgerNoRecovery(ledgerHandle.getId(), BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(StandardCharsets.UTF_8));
    }

    private LedgerHandle openLedger(LedgerHandle ledgerHandle) throws Exception {
        return this.bkc.get().openLedger(ledgerHandle.getId(), BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(StandardCharsets.UTF_8));
    }

    private void fenceLedger(LedgerHandle ledgerHandle) throws Exception {
        this.bkc.get().openLedger(ledgerHandle.getId(), BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(StandardCharsets.UTF_8));
    }

    @Test(timeout = 60000)
    public void testCloseShouldFlush() throws Exception {
        DistributedLogConfiguration newLocalConf = newLocalConf();
        newLocalConf.setImmediateFlushEnabled(false);
        newLocalConf.setOutputBufferSize(Integer.MAX_VALUE);
        newLocalConf.setPeriodicFlushFrequencyMilliSeconds(0);
        ZKDistributedLock createLock = createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc, true);
        BKLogSegmentWriter createLogSegmentWriter = createLogSegmentWriter(newLocalConf, 0L, -1L, createLock);
        ZKDistributedLock createLock2 = createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc0, false);
        CompletableFuture asyncAcquire = createLock2.asyncAcquire();
        ArrayList arrayList = new ArrayList(10);
        for (int i = 0; i < 10; i++) {
            arrayList.add(createLogSegmentWriter.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
        }
        Assert.assertEquals("Last tx id should be " + (10 - 1), 10 - 1, createLogSegmentWriter.getLastTxId());
        Assert.assertEquals("Last acked tx id should be -1", -1L, createLogSegmentWriter.getLastTxIdAcknowledged());
        Assert.assertEquals("Last DLSN should be " + DLSN.InvalidDLSN, DLSN.InvalidDLSN, createLogSegmentWriter.getLastDLSN());
        Assert.assertEquals("Position should be 10", 10L, createLogSegmentWriter.getPositionWithinLogSegment());
        closeWriterAndLock(createLogSegmentWriter, createLock);
        Utils.ioResult(asyncAcquire);
        createLock2.checkOwnership();
        Assert.assertEquals("Last tx id should still be " + (10 - 1), 10 - 1, createLogSegmentWriter.getLastTxId());
        Assert.assertEquals("Last acked tx id should become " + (10 - 1), 10 - 1, createLogSegmentWriter.getLastTxIdAcknowledged());
        Assert.assertEquals("Position should still be 10", 10L, createLogSegmentWriter.getPositionWithinLogSegment());
        List list = (List) Utils.ioResult(FutureUtils.collect(arrayList));
        Assert.assertEquals("All records should be written", 10, list.size());
        for (int i2 = 0; i2 < 10; i2++) {
            DLSN dlsn = (DLSN) list.get(i2);
            Assert.assertEquals("Incorrent ledger sequence number", 0L, dlsn.getLogSegmentSequenceNo());
            Assert.assertEquals("Incorrent entry id", 0L, dlsn.getEntryId());
            Assert.assertEquals("Inconsistent slot id", i2, dlsn.getSlotId());
        }
        Assert.assertEquals("Last DLSN should be " + list.get(list.size() - 1), list.get(list.size() - 1), createLogSegmentWriter.getLastDLSN());
        LedgerHandle ledgerHandle = getLedgerHandle(createLogSegmentWriter);
        LedgerHandle openLedgerNoRecovery = openLedgerNoRecovery(ledgerHandle);
        Assert.assertTrue("Ledger " + ledgerHandle.getId() + " should be closed", openLedgerNoRecovery.isClosed());
        Assert.assertEquals("There should be two entries in ledger " + ledgerHandle.getId(), 1L, openLedgerNoRecovery.getLastAddConfirmed());
    }

    @Test(timeout = 60000)
    public void testAbortShouldNotFlush() throws Exception {
        DistributedLogConfiguration newLocalConf = newLocalConf();
        newLocalConf.setImmediateFlushEnabled(false);
        newLocalConf.setOutputBufferSize(Integer.MAX_VALUE);
        newLocalConf.setPeriodicFlushFrequencyMilliSeconds(0);
        ZKDistributedLock createLock = createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc, true);
        BKLogSegmentWriter createLogSegmentWriter = createLogSegmentWriter(newLocalConf, 0L, -1L, createLock);
        ZKDistributedLock createLock2 = createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc0, false);
        CompletableFuture asyncAcquire = createLock2.asyncAcquire();
        ArrayList arrayList = new ArrayList(10);
        for (int i = 0; i < 10; i++) {
            arrayList.add(createLogSegmentWriter.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
        }
        Assert.assertEquals("Last tx id should be " + (10 - 1), 10 - 1, createLogSegmentWriter.getLastTxId());
        Assert.assertEquals("Last acked tx id should be -1", -1L, createLogSegmentWriter.getLastTxIdAcknowledged());
        Assert.assertEquals("Last DLSN should be " + DLSN.InvalidDLSN, DLSN.InvalidDLSN, createLogSegmentWriter.getLastDLSN());
        Assert.assertEquals("Position should be 10", 10L, createLogSegmentWriter.getPositionWithinLogSegment());
        abortWriterAndLock(createLogSegmentWriter, createLock);
        Utils.ioResult(asyncAcquire);
        createLock2.checkOwnership();
        Assert.assertEquals("Last tx id should still be " + (10 - 1), 10 - 1, createLogSegmentWriter.getLastTxId());
        Assert.assertEquals("Last acked tx id should still be " + (10 - 1), -1L, createLogSegmentWriter.getLastTxIdAcknowledged());
        Assert.assertEquals("Last DLSN should still be " + DLSN.InvalidDLSN, DLSN.InvalidDLSN, createLogSegmentWriter.getLastDLSN());
        Assert.assertEquals("Position should still be 10", 10L, createLogSegmentWriter.getPositionWithinLogSegment());
        for (int i2 = 0; i2 < 10; i2++) {
            try {
                Utils.ioResult((CompletableFuture) arrayList.get(i2));
                Assert.fail("Should be aborted record " + i2 + " with transmit exception");
            } catch (WriteCancelledException e) {
                Assert.assertTrue("Record " + i2 + " should be aborted because of ledger fenced", e.getCause() instanceof BKTransmitException);
                Assert.assertEquals("Record " + i2 + " should be aborted", -15L, e.getCause().getBKResultCode());
            }
        }
        LedgerHandle ledgerHandle = getLedgerHandle(createLogSegmentWriter);
        LedgerHandle openLedgerNoRecovery = openLedgerNoRecovery(ledgerHandle);
        Assert.assertTrue("Ledger " + ledgerHandle.getId() + " should not be closed", openLedgerNoRecovery.isClosed());
        Assert.assertEquals("There should be no entries in ledger " + ledgerHandle.getId(), -1L, openLedgerNoRecovery.getLastAddConfirmed());
    }

    @Test(timeout = 60000)
    public void testCloseShouldNotFlushIfLedgerFenced() throws Exception {
        testCloseShouldNotFlushIfInErrorState(-101);
    }

    void testCloseShouldNotFlushIfInErrorState(int i) throws Exception {
        DistributedLogConfiguration newLocalConf = newLocalConf();
        newLocalConf.setImmediateFlushEnabled(false);
        newLocalConf.setOutputBufferSize(Integer.MAX_VALUE);
        newLocalConf.setPeriodicFlushFrequencyMilliSeconds(0);
        ZKDistributedLock createLock = createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc, true);
        BKLogSegmentWriter createLogSegmentWriter = createLogSegmentWriter(newLocalConf, 0L, -1L, createLock);
        ZKDistributedLock createLock2 = createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc0, false);
        CompletableFuture asyncAcquire = createLock2.asyncAcquire();
        ArrayList arrayList = new ArrayList(10);
        for (int i2 = 0; i2 < 10; i2++) {
            arrayList.add(createLogSegmentWriter.asyncWrite(DLMTestUtil.getLogRecordInstance(i2)));
        }
        Assert.assertEquals("Last tx id should be " + (10 - 1), 10 - 1, createLogSegmentWriter.getLastTxId());
        Assert.assertEquals("Last acked tx id should be -1", -1L, createLogSegmentWriter.getLastTxIdAcknowledged());
        Assert.assertEquals("Last DLSN should be " + DLSN.InvalidDLSN, DLSN.InvalidDLSN, createLogSegmentWriter.getLastDLSN());
        Assert.assertEquals("Position should be 10", 10L, createLogSegmentWriter.getPositionWithinLogSegment());
        createLogSegmentWriter.setTransmitResult(i);
        try {
            closeWriterAndLock(createLogSegmentWriter, createLock);
            Assert.fail("Close a log segment writer in error state should throw exception");
        } catch (BKTransmitException e) {
            Assert.assertEquals("Inconsistent rc is thrown", i, e.getBKResultCode());
        }
        Utils.ioResult(asyncAcquire);
        createLock2.checkOwnership();
        Assert.assertEquals("Last tx id should still be " + (10 - 1), 10 - 1, createLogSegmentWriter.getLastTxId());
        Assert.assertEquals("Last acked tx id should still be " + (10 - 1), -1L, createLogSegmentWriter.getLastTxIdAcknowledged());
        Assert.assertEquals("Last DLSN should still be " + DLSN.InvalidDLSN, DLSN.InvalidDLSN, createLogSegmentWriter.getLastDLSN());
        Assert.assertEquals("Position should still be 10", 10L, createLogSegmentWriter.getPositionWithinLogSegment());
        for (int i3 = 0; i3 < 10; i3++) {
            try {
                Utils.ioResult((CompletableFuture) arrayList.get(i3));
                Assert.fail("Should be aborted record " + i3 + " with transmit exception");
            } catch (WriteCancelledException e2) {
                Assert.assertTrue("Record " + i3 + " should be aborted because of ledger fenced", e2.getCause() instanceof BKTransmitException);
                Assert.assertEquals("Record " + i3 + " should be aborted", i, e2.getCause().getBKResultCode());
            }
        }
        LedgerHandle ledgerHandle = getLedgerHandle(createLogSegmentWriter);
        LedgerHandle openLedgerNoRecovery = openLedgerNoRecovery(ledgerHandle);
        Assert.assertFalse("Ledger " + ledgerHandle.getId() + " should not be closed", openLedgerNoRecovery.isClosed());
        Assert.assertEquals("There should be no entries in ledger " + ledgerHandle.getId(), -1L, openLedgerNoRecovery.getLastAddConfirmed());
    }

    @Test(timeout = 60000)
    public void testCloseShouldFailIfLedgerFenced() throws Exception {
        DistributedLogConfiguration newLocalConf = newLocalConf();
        newLocalConf.setImmediateFlushEnabled(false);
        newLocalConf.setOutputBufferSize(Integer.MAX_VALUE);
        newLocalConf.setPeriodicFlushFrequencyMilliSeconds(0);
        ZKDistributedLock createLock = createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc, true);
        BKLogSegmentWriter createLogSegmentWriter = createLogSegmentWriter(newLocalConf, 0L, -1L, createLock);
        ZKDistributedLock createLock2 = createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc0, false);
        CompletableFuture asyncAcquire = createLock2.asyncAcquire();
        ArrayList arrayList = new ArrayList(10);
        for (int i = 0; i < 10; i++) {
            arrayList.add(createLogSegmentWriter.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
        }
        Assert.assertEquals("Last tx id should be " + (10 - 1), 10 - 1, createLogSegmentWriter.getLastTxId());
        Assert.assertEquals("Last acked tx id should be -1", -1L, createLogSegmentWriter.getLastTxIdAcknowledged());
        Assert.assertEquals("Last DLSN should be " + DLSN.InvalidDLSN, DLSN.InvalidDLSN, createLogSegmentWriter.getLastDLSN());
        Assert.assertEquals("Position should be 10", 10L, createLogSegmentWriter.getPositionWithinLogSegment());
        fenceLedger(getLedgerHandle(createLogSegmentWriter));
        try {
            closeWriterAndLock(createLogSegmentWriter, createLock);
            Assert.fail("Close a log segment writer when ledger is fenced should throw exception");
        } catch (BKTransmitException e) {
            Assert.assertEquals("Inconsistent rc is thrown", -101L, e.getBKResultCode());
        }
        Utils.ioResult(asyncAcquire);
        createLock2.checkOwnership();
        Assert.assertEquals("Last tx id should still be " + (10 - 1), 10 - 1, createLogSegmentWriter.getLastTxId());
        Assert.assertEquals("Last acked tx id should still be " + (10 - 1), -1L, createLogSegmentWriter.getLastTxIdAcknowledged());
        Assert.assertEquals("Last DLSN should still be " + DLSN.InvalidDLSN, DLSN.InvalidDLSN, createLogSegmentWriter.getLastDLSN());
        Assert.assertEquals("Position should still be 10", 10L, createLogSegmentWriter.getPositionWithinLogSegment());
        for (int i2 = 0; i2 < 10; i2++) {
            try {
                Utils.ioResult((CompletableFuture) arrayList.get(i2));
                Assert.fail("Should be aborted record " + i2 + " with transmit exception");
            } catch (BKTransmitException e2) {
                Assert.assertEquals("Record " + i2 + " should be aborted", -101L, e2.getBKResultCode());
            }
        }
        LedgerHandle ledgerHandle = getLedgerHandle(createLogSegmentWriter);
        LedgerHandle openLedgerNoRecovery = openLedgerNoRecovery(ledgerHandle);
        Assert.assertTrue("Ledger " + ledgerHandle.getId() + " should be closed", openLedgerNoRecovery.isClosed());
        Assert.assertEquals("There should be no entries in ledger " + ledgerHandle.getId(), -1L, openLedgerNoRecovery.getLastAddConfirmed());
    }

    @Test(timeout = 60000)
    public void testAbortShouldFailAllWrites() throws Exception {
        DistributedLogConfiguration newLocalConf = newLocalConf();
        newLocalConf.setImmediateFlushEnabled(false);
        newLocalConf.setOutputBufferSize(Integer.MAX_VALUE);
        newLocalConf.setPeriodicFlushFrequencyMilliSeconds(0);
        ZKDistributedLock createLock = createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc, true);
        BKLogSegmentWriter createLogSegmentWriter = createLogSegmentWriter(newLocalConf, 0L, -1L, createLock);
        ZKDistributedLock createLock2 = createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc0, false);
        CompletableFuture asyncAcquire = createLock2.asyncAcquire();
        ArrayList arrayList = new ArrayList(10);
        for (int i = 0; i < 10; i++) {
            arrayList.add(createLogSegmentWriter.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
        }
        Assert.assertEquals("Last tx id should be " + (10 - 1), 10 - 1, createLogSegmentWriter.getLastTxId());
        Assert.assertEquals("Last acked tx id should be -1", -1L, createLogSegmentWriter.getLastTxIdAcknowledged());
        Assert.assertEquals("Last DLSN should be " + DLSN.InvalidDLSN, DLSN.InvalidDLSN, createLogSegmentWriter.getLastDLSN());
        Assert.assertEquals("Position should be 10", 10, createLogSegmentWriter.getPositionWithinLogSegment());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        createLogSegmentWriter.getFuturePool().submit(() -> {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOG.warn("Interrupted on deferring completion : ", e);
            }
        });
        Utils.ioResult(createLogSegmentWriter.flush());
        ArrayList arrayList2 = new ArrayList(10);
        for (int i2 = 10; i2 < 2 * 10; i2++) {
            arrayList2.add(createLogSegmentWriter.asyncWrite(DLMTestUtil.getLogRecordInstance(i2)));
        }
        Assert.assertEquals("Last tx id should become " + ((2 * 10) - 1), (2 * 10) - 1, createLogSegmentWriter.getLastTxId());
        Assert.assertEquals("Last acked tx id should become " + (10 - 1), 10 - 1, createLogSegmentWriter.getLastTxIdAcknowledged());
        Assert.assertEquals("Last DLSN should still be " + DLSN.InvalidDLSN, DLSN.InvalidDLSN, createLogSegmentWriter.getLastDLSN());
        Assert.assertEquals("Position should become " + (2 * 10), 2 * 10, createLogSegmentWriter.getPositionWithinLogSegment());
        abortWriterAndLock(createLogSegmentWriter, createLock);
        Utils.ioResult(asyncAcquire);
        createLock2.checkOwnership();
        countDownLatch.countDown();
        List list = (List) Utils.ioResult(FutureUtils.collect(arrayList));
        Assert.assertEquals("All first 10 records should be written", 10, list.size());
        for (int i3 = 0; i3 < 10; i3++) {
            DLSN dlsn = (DLSN) list.get(i3);
            Assert.assertEquals("Incorrent ledger sequence number", 0L, dlsn.getLogSegmentSequenceNo());
            Assert.assertEquals("Incorrent entry id", 0L, dlsn.getEntryId());
            Assert.assertEquals("Inconsistent slot id", i3, dlsn.getSlotId());
        }
        for (int i4 = 0; i4 < 10; i4++) {
            try {
                Utils.ioResult((CompletableFuture) arrayList2.get(i4));
                Assert.fail("Should be aborted record " + (10 + i4) + " with transmit exception");
            } catch (WriteCancelledException e) {
            }
        }
        Assert.assertEquals("Last tx id should still be " + ((2 * 10) - 1), (2 * 10) - 1, createLogSegmentWriter.getLastTxId());
        Assert.assertEquals("Last acked tx id should be still " + (10 - 1), 10 - 1, createLogSegmentWriter.getLastTxIdAcknowledged());
        Assert.assertEquals("Last DLSN should become " + arrayList.get(arrayList.size() - 1), list.get(arrayList.size() - 1), createLogSegmentWriter.getLastDLSN());
        Assert.assertEquals("Position should become " + (2 * 10), 2 * 10, createLogSegmentWriter.getPositionWithinLogSegment());
        LedgerHandle ledgerHandle = getLedgerHandle(createLogSegmentWriter);
        LedgerHandle openLedgerNoRecovery = openLedgerNoRecovery(ledgerHandle);
        Assert.assertTrue("Ledger " + ledgerHandle.getId() + " should not be closed", openLedgerNoRecovery.isClosed());
        Assert.assertEquals("Only one entry is written for ledger " + ledgerHandle.getId(), 0L, ledgerHandle.getLastAddPushed());
        Assert.assertEquals("Only one entry is written for ledger " + ledgerHandle.getId(), 0L, openLedgerNoRecovery.getLastAddConfirmed());
    }

    @Test(timeout = 60000)
    public void testUpdateLastTxIdForUserRecords() throws Exception {
        DistributedLogConfiguration newLocalConf = newLocalConf();
        newLocalConf.setImmediateFlushEnabled(false);
        newLocalConf.setOutputBufferSize(Integer.MAX_VALUE);
        newLocalConf.setPeriodicFlushFrequencyMilliSeconds(0);
        ZKDistributedLock createLock = createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc, true);
        BKLogSegmentWriter createLogSegmentWriter = createLogSegmentWriter(newLocalConf, 0L, -1L, createLock);
        ArrayList arrayList = new ArrayList(10);
        for (int i = 0; i < 10; i++) {
            arrayList.add(createLogSegmentWriter.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
        }
        LogRecord logRecordInstance = DLMTestUtil.getLogRecordInstance(9999L);
        logRecordInstance.setControl();
        arrayList.add(createLogSegmentWriter.asyncWrite(logRecordInstance));
        Assert.assertEquals("Last tx id should be " + (10 - 1), 10 - 1, createLogSegmentWriter.getLastTxId());
        Assert.assertEquals("Last DLSN should be " + DLSN.InvalidDLSN, DLSN.InvalidDLSN, createLogSegmentWriter.getLastDLSN());
        Assert.assertEquals("Position should be 10", 10, createLogSegmentWriter.getPositionWithinLogSegment());
        closeWriterAndLock(createLogSegmentWriter, createLock);
        List list = (List) Utils.ioResult(FutureUtils.collect(arrayList));
        Assert.assertEquals("All 11 records should be written", 10 + 1, list.size());
        for (int i2 = 0; i2 < 10; i2++) {
            DLSN dlsn = (DLSN) list.get(i2);
            Assert.assertEquals("Incorrent ledger sequence number", 0L, dlsn.getLogSegmentSequenceNo());
            Assert.assertEquals("Incorrent entry id", 0L, dlsn.getEntryId());
            Assert.assertEquals("Inconsistent slot id", i2, dlsn.getSlotId());
        }
        DLSN dlsn2 = (DLSN) list.get(10);
        Assert.assertEquals("Incorrent ledger sequence number", 0L, dlsn2.getLogSegmentSequenceNo());
        Assert.assertEquals("Incorrent entry id", 1L, dlsn2.getEntryId());
        Assert.assertEquals("Inconsistent slot id", 0L, dlsn2.getSlotId());
        Assert.assertEquals("Last tx id should be " + (10 - 1), 10 - 1, createLogSegmentWriter.getLastTxId());
        Assert.assertEquals("Last acked tx id should be " + (10 - 1), 10 - 1, createLogSegmentWriter.getLastTxIdAcknowledged());
        Assert.assertEquals("Position should be 10", 10, createLogSegmentWriter.getPositionWithinLogSegment());
        Assert.assertEquals("Last DLSN should be " + dlsn2, list.get(10 - 1), createLogSegmentWriter.getLastDLSN());
    }

    @Test(timeout = 60000)
    public void testNondurableWriteAfterWriterIsClosed() throws Exception {
        DistributedLogConfiguration newLocalConf = newLocalConf();
        newLocalConf.setImmediateFlushEnabled(false);
        newLocalConf.setOutputBufferSize(Integer.MAX_VALUE);
        newLocalConf.setPeriodicFlushFrequencyMilliSeconds(0);
        newLocalConf.setDurableWriteEnabled(false);
        ZKDistributedLock createLock = createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc, true);
        BKLogSegmentWriter createLogSegmentWriter = createLogSegmentWriter(newLocalConf, 0L, -1L, createLock);
        closeWriterAndLock(createLogSegmentWriter, createLock);
        Utils.ioResult(createLogSegmentWriter.asyncClose());
        try {
            Utils.ioResult(createLogSegmentWriter.asyncWrite(DLMTestUtil.getLogRecordInstance(1L)));
            Assert.fail("Should fail the write if the writer is closed");
        } catch (WriteException e) {
        }
    }

    @Test(timeout = 60000)
    public void testNondurableWriteAfterEndOfStream() throws Exception {
        DistributedLogConfiguration newLocalConf = newLocalConf();
        newLocalConf.setImmediateFlushEnabled(false);
        newLocalConf.setOutputBufferSize(Integer.MAX_VALUE);
        newLocalConf.setPeriodicFlushFrequencyMilliSeconds(0);
        newLocalConf.setDurableWriteEnabled(false);
        ZKDistributedLock createLock = createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc, true);
        BKLogSegmentWriter createLogSegmentWriter = createLogSegmentWriter(newLocalConf, 0L, -1L, createLock);
        Utils.ioResult(createLogSegmentWriter.markEndOfStream());
        try {
            Utils.ioResult(createLogSegmentWriter.asyncWrite(DLMTestUtil.getLogRecordInstance(1L)));
            Assert.fail("Should fail the write if the writer is marked as end of stream");
        } catch (EndOfStreamException e) {
        }
        closeWriterAndLock(createLogSegmentWriter, createLock);
    }

    @Test(timeout = 60000)
    public void testNondurableWriteAfterLedgerIsFenced() throws Exception {
        DistributedLogConfiguration newLocalConf = newLocalConf();
        newLocalConf.setImmediateFlushEnabled(false);
        newLocalConf.setOutputBufferSize(Integer.MAX_VALUE);
        newLocalConf.setPeriodicFlushFrequencyMilliSeconds(0);
        newLocalConf.setDurableWriteEnabled(false);
        ZKDistributedLock createLock = createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc, true);
        BKLogSegmentWriter createLogSegmentWriter = createLogSegmentWriter(newLocalConf, 0L, -1L, createLock);
        fenceLedger(getLedgerHandle(createLogSegmentWriter));
        LogRecord logRecordInstance = DLMTestUtil.getLogRecordInstance(1L);
        logRecordInstance.setControl();
        try {
            Utils.ioResult(createLogSegmentWriter.asyncWrite(logRecordInstance));
            Assert.fail("Should fail the writer if the log segment is already fenced");
        } catch (BKTransmitException e) {
            Assert.assertEquals(-101L, e.getBKResultCode());
        }
        try {
            Utils.ioResult(createLogSegmentWriter.asyncWrite(DLMTestUtil.getLogRecordInstance(2L)));
            Assert.fail("Should fail the writer if the log segment is already fenced");
        } catch (WriteException e2) {
        }
        abortWriterAndLock(createLogSegmentWriter, createLock);
    }

    @Test(timeout = 60000)
    public void testNondurableWrite() throws Exception {
        DistributedLogConfiguration newLocalConf = newLocalConf();
        newLocalConf.setImmediateFlushEnabled(false);
        newLocalConf.setOutputBufferSize(Integer.MAX_VALUE);
        newLocalConf.setPeriodicFlushFrequencyMilliSeconds(0);
        newLocalConf.setDurableWriteEnabled(false);
        ZKDistributedLock createLock = createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc, true);
        BKLogSegmentWriter createLogSegmentWriter = createLogSegmentWriter(newLocalConf, 0L, -1L, createLock);
        Assert.assertEquals(DLSN.InvalidDLSN, Utils.ioResult(createLogSegmentWriter.asyncWrite(DLMTestUtil.getLogRecordInstance(2L))));
        Assert.assertEquals(-1L, createLogSegmentWriter.getEntryWriter().getLedgerHandle().getLastAddPushed());
        closeWriterAndLock(createLogSegmentWriter, createLock);
    }
}
