package org.apache.distributedlog.lock;

import dlshade.com.google.common.annotations.VisibleForTesting;
import dlshade.com.google.common.base.Stopwatch;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import dlshade.org.apache.bookkeeper.common.util.OrderedScheduler;
import dlshade.org.apache.bookkeeper.stats.Counter;
import dlshade.org.apache.bookkeeper.stats.OpStatsLogger;
import dlshade.org.apache.bookkeeper.stats.StatsLogger;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.common.concurrent.AsyncSemaphore;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/lock/ZKDistributedLock.class */
public class ZKDistributedLock implements LockListener, DistributedLock {
    static final Logger LOG = LoggerFactory.getLogger(ZKDistributedLock.class);
    private final SessionLockFactory lockFactory;
    private final OrderedScheduler lockStateExecutor;
    private final String lockPath;
    private final long lockTimeout;
    private final DistributedLockContext lockContext = new DistributedLockContext();
    private final AsyncSemaphore lockSemaphore = new AsyncSemaphore(1, Optional.empty());
    private CompletableFuture<ZKDistributedLock> lockAcquireFuture = null;
    private CompletableFuture<ZKDistributedLock> lockReacquireFuture = null;
    private SessionLock internalLock = null;
    private CompletableFuture<LockWaiter> tryLockFuture = null;
    private LockWaiter lockWaiter = null;
    private LockingException lockReacquireException = null;
    private volatile boolean closed = false;
    private CompletableFuture<Void> closeFuture = null;
    private final AtomicInteger reacquireCount = new AtomicInteger(0);
    private final StatsLogger lockStatsLogger;
    private final OpStatsLogger acquireStats;
    private final OpStatsLogger reacquireStats;
    private final Counter internalTryRetries;

    public ZKDistributedLock(OrderedScheduler orderedScheduler, SessionLockFactory sessionLockFactory, String str, long j, StatsLogger statsLogger) {
        this.lockStateExecutor = orderedScheduler;
        this.lockPath = str;
        this.lockTimeout = j;
        this.lockFactory = sessionLockFactory;
        this.lockStatsLogger = statsLogger.scope("lock");
        this.acquireStats = this.lockStatsLogger.getOpStatsLogger("acquire");
        this.reacquireStats = this.lockStatsLogger.getOpStatsLogger("reacquire");
        this.internalTryRetries = this.lockStatsLogger.getCounter("internalTryRetries");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public LockClosedException newLockClosedException() {
        return new LockClosedException(this.lockPath, "Lock is already closed");
    }

    private synchronized void checkLockState() throws LockingException {
        if (this.closed) {
            throw newLockClosedException();
        }
        if (null != this.lockReacquireException) {
            throw this.lockReacquireException;
        }
    }

    @Override // org.apache.distributedlog.lock.DistributedLock
    public synchronized CompletableFuture<ZKDistributedLock> asyncAcquire() {
        if (null != this.lockAcquireFuture) {
            return FutureUtils.exception(new UnexpectedException("Someone is already acquiring/acquired lock " + this.lockPath));
        }
        CompletableFuture<ZKDistributedLock> createFuture = FutureUtils.createFuture();
        createFuture.whenComplete((zKDistributedLock, th) -> {
            if (null == th || !(th instanceof CancellationException)) {
                return;
            }
            this.lockStateExecutor.submitOrdered(this.lockPath, () -> {
                return asyncClose();
            });
        });
        final Stopwatch createStarted = Stopwatch.createStarted();
        createFuture.whenComplete((BiConsumer<? super ZKDistributedLock, ? super Throwable>) new FutureEventListener<ZKDistributedLock>() { // from class: org.apache.distributedlog.lock.ZKDistributedLock.1
            @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
            public void onSuccess(ZKDistributedLock zKDistributedLock2) {
                ZKDistributedLock.this.acquireStats.registerSuccessfulEvent(createStarted.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
            }

            @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
            public void onFailure(Throwable th2) {
                ZKDistributedLock.this.acquireStats.registerFailedEvent(createStarted.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                ZKDistributedLock.this.asyncClose();
            }
        });
        this.lockAcquireFuture = createFuture;
        this.lockStateExecutor.submitOrdered(this.lockPath, () -> {
            doAsyncAcquireWithSemaphore(createFuture, this.lockTimeout);
        });
        return createFuture;
    }

    void doAsyncAcquireWithSemaphore(CompletableFuture<ZKDistributedLock> completableFuture, long j) {
        this.lockSemaphore.acquireAndRun(() -> {
            doAsyncAcquire(completableFuture, j);
            return completableFuture;
        });
    }

    void doAsyncAcquire(final CompletableFuture<ZKDistributedLock> completableFuture, final long j) {
        LOG.trace("Async Lock Acquire {}", this.lockPath);
        try {
            checkLockState();
            if (haveLock()) {
                FutureUtils.complete(completableFuture, this);
            } else {
                this.lockFactory.createLock(this.lockPath, this.lockContext).whenCompleteAsync((BiConsumer<? super SessionLock, ? super Throwable>) new FutureEventListener<SessionLock>() { // from class: org.apache.distributedlog.lock.ZKDistributedLock.2
                    @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
                    public void onSuccess(SessionLock sessionLock) {
                        synchronized (ZKDistributedLock.this) {
                            if (ZKDistributedLock.this.closed) {
                                ZKDistributedLock.LOG.info("Skipping tryLocking lock {} since it is already closed", ZKDistributedLock.this.lockPath);
                                FutureUtils.completeExceptionally(completableFuture, ZKDistributedLock.this.newLockClosedException());
                                return;
                            }
                            synchronized (ZKDistributedLock.this) {
                                ZKDistributedLock.this.internalLock = sessionLock;
                                ZKDistributedLock.this.internalLock.setLockListener(ZKDistributedLock.this);
                            }
                            ZKDistributedLock.this.asyncTryLock(sessionLock, completableFuture, j);
                        }
                    }

                    @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
                    public void onFailure(Throwable th) {
                        FutureUtils.completeExceptionally(completableFuture, th);
                    }
                }, (Executor) this.lockStateExecutor.chooseThread(this.lockPath));
            }
        } catch (IOException e) {
            FutureUtils.completeExceptionally(completableFuture, e);
        }
    }

    void asyncTryLock(SessionLock sessionLock, final CompletableFuture<ZKDistributedLock> completableFuture, long j) {
        if (null != this.tryLockFuture) {
            this.tryLockFuture.cancel(true);
        }
        this.tryLockFuture = sessionLock.asyncTryLock(j, TimeUnit.MILLISECONDS);
        this.tryLockFuture.whenCompleteAsync((BiConsumer<? super LockWaiter, ? super Throwable>) new FutureEventListener<LockWaiter>() { // from class: org.apache.distributedlog.lock.ZKDistributedLock.3
            @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
            public void onSuccess(LockWaiter lockWaiter) {
                synchronized (ZKDistributedLock.this) {
                    if (ZKDistributedLock.this.closed) {
                        ZKDistributedLock.LOG.info("Skipping acquiring lock {} since it is already closed", ZKDistributedLock.this.lockPath);
                        lockWaiter.getAcquireFuture().completeExceptionally(new LockingException(ZKDistributedLock.this.lockPath, "lock is already closed."));
                        FutureUtils.completeExceptionally(completableFuture, ZKDistributedLock.this.newLockClosedException());
                    } else {
                        ZKDistributedLock.this.tryLockFuture = null;
                        ZKDistributedLock.this.lockWaiter = lockWaiter;
                        ZKDistributedLock.this.waitForAcquire(lockWaiter, completableFuture);
                    }
                }
            }

            @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
            public void onFailure(Throwable th) {
                FutureUtils.completeExceptionally(completableFuture, th);
            }
        }, (Executor) this.lockStateExecutor.chooseThread(this.lockPath));
    }

    void waitForAcquire(final LockWaiter lockWaiter, final CompletableFuture<ZKDistributedLock> completableFuture) {
        lockWaiter.getAcquireFuture().whenCompleteAsync((BiConsumer<? super Boolean, ? super Throwable>) new FutureEventListener<Boolean>() { // from class: org.apache.distributedlog.lock.ZKDistributedLock.4
            @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
            public void onSuccess(Boolean bool) {
                ZKDistributedLock.LOG.info("{} acquired lock {}", lockWaiter, ZKDistributedLock.this.lockPath);
                if (bool.booleanValue()) {
                    FutureUtils.complete(completableFuture, ZKDistributedLock.this);
                } else {
                    FutureUtils.completeExceptionally(completableFuture, new OwnershipAcquireFailedException(ZKDistributedLock.this.lockPath, lockWaiter.getCurrentOwner()));
                }
            }

            @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
            public void onFailure(Throwable th) {
                FutureUtils.completeExceptionally(completableFuture, th);
            }
        }, (Executor) this.lockStateExecutor.chooseThread(this.lockPath));
    }

    @Override // org.apache.distributedlog.lock.LockListener
    public void onExpired() {
        try {
            reacquireLock(false);
        } catch (LockingException e) {
            LOG.error("Locking exception on re-acquiring lock {} : ", this.lockPath, e);
        }
    }

    @Override // org.apache.distributedlog.lock.DistributedLock
    public synchronized void checkOwnershipAndReacquire() throws LockingException {
        if (null == this.lockAcquireFuture || !this.lockAcquireFuture.isDone()) {
            throw new LockingException(this.lockPath, "check ownership before acquiring");
        }
        if (haveLock()) {
            return;
        }
        reacquireLock(true);
    }

    @Override // org.apache.distributedlog.lock.DistributedLock
    public synchronized void checkOwnership() throws LockingException {
        if (null == this.lockAcquireFuture || !this.lockAcquireFuture.isDone()) {
            throw new LockingException(this.lockPath, "check ownership before acquiring");
        }
        if (!haveLock()) {
            throw new LockingException(this.lockPath, "Lost lock ownership");
        }
    }

    @VisibleForTesting
    int getReacquireCount() {
        return this.reacquireCount.get();
    }

    @VisibleForTesting
    synchronized CompletableFuture<ZKDistributedLock> getLockReacquireFuture() {
        return this.lockReacquireFuture;
    }

    @VisibleForTesting
    synchronized CompletableFuture<ZKDistributedLock> getLockAcquireFuture() {
        return this.lockAcquireFuture;
    }

    @VisibleForTesting
    synchronized SessionLock getInternalLock() {
        return this.internalLock;
    }

    @VisibleForTesting
    LockWaiter getLockWaiter() {
        return this.lockWaiter;
    }

    synchronized boolean haveLock() {
        return (this.closed || this.internalLock == null || !this.internalLock.isLockHeld()) ? false : true;
    }

    void closeWaiter(LockWaiter lockWaiter, final CompletableFuture<Void> completableFuture) {
        if (null == lockWaiter) {
            interruptTryLock(this.tryLockFuture, completableFuture);
        } else {
            lockWaiter.getAcquireFuture().whenCompleteAsync((BiConsumer<? super Boolean, ? super Throwable>) new FutureEventListener<Boolean>() { // from class: org.apache.distributedlog.lock.ZKDistributedLock.5
                @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
                public void onSuccess(Boolean bool) {
                    ZKDistributedLock.this.unlockInternalLock(completableFuture);
                }

                @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
                public void onFailure(Throwable th) {
                    ZKDistributedLock.this.unlockInternalLock(completableFuture);
                }
            }, (Executor) this.lockStateExecutor.chooseThread(this.lockPath));
            lockWaiter.getAcquireFuture().cancel(true);
        }
    }

    void interruptTryLock(CompletableFuture<LockWaiter> completableFuture, final CompletableFuture<Void> completableFuture2) {
        if (null == completableFuture) {
            unlockInternalLock(completableFuture2);
        } else {
            completableFuture.whenCompleteAsync((BiConsumer<? super LockWaiter, ? super Throwable>) new FutureEventListener<LockWaiter>() { // from class: org.apache.distributedlog.lock.ZKDistributedLock.6
                @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
                public void onSuccess(LockWaiter lockWaiter) {
                    ZKDistributedLock.this.closeWaiter(lockWaiter, completableFuture2);
                }

                @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
                public void onFailure(Throwable th) {
                    ZKDistributedLock.this.unlockInternalLock(completableFuture2);
                }
            }, (Executor) this.lockStateExecutor.chooseThread(this.lockPath));
            completableFuture.cancel(true);
        }
    }

    synchronized void unlockInternalLock(CompletableFuture<Void> completableFuture) {
        if (this.internalLock == null) {
            FutureUtils.complete(completableFuture, null);
        } else {
            this.internalLock.asyncUnlock().whenComplete((r4, th) -> {
                completableFuture.complete(null);
            });
        }
    }

    @Override // org.apache.distributedlog.io.AsyncCloseable
    public CompletableFuture<Void> asyncClose() {
        synchronized (this) {
            if (this.closed) {
                return this.closeFuture;
            }
            this.closed = true;
            final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            this.closeFuture = completableFuture;
            CompletableFuture completableFuture2 = new CompletableFuture();
            completableFuture2.whenCompleteAsync((BiConsumer) new FutureEventListener<Void>() { // from class: org.apache.distributedlog.lock.ZKDistributedLock.7
                @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
                public void onSuccess(Void r3) {
                    complete();
                }

                @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
                public void onFailure(Throwable th) {
                    complete();
                }

                private void complete() {
                    FutureUtils.complete(completableFuture, null);
                }
            }, (Executor) this.lockStateExecutor.chooseThread(this.lockPath));
            this.lockStateExecutor.submitOrdered(this.lockPath, () -> {
                closeWaiter(this.lockWaiter, completableFuture2);
            });
            return completableFuture;
        }
    }

    void internalReacquireLock(AtomicInteger atomicInteger, long j, CompletableFuture<ZKDistributedLock> completableFuture) {
        this.lockStateExecutor.submitOrdered(this.lockPath, () -> {
            doInternalReacquireLock(atomicInteger, j, completableFuture);
        });
    }

    void doInternalReacquireLock(final AtomicInteger atomicInteger, final long j, final CompletableFuture<ZKDistributedLock> completableFuture) {
        this.internalTryRetries.inc();
        CompletableFuture<ZKDistributedLock> completableFuture2 = new CompletableFuture<>();
        completableFuture2.whenComplete((BiConsumer<? super ZKDistributedLock, ? super Throwable>) new FutureEventListener<ZKDistributedLock>() { // from class: org.apache.distributedlog.lock.ZKDistributedLock.8
            @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
            public void onSuccess(ZKDistributedLock zKDistributedLock) {
                FutureUtils.complete(completableFuture, zKDistributedLock);
            }

            @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
            public void onFailure(Throwable th) {
                if (th instanceof OwnershipAcquireFailedException) {
                    FutureUtils.completeExceptionally(completableFuture, th);
                } else if (atomicInteger.getAndDecrement() <= 0 || ZKDistributedLock.this.closed) {
                    FutureUtils.completeExceptionally(completableFuture, th);
                } else {
                    ZKDistributedLock.this.internalReacquireLock(atomicInteger, j, completableFuture);
                }
            }
        });
        doAsyncAcquireWithSemaphore(completableFuture2, 0L);
    }

    private CompletableFuture<ZKDistributedLock> reacquireLock(boolean z) throws LockingException {
        final Stopwatch createStarted = Stopwatch.createStarted();
        synchronized (this) {
            if (this.closed) {
                throw newLockClosedException();
            }
            if (null != this.lockReacquireException) {
                if (z) {
                    throw this.lockReacquireException;
                }
                return null;
            }
            if (null != this.lockReacquireFuture) {
                return this.lockReacquireFuture;
            }
            LOG.info("reacquiring lock at {}", this.lockPath);
            CompletableFuture<ZKDistributedLock> completableFuture = new CompletableFuture<>();
            this.lockReacquireFuture = completableFuture;
            this.lockReacquireFuture.whenComplete((BiConsumer<? super ZKDistributedLock, ? super Throwable>) new FutureEventListener<ZKDistributedLock>() { // from class: org.apache.distributedlog.lock.ZKDistributedLock.9
                @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
                public void onSuccess(ZKDistributedLock zKDistributedLock) {
                    synchronized (ZKDistributedLock.this) {
                        ZKDistributedLock.this.lockReacquireFuture = null;
                    }
                    ZKDistributedLock.this.reacquireStats.registerSuccessfulEvent(createStarted.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                }

                @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
                public void onFailure(Throwable th) {
                    synchronized (ZKDistributedLock.this) {
                        if (th instanceof LockingException) {
                            ZKDistributedLock.this.lockReacquireException = (LockingException) th;
                        } else {
                            ZKDistributedLock.this.lockReacquireException = new LockingException(ZKDistributedLock.this.lockPath, "Exception on re-acquiring lock", th);
                        }
                    }
                    ZKDistributedLock.this.reacquireStats.registerFailedEvent(createStarted.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                }
            });
            this.reacquireCount.incrementAndGet();
            internalReacquireLock(new AtomicInteger(DistributedLogConfiguration.BKDL_READER_IDLE_ERROR_THRESHOLD_MILLIS_DEFAULT), 0L, completableFuture);
            return completableFuture;
        }
    }
}
