package org.apache.distributedlog.impl.metadata;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.DistributedLogConstants;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.common.util.PermitManager;
import org.apache.distributedlog.common.util.SchedulerUtils;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.exceptions.InvalidStreamNameException;
import org.apache.distributedlog.exceptions.LockCancelledException;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.exceptions.LogExistsException;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.impl.ZKLogSegmentMetadataStore;
import org.apache.distributedlog.lock.DistributedLock;
import org.apache.distributedlog.lock.SessionLockFactory;
import org.apache.distributedlog.lock.ZKDistributedLock;
import org.apache.distributedlog.lock.ZKSessionLockFactory;
import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
import org.apache.distributedlog.metadata.LogMetadata;
import org.apache.distributedlog.metadata.LogMetadataForReader;
import org.apache.distributedlog.metadata.LogMetadataForWriter;
import org.apache.distributedlog.metadata.LogStreamMetadataStore;
import org.apache.distributedlog.util.DLUtils;
import org.apache.distributedlog.util.Transaction;
import org.apache.distributedlog.util.Utils;
import org.apache.distributedlog.zk.LimitedPermitManager;
import org.apache.distributedlog.zk.ZKTransaction;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.class */
public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
    private static final Logger LOG;
    private final String clientId;
    private final DistributedLogConfiguration conf;
    private final ZooKeeperClient zooKeeperClient;
    private final OrderedScheduler scheduler;
    private final StatsLogger statsLogger;
    private final LogSegmentMetadataStore logSegmentStore;
    private final LimitedPermitManager permitManager;
    private SessionLockFactory lockFactory;
    private OrderedScheduler lockStateExecutor;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore$MetadataIndex.class */
    static class MetadataIndex {
        static final int LOG_ROOT_PARENT = 0;
        static final int LOG_ROOT = 1;
        static final int MAX_TXID = 2;
        static final int VERSION = 3;
        static final int LOCK = 4;
        static final int READ_LOCK = 5;
        static final int LOGSEGMENTS = 6;
        static final int ALLOCATION = 7;

        MetadataIndex() {
        }
    }

    public ZKLogStreamMetadataStore(String str, DistributedLogConfiguration distributedLogConfiguration, ZooKeeperClient zooKeeperClient, OrderedScheduler orderedScheduler, StatsLogger statsLogger) {
        this.clientId = str;
        this.conf = distributedLogConfiguration;
        this.zooKeeperClient = zooKeeperClient;
        this.scheduler = orderedScheduler;
        this.statsLogger = statsLogger;
        this.logSegmentStore = new ZKLogSegmentMetadataStore(distributedLogConfiguration, this.zooKeeperClient, orderedScheduler);
        this.permitManager = new LimitedPermitManager(distributedLogConfiguration.getLogSegmentRollingConcurrency(), 1, TimeUnit.MINUTES, orderedScheduler);
        this.zooKeeperClient.register(this.permitManager);
    }

    private synchronized OrderedScheduler getLockStateExecutor(boolean z) {
        if (z && null == this.lockStateExecutor) {
            this.lockStateExecutor = OrderedScheduler.newSchedulerBuilder().name("DLM-LockState").numThreads(this.conf.getNumLockStateThreads()).build();
        }
        return this.lockStateExecutor;
    }

    private synchronized SessionLockFactory getLockFactory(boolean z) {
        if (z && null == this.lockFactory) {
            this.lockFactory = new ZKSessionLockFactory(this.zooKeeperClient, this.clientId, getLockStateExecutor(z), this.conf.getZKNumRetries(), this.conf.getLockTimeoutMilliSeconds(), this.conf.getZKRetryBackoffStartMillis(), this.statsLogger);
        }
        return this.lockFactory;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.zooKeeperClient.unregister(this.permitManager);
        this.permitManager.close();
        this.logSegmentStore.close();
        SchedulerUtils.shutdownScheduler(getLockStateExecutor(false), this.conf.getSchedulerShutdownTimeoutMs(), TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.distributedlog.metadata.LogStreamMetadataStore
    public LogSegmentMetadataStore getLogSegmentMetadataStore() {
        return this.logSegmentStore;
    }

    @Override // org.apache.distributedlog.metadata.LogStreamMetadataStore
    public PermitManager getPermitManager() {
        return this.permitManager;
    }

    @Override // org.apache.distributedlog.metadata.LogStreamMetadataStore
    public Transaction<Object> newTransaction() {
        return new ZKTransaction(this.zooKeeperClient);
    }

    @Override // org.apache.distributedlog.metadata.LogStreamMetadataStore
    public CompletableFuture<Void> logExists(URI uri, final String str) {
        final String logSegmentsPath = LogMetadata.getLogSegmentsPath(uri, str, this.conf.getUnpartitionedStreamName());
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        try {
            final ZooKeeper zooKeeper = this.zooKeeperClient.get();
            zooKeeper.sync(logSegmentsPath, new AsyncCallback.VoidCallback() { // from class: org.apache.distributedlog.impl.metadata.ZKLogStreamMetadataStore.1
                public void processResult(int i, String str2, Object obj) {
                    if (KeeperException.Code.NONODE.intValue() == i) {
                        completableFuture.completeExceptionally(new LogNotFoundException(String.format("Log %s does not exist or has been deleted", str)));
                    } else if (KeeperException.Code.OK.intValue() != i) {
                        completableFuture.completeExceptionally(new ZKException("Error on checking log existence for " + str, KeeperException.create(KeeperException.Code.get(i))));
                    } else {
                        zooKeeper.exists(logSegmentsPath, false, new AsyncCallback.StatCallback() { // from class: org.apache.distributedlog.impl.metadata.ZKLogStreamMetadataStore.1.1
                            public void processResult(int i2, String str3, Object obj2, Stat stat) {
                                if (KeeperException.Code.OK.intValue() == i2) {
                                    completableFuture.complete(null);
                                } else if (KeeperException.Code.NONODE.intValue() == i2) {
                                    completableFuture.completeExceptionally(new LogNotFoundException(String.format("Log %s does not exist or has been deleted", str)));
                                } else {
                                    completableFuture.completeExceptionally(new ZKException("Error on checking log existence for " + str, KeeperException.create(KeeperException.Code.get(i2))));
                                }
                            }
                        }, (Object) null);
                    }
                }
            }, (Object) null);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.error("Interrupted while reading {}", logSegmentsPath, e);
            completableFuture.completeExceptionally(new DLInterruptedException("Interrupted while checking " + logSegmentsPath, e));
        } catch (ZooKeeperClient.ZooKeeperConnectionException e2) {
            completableFuture.completeExceptionally(e2);
        }
        return completableFuture;
    }

    @Override // org.apache.distributedlog.metadata.LogStreamMetadataStore
    public DistributedLock createWriteLock(LogMetadataForWriter logMetadataForWriter) {
        return new ZKDistributedLock(getLockStateExecutor(true), getLockFactory(true), logMetadataForWriter.getLockPath(), this.conf.getLockTimeoutMilliSeconds(), this.statsLogger);
    }

    private CompletableFuture<Void> ensureReadLockPathExist(final LogMetadata logMetadata, String str) {
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        completableFuture.whenComplete((r9, th) -> {
            if (th instanceof CancellationException) {
                FutureUtils.completeExceptionally(completableFuture, new LockCancelledException(str, "Could not ensure read lock path", th));
            }
        });
        Utils.zkAsyncCreateFullPathOptimisticRecursive(this.zooKeeperClient, str, Optional.of(logMetadata.getLogRootPath()), new byte[0], this.zooKeeperClient.getDefaultACL(), CreateMode.PERSISTENT, new AsyncCallback.StringCallback() { // from class: org.apache.distributedlog.impl.metadata.ZKLogStreamMetadataStore.2
            public void processResult(int i, String str2, Object obj, String str3) {
                if (KeeperException.Code.NONODE.intValue() == i) {
                    FutureUtils.completeExceptionally(completableFuture, new LogNotFoundException(String.format("Log %s does not exist or has been deleted", logMetadata.getFullyQualifiedName())));
                    return;
                }
                if (KeeperException.Code.OK.intValue() == i) {
                    FutureUtils.complete(completableFuture, (Object) null);
                    ZKLogStreamMetadataStore.LOG.trace("Created path {}.", str2);
                    return;
                }
                if (KeeperException.Code.NODEEXISTS.intValue() == i) {
                    FutureUtils.complete(completableFuture, (Object) null);
                    ZKLogStreamMetadataStore.LOG.trace("Path {} is already existed.", str2);
                } else if (-2147483646 == i) {
                    FutureUtils.completeExceptionally(completableFuture, new ZooKeeperClient.ZooKeeperConnectionException(str2));
                } else if (-2147483647 == i) {
                    FutureUtils.completeExceptionally(completableFuture, new DLInterruptedException(str2));
                } else {
                    FutureUtils.completeExceptionally(completableFuture, KeeperException.create(KeeperException.Code.get(i)));
                }
            }
        }, null);
        return completableFuture;
    }

    @Override // org.apache.distributedlog.metadata.LogStreamMetadataStore
    public CompletableFuture<DistributedLock> createReadLock(LogMetadataForReader logMetadataForReader, Optional<String> optional) {
        String readLockPath = logMetadataForReader.getReadLockPath(optional);
        return ensureReadLockPathExist(logMetadataForReader, readLockPath).thenApplyAsync(r12 -> {
            return new ZKDistributedLock(getLockStateExecutor(true), getLockFactory(true), readLockPath, this.conf.getLockTimeoutMilliSeconds(), this.statsLogger.scope("read_lock"));
        }, (Executor) this.scheduler.chooseThread(readLockPath));
    }

    static int bytesToInt(byte[] bArr) {
        if ($assertionsDisabled || bArr.length >= 4) {
            return (bArr[0] << 24) | (bArr[1] << 16) | (bArr[2] << 8) | bArr[3];
        }
        throw new AssertionError();
    }

    static byte[] intToBytes(int i) {
        return new byte[]{(byte) (i >> 24), (byte) (i >> 16), (byte) (i >> 8), (byte) i};
    }

    static CompletableFuture<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zooKeeper, String str, boolean z) {
        String parent = Utils.getParent(str);
        String str2 = str + LogMetadata.LOGSEGMENTS_PATH;
        String str3 = str + LogMetadata.MAX_TXID_PATH;
        String str4 = str + LogMetadata.LOCK_PATH;
        String str5 = str + LogMetadata.READ_LOCK_PATH;
        String str6 = str + LogMetadata.VERSION_PATH;
        String str7 = str + LogMetadata.ALLOCATION_PATH;
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(z ? 8 : 7);
        newArrayListWithExpectedSize.add(Utils.zkGetData(zooKeeper, parent, false));
        newArrayListWithExpectedSize.add(Utils.zkGetData(zooKeeper, str, false));
        newArrayListWithExpectedSize.add(Utils.zkGetData(zooKeeper, str3, false));
        newArrayListWithExpectedSize.add(Utils.zkGetData(zooKeeper, str6, false));
        newArrayListWithExpectedSize.add(Utils.zkGetData(zooKeeper, str4, false));
        newArrayListWithExpectedSize.add(Utils.zkGetData(zooKeeper, str5, false));
        newArrayListWithExpectedSize.add(Utils.zkGetData(zooKeeper, str2, false));
        if (z) {
            newArrayListWithExpectedSize.add(Utils.zkGetData(zooKeeper, str7, false));
        }
        return FutureUtils.collect(newArrayListWithExpectedSize);
    }

    static boolean pathExists(Versioned<byte[]> versioned) {
        return (null == versioned.getValue() || null == versioned.getVersion()) ? false : true;
    }

    static void ensureMetadataExist(Versioned<byte[]> versioned) {
        Preconditions.checkNotNull((byte[]) versioned.getValue());
        Preconditions.checkNotNull(versioned.getVersion());
    }

    static void createMissingMetadata(final ZooKeeper zooKeeper, String str, final String str2, final List<Versioned<byte[]>> list, final List<ACL> list2, boolean z, boolean z2, final CompletableFuture<List<Versioned<byte[]>>> completableFuture) {
        final ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(list.size());
        final ArrayList newArrayListWithExpectedSize2 = Lists.newArrayListWithExpectedSize(list.size());
        final CreateMode createMode = CreateMode.PERSISTENT;
        String parent = Utils.getParent(str2);
        if (pathExists(list.get(0))) {
            newArrayListWithExpectedSize.add(null);
        } else {
            newArrayListWithExpectedSize.add(DistributedLogConstants.EMPTY_BYTES);
            newArrayListWithExpectedSize2.add(Op.create(parent, DistributedLogConstants.EMPTY_BYTES, list2, createMode));
        }
        if (pathExists(list.get(1))) {
            newArrayListWithExpectedSize.add(null);
        } else {
            newArrayListWithExpectedSize.add(DistributedLogConstants.EMPTY_BYTES);
            newArrayListWithExpectedSize2.add(Op.create(str2, DistributedLogConstants.EMPTY_BYTES, list2, createMode));
        }
        if (pathExists(list.get(2))) {
            newArrayListWithExpectedSize.add(null);
        } else {
            byte[] serializeTransactionId = DLUtils.serializeTransactionId(0L);
            newArrayListWithExpectedSize.add(serializeTransactionId);
            newArrayListWithExpectedSize2.add(Op.create(str2 + LogMetadata.MAX_TXID_PATH, serializeTransactionId, list2, createMode));
        }
        if (pathExists(list.get(3))) {
            newArrayListWithExpectedSize.add(null);
        } else {
            byte[] intToBytes = intToBytes(-1);
            newArrayListWithExpectedSize.add(intToBytes);
            newArrayListWithExpectedSize2.add(Op.create(str2 + LogMetadata.VERSION_PATH, intToBytes, list2, createMode));
        }
        if (pathExists(list.get(4))) {
            newArrayListWithExpectedSize.add(null);
        } else {
            newArrayListWithExpectedSize.add(DistributedLogConstants.EMPTY_BYTES);
            newArrayListWithExpectedSize2.add(Op.create(str2 + LogMetadata.LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, list2, createMode));
        }
        if (pathExists(list.get(5))) {
            newArrayListWithExpectedSize.add(null);
        } else {
            newArrayListWithExpectedSize.add(DistributedLogConstants.EMPTY_BYTES);
            newArrayListWithExpectedSize2.add(Op.create(str2 + LogMetadata.READ_LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, list2, createMode));
        }
        if (pathExists(list.get(6))) {
            newArrayListWithExpectedSize.add(null);
        } else {
            byte[] serializeLogSegmentSequenceNumber = DLUtils.serializeLogSegmentSequenceNumber(0L);
            newArrayListWithExpectedSize.add(serializeLogSegmentSequenceNumber);
            newArrayListWithExpectedSize2.add(Op.create(str2 + LogMetadata.LOGSEGMENTS_PATH, serializeLogSegmentSequenceNumber, list2, createMode));
        }
        if (z) {
            if (pathExists(list.get(7))) {
                newArrayListWithExpectedSize.add(null);
            } else {
                newArrayListWithExpectedSize.add(DistributedLogConstants.EMPTY_BYTES);
                newArrayListWithExpectedSize2.add(Op.create(str2 + LogMetadata.ALLOCATION_PATH, DistributedLogConstants.EMPTY_BYTES, list2, createMode));
            }
        }
        if (newArrayListWithExpectedSize2.isEmpty()) {
            completableFuture.complete(list);
        } else if (z2) {
            getMissingPaths(zooKeeper, str, Utils.getParent(parent)).whenComplete((BiConsumer<? super List<String>, ? super Throwable>) new FutureEventListener<List<String>>() { // from class: org.apache.distributedlog.impl.metadata.ZKLogStreamMetadataStore.3
                public void onSuccess(List<String> list3) {
                    for (String str3 : list3) {
                        newArrayListWithExpectedSize.add(DistributedLogConstants.EMPTY_BYTES);
                        newArrayListWithExpectedSize2.add(0, Op.create(str3, DistributedLogConstants.EMPTY_BYTES, list2, createMode));
                    }
                    ZKLogStreamMetadataStore.executeCreateMissingPathTxn(zooKeeper, newArrayListWithExpectedSize2, newArrayListWithExpectedSize, list, str2, completableFuture);
                }

                public void onFailure(Throwable th) {
                    completableFuture.completeExceptionally(th);
                }
            });
        } else {
            completableFuture.completeExceptionally(new LogNotFoundException("Log " + str2 + " not found"));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void executeCreateMissingPathTxn(ZooKeeper zooKeeper, List<Op> list, final List<byte[]> list2, final List<Versioned<byte[]>> list3, final String str, final CompletableFuture<List<Versioned<byte[]>>> completableFuture) {
        zooKeeper.multi(list, new AsyncCallback.MultiCallback() { // from class: org.apache.distributedlog.impl.metadata.ZKLogStreamMetadataStore.4
            public void processResult(int i, String str2, Object obj, List<OpResult> list4) {
                if (KeeperException.Code.OK.intValue() == i) {
                    ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(list3.size());
                    for (int i2 = 0; i2 < list2.size(); i2++) {
                        byte[] bArr = (byte[]) list2.get(i2);
                        if (null == bArr) {
                            newArrayListWithExpectedSize.add((Versioned) list3.get(i2));
                        } else {
                            newArrayListWithExpectedSize.add(new Versioned(bArr, new LongVersion(0L)));
                        }
                    }
                    completableFuture.complete(newArrayListWithExpectedSize);
                    return;
                }
                if (KeeperException.Code.NODEEXISTS.intValue() == i) {
                    completableFuture.completeExceptionally(new LogExistsException("Someone just created log " + str));
                    return;
                }
                if (ZKLogStreamMetadataStore.LOG.isDebugEnabled()) {
                    StringBuilder sb = new StringBuilder();
                    Iterator<OpResult> it = list4.iterator();
                    while (it.hasNext()) {
                        OpResult.ErrorResult errorResult = (OpResult) it.next();
                        if (errorResult instanceof OpResult.ErrorResult) {
                            sb.append(errorResult.getErr()).append(",");
                        } else {
                            sb.append(0).append(",");
                        }
                    }
                    ZKLogStreamMetadataStore.LOG.debug("Failed to create log, full rc list = {}", sb.substring(0, sb.length() - 1));
                }
                completableFuture.completeExceptionally(new ZKException("Failed to create log " + str, KeeperException.Code.get(i)));
            }
        }, (Object) null);
    }

    static LogMetadataForWriter processLogMetadatas(URI uri, String str, String str2, List<Versioned<byte[]>> list, boolean z) throws UnexpectedException {
        Versioned<byte[]> versioned;
        try {
            Versioned<byte[]> versioned2 = list.get(2);
            ensureMetadataExist(versioned2);
            Versioned<byte[]> versioned3 = list.get(3);
            ensureMetadataExist(versioned2);
            Preconditions.checkArgument(-1 == bytesToInt((byte[]) versioned3.getValue()));
            ensureMetadataExist(list.get(4));
            ensureMetadataExist(list.get(5));
            Versioned<byte[]> versioned4 = list.get(6);
            ensureMetadataExist(versioned4);
            try {
                DLUtils.deserializeLogSegmentSequenceNumber((byte[]) versioned4.getValue());
                if (z) {
                    versioned = list.get(7);
                    ensureMetadataExist(versioned);
                } else {
                    versioned = new Versioned<>((Object) null, (Version) null);
                }
                return new LogMetadataForWriter(uri, str, str2, versioned4, versioned2, versioned);
            } catch (NumberFormatException e) {
                throw new UnexpectedException("Invalid max sequence number found in log " + str, e);
            }
        } catch (IllegalArgumentException e2) {
            throw new UnexpectedException("Invalid log " + str, e2);
        } catch (NullPointerException e3) {
            throw new UnexpectedException("Invalid log " + str, e3);
        }
    }

    static CompletableFuture<LogMetadataForWriter> getLog(URI uri, String str, String str2, ZooKeeperClient zooKeeperClient, boolean z, boolean z2) {
        String logRootPath = LogMetadata.getLogRootPath(uri, str, str2);
        try {
            PathUtils.validatePath(logRootPath);
            try {
                ZooKeeper zooKeeper = zooKeeperClient.get();
                return checkLogMetadataPaths(zooKeeper, logRootPath, z).thenCompose(list -> {
                    CompletableFuture completableFuture = new CompletableFuture();
                    createMissingMetadata(zooKeeper, uri.getPath(), logRootPath, list, zooKeeperClient.getDefaultACL(), z, z2, completableFuture);
                    return completableFuture;
                }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) list2 -> {
                    try {
                        return FutureUtils.value(processLogMetadatas(uri, str, str2, list2, z));
                    } catch (UnexpectedException e) {
                        return FutureUtils.exception(e);
                    }
                });
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return FutureUtils.exception(new DLInterruptedException("Interrupted on creating log " + str, e));
            } catch (ZooKeeperClient.ZooKeeperConnectionException e2) {
                return FutureUtils.exception(new ZKException("Encountered zookeeper connection issue on creating log " + str, KeeperException.Code.CONNECTIONLOSS));
            }
        } catch (IllegalArgumentException e3) {
            LOG.error("Illegal path value {} for stream {}", new Object[]{logRootPath, str, e3});
            return FutureUtils.exception(new InvalidStreamNameException(str, "Log name is invalid"));
        }
    }

    @Override // org.apache.distributedlog.metadata.LogStreamMetadataStore
    public CompletableFuture<LogMetadataForWriter> getLog(URI uri, String str, boolean z, boolean z2) {
        return getLog(uri, str, this.conf.getUnpartitionedStreamName(), this.zooKeeperClient, z, z2);
    }

    @Override // org.apache.distributedlog.metadata.LogStreamMetadataStore
    public CompletableFuture<Void> deleteLog(URI uri, final String str) {
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        try {
            ZKUtil.deleteRecursive(this.zooKeeperClient.get(), LogMetadata.getLogStreamPath(uri, str), new AsyncCallback.VoidCallback() { // from class: org.apache.distributedlog.impl.metadata.ZKLogStreamMetadataStore.5
                public void processResult(int i, String str2, Object obj) {
                    if (KeeperException.Code.OK.intValue() != i) {
                        FutureUtils.completeExceptionally(completableFuture, new ZKException("Encountered zookeeper issue on deleting log stream " + str, KeeperException.Code.get(i)));
                    } else {
                        FutureUtils.complete(completableFuture, (Object) null);
                    }
                }
            }, (Object) null);
        } catch (KeeperException e) {
            FutureUtils.completeExceptionally(completableFuture, new ZKException("Encountered zookeeper issue on deleting log stream " + str, e));
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            FutureUtils.completeExceptionally(completableFuture, new DLInterruptedException("Interrupted while deleting log stream " + str));
        } catch (ZooKeeperClient.ZooKeeperConnectionException e3) {
            FutureUtils.completeExceptionally(completableFuture, new ZKException("Encountered zookeeper issue on deleting log stream " + str, KeeperException.Code.CONNECTIONLOSS));
        }
        return completableFuture;
    }

    @Override // org.apache.distributedlog.metadata.LogStreamMetadataStore
    public CompletableFuture<Void> renameLog(URI uri, String str, String str2) {
        return getLog(uri, str, true, false).thenCompose(logMetadataForWriter -> {
            return renameLogMetadata(uri, logMetadataForWriter, str2);
        });
    }

    private CompletableFuture<Void> renameLogMetadata(URI uri, LogMetadataForWriter logMetadataForWriter, String str) {
        LinkedList newLinkedList = Lists.newLinkedList();
        LinkedList newLinkedList2 = Lists.newLinkedList();
        List<ACL> defaultACL = this.zooKeeperClient.getDefaultACL();
        String logRootPath = logMetadataForWriter.getLogRootPath();
        String logRootPath2 = LogMetadata.getLogRootPath(uri, str, this.conf.getUnpartitionedStreamName());
        newLinkedList2.addFirst(Op.delete(LogMetadata.getLogStreamPath(uri, logMetadataForWriter.getLogName()), -1));
        newLinkedList.addLast(Op.create(logRootPath2, DistributedLogConstants.EMPTY_BYTES, defaultACL, CreateMode.PERSISTENT));
        newLinkedList2.addFirst(Op.delete(logRootPath, -1));
        deleteOldPathAndCreateNewPath(logRootPath, LogMetadata.MAX_TXID_PATH, logMetadataForWriter.getMaxTxIdData(), logRootPath2, DLUtils.serializeTransactionId(0L), defaultACL, newLinkedList, newLinkedList2);
        newLinkedList.addLast(Op.create(logRootPath2 + LogMetadata.VERSION_PATH, intToBytes(-1), defaultACL, CreateMode.PERSISTENT));
        newLinkedList2.addFirst(Op.delete(logRootPath + LogMetadata.VERSION_PATH, -1));
        newLinkedList.addLast(Op.create(logRootPath2 + LogMetadata.LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, defaultACL, CreateMode.PERSISTENT));
        newLinkedList2.addFirst(Op.delete(logRootPath + LogMetadata.LOCK_PATH, -1));
        newLinkedList.addLast(Op.create(logRootPath2 + LogMetadata.READ_LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, defaultACL, CreateMode.PERSISTENT));
        newLinkedList2.addFirst(Op.delete(logRootPath + LogMetadata.READ_LOCK_PATH, -1));
        deleteOldPathAndCreateNewPath(logRootPath, LogMetadata.ALLOCATION_PATH, logMetadataForWriter.getAllocationData(), logRootPath2, DistributedLogConstants.EMPTY_BYTES, defaultACL, newLinkedList, newLinkedList2);
        Versioned<byte[]> maxLSSNData = logMetadataForWriter.getMaxLSSNData();
        deleteOldPathAndCreateNewPath(logRootPath, LogMetadata.LOGSEGMENTS_PATH, maxLSSNData, logRootPath2, DLUtils.serializeLogSegmentSequenceNumber(0L), defaultACL, newLinkedList, newLinkedList2);
        return (pathExists(maxLSSNData) ? getLogSegments(this.zooKeeperClient, logRootPath + LogMetadata.LOGSEGMENTS_PATH) : FutureUtils.value(Collections.emptyList())).thenApply((Function<? super List<LogSegmentMetadata>, ? extends U>) list -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                deleteOldSegmentAndCreateNewSegment((LogSegmentMetadata) it.next(), logRootPath2 + LogMetadata.LOGSEGMENTS_PATH, defaultACL, newLinkedList, newLinkedList2);
            }
            return null;
        }).thenCompose(obj -> {
            return getMissingPaths(this.zooKeeperClient, uri, str);
        }).thenCompose(list2 -> {
            Iterator it = list2.iterator();
            while (it.hasNext()) {
                newLinkedList.addFirst(Op.create((String) it.next(), DistributedLogConstants.EMPTY_BYTES, defaultACL, CreateMode.PERSISTENT));
            }
            return executeRenameTxn(logRootPath, logRootPath2, newLinkedList, newLinkedList2);
        });
    }

    @VisibleForTesting
    static CompletableFuture<List<String>> getMissingPaths(ZooKeeperClient zooKeeperClient, URI uri, String str) {
        try {
            return getMissingPaths(zooKeeperClient.get(), uri.getPath(), LogMetadata.getLogStreamPath(uri, str));
        } catch (InterruptedException | ZooKeeperClient.ZooKeeperConnectionException e) {
            return FutureUtils.exception(e);
        }
    }

    @VisibleForTesting
    static CompletableFuture<List<String>> getMissingPaths(ZooKeeper zooKeeper, String str, String str2) {
        LinkedList newLinkedList = Lists.newLinkedList();
        CompletableFuture<List<String>> createFuture = FutureUtils.createFuture();
        existPath(zooKeeper, str2, str, newLinkedList, createFuture);
        return createFuture;
    }

    private static void existPath(ZooKeeper zooKeeper, String str, String str2, LinkedList<String> linkedList, CompletableFuture<List<String>> completableFuture) {
        if (str2.equals(str)) {
            completableFuture.complete(linkedList);
        } else {
            zooKeeper.exists(str, false, (i, str3, obj, stat) -> {
                if (KeeperException.Code.OK.intValue() != i && KeeperException.Code.NONODE.intValue() != i) {
                    completableFuture.completeExceptionally(new ZKException("Failed to check existence of path " + str3, KeeperException.Code.get(i)));
                } else if (KeeperException.Code.OK.intValue() == i) {
                    completableFuture.complete(linkedList);
                } else {
                    linkedList.addLast(str);
                    existPath(zooKeeper, Utils.getParent(str), str2, linkedList, completableFuture);
                }
            }, (Object) null);
        }
    }

    private CompletableFuture<Void> executeRenameTxn(String str, String str2, LinkedList<Op> linkedList, LinkedList<Op> linkedList2) {
        CompletableFuture<Void> createFuture = FutureUtils.createFuture();
        ArrayList<Op.Delete> newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(linkedList.size() + linkedList2.size());
        newArrayListWithExpectedSize.addAll(linkedList);
        newArrayListWithExpectedSize.addAll(linkedList2);
        if (LOG.isDebugEnabled()) {
            for (Op.Delete delete : newArrayListWithExpectedSize) {
                if (delete instanceof Op.Create) {
                    LOG.debug("op : create {}", ((Op.Create) delete).getPath());
                } else if (delete instanceof Op.Delete) {
                    LOG.debug("op : delete {}, record = {}", delete.getPath(), delete.toRequestRecord());
                } else {
                    LOG.debug("op : {}", delete);
                }
            }
        }
        try {
            this.zooKeeperClient.get().multi(newArrayListWithExpectedSize, (i, str3, obj, list) -> {
                if (KeeperException.Code.OK.intValue() == i) {
                    createFuture.complete(null);
                    return;
                }
                if (KeeperException.Code.NODEEXISTS.intValue() == i) {
                    createFuture.completeExceptionally(new LogExistsException("Someone just created new log " + str2));
                    return;
                }
                if (KeeperException.Code.NOTEMPTY.intValue() == i) {
                    createFuture.completeExceptionally(new LockingException(str + LogMetadata.LOCK_PATH, "Someone is holding a lock on log " + str));
                } else if (KeeperException.Code.NONODE.intValue() == i) {
                    createFuture.completeExceptionally(new LogNotFoundException("Log " + str2 + " is not found"));
                } else {
                    createFuture.completeExceptionally(new ZKException("Failed to rename log " + str + " to " + str2 + " at path " + str3, KeeperException.Code.get(i)));
                }
            }, (Object) null);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            createFuture.completeExceptionally(e);
        } catch (ZooKeeperClient.ZooKeeperConnectionException e2) {
            createFuture.completeExceptionally(e2);
        }
        return createFuture;
    }

    private static void deleteOldSegmentAndCreateNewSegment(LogSegmentMetadata logSegmentMetadata, String str, List<ACL> list, LinkedList<Op> linkedList, LinkedList<Op> linkedList2) {
        linkedList.addLast(Op.create(str + "/" + logSegmentMetadata.getZNodeName(), logSegmentMetadata.getFinalisedData().getBytes(StandardCharsets.UTF_8), list, CreateMode.PERSISTENT));
        linkedList2.addFirst(Op.delete(logSegmentMetadata.getZkPath(), -1));
    }

    private static void deleteOldPathAndCreateNewPath(String str, String str2, Versioned<byte[]> versioned, String str3, byte[] bArr, List<ACL> list, LinkedList<Op> linkedList, LinkedList<Op> linkedList2) {
        if (!pathExists(versioned)) {
            linkedList.addLast(Op.create(str3 + str2, bArr, list, CreateMode.PERSISTENT));
        } else {
            linkedList.addLast(Op.create(str3 + str2, (byte[]) versioned.getValue(), list, CreateMode.PERSISTENT));
            linkedList2.addFirst(Op.delete(str + str2, (int) versioned.getVersion().getLongVersion()));
        }
    }

    @VisibleForTesting
    static CompletableFuture<List<LogSegmentMetadata>> getLogSegments(ZooKeeperClient zooKeeperClient, String str) {
        CompletableFuture<List<LogSegmentMetadata>> createFuture = FutureUtils.createFuture();
        try {
            zooKeeperClient.get().getChildren(str, false, (i, str2, obj, list, stat) -> {
                if (KeeperException.Code.OK.intValue() != i) {
                    if (KeeperException.Code.NONODE.intValue() == i) {
                        createFuture.completeExceptionally(new LogNotFoundException("Log " + str2 + " not found"));
                        return;
                    } else {
                        createFuture.completeExceptionally(new ZKException("Failed to get log segments from " + str2, KeeperException.Code.get(i)));
                        return;
                    }
                }
                ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(list.size());
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    newArrayListWithExpectedSize.add(LogSegmentMetadata.read(zooKeeperClient, str + "/" + ((String) it.next())));
                }
                FutureUtils.proxyTo(FutureUtils.collect(newArrayListWithExpectedSize), createFuture);
            }, (Object) null);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            createFuture.completeExceptionally(e);
        } catch (ZooKeeperClient.ZooKeeperConnectionException e2) {
            createFuture.completeExceptionally(e2);
        }
        return createFuture;
    }

    static {
        $assertionsDisabled = !ZKLogStreamMetadataStore.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(ZKLogStreamMetadataStore.class);
    }
}
