package org.apache.distributedlog;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.distributedlog.bk.LedgerMetadata;
import org.apache.distributedlog.common.util.PermitManager;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.distributedlog.exceptions.AlreadyClosedException;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.io.Abortable;
import org.apache.distributedlog.io.Abortables;
import org.apache.distributedlog.io.AsyncAbortable;
import org.apache.distributedlog.io.AsyncCloseable;
import org.apache.distributedlog.util.Utils;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/distributedlog/BKAbstractLogWriter.class */
public abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortable, AsyncAbortable {
    static final Logger LOG = LoggerFactory.getLogger(BKAbstractLogWriter.class);
    protected final DistributedLogConfiguration conf;
    private final DynamicDistributedLogConfiguration dynConf;
    protected final BKDistributedLogManager bkDistributedLogManager;
    private CompletableFuture<Void> closePromise = null;
    private volatile boolean forceRolling = false;
    private boolean forceRecovery = false;
    private CompletableFuture<List<LogSegmentMetadata>> lastTruncationAttempt = null;

    @VisibleForTesting
    private Long minTimestampToKeepOverride = null;
    protected BKLogSegmentWriter segmentWriter = null;
    protected CompletableFuture<BKLogSegmentWriter> segmentWriterFuture = null;
    protected BKLogSegmentWriter allocatedSegmentWriter = null;
    protected BKLogWriteHandler writeHandler = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BKAbstractLogWriter(DistributedLogConfiguration distributedLogConfiguration, DynamicDistributedLogConfiguration dynamicDistributedLogConfiguration, BKDistributedLogManager bKDistributedLogManager) {
        this.conf = distributedLogConfiguration;
        this.dynConf = dynamicDistributedLogConfiguration;
        this.bkDistributedLogManager = bKDistributedLogManager;
        LOG.debug("Initial retention period for {} : {}", bKDistributedLogManager.getStreamName(), Long.valueOf(TimeUnit.MILLISECONDS.convert(dynamicDistributedLogConfiguration.getRetentionPeriodHours(), TimeUnit.HOURS)));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public CompletableFuture<List<LogSegmentMetadata>> getLastTruncationAttempt() {
        return this.lastTruncationAttempt;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized BKLogWriteHandler getCachedWriteHandler() {
        return this.writeHandler;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BKLogWriteHandler getWriteHandler() throws IOException {
        BKLogWriteHandler createAndCacheWriteHandler = createAndCacheWriteHandler();
        createAndCacheWriteHandler.checkMetadataException();
        return createAndCacheWriteHandler;
    }

    protected BKLogWriteHandler createAndCacheWriteHandler() throws IOException {
        return createAndCacheWriteHandler(null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BKLogWriteHandler createAndCacheWriteHandler(LedgerMetadata ledgerMetadata) throws IOException {
        BKLogWriteHandler bKLogWriteHandler;
        synchronized (this) {
            if (this.writeHandler != null) {
                return this.writeHandler;
            }
            BKLogWriteHandler bKLogWriteHandler2 = (BKLogWriteHandler) Utils.ioResult(this.bkDistributedLogManager.asyncCreateWriteHandler(false, ledgerMetadata));
            boolean z = false;
            try {
                synchronized (this) {
                    if (this.writeHandler == null) {
                        this.writeHandler = bKLogWriteHandler2;
                        z = true;
                    }
                    bKLogWriteHandler = this.writeHandler;
                }
                return bKLogWriteHandler;
            } finally {
                if (!z) {
                    bKLogWriteHandler2.asyncAbort();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized BKLogSegmentWriter getCachedLogWriter() {
        return this.segmentWriter;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized CompletableFuture<BKLogSegmentWriter> getCachedLogWriterFuture() {
        return this.segmentWriterFuture;
    }

    protected synchronized void cacheLogWriter(BKLogSegmentWriter bKLogSegmentWriter) {
        this.segmentWriter = bKLogSegmentWriter;
        this.segmentWriterFuture = FutureUtils.value(bKLogSegmentWriter);
    }

    protected synchronized BKLogSegmentWriter removeCachedLogWriter() {
        try {
            return this.segmentWriter;
        } finally {
            this.segmentWriter = null;
            this.segmentWriterFuture = null;
        }
    }

    protected synchronized BKLogSegmentWriter getAllocatedLogWriter() {
        return this.allocatedSegmentWriter;
    }

    protected synchronized void cacheAllocatedLogWriter(BKLogSegmentWriter bKLogSegmentWriter) {
        this.allocatedSegmentWriter = bKLogSegmentWriter;
    }

    protected synchronized BKLogSegmentWriter removeAllocatedLogWriter() {
        try {
            return this.allocatedSegmentWriter;
        } finally {
            this.allocatedSegmentWriter = null;
        }
    }

    private CompletableFuture<Void> asyncCloseAndComplete(boolean z) {
        BKLogSegmentWriter cachedLogWriter = getCachedLogWriter();
        BKLogWriteHandler cachedWriteHandler = getCachedWriteHandler();
        if (null == cachedLogWriter || null == cachedWriteHandler) {
            return closeNoThrow();
        }
        cancelTruncation();
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        asyncCloseAndComplete(cachedLogWriter, cachedWriteHandler, completableFuture, z);
        return completableFuture;
    }

    private void asyncCloseAndComplete(BKLogSegmentWriter bKLogSegmentWriter, BKLogWriteHandler bKLogWriteHandler, final CompletableFuture<Void> completableFuture, final boolean z) {
        bKLogWriteHandler.completeAndCloseLogSegment(bKLogSegmentWriter).whenComplete((BiConsumer<? super LogSegmentMetadata, ? super Throwable>) new FutureEventListener<LogSegmentMetadata>() { // from class: org.apache.distributedlog.BKAbstractLogWriter.1
            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureEventListener
            public void onSuccess(LogSegmentMetadata logSegmentMetadata) {
                BKAbstractLogWriter.this.removeCachedLogWriter();
                complete(null);
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureEventListener
            public void onFailure(Throwable th) {
                BKAbstractLogWriter.LOG.error("Completing Log segments encountered exception", th);
                complete(th);
            }

            private void complete(Throwable th) {
                CompletableFuture<Void> closeNoThrow = BKAbstractLogWriter.this.closeNoThrow();
                boolean z2 = z;
                CompletableFuture completableFuture2 = completableFuture;
                FutureUtils.ensure(closeNoThrow, () -> {
                    if (null == th || !z2) {
                        FutureUtils.complete(completableFuture2, null);
                    } else {
                        FutureUtils.completeExceptionally(completableFuture2, th);
                    }
                });
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public void closeAndComplete() throws IOException {
        Utils.ioResult(asyncCloseAndComplete(true));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Void> asyncCloseAndComplete() {
        return asyncCloseAndComplete(true);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Utils.ioResult(asyncClose());
    }

    @Override // org.apache.distributedlog.io.AsyncCloseable
    public CompletableFuture<Void> asyncClose() {
        return asyncCloseAndComplete(false);
    }

    protected CompletableFuture<Void> closeNoThrow() {
        synchronized (this) {
            if (null != this.closePromise) {
                return this.closePromise;
            }
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            this.closePromise = completableFuture;
            cancelTruncation();
            FutureUtils.proxyTo(Utils.closeSequence(this.bkDistributedLogManager.getScheduler(), true, getCachedLogWriter(), getAllocatedLogWriter(), getCachedWriteHandler()), completableFuture);
            return completableFuture;
        }
    }

    @Override // org.apache.distributedlog.io.Abortable
    public void abort() throws IOException {
        Utils.ioResult(asyncAbort());
    }

    @Override // org.apache.distributedlog.io.AsyncAbortable
    public CompletableFuture<Void> asyncAbort() {
        synchronized (this) {
            if (null != this.closePromise) {
                return this.closePromise;
            }
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            this.closePromise = completableFuture;
            cancelTruncation();
            FutureUtils.proxyTo(Abortables.abortSequence(this.bkDistributedLogManager.getScheduler(), getCachedLogWriter(), getAllocatedLogWriter(), getCachedWriteHandler()), completableFuture);
            return completableFuture;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BKLogSegmentWriter getLedgerWriter(long j, boolean z) throws IOException {
        CompletableFuture<BKLogSegmentWriter> asyncGetLedgerWriter = asyncGetLedgerWriter(true);
        BKLogSegmentWriter bKLogSegmentWriter = null;
        if (null != asyncGetLedgerWriter) {
            bKLogSegmentWriter = (BKLogSegmentWriter) Utils.ioResult(asyncGetLedgerWriter);
        }
        if (null == bKLogSegmentWriter || shouldStartNewSegment(bKLogSegmentWriter) || this.forceRolling) {
            bKLogSegmentWriter = (BKLogSegmentWriter) Utils.ioResult(rollLogSegmentIfNecessary(bKLogSegmentWriter, j, true, z));
        }
        return bKLogSegmentWriter;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized CompletableFuture<BKLogSegmentWriter> asyncGetLedgerWriter(boolean z) {
        final BKLogSegmentWriter cachedLogWriter = getCachedLogWriter();
        CompletableFuture<BKLogSegmentWriter> cachedLogWriterFuture = getCachedLogWriterFuture();
        if (null == cachedLogWriterFuture || null == cachedLogWriter) {
            return null;
        }
        if ((cachedLogWriter.isLogSegmentInError() || this.forceRecovery) && z) {
            return (cachedLogWriter.isLogSegmentInError() ? cachedLogWriter.asyncAbort() : cachedLogWriter.asyncClose()).thenCompose((Function<? super Void, ? extends CompletionStage<U>>) new Function<Void, CompletionStage<BKLogSegmentWriter>>() { // from class: org.apache.distributedlog.BKAbstractLogWriter.2
                @Override // java.util.function.Function
                public CompletableFuture<BKLogSegmentWriter> apply(Void r6) {
                    BKAbstractLogWriter.this.removeCachedLogWriter();
                    if (cachedLogWriter.isLogSegmentInError()) {
                        return FutureUtils.value(null);
                    }
                    try {
                        BKLogWriteHandler writeHandler = BKAbstractLogWriter.this.getWriteHandler();
                        return (null == writeHandler || !BKAbstractLogWriter.this.forceRecovery) ? FutureUtils.value(null) : writeHandler.completeAndCloseLogSegment(cachedLogWriter).thenApply((Function<? super LogSegmentMetadata, ? extends U>) new Function<LogSegmentMetadata, BKLogSegmentWriter>() { // from class: org.apache.distributedlog.BKAbstractLogWriter.2.1
                            @Override // java.util.function.Function
                            public BKLogSegmentWriter apply(LogSegmentMetadata logSegmentMetadata) {
                                return null;
                            }
                        });
                    } catch (IOException e) {
                        return FutureUtils.exception(e);
                    }
                }
            });
        }
        return cachedLogWriterFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean shouldStartNewSegment(BKLogSegmentWriter bKLogSegmentWriter) throws IOException {
        return null == bKLogSegmentWriter || getWriteHandler().shouldStartNewSegment(bKLogSegmentWriter) || this.forceRolling;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void truncateLogSegmentsIfNecessary(BKLogWriteHandler bKLogWriteHandler) {
        boolean z = false;
        long j = 0;
        long convert = TimeUnit.MILLISECONDS.convert(this.dynConf.getRetentionPeriodHours(), TimeUnit.HOURS);
        if (convert > 0) {
            j = Utils.nowInMillis() - convert;
            z = true;
        }
        if (null != this.minTimestampToKeepOverride) {
            j = this.minTimestampToKeepOverride.longValue();
            z = true;
        }
        synchronized (this) {
            if (z) {
                if (this.lastTruncationAttempt == null || this.lastTruncationAttempt.isDone()) {
                    this.lastTruncationAttempt = bKLogWriteHandler.purgeLogSegmentsOlderThanTimestamp(j);
                }
            }
        }
    }

    private CompletableFuture<BKLogSegmentWriter> asyncStartNewLogSegment(BKLogWriteHandler bKLogWriteHandler, long j, boolean z) {
        return bKLogWriteHandler.recoverIncompleteLogSegments().thenCompose(l -> {
            return bKLogWriteHandler.asyncStartLogSegment(j, false, z).thenApply(bKLogSegmentWriter -> {
                cacheLogWriter(bKLogSegmentWriter);
                return bKLogSegmentWriter;
            });
        });
    }

    private CompletableFuture<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOneWithPermit(BKLogSegmentWriter bKLogSegmentWriter, BKLogWriteHandler bKLogWriteHandler, long j, boolean z, boolean z2) {
        PermitManager.Permit acquirePermit = this.bkDistributedLogManager.getLogSegmentRollingPermitManager().acquirePermit();
        if (acquirePermit.isAllowed()) {
            return FutureUtils.ensure(FutureUtils.rescue(closeOldLogSegmentAndStartNewOne(bKLogSegmentWriter, bKLogWriteHandler, j, z, z2), th -> {
                if (th instanceof LockingException) {
                    LOG.warn("We lost lock during completeAndClose log segment for {}.Disable ledger rolling until it is recovered : ", bKLogWriteHandler.getFullyQualifiedName(), th);
                    this.bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(acquirePermit);
                    return FutureUtils.value(bKLogSegmentWriter);
                }
                if (th instanceof ZKException) {
                    ZKException zKException = (ZKException) th;
                    if (ZKException.isRetryableZKException(zKException)) {
                        LOG.warn("Encountered zookeeper connection issues during completeAndClose log segment for {}. Disable ledger rolling until it is recovered : {}", bKLogWriteHandler.getFullyQualifiedName(), zKException.getKeeperExceptionCode());
                        this.bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(acquirePermit);
                        return FutureUtils.value(bKLogSegmentWriter);
                    }
                }
                return FutureUtils.exception(th);
            }), () -> {
                this.bkDistributedLogManager.getLogSegmentRollingPermitManager().releasePermit(acquirePermit);
            });
        }
        this.bkDistributedLogManager.getLogSegmentRollingPermitManager().releasePermit(acquirePermit);
        return FutureUtils.value(bKLogSegmentWriter);
    }

    private CompletableFuture<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOne(final BKLogSegmentWriter bKLogSegmentWriter, final BKLogWriteHandler bKLogWriteHandler, final long j, final boolean z, boolean z2) {
        BKLogSegmentWriter allocatedLogWriter = getAllocatedLogWriter();
        if (null != allocatedLogWriter) {
            return completeOldSegmentAndCacheNewLogSegmentWriter(bKLogSegmentWriter, allocatedLogWriter);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Allocating a new log segment from {} for {}.", Long.valueOf(j), bKLogWriteHandler.getFullyQualifiedName());
        }
        return bKLogWriteHandler.asyncStartLogSegment(j, z, z2).thenCompose((Function<? super BKLogSegmentWriter, ? extends CompletionStage<U>>) new Function<BKLogSegmentWriter, CompletableFuture<BKLogSegmentWriter>>() { // from class: org.apache.distributedlog.BKAbstractLogWriter.3
            @Override // java.util.function.Function
            public CompletableFuture<BKLogSegmentWriter> apply(BKLogSegmentWriter bKLogSegmentWriter2) {
                if (null == bKLogSegmentWriter2) {
                    return z ? FutureUtils.value(bKLogSegmentWriter) : FutureUtils.exception(new UnexpectedException("StartLogSegment returns null for bestEffort rolling"));
                }
                BKAbstractLogWriter.this.cacheAllocatedLogWriter(bKLogSegmentWriter2);
                if (BKAbstractLogWriter.LOG.isDebugEnabled()) {
                    BKAbstractLogWriter.LOG.debug("Allocated a new log segment from {} for {}.", Long.valueOf(j), bKLogWriteHandler.getFullyQualifiedName());
                }
                return BKAbstractLogWriter.this.completeOldSegmentAndCacheNewLogSegmentWriter(bKLogSegmentWriter, bKLogSegmentWriter2);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CompletableFuture<BKLogSegmentWriter> completeOldSegmentAndCacheNewLogSegmentWriter(BKLogSegmentWriter bKLogSegmentWriter, final BKLogSegmentWriter bKLogSegmentWriter2) {
        final CompletableFuture<BKLogSegmentWriter> completableFuture = new CompletableFuture<>();
        this.writeHandler.completeAndCloseLogSegment(bKLogSegmentWriter).whenComplete((BiConsumer<? super LogSegmentMetadata, ? super Throwable>) new FutureEventListener<LogSegmentMetadata>() { // from class: org.apache.distributedlog.BKAbstractLogWriter.4
            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureEventListener
            public void onSuccess(LogSegmentMetadata logSegmentMetadata) {
                BKAbstractLogWriter.this.cacheLogWriter(bKLogSegmentWriter2);
                BKAbstractLogWriter.this.removeAllocatedLogWriter();
                FutureUtils.complete(completableFuture, bKLogSegmentWriter2);
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureEventListener
            public void onFailure(Throwable th) {
                FutureUtils.completeExceptionally(completableFuture, th);
            }
        });
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized CompletableFuture<BKLogSegmentWriter> rollLogSegmentIfNecessary(final BKLogSegmentWriter bKLogSegmentWriter, long j, boolean z, boolean z2) {
        try {
            final BKLogWriteHandler writeHandler = getWriteHandler();
            return ((null == bKLogSegmentWriter || !(writeHandler.shouldStartNewSegment(bKLogSegmentWriter) || this.forceRolling)) ? null == bKLogSegmentWriter ? asyncStartNewLogSegment(writeHandler, j, z2) : FutureUtils.value(bKLogSegmentWriter) : closeOldLogSegmentAndStartNewOneWithPermit(bKLogSegmentWriter, writeHandler, j, z, z2)).thenApply((Function<? super BKLogSegmentWriter, ? extends U>) new Function<BKLogSegmentWriter, BKLogSegmentWriter>() { // from class: org.apache.distributedlog.BKAbstractLogWriter.5
                @Override // java.util.function.Function
                public BKLogSegmentWriter apply(BKLogSegmentWriter bKLogSegmentWriter2) {
                    if (bKLogSegmentWriter == bKLogSegmentWriter2) {
                        return bKLogSegmentWriter2;
                    }
                    BKAbstractLogWriter.this.truncateLogSegmentsIfNecessary(writeHandler);
                    return bKLogSegmentWriter2;
                }
            });
        } catch (IOException e) {
            return FutureUtils.exception(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void checkClosedOrInError(String str) throws AlreadyClosedException {
        if (null != this.closePromise) {
            LOG.error("Executing " + str + " on already closed Log Writer");
            throw new AlreadyClosedException("Executing " + str + " on already closed Log Writer");
        }
    }

    @VisibleForTesting
    public void setForceRolling(boolean z) {
        this.forceRolling = z;
    }

    @VisibleForTesting
    public synchronized void overRideMinTimeStampToKeep(Long l) {
        this.minTimestampToKeepOverride = l;
    }

    protected synchronized void cancelTruncation() {
        if (null != this.lastTruncationAttempt) {
            this.lastTruncationAttempt.cancel(true);
            this.lastTruncationAttempt = null;
        }
    }

    @VisibleForTesting
    public synchronized void setForceRecovery(boolean z) {
        this.forceRecovery = z;
    }
}
