package org.apache.distributedlog;

import dlshade.com.google.common.annotations.VisibleForTesting;
import dlshade.com.google.common.base.Stopwatch;
import dlshade.com.google.common.util.concurrent.FutureCallback;
import dlshade.com.google.common.util.concurrent.Futures;
import dlshade.com.google.common.util.concurrent.MoreExecutors;
import dlshade.com.sun.jna.Callback;
import dlshade.org.apache.bookkeeper.client.AsyncCallback;
import dlshade.org.apache.bookkeeper.client.BKException;
import dlshade.org.apache.bookkeeper.client.LedgerHandle;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import dlshade.org.apache.bookkeeper.common.util.OrderedScheduler;
import dlshade.org.apache.bookkeeper.feature.FeatureProvider;
import dlshade.org.apache.bookkeeper.stats.AlertStatsLogger;
import dlshade.org.apache.bookkeeper.stats.Counter;
import dlshade.org.apache.bookkeeper.stats.Gauge;
import dlshade.org.apache.bookkeeper.stats.OpStatsLogger;
import dlshade.org.apache.bookkeeper.stats.StatsLogger;
import dlshade.org.apache.bookkeeper.util.MathUtils;
import dlshade.org.apache.commons.lang3.mutable.MutableObject;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.distributedlog.Entry;
import org.apache.distributedlog.common.stats.OpStatsListener;
import org.apache.distributedlog.common.util.PermitLimiter;
import org.apache.distributedlog.common.util.Sizable;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.distributedlog.exceptions.BKTransmitException;
import org.apache.distributedlog.exceptions.EndOfStreamException;
import org.apache.distributedlog.exceptions.FlushException;
import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.exceptions.LogRecordTooLongException;
import org.apache.distributedlog.exceptions.TransactionIdOutOfOrderException;
import org.apache.distributedlog.exceptions.WriteCancelledException;
import org.apache.distributedlog.exceptions.WriteException;
import org.apache.distributedlog.feature.CoreFeatureKeys;
import org.apache.distributedlog.injector.FailureInjector;
import org.apache.distributedlog.injector.RandomDelayFailureInjector;
import org.apache.distributedlog.io.CompressionCodec;
import org.apache.distributedlog.io.CompressionUtils;
import org.apache.distributedlog.lock.DistributedLock;
import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
import org.apache.distributedlog.logsegment.LogSegmentWriter;
import org.apache.distributedlog.util.FailpointUtils;
import org.apache.distributedlog.util.SimplePermitLimiter;
import org.apache.distributedlog.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/distributedlog/BKLogSegmentWriter.class */
public class BKLogSegmentWriter implements LogSegmentWriter, AsyncCallback.AddCallback, Runnable, Sizable {
    static final Logger LOG;
    private final String fullyQualifiedLogSegment;
    private final String streamName;
    private final int logSegmentMetadataVersion;
    private BKTransmitPacket packetPrevious;
    private Entry.Writer recordSetWriter;
    private static final AtomicIntegerFieldUpdater<BKLogSegmentWriter> outstandingTransmitsUpdater;
    private final int transmissionThreshold;
    protected final LogSegmentEntryWriter entryWriter;
    private final CompressionCodec.Type compressionType;
    private static final AtomicIntegerFieldUpdater<BKLogSegmentWriter> transmitResultUpdater;
    private final DistributedLock lock;
    private final boolean isDurableWriteEnabled;
    private final long startTxId;
    private long lastTxId;
    private long lastTxIdAcknowledged;
    private final int periodicKeepAliveMs;
    private boolean immediateFlushEnabled;
    private int minDelayBetweenImmediateFlushMs;
    private Stopwatch lastTransmit;
    private final ScheduledFuture<?> periodicFlushSchedule;
    private final ScheduledFuture<?> periodicKeepAliveSchedule;
    private static final AtomicReferenceFieldUpdater<BKLogSegmentWriter, ScheduledFuture> transmitSchedFutureRefUpdater;
    private static final AtomicReferenceFieldUpdater<BKLogSegmentWriter, ScheduledFuture> immFlushSchedFutureRefUpdater;
    private static final AtomicReferenceFieldUpdater<BKLogSegmentWriter, Exception> scheduledFlushExceptionUpdater;
    private final boolean enableRecordCounts;
    private final long logSegmentSequenceNumber;
    private final DistributedLogConfiguration conf;
    private final OrderedScheduler scheduler;
    private final StatsLogger transmitOutstandingLogger;
    private final Counter transmitDataSuccesses;
    private final Counter transmitDataMisses;
    private final Gauge<Number> transmitOutstandingGauge;
    private final OpStatsLogger transmitDataPacketSize;
    private final Counter transmitControlSuccesses;
    private final Counter pFlushSuccesses;
    private final Counter pFlushMisses;
    private final OpStatsLogger writeTime;
    private final OpStatsLogger addCompleteTime;
    private final OpStatsLogger addCompleteQueuedTime;
    private final OpStatsLogger addCompleteDeferredTime;
    private final Counter pendingWrites;
    private final AlertStatsLogger alertStatsLogger;
    private final WriteLimiter writeLimiter;
    private final FailureInjector writeDelayInjector;
    static final /* synthetic */ boolean $assertionsDisabled;
    final Entry.Writer REJECT_WRITES_WRITER = new Entry.Writer() { // from class: org.apache.distributedlog.BKLogSegmentWriter.1
        @Override // org.apache.distributedlog.Entry.Writer
        public void writeRecord(LogRecord logRecord, CompletableFuture<DLSN> completableFuture) throws LogRecordTooLongException, WriteException {
            throw new WriteException(BKLogSegmentWriter.this.getFullyQualifiedLogSegment(), "Write record is cancelled.");
        }

        @Override // org.apache.distributedlog.EntryBuffer
        public boolean hasUserRecords() {
            return false;
        }

        @Override // org.apache.distributedlog.EntryBuffer
        public int getNumRecords() {
            return 0;
        }

        @Override // org.apache.distributedlog.EntryBuffer
        public int getNumBytes() {
            return 0;
        }

        @Override // org.apache.distributedlog.EntryBuffer
        public long getMaxTxId() {
            return -999L;
        }

        @Override // org.apache.distributedlog.EntryBuffer
        public ByteBuf getBuffer() throws InvalidEnvelopedEntryException, IOException {
            throw new IOException("GetBuffer is not supported.");
        }

        @Override // org.apache.distributedlog.TransmitListener
        public DLSN finalizeTransmit(long j, long j2) {
            return new DLSN(j, j2, -1L);
        }

        @Override // org.apache.distributedlog.TransmitListener
        public void completeTransmit(long j, long j2) {
        }

        @Override // org.apache.distributedlog.TransmitListener
        public void abortTransmit(Throwable th) {
        }
    };
    private volatile int outstandingTransmits = 0;
    private final ReentrantLock transmitLock = new ReentrantLock();
    private volatile int transmitResult = 0;
    private DLSN lastDLSN = DLSN.InvalidDLSN;
    private long outstandingBytes = 0;
    private long numFlushesSinceRestart = 0;
    private long numBytes = 0;
    private long lastEntryId = Long.MIN_VALUE;
    private long lastTransmitNanos = Long.MIN_VALUE;
    private volatile boolean controlFlushNeeded = false;
    private boolean streamEnded = false;
    private volatile ScheduledFuture transmitSchedFutureRef = null;
    private volatile ScheduledFuture immFlushSchedFutureRef = null;
    private volatile Exception scheduledFlushException = null;
    private boolean enforceLock = true;
    private CompletableFuture<Void> closeFuture = null;
    private int positionWithinLogSegment = 0;
    private final Function<Integer, CompletableFuture<Long>> GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC = num -> {
        return 0 == num.intValue() ? FutureUtils.value(Long.valueOf(getLastTxIdAcknowledged())) : FutureUtils.exception(new BKTransmitException("Failed to transmit entry", num.intValue()));
    };
    final Function<Long, CompletableFuture<Long>> COMMIT_AFTER_FLUSH_FUNC = l -> {
        return commit();
    };

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v113, types: [org.apache.distributedlog.common.util.PermitLimiter] */
    public BKLogSegmentWriter(String str, String str2, DistributedLogConfiguration distributedLogConfiguration, int i, LogSegmentEntryWriter logSegmentEntryWriter, DistributedLock distributedLock, long j, long j2, OrderedScheduler orderedScheduler, StatsLogger statsLogger, StatsLogger statsLogger2, AlertStatsLogger alertStatsLogger, PermitLimiter permitLimiter, FeatureProvider featureProvider, DynamicDistributedLogConfiguration dynamicDistributedLogConfiguration) throws IOException {
        this.lastTxId = -999L;
        this.lastTxIdAcknowledged = -999L;
        this.immediateFlushEnabled = false;
        this.minDelayBetweenImmediateFlushMs = 0;
        this.writeLimiter = new WriteLimiter(str, distributedLogConfiguration.getPerWriterOutstandingWriteLimit() < 0 ? PermitLimiter.NULL_PERMIT_LIMITER : new SimplePermitLimiter(distributedLogConfiguration.getOutstandingWriteLimitDarkmode(), distributedLogConfiguration.getPerWriterOutstandingWriteLimit(), statsLogger.scope("streamWriteLimiter"), false, featureProvider.getFeature(CoreFeatureKeys.DISABLE_WRITE_LIMIT.name().toLowerCase())), permitLimiter);
        this.alertStatsLogger = alertStatsLogger;
        StatsLogger scope = statsLogger.scope("flush").scope("periodic");
        this.pFlushSuccesses = scope.getCounter("success");
        this.pFlushMisses = scope.getCounter("miss");
        StatsLogger scope2 = statsLogger.scope("data");
        this.transmitDataSuccesses = scope2.getCounter("success");
        this.transmitDataMisses = scope2.getCounter("miss");
        this.transmitDataPacketSize = statsLogger.scope("transmit").getOpStatsLogger("packetsize");
        this.transmitControlSuccesses = statsLogger.scope("control").getCounter("success");
        StatsLogger scope3 = statsLogger.scope("seg_writer");
        this.writeTime = scope3.getOpStatsLogger("write");
        this.addCompleteTime = scope3.scope("add_complete").getOpStatsLogger(Callback.METHOD_NAME);
        this.addCompleteQueuedTime = scope3.scope("add_complete").getOpStatsLogger("queued");
        this.addCompleteDeferredTime = scope3.scope("add_complete").getOpStatsLogger("deferred");
        this.pendingWrites = scope3.getCounter("pending");
        this.transmitOutstandingLogger = statsLogger2.scope("transmit").scope("outstanding");
        this.transmitOutstandingGauge = new Gauge<Number>() { // from class: org.apache.distributedlog.BKLogSegmentWriter.2
            @Override // dlshade.org.apache.bookkeeper.stats.Gauge
            public Number getDefaultValue() {
                return 0;
            }

            @Override // dlshade.org.apache.bookkeeper.stats.Gauge
            public Number getSample() {
                return Integer.valueOf(BKLogSegmentWriter.outstandingTransmitsUpdater.get(BKLogSegmentWriter.this));
            }
        };
        this.transmitOutstandingLogger.registerGauge("requests", this.transmitOutstandingGauge);
        this.fullyQualifiedLogSegment = str + ":" + str2;
        this.streamName = str;
        this.logSegmentMetadataVersion = i;
        this.entryWriter = logSegmentEntryWriter;
        this.lock = distributedLock;
        this.lock.checkOwnershipAndReacquire();
        int outputBufferSize = dynamicDistributedLogConfiguration.getOutputBufferSize();
        if (outputBufferSize > 1044480) {
            LOG.warn("Setting output buffer size {} greater than max transmission size {} for log segment {}", new Object[]{Integer.valueOf(outputBufferSize), Integer.valueOf(LogRecord.MAX_LOGRECORDSET_SIZE), this.fullyQualifiedLogSegment});
            this.transmissionThreshold = LogRecord.MAX_LOGRECORDSET_SIZE;
        } else {
            this.transmissionThreshold = outputBufferSize;
        }
        this.compressionType = CompressionUtils.stringToType(distributedLogConfiguration.getCompressionType());
        this.logSegmentSequenceNumber = j2;
        this.recordSetWriter = Entry.newEntry(str, Math.max(this.transmissionThreshold, 1024), envelopeBeforeTransmit(), this.compressionType);
        this.packetPrevious = null;
        this.startTxId = j;
        this.lastTxId = j;
        this.lastTxIdAcknowledged = j;
        this.enableRecordCounts = distributedLogConfiguration.getEnableRecordCounts();
        this.immediateFlushEnabled = distributedLogConfiguration.getImmediateFlushEnabled();
        this.isDurableWriteEnabled = dynamicDistributedLogConfiguration.isDurableWriteEnabled();
        this.scheduler = orderedScheduler;
        if (distributedLogConfiguration.getEIInjectWriteDelay()) {
            this.writeDelayInjector = new RandomDelayFailureInjector(dynamicDistributedLogConfiguration);
        } else {
            this.writeDelayInjector = FailureInjector.NULL;
        }
        int periodicFlushFrequencyMilliSeconds = dynamicDistributedLogConfiguration.getPeriodicFlushFrequencyMilliSeconds();
        if (this.immediateFlushEnabled && 0 == this.transmissionThreshold) {
            this.minDelayBetweenImmediateFlushMs = distributedLogConfiguration.getMinDelayBetweenImmediateFlushMs();
            this.periodicFlushSchedule = null;
        } else if (periodicFlushFrequencyMilliSeconds <= 0 || orderedScheduler == null) {
            this.periodicFlushSchedule = null;
        } else {
            this.periodicFlushSchedule = orderedScheduler.scheduleAtFixedRate(this, periodicFlushFrequencyMilliSeconds / 2, periodicFlushFrequencyMilliSeconds / 2, TimeUnit.MILLISECONDS);
        }
        this.periodicKeepAliveMs = distributedLogConfiguration.getPeriodicKeepAliveMilliSeconds();
        if (this.periodicKeepAliveMs <= 0 || orderedScheduler == null) {
            this.periodicKeepAliveSchedule = null;
        } else {
            this.periodicKeepAliveSchedule = orderedScheduler.scheduleAtFixedRate(new Runnable() { // from class: org.apache.distributedlog.BKLogSegmentWriter.3
                @Override // java.lang.Runnable
                public void run() {
                    BKLogSegmentWriter.this.keepAlive();
                }
            }, this.periodicKeepAliveMs, this.periodicKeepAliveMs, TimeUnit.MILLISECONDS);
        }
        this.conf = distributedLogConfiguration;
        if (!$assertionsDisabled && this.immediateFlushEnabled && null == this.scheduler) {
            throw new AssertionError();
        }
        this.lastTransmit = Stopwatch.createStarted();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getFullyQualifiedLogSegment() {
        return this.fullyQualifiedLogSegment;
    }

    @VisibleForTesting
    DistributedLock getLock() {
        return this.lock;
    }

    @VisibleForTesting
    ScheduledExecutorService getFuturePool() {
        return this.scheduler.chooseThread((Object) this.streamName);
    }

    @VisibleForTesting
    void setTransmitResult(int i) {
        transmitResultUpdater.set(this, i);
    }

    @VisibleForTesting
    protected final LogSegmentEntryWriter getEntryWriter() {
        return this.entryWriter;
    }

    @Override // org.apache.distributedlog.logsegment.LogSegmentWriter
    public long getLogSegmentId() {
        return this.entryWriter.getLogSegmentId();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final long getLogSegmentSequenceNumber() {
        return this.logSegmentSequenceNumber;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final long getStartTxId() {
        return this.startTxId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized long getLastTxId() {
        return this.lastTxId;
    }

    synchronized long getLastTxIdAcknowledged() {
        return this.lastTxIdAcknowledged;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized int getPositionWithinLogSegment() {
        return this.positionWithinLogSegment;
    }

    @VisibleForTesting
    long getLastEntryId() {
        return this.lastEntryId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized DLSN getLastDLSN() {
        return this.lastDLSN;
    }

    @Override // org.apache.distributedlog.common.util.Sizable
    public long size() {
        return this.entryWriter.size();
    }

    private synchronized int getAverageTransmitSize() {
        if (this.numFlushesSinceRestart <= 0) {
            return 0;
        }
        long j = this.numBytes / this.numFlushesSinceRestart;
        if (j < -2147483648L || j > 2147483647L) {
            throw new IllegalArgumentException(j + " transmit size should never exceed max transmit size");
        }
        return (int) j;
    }

    private Entry.Writer newRecordSetWriter() {
        return Entry.newEntry(this.streamName, Math.max(this.transmissionThreshold, getAverageTransmitSize()), envelopeBeforeTransmit(), this.compressionType);
    }

    private boolean envelopeBeforeTransmit() {
        return LogSegmentMetadata.supportsEnvelopedEntries(this.logSegmentMetadataVersion);
    }

    @Override // org.apache.distributedlog.io.AsyncCloseable
    public CompletableFuture<Void> asyncClose() {
        return closeInternal(false);
    }

    @Override // org.apache.distributedlog.io.AsyncAbortable
    public CompletableFuture<Void> asyncAbort() {
        return closeInternal(true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void abortPacket(BKTransmitPacket bKTransmitPacket) {
        long j = 0;
        if (null != bKTransmitPacket) {
            EntryBuffer recordSet = bKTransmitPacket.getRecordSet();
            j = recordSet.getNumRecords();
            int i = transmitResultUpdater.get(this);
            if (0 == i) {
                i = -15;
            }
            recordSet.abortTransmit(new WriteCancelledException(this.streamName, Utils.transmitException(i)));
        }
        LOG.info("Stream {} aborted {} writes", this.fullyQualifiedLogSegment, Long.valueOf(j));
    }

    private synchronized long getWritesPendingTransmit() {
        if (null != this.recordSetWriter) {
            return this.recordSetWriter.getNumRecords();
        }
        return 0L;
    }

    private CompletableFuture<Void> closeInternal(boolean z) {
        synchronized (this) {
            if (null != this.closeFuture) {
                return this.closeFuture;
            }
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            this.closeFuture = completableFuture;
            closeInternal(z, new MutableObject<>(null), completableFuture);
            return completableFuture;
        }
    }

    private void closeInternal(final boolean z, final MutableObject<Throwable> mutableObject, final CompletableFuture<Void> completableFuture) {
        this.transmitOutstandingLogger.unregisterGauge("requests", this.transmitOutstandingGauge);
        this.writeLimiter.close();
        if (null != this.periodicKeepAliveSchedule && !this.periodicKeepAliveSchedule.cancel(false)) {
            LOG.info("Periodic keepalive for log segment {} isn't cancelled.", getFullyQualifiedLogSegment());
        }
        if (null != this.periodicFlushSchedule && !this.periodicFlushSchedule.cancel(false)) {
            LOG.info("Periodic flush for log segment {} isn't cancelled.", getFullyQualifiedLogSegment());
        }
        if (z || isLogSegmentInError()) {
            abortTransmitPacketOnClose(z, mutableObject, completableFuture);
            return;
        }
        this.enforceLock = false;
        LOG.info("Flushing before closing log segment {}", getFullyQualifiedLogSegment());
        flushAndCommit().whenComplete((BiConsumer<? super Long, ? super Throwable>) new FutureEventListener<Long>() { // from class: org.apache.distributedlog.BKLogSegmentWriter.4
            @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
            public void onSuccess(Long l) {
                BKLogSegmentWriter.this.abortTransmitPacketOnClose(z, mutableObject, completableFuture);
            }

            @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
            public void onFailure(Throwable th) {
                mutableObject.setValue(th);
                BKLogSegmentWriter.this.abortTransmitPacketOnClose(z, mutableObject, completableFuture);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void abortTransmitPacketOnClose(boolean z, MutableObject<Throwable> mutableObject, CompletableFuture<Void> completableFuture) {
        BKTransmitPacket bKTransmitPacket;
        final BKTransmitPacket bKTransmitPacket2;
        LOG.info("Closing BKPerStreamLogWriter (abort={}) for {} : lastDLSN = {} outstandingTransmits = {} writesPendingTransmit = {}", new Object[]{Boolean.valueOf(z), this.fullyQualifiedLogSegment, getLastDLSN(), Integer.valueOf(outstandingTransmitsUpdater.get(this)), Long.valueOf(getWritesPendingTransmit())});
        synchronized (this) {
            bKTransmitPacket = this.packetPrevious;
            bKTransmitPacket2 = new BKTransmitPacket(this.recordSetWriter);
            this.recordSetWriter = this.REJECT_WRITES_WRITER;
        }
        if (null != bKTransmitPacket) {
            bKTransmitPacket.addTransmitCompleteListener(new FutureEventListener<Integer>() { // from class: org.apache.distributedlog.BKLogSegmentWriter.5
                @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
                public void onSuccess(Integer num) {
                    BKLogSegmentWriter.this.abortPacket(bKTransmitPacket2);
                }

                @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
                public void onFailure(Throwable th) {
                    BKLogSegmentWriter.LOG.error("Unexpected error on transmit completion ", th);
                }
            });
        } else {
            abortPacket(bKTransmitPacket2);
        }
        closeLedgerOnClose(z, mutableObject, completableFuture);
    }

    private void closeLedgerOnClose(final boolean z, final MutableObject<Throwable> mutableObject, final CompletableFuture<Void> completableFuture) {
        if (null != mutableObject.getValue2() || isLogSegmentInError()) {
            completeClosePromise(z, mutableObject, completableFuture);
        } else {
            this.entryWriter.asyncClose(new AsyncCallback.CloseCallback() { // from class: org.apache.distributedlog.BKLogSegmentWriter.6
                @Override // dlshade.org.apache.bookkeeper.client.AsyncCallback.CloseCallback
                public void closeComplete(int i, LedgerHandle ledgerHandle, Object obj) {
                    if (0 != i && -11 != i && !z) {
                        mutableObject.setValue(new IOException("Failed to close ledger for " + BKLogSegmentWriter.this.fullyQualifiedLogSegment + " : " + BKException.getMessage(i)));
                    }
                    BKLogSegmentWriter.this.completeClosePromise(z, mutableObject, completableFuture);
                }
            }, null);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void completeClosePromise(boolean z, MutableObject<Throwable> mutableObject, CompletableFuture<Void> completableFuture) {
        if (!z && null == mutableObject.getValue2() && shouldFailCompleteLogSegment()) {
            mutableObject.setValue(new BKTransmitException("Closing an errored stream : ", transmitResultUpdater.get(this)));
        }
        if (null == mutableObject.getValue2()) {
            FutureUtils.complete(completableFuture, null);
        } else {
            FutureUtils.completeExceptionally(completableFuture, mutableObject.getValue2());
        }
    }

    @Override // org.apache.distributedlog.logsegment.LogSegmentWriter
    public synchronized void write(LogRecord logRecord) throws IOException {
        writeUserRecord(logRecord);
        flushIfNeeded();
    }

    @Override // org.apache.distributedlog.logsegment.LogSegmentWriter
    public synchronized CompletableFuture<DLSN> asyncWrite(LogRecord logRecord) {
        return asyncWrite(logRecord, true);
    }

    public synchronized CompletableFuture<DLSN> asyncWrite(LogRecord logRecord, boolean z) {
        CompletableFuture<DLSN> exception;
        try {
            if (logRecord.isControl()) {
                try {
                    transmit();
                    exception = writeControlLogRecord(logRecord);
                    transmit();
                } catch (IOException e) {
                    return FutureUtils.exception(new WriteCancelledException(this.fullyQualifiedLogSegment, e));
                }
            } else {
                exception = writeUserRecord(logRecord);
                if (!this.isDurableWriteEnabled) {
                    exception = FutureUtils.value(DLSN.InvalidDLSN);
                }
                if (z) {
                    flushIfNeeded();
                }
            }
        } catch (IOException e2) {
            if (0 != 0) {
                LOG.error("Overriding first result with flush failure {}", (Object) null);
            }
            exception = FutureUtils.exception(e2);
            flushIfNeededNoThrow();
        }
        return exception;
    }

    private synchronized CompletableFuture<DLSN> writeUserRecord(LogRecord logRecord) throws IOException {
        if (null != this.closeFuture) {
            throw new WriteException(this.fullyQualifiedLogSegment, BKException.getMessage(-11));
        }
        if (0 != transmitResultUpdater.get(this)) {
            throw new WriteException(this.fullyQualifiedLogSegment, BKException.getMessage(transmitResultUpdater.get(this)));
        }
        if (this.streamEnded) {
            throw new EndOfStreamException("Writing to a stream after it has been marked as completed");
        }
        if (logRecord.getTransactionId() < 0 || logRecord.getTransactionId() == DistributedLogConstants.MAX_TXID) {
            throw new TransactionIdOutOfOrderException(logRecord.getTransactionId());
        }
        this.writeDelayInjector.inject();
        this.writeLimiter.acquire();
        this.pendingWrites.inc();
        try {
            this.positionWithinLogSegment++;
            int i = 1;
            if (logRecord.isRecordSet()) {
                i = LogRecordSet.numRecords(logRecord);
            }
            CompletableFuture<DLSN> writeInternal = writeInternal(logRecord);
            this.positionWithinLogSegment += i - 1;
            return FutureUtils.ensure(writeInternal, () -> {
                this.pendingWrites.dec();
                this.writeLimiter.release();
            });
        } catch (IOException e) {
            this.writeLimiter.release();
            this.pendingWrites.dec();
            this.positionWithinLogSegment--;
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isLogSegmentInError() {
        return transmitResultUpdater.get(this) != 0;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean shouldFailCompleteLogSegment() {
        return (transmitResultUpdater.get(this) == 0 || transmitResultUpdater.get(this) == -11) ? false : true;
    }

    public synchronized CompletableFuture<DLSN> writeInternal(LogRecord logRecord) throws LogRecordTooLongException, LockingException, BKTransmitException, WriteException, InvalidEnvelopedEntryException {
        int persistentSize = logRecord.getPersistentSize();
        if (persistentSize > 1040384) {
            throw new LogRecordTooLongException(String.format("Log Record of size %d written when only %d is allowed", Integer.valueOf(persistentSize), Integer.valueOf(LogRecord.MAX_LOGRECORD_SIZE)));
        }
        if (this.recordSetWriter.getNumBytes() + persistentSize > 1044480) {
            checkStateAndTransmit();
        }
        checkWriteLock();
        if (this.enableRecordCounts) {
            logRecord.setPositionWithinLogSegment(this.positionWithinLogSegment);
        }
        CompletableFuture<DLSN> completableFuture = new CompletableFuture<>();
        completableFuture.whenComplete((BiConsumer<? super DLSN, ? super Throwable>) new OpStatsListener(this.writeTime));
        this.recordSetWriter.writeRecord(logRecord, completableFuture);
        if (logRecord.getTransactionId() < this.lastTxId) {
            LOG.info("Log Segment {} TxId decreased Last: {} Record: {}", new Object[]{this.fullyQualifiedLogSegment, Long.valueOf(this.lastTxId), Long.valueOf(logRecord.getTransactionId())});
        }
        if (!logRecord.isControl()) {
            this.lastTxId = logRecord.getTransactionId();
            this.outstandingBytes += 20 + logRecord.getPayload().length;
        }
        return completableFuture;
    }

    private synchronized CompletableFuture<DLSN> writeControlLogRecord() throws BKTransmitException, WriteException, InvalidEnvelopedEntryException, LockingException, LogRecordTooLongException {
        LogRecord logRecord = new LogRecord(this.lastTxId, DistributedLogConstants.CONTROL_RECORD_CONTENT);
        logRecord.setControl();
        return writeControlLogRecord(logRecord);
    }

    private synchronized CompletableFuture<DLSN> writeControlLogRecord(LogRecord logRecord) throws BKTransmitException, WriteException, InvalidEnvelopedEntryException, LockingException, LogRecordTooLongException {
        return writeInternal(logRecord);
    }

    private synchronized void writeEndOfStreamMarker() throws IOException {
        LogRecord logRecord = new LogRecord(DistributedLogConstants.MAX_TXID, "endOfStream".getBytes(StandardCharsets.UTF_8));
        logRecord.setEndOfStream();
        writeInternal(logRecord);
    }

    public CompletableFuture<Long> markEndOfStream() {
        synchronized (this) {
            try {
                writeEndOfStreamMarker();
                this.streamEnded = true;
            } catch (IOException e) {
                return FutureUtils.exception(e);
            }
        }
        return flushAndCommit();
    }

    public synchronized int writeBulk(List<LogRecord> list) throws IOException {
        int i = 0;
        Iterator<LogRecord> it = list.iterator();
        while (it.hasNext()) {
            write(it.next());
            i++;
        }
        return i;
    }

    private void checkStateBeforeTransmit() throws WriteException {
        try {
            FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_TransmitBeforeAddEntry);
        } catch (IOException e) {
            throw new WriteException(this.streamName, "Fail transmit before adding entries");
        }
    }

    synchronized void checkStateAndTransmit() throws BKTransmitException, WriteException, InvalidEnvelopedEntryException, LockingException {
        checkStateBeforeTransmit();
        transmit();
    }

    @Override // org.apache.distributedlog.logsegment.LogSegmentWriter
    public synchronized CompletableFuture<Long> flush() {
        try {
            checkStateBeforeTransmit();
            try {
                CompletableFuture<Integer> transmit = transmit();
                if (null == transmit) {
                    if (null == this.packetPrevious) {
                        return FutureUtils.value(Long.valueOf(getLastTxIdAcknowledged()));
                    }
                    transmit = this.packetPrevious.getTransmitFuture();
                }
                return transmit.thenCompose((Function<? super Integer, ? extends CompletionStage<U>>) this.GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC);
            } catch (BKTransmitException e) {
                return FutureUtils.exception(e);
            } catch (InvalidEnvelopedEntryException e2) {
                return FutureUtils.exception(e2);
            } catch (LockingException e3) {
                return FutureUtils.exception(e3);
            } catch (WriteException e4) {
                return FutureUtils.exception(e4);
            }
        } catch (WriteException e5) {
            return FutureUtils.exception(e5);
        }
    }

    @Override // org.apache.distributedlog.logsegment.LogSegmentWriter
    public synchronized CompletableFuture<Long> commit() {
        try {
            try {
                CompletableFuture<Integer> transmit = transmit();
                if (null != transmit) {
                    return transmit.thenCompose((Function<? super Integer, ? extends CompletionStage<U>>) this.GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC);
                }
                writeControlLogRecord();
                return flush();
            } catch (IOException e) {
                return FutureUtils.exception(e);
            }
        } catch (IOException e2) {
            return FutureUtils.exception(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Long> flushAndCommit() {
        return flush().thenCompose((Function<? super Long, ? extends CompletionStage<U>>) this.COMMIT_AFTER_FLUSH_FUNC);
    }

    void flushIfNeededNoThrow() {
        try {
            flushIfNeeded();
        } catch (IOException e) {
            LOG.error("Encountered exception while flushing log records to stream {}", this.fullyQualifiedLogSegment, e);
        }
    }

    void scheduleFlushWithDelayIfNeeded(final Callable<?> callable, final AtomicReferenceFieldUpdater<BKLogSegmentWriter, ScheduledFuture> atomicReferenceFieldUpdater) {
        long max = Math.max(0L, this.minDelayBetweenImmediateFlushMs - this.lastTransmit.elapsed(TimeUnit.MILLISECONDS));
        ScheduledFuture scheduledFuture = atomicReferenceFieldUpdater.get(this);
        if (null == scheduledFuture || scheduledFuture.isDone()) {
            atomicReferenceFieldUpdater.set(this, this.scheduler.schedule(new Runnable() { // from class: org.apache.distributedlog.BKLogSegmentWriter.7
                @Override // java.lang.Runnable
                public void run() {
                    synchronized (this) {
                        atomicReferenceFieldUpdater.set(BKLogSegmentWriter.this, null);
                        try {
                            callable.call();
                            BKLogSegmentWriter.scheduledFlushExceptionUpdater.set(BKLogSegmentWriter.this, null);
                        } catch (Exception e) {
                            BKLogSegmentWriter.scheduledFlushExceptionUpdater.set(BKLogSegmentWriter.this, e);
                            BKLogSegmentWriter.LOG.error("Delayed flush failed", e);
                        }
                    }
                }
            }, max, TimeUnit.MILLISECONDS));
        }
    }

    void flushIfNeeded() throws BKTransmitException, WriteException, InvalidEnvelopedEntryException, LockingException, FlushException {
        if (this.outstandingBytes > this.transmissionThreshold) {
            if (0 == this.minDelayBetweenImmediateFlushMs) {
                checkStateAndTransmit();
                return;
            }
            scheduleFlushWithDelayIfNeeded(new Callable<Void>() { // from class: org.apache.distributedlog.BKLogSegmentWriter.8
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    BKLogSegmentWriter.this.checkStateAndTransmit();
                    return null;
                }
            }, transmitSchedFutureRefUpdater);
            Exception exc = scheduledFlushExceptionUpdater.get(this);
            if (exc != null) {
                throw new FlushException("Last flush encountered an error while writing data to the backend", getLastTxId(), getLastTxIdAcknowledged(), exc);
            }
        }
    }

    private void checkWriteLock() throws LockingException {
        try {
            if (FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_WriteInternalLostLock)) {
                throw new LockingException("/failpoint/lockpath", "failpoint is simulating a lost lock" + getFullyQualifiedLogSegment());
            }
            if (this.enforceLock) {
                this.lock.checkOwnershipAndReacquire();
            }
        } catch (IOException e) {
            throw new LockingException("/failpoint/lockpath", "failpoint is simulating a lost lock for " + getFullyQualifiedLogSegment());
        }
    }

    private CompletableFuture<Integer> transmit() throws BKTransmitException, LockingException, WriteException, InvalidEnvelopedEntryException {
        CompletableFuture<Integer> transmitFuture;
        this.transmitLock.lock();
        try {
            synchronized (this) {
                checkWriteLock();
                if (!transmitResultUpdater.compareAndSet(this, 0, 0)) {
                    LOG.error("Log Segment {} Trying to write to an errored stream; Error is {}", this.fullyQualifiedLogSegment, BKException.getMessage(transmitResultUpdater.get(this)));
                    throw new BKTransmitException("Trying to write to an errored stream; Error code : (" + transmitResultUpdater.get(this) + ") " + BKException.getMessage(transmitResultUpdater.get(this)), transmitResultUpdater.get(this));
                }
                if (this.recordSetWriter.getNumRecords() == 0) {
                    this.transmitDataMisses.inc();
                    return null;
                }
                Entry.Writer writer = this.recordSetWriter;
                this.recordSetWriter = newRecordSetWriter();
                this.outstandingBytes = 0L;
                if (writer.hasUserRecords()) {
                    this.numBytes += writer.getNumBytes();
                    this.numFlushesSinceRestart++;
                }
                try {
                    ByteBuf buffer = writer.getBuffer();
                    FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_TransmitFailGetBuffer);
                    synchronized (this) {
                        this.lastTransmitNanos = MathUtils.nowInNano();
                        BKTransmitPacket bKTransmitPacket = new BKTransmitPacket(writer);
                        this.packetPrevious = bKTransmitPacket;
                        this.entryWriter.asyncAddEntry(buffer, this, bKTransmitPacket);
                        if (writer.hasUserRecords()) {
                            this.transmitDataSuccesses.inc();
                        } else {
                            this.transmitControlSuccesses.inc();
                        }
                        this.lastTransmit.reset().start();
                        outstandingTransmitsUpdater.incrementAndGet(this);
                        this.controlFlushNeeded = false;
                        transmitFuture = bKTransmitPacket.getTransmitFuture();
                    }
                    this.transmitLock.unlock();
                    return transmitFuture;
                } catch (IOException e) {
                    if (e instanceof InvalidEnvelopedEntryException) {
                        this.alertStatsLogger.raise("Invalid enveloped entry for segment {} : ", this.fullyQualifiedLogSegment, e);
                    }
                    LOG.error("Exception while enveloping entries for segment: {}", new Object[]{this.fullyQualifiedLogSegment}, e);
                    transmitResultUpdater.set(this, -12);
                    if (!(e instanceof InvalidEnvelopedEntryException)) {
                        throw new WriteException(this.streamName, "Envelope Error");
                    }
                    this.alertStatsLogger.raise("Invalid enveloped entry for segment {} : ", this.fullyQualifiedLogSegment, e);
                    throw ((InvalidEnvelopedEntryException) e);
                }
            }
        } finally {
            this.transmitLock.unlock();
        }
    }

    private synchronized boolean haveDataToTransmit() {
        return transmitResultUpdater.compareAndSet(this, 0, 0) && this.recordSetWriter.getNumRecords() > 0;
    }

    @Override // dlshade.org.apache.bookkeeper.client.AsyncCallback.AddCallback
    public void addComplete(final int i, LedgerHandle ledgerHandle, final long j, Object obj) {
        int i2 = i;
        try {
            if (FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_TransmitComplete)) {
                i2 = -999;
            }
        } catch (Exception e) {
            i2 = -999;
        }
        final int i3 = i2;
        if (j > -1 && this.lastEntryId >= j) {
            LOG.error("Log segment {} saw out of order entry {} lastEntryId {}", new Object[]{this.fullyQualifiedLogSegment, Long.valueOf(j), Long.valueOf(this.lastEntryId)});
        }
        this.lastEntryId = j;
        if (!$assertionsDisabled && !(obj instanceof BKTransmitPacket)) {
            throw new AssertionError();
        }
        final BKTransmitPacket bKTransmitPacket = (BKTransmitPacket) obj;
        this.addCompleteTime.registerSuccessfulEvent(TimeUnit.MICROSECONDS.convert(System.nanoTime() - bKTransmitPacket.getTransmitTime(), TimeUnit.NANOSECONDS), TimeUnit.MICROSECONDS);
        if (0 == i) {
            EntryBuffer recordSet = bKTransmitPacket.getRecordSet();
            if (recordSet.hasUserRecords()) {
                synchronized (this) {
                    this.lastTxIdAcknowledged = Math.max(this.lastTxIdAcknowledged, recordSet.getMaxTxId());
                }
            }
        }
        if (null == this.scheduler) {
            bKTransmitPacket.notifyTransmitComplete(i3);
            outstandingTransmitsUpdater.getAndDecrement(this);
            addCompleteDeferredProcessing(bKTransmitPacket, j, i3);
        } else {
            final Stopwatch createStarted = Stopwatch.createStarted();
            Futures.addCallback(this.scheduler.submitOrdered(this.streamName, new Callable<Void>() { // from class: org.apache.distributedlog.BKLogSegmentWriter.10
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() {
                    Stopwatch createStarted2 = Stopwatch.createStarted();
                    BKLogSegmentWriter.this.addCompleteQueuedTime.registerSuccessfulEvent(createStarted.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                    BKLogSegmentWriter.this.addCompleteDeferredProcessing(bKTransmitPacket, j, i3);
                    BKLogSegmentWriter.this.addCompleteDeferredTime.registerSuccessfulEvent(createStarted2.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MILLISECONDS);
                    return null;
                }

                public String toString() {
                    return String.format("AddComplete(Stream=%s, entryId=%d, rc=%d)", BKLogSegmentWriter.this.fullyQualifiedLogSegment, Long.valueOf(j), Integer.valueOf(i));
                }
            }), new FutureCallback<Void>() { // from class: org.apache.distributedlog.BKLogSegmentWriter.9
                @Override // dlshade.com.google.common.util.concurrent.FutureCallback
                public void onSuccess(Void r2) {
                }

                @Override // dlshade.com.google.common.util.concurrent.FutureCallback
                public void onFailure(Throwable th) {
                    BKLogSegmentWriter.LOG.error("addComplete processing failed for {} entry {} lastTxId {} rc {} with error", new Object[]{BKLogSegmentWriter.this.fullyQualifiedLogSegment, Long.valueOf(j), Long.valueOf(bKTransmitPacket.getRecordSet().getMaxTxId()), Integer.valueOf(i), th});
                }
            }, MoreExecutors.directExecutor());
            bKTransmitPacket.notifyTransmitComplete(i3);
            outstandingTransmitsUpdater.getAndDecrement(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addCompleteDeferredProcessing(BKTransmitPacket bKTransmitPacket, long j, int i) {
        BKTransmitPacket bKTransmitPacket2;
        boolean z = false;
        EntryBuffer recordSet = bKTransmitPacket.getRecordSet();
        synchronized (this) {
            if (transmitResultUpdater.compareAndSet(this, 0, i)) {
                z = 0 != i;
            } else {
                LOG.warn("Log segment {} entryId {}: Tried to set transmit result to ({}) but is already ({})", new Object[]{this.fullyQualifiedLogSegment, Long.valueOf(j), Integer.valueOf(i), Integer.valueOf(transmitResultUpdater.get(this))});
            }
            if (transmitResultUpdater.get(this) != 0) {
                if (recordSet.hasUserRecords()) {
                    this.transmitDataPacketSize.registerFailedEvent(recordSet.getNumBytes(), TimeUnit.MICROSECONDS);
                }
            } else if (recordSet.hasUserRecords()) {
                this.transmitDataPacketSize.registerSuccessfulEvent(recordSet.getNumBytes(), TimeUnit.MICROSECONDS);
                this.controlFlushNeeded = true;
                if (this.immediateFlushEnabled) {
                    if (0 == this.minDelayBetweenImmediateFlushMs) {
                        backgroundFlush(true);
                    } else {
                        scheduleFlushWithDelayIfNeeded(new Callable<Void>() { // from class: org.apache.distributedlog.BKLogSegmentWriter.11
                            /* JADX WARN: Can't rename method to resolve collision */
                            @Override // java.util.concurrent.Callable
                            public Void call() throws Exception {
                                BKLogSegmentWriter.this.backgroundFlush(true);
                                return null;
                            }
                        }, immFlushSchedFutureRefUpdater);
                    }
                }
            }
            if (0 == transmitResultUpdater.get(this)) {
                DLSN finalizeTransmit = recordSet.finalizeTransmit(this.logSegmentSequenceNumber, j);
                if (recordSet.hasUserRecords() && null != finalizeTransmit && this.lastDLSN.compareTo(finalizeTransmit) < 0) {
                    this.lastDLSN = finalizeTransmit;
                }
            }
        }
        if (0 == transmitResultUpdater.get(this)) {
            recordSet.completeTransmit(this.logSegmentSequenceNumber, j);
        } else {
            recordSet.abortTransmit(Utils.transmitException(transmitResultUpdater.get(this)));
        }
        if (z) {
            synchronized (this) {
                bKTransmitPacket2 = new BKTransmitPacket(this.recordSetWriter);
                this.recordSetWriter = newRecordSetWriter();
            }
            bKTransmitPacket2.getRecordSet().abortTransmit(new WriteCancelledException(this.streamName, Utils.transmitException(transmitResultUpdater.get(this))));
        }
    }

    @Override // java.lang.Runnable
    public synchronized void run() {
        backgroundFlush(false);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void backgroundFlush(boolean z) {
        if (null != this.closeFuture) {
            LOG.debug("Skip background flushing since log segment {} is closing.", getFullyQualifiedLogSegment());
            return;
        }
        try {
            boolean haveDataToTransmit = haveDataToTransmit();
            if (this.controlFlushNeeded || (!z && haveDataToTransmit)) {
                if (!haveDataToTransmit) {
                    writeControlLogRecord();
                }
                transmit();
                this.pFlushSuccesses.inc();
            } else {
                this.pFlushMisses.inc();
            }
        } catch (IOException e) {
            LOG.error("Log Segment {}: Error encountered by the periodic flush", this.fullyQualifiedLogSegment, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void keepAlive() {
        if (null != this.closeFuture) {
            LOG.debug("Skip sending keepAlive control record since log segment {} is closing.", getFullyQualifiedLogSegment());
        } else {
            if (MathUtils.elapsedMSec(this.lastTransmitNanos) < this.periodicKeepAliveMs) {
                return;
            }
            LogRecord logRecord = new LogRecord(this.lastTxId, DistributedLogConstants.KEEPALIVE_RECORD_CONTENT);
            logRecord.setControl();
            asyncWrite(logRecord);
        }
    }

    static {
        $assertionsDisabled = !BKLogSegmentWriter.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(BKLogSegmentWriter.class);
        outstandingTransmitsUpdater = AtomicIntegerFieldUpdater.newUpdater(BKLogSegmentWriter.class, "outstandingTransmits");
        transmitResultUpdater = AtomicIntegerFieldUpdater.newUpdater(BKLogSegmentWriter.class, "transmitResult");
        transmitSchedFutureRefUpdater = AtomicReferenceFieldUpdater.newUpdater(BKLogSegmentWriter.class, ScheduledFuture.class, "transmitSchedFutureRef");
        immFlushSchedFutureRefUpdater = AtomicReferenceFieldUpdater.newUpdater(BKLogSegmentWriter.class, ScheduledFuture.class, "immFlushSchedFutureRef");
        scheduledFlushExceptionUpdater = AtomicReferenceFieldUpdater.newUpdater(BKLogSegmentWriter.class, Exception.class, "scheduledFlushException");
    }
}
