package org.apache.distributedlog.bk;

import dlshade.com.google.common.collect.Lists;
import dlshade.org.apache.bookkeeper.client.LedgerHandle;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import dlshade.org.apache.bookkeeper.versioning.LongVersion;
import dlshade.org.apache.bookkeeper.versioning.Version;
import dlshade.org.apache.bookkeeper.versioning.Versioned;
import dlshade.org.apache.zookeeper.CreateMode;
import dlshade.org.apache.zookeeper.KeeperException;
import dlshade.org.apache.zookeeper.Op;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.DistributedLogConstants;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.util.DLUtils;
import org.apache.distributedlog.util.Transaction;
import org.apache.distributedlog.util.Utils;
import org.apache.distributedlog.zk.ZKTransaction;
import org.apache.distributedlog.zk.ZKVersionedSetOp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/bk/SimpleLedgerAllocator.class */
public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListener<LedgerHandle>, Transaction.OpListener<Version> {
    static final Logger LOG = LoggerFactory.getLogger(SimpleLedgerAllocator.class);
    final ZooKeeperClient zkc;
    final BookKeeperClient bkc;
    final String allocatePath;
    Phase phase;
    LongVersion version;
    CompletableFuture<LedgerHandle> allocatePromise;
    Transaction<Object> tryObtainTxn;
    Transaction.OpListener<LedgerHandle> tryObtainListener;
    Long ledgerIdLeftFromPrevAllocation;
    LedgerHandle allocatedLh;
    LedgerMetadata ledgerMetadata;
    CompletableFuture<Void> closeFuture;
    final LinkedList<CompletableFuture<Void>> ledgerDeletions;
    private final QuorumConfigProvider quorumConfigProvider;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/distributedlog/bk/SimpleLedgerAllocator$AllocationException.class */
    public static class AllocationException extends IOException {
        private static final long serialVersionUID = -1111397872059426882L;
        private final Phase phase;

        public AllocationException(Phase phase, String str) {
            super(str);
            this.phase = phase;
        }

        public Phase getPhase() {
            return this.phase;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/distributedlog/bk/SimpleLedgerAllocator$ConcurrentObtainException.class */
    public static class ConcurrentObtainException extends AllocationException {
        private static final long serialVersionUID = -8532471098537176913L;

        public ConcurrentObtainException(Phase phase, String str) {
            super(phase, str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/distributedlog/bk/SimpleLedgerAllocator$Phase.class */
    public enum Phase {
        ALLOCATING,
        ALLOCATED,
        HANDING_OVER,
        HANDED_OVER,
        ERROR
    }

    static CompletableFuture<Versioned<byte[]>> getAndCreateAllocationData(final String str, final ZooKeeperClient zooKeeperClient) {
        return Utils.zkGetData(zooKeeperClient, str, false).thenCompose((Function<? super Versioned<byte[]>, ? extends CompletionStage<U>>) new Function<Versioned<byte[]>, CompletionStage<Versioned<byte[]>>>() { // from class: org.apache.distributedlog.bk.SimpleLedgerAllocator.1
            @Override // java.util.function.Function
            public CompletableFuture<Versioned<byte[]>> apply(Versioned<byte[]> versioned) {
                return (null == versioned || null == versioned.getVersion() || null == versioned.getValue()) ? SimpleLedgerAllocator.createAllocationData(str, zooKeeperClient) : FutureUtils.value(versioned);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static CompletableFuture<Versioned<byte[]>> createAllocationData(String str, ZooKeeperClient zooKeeperClient) {
        try {
            CompletableFuture<Versioned<byte[]>> completableFuture = new CompletableFuture<>();
            zooKeeperClient.get().create(str, DistributedLogConstants.EMPTY_BYTES, zooKeeperClient.getDefaultACL(), CreateMode.PERSISTENT, (i, str2, obj, str3) -> {
                if (KeeperException.Code.OK.intValue() == i) {
                    completableFuture.complete(new Versioned(DistributedLogConstants.EMPTY_BYTES, new LongVersion(0L)));
                } else if (KeeperException.Code.NODEEXISTS.intValue() == i) {
                    FutureUtils.proxyTo(Utils.zkGetData(zooKeeperClient, str, false), completableFuture);
                } else {
                    completableFuture.completeExceptionally(Utils.zkException(KeeperException.create(KeeperException.Code.get(i)), str));
                }
            }, (Object) null);
            return completableFuture;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return FutureUtils.exception(Utils.zkException(e, str));
        } catch (ZooKeeperClient.ZooKeeperConnectionException e2) {
            return FutureUtils.exception(Utils.zkException(e2, str));
        }
    }

    public static CompletableFuture<SimpleLedgerAllocator> of(String str, Versioned<byte[]> versioned, QuorumConfigProvider quorumConfigProvider, ZooKeeperClient zooKeeperClient, BookKeeperClient bookKeeperClient) {
        return of(str, versioned, quorumConfigProvider, zooKeeperClient, bookKeeperClient, null);
    }

    public static CompletableFuture<SimpleLedgerAllocator> of(String str, Versioned<byte[]> versioned, QuorumConfigProvider quorumConfigProvider, ZooKeeperClient zooKeeperClient, BookKeeperClient bookKeeperClient, LedgerMetadata ledgerMetadata) {
        return (null == versioned || null == versioned.getValue() || null == versioned.getVersion()) ? getAndCreateAllocationData(str, zooKeeperClient).thenApply(versioned2 -> {
            return new SimpleLedgerAllocator(str, versioned2, quorumConfigProvider, zooKeeperClient, bookKeeperClient, ledgerMetadata);
        }) : FutureUtils.value(new SimpleLedgerAllocator(str, versioned, quorumConfigProvider, zooKeeperClient, bookKeeperClient, ledgerMetadata));
    }

    public SimpleLedgerAllocator(String str, Versioned<byte[]> versioned, QuorumConfigProvider quorumConfigProvider, ZooKeeperClient zooKeeperClient, BookKeeperClient bookKeeperClient) {
        this(str, versioned, quorumConfigProvider, zooKeeperClient, bookKeeperClient, null);
    }

    public SimpleLedgerAllocator(String str, Versioned<byte[]> versioned, QuorumConfigProvider quorumConfigProvider, ZooKeeperClient zooKeeperClient, BookKeeperClient bookKeeperClient, LedgerMetadata ledgerMetadata) {
        this.phase = Phase.HANDED_OVER;
        this.version = new LongVersion(-1L);
        this.tryObtainTxn = null;
        this.tryObtainListener = null;
        this.ledgerIdLeftFromPrevAllocation = null;
        this.allocatedLh = null;
        this.closeFuture = null;
        this.ledgerDeletions = new LinkedList<>();
        this.zkc = zooKeeperClient;
        this.bkc = bookKeeperClient;
        this.allocatePath = str;
        this.quorumConfigProvider = quorumConfigProvider;
        this.ledgerMetadata = ledgerMetadata;
        initialize(versioned);
    }

    private void initialize(Versioned<byte[]> versioned) {
        setVersion((LongVersion) versioned.getVersion());
        byte[] value = versioned.getValue();
        if (null == value || value.length <= 0) {
            return;
        }
        try {
            this.ledgerIdLeftFromPrevAllocation = Long.valueOf(DLUtils.bytes2LogSegmentId(value));
        } catch (NumberFormatException e) {
            LOG.warn("Invalid data found in allocator path {} : ", this.allocatePath, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void deleteLedgerLeftFromPreviousAllocationIfNecessary() {
        if (null != this.ledgerIdLeftFromPrevAllocation) {
            LOG.info("Deleting allocated-but-unused ledger left from previous allocation {}.", this.ledgerIdLeftFromPrevAllocation);
            deleteLedger(this.ledgerIdLeftFromPrevAllocation.longValue());
            this.ledgerIdLeftFromPrevAllocation = null;
        }
    }

    @Override // org.apache.distributedlog.util.Allocator
    public synchronized void allocate() throws IOException {
        if (Phase.ERROR == this.phase) {
            throw new AllocationException(Phase.ERROR, "Error on ledger allocator for " + this.allocatePath);
        }
        if (Phase.HANDED_OVER == this.phase) {
            allocateLedger(this.ledgerMetadata);
        }
    }

    @Override // org.apache.distributedlog.util.Allocator
    public synchronized CompletableFuture<LedgerHandle> tryObtain(Transaction<Object> transaction, Transaction.OpListener<LedgerHandle> opListener) {
        if (Phase.ERROR == this.phase) {
            return FutureUtils.exception(new AllocationException(Phase.ERROR, "Error on allocating ledger under " + this.allocatePath));
        }
        if (Phase.HANDING_OVER == this.phase || Phase.HANDED_OVER == this.phase || null != this.tryObtainTxn) {
            return FutureUtils.exception(new ConcurrentObtainException(this.phase, "Ledger handle is handling over to another thread : " + this.phase));
        }
        this.tryObtainTxn = transaction;
        this.tryObtainListener = opListener;
        if (null != this.allocatedLh) {
            completeAllocation(this.allocatedLh);
        }
        return this.allocatePromise;
    }

    @Override // org.apache.distributedlog.util.Transaction.OpListener
    public void onCommit(Version version) {
        confirmObtain((LongVersion) version);
    }

    private void confirmObtain(LongVersion longVersion) {
        boolean z = false;
        Transaction.OpListener<LedgerHandle> opListener = null;
        LedgerHandle ledgerHandle = null;
        synchronized (this) {
            if (Phase.HANDING_OVER == this.phase) {
                setPhase(Phase.HANDED_OVER);
                setVersion(longVersion);
                opListener = this.tryObtainListener;
                ledgerHandle = this.allocatedLh;
                this.allocatedLh = null;
                this.allocatePromise = null;
                this.tryObtainTxn = null;
                this.tryObtainListener = null;
                z = true;
            }
        }
        if (null != opListener && null != ledgerHandle) {
            opListener.onCommit(ledgerHandle);
        }
        if (z) {
            allocateLedger();
        }
    }

    @Override // org.apache.distributedlog.util.Transaction.OpListener
    public void onAbort(Throwable th) {
        Transaction.OpListener<LedgerHandle> opListener;
        synchronized (this) {
            opListener = this.tryObtainListener;
            if ((th instanceof KeeperException) && ((KeeperException) th).code() == KeeperException.Code.BADVERSION) {
                LOG.info("Set ledger allocator {} to ERROR state after hit bad version : version = {}", this.allocatePath, getVersion());
                setPhase(Phase.ERROR);
            } else if (Phase.HANDING_OVER == this.phase) {
                setPhase(Phase.ALLOCATED);
                this.tryObtainTxn = null;
                this.tryObtainListener = null;
            }
        }
        if (null != opListener) {
            opListener.onAbort(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void setPhase(Phase phase) {
        this.phase = phase;
        LOG.info("Ledger allocator {} moved to phase {} : version = {}.", new Object[]{this.allocatePath, phase, this.version});
    }

    private synchronized void allocateLedger() {
        allocateLedger(null);
    }

    private synchronized void allocateLedger(LedgerMetadata ledgerMetadata) {
        if (Phase.HANDED_OVER != this.phase) {
            LOG.error("Trying allocate ledger for {} in phase {}, giving up.", this.allocatePath, this.phase);
            return;
        }
        setPhase(Phase.ALLOCATING);
        this.allocatePromise = new CompletableFuture<>();
        QuorumConfig quorumConfig = this.quorumConfigProvider.getQuorumConfig();
        this.bkc.createLedger(quorumConfig.getEnsembleSize(), quorumConfig.getWriteQuorumSize(), quorumConfig.getAckQuorumSize(), ledgerMetadata).whenComplete((BiConsumer<? super LedgerHandle, ? super Throwable>) this);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void completeAllocation(LedgerHandle ledgerHandle) {
        this.allocatedLh = ledgerHandle;
        if (null == this.tryObtainTxn) {
            return;
        }
        this.tryObtainTxn.addOp(new ZKVersionedSetOp(Op.setData(this.allocatePath, DistributedLogConstants.EMPTY_BYTES, (int) this.version.getLongVersion()), this));
        setPhase(Phase.HANDING_OVER);
        this.allocatePromise.complete(ledgerHandle);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void failAllocation(Throwable th) {
        this.allocatePromise.completeExceptionally(th);
    }

    @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
    public void onSuccess(LedgerHandle ledgerHandle) {
        markAsAllocated(ledgerHandle);
    }

    @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
    public void onFailure(Throwable th) {
        LOG.error("Error creating ledger for allocating {} : ", this.allocatePath, th);
        setPhase(Phase.ERROR);
        failAllocation(th);
    }

    private synchronized LongVersion getVersion() {
        return this.version;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void setVersion(LongVersion longVersion) {
        if (longVersion.compare(this.version) != Version.Occurred.AFTER) {
            LOG.warn("Ledger allocator for {} received an old version {}, current version is {}.", new Object[]{this.allocatePath, longVersion, this.version});
        } else {
            LOG.info("Ledger allocator for {} moved version from {} to {}.", new Object[]{this.allocatePath, this.version, longVersion});
            this.version = longVersion;
        }
    }

    private void markAsAllocated(final LedgerHandle ledgerHandle) {
        Utils.zkSetData(this.zkc, this.allocatePath, DLUtils.logSegmentId2Bytes(ledgerHandle.getId()), getVersion()).whenComplete((BiConsumer<? super LongVersion, ? super Throwable>) new FutureEventListener<LongVersion>() { // from class: org.apache.distributedlog.bk.SimpleLedgerAllocator.2
            @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
            public void onSuccess(LongVersion longVersion) {
                SimpleLedgerAllocator.this.deleteLedgerLeftFromPreviousAllocationIfNecessary();
                SimpleLedgerAllocator.this.setVersion(longVersion);
                SimpleLedgerAllocator.this.setPhase(Phase.ALLOCATED);
                SimpleLedgerAllocator.this.completeAllocation(ledgerHandle);
            }

            @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
            public void onFailure(Throwable th) {
                SimpleLedgerAllocator.this.setPhase(Phase.ERROR);
                SimpleLedgerAllocator.this.deleteLedger(ledgerHandle.getId());
                SimpleLedgerAllocator.LOG.error("Fail mark ledger {} as allocated under {} : ", new Object[]{Long.valueOf(ledgerHandle.getId()), SimpleLedgerAllocator.this.allocatePath, th});
                SimpleLedgerAllocator.this.failAllocation(th);
            }
        });
    }

    void deleteLedger(long j) {
        CompletableFuture<Void> deleteLedger = this.bkc.deleteLedger(j, true);
        synchronized (this.ledgerDeletions) {
            this.ledgerDeletions.add(deleteLedger);
        }
        deleteLedger.whenComplete((r12, th) -> {
            if (null != th) {
                LOG.error("Error deleting ledger {} for ledger allocator {}, retrying : ", new Object[]{Long.valueOf(j), this.allocatePath, th});
                if (!isClosing()) {
                    deleteLedger(j);
                }
            }
            synchronized (this.ledgerDeletions) {
                this.ledgerDeletions.remove(deleteLedger);
            }
        });
    }

    private synchronized boolean isClosing() {
        return this.closeFuture != null;
    }

    private CompletableFuture<Void> closeInternal(boolean z) {
        synchronized (this) {
            if (null != this.closeFuture) {
                return this.closeFuture;
            }
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            this.closeFuture = completableFuture;
            if (z) {
                cleanupAndClose(completableFuture);
                return completableFuture;
            }
            LOG.info("Abort ledger allocator without cleaning up on {}.", this.allocatePath);
            FutureUtils.complete(completableFuture, null);
            return completableFuture;
        }
    }

    private void cleanupAndClose(final CompletableFuture<Void> completableFuture) {
        LOG.info("Closing ledger allocator on {}.", this.allocatePath);
        final ZKTransaction zKTransaction = new ZKTransaction(this.zkc);
        tryObtain(zKTransaction, new Transaction.OpListener<LedgerHandle>() { // from class: org.apache.distributedlog.bk.SimpleLedgerAllocator.4
            @Override // org.apache.distributedlog.util.Transaction.OpListener
            public void onCommit(LedgerHandle ledgerHandle) {
                complete();
            }

            @Override // org.apache.distributedlog.util.Transaction.OpListener
            public void onAbort(Throwable th) {
                complete();
            }

            private void complete() {
                completableFuture.complete(null);
                SimpleLedgerAllocator.LOG.info("Closed ledger allocator on {}.", SimpleLedgerAllocator.this.allocatePath);
            }
        }).whenComplete(new FutureEventListener<LedgerHandle>() { // from class: org.apache.distributedlog.bk.SimpleLedgerAllocator.3
            @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
            public void onSuccess(LedgerHandle ledgerHandle) {
                ArrayList newArrayList;
                SimpleLedgerAllocator.this.deleteLedger(ledgerHandle.getId());
                synchronized (SimpleLedgerAllocator.this.ledgerDeletions) {
                    newArrayList = Lists.newArrayList(SimpleLedgerAllocator.this.ledgerDeletions);
                }
                FutureUtils.collect(newArrayList).whenComplete((BiConsumer) new FutureEventListener<List<Void>>() { // from class: org.apache.distributedlog.bk.SimpleLedgerAllocator.3.1
                    @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
                    public void onSuccess(List<Void> list) {
                        zKTransaction.execute();
                    }

                    @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
                    public void onFailure(Throwable th) {
                        SimpleLedgerAllocator.LOG.debug("Fail to obtain the allocated ledger handle when closing the allocator : ", th);
                        FutureUtils.complete(completableFuture, null);
                    }
                });
            }

            @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
            public void onFailure(Throwable th) {
                SimpleLedgerAllocator.LOG.debug("Fail to obtain the allocated ledger handle when closing the allocator : ", th);
                FutureUtils.complete(completableFuture, null);
            }
        });
    }

    @Override // org.apache.distributedlog.bk.LedgerAllocator
    public void start() {
    }

    @Override // org.apache.distributedlog.io.AsyncCloseable
    public CompletableFuture<Void> asyncClose() {
        return closeInternal(false);
    }

    @Override // org.apache.distributedlog.io.AsyncDeleteable
    public CompletableFuture<Void> delete() {
        return closeInternal(true).thenCompose(r5 -> {
            return Utils.zkDelete(this.zkc, this.allocatePath, getVersion());
        });
    }
}
