package org.apache.pulsar.packages.management.storage.bookkeeper;

import com.google.common.base.Strings;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.distributedlog.metadata.DLMetadata;
import org.apache.distributedlog.namespace.NamespaceDriver;
import org.apache.pulsar.packages.management.core.PackagesStorage;
import org.apache.pulsar.packages.management.core.PackagesStorageConfiguration;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.class */
public class BookKeeperPackagesStorage implements PackagesStorage {
    private static final Logger log = LoggerFactory.getLogger(BookKeeperPackagesStorage.class);
    private static final String NS_CLIENT_ID = "packages-management";
    final BookKeeperPackagesStorageConfiguration configuration;
    private Namespace namespace;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BookKeeperPackagesStorage(PackagesStorageConfiguration packagesStorageConfiguration) {
        this.configuration = new BookKeeperPackagesStorageConfiguration(packagesStorageConfiguration);
    }

    public void initialize() {
        DistributedLogConfiguration lockTimeout = new DistributedLogConfiguration().setImmediateFlushEnabled(true).setOutputBufferSize(0).setWriteQuorumSize(this.configuration.getPackagesReplicas()).setEnsembleSize(this.configuration.getPackagesReplicas()).setAckQuorumSize(this.configuration.getPackagesReplicas()).setLockTimeout(0L);
        if (!Strings.isNullOrEmpty(this.configuration.getBookkeeperClientAuthenticationPlugin())) {
            lockTimeout.setProperty("bkc.clientAuthProviderFactoryClass", this.configuration.getBookkeeperClientAuthenticationPlugin());
            if (!Strings.isNullOrEmpty(this.configuration.getBookkeeperClientAuthenticationParametersName())) {
                lockTimeout.setProperty("bkc." + this.configuration.getBookkeeperClientAuthenticationParametersName(), this.configuration.getBookkeeperClientAuthenticationParameters());
            }
        }
        try {
            this.namespace = NamespaceBuilder.newBuilder().conf(lockTimeout).clientId(NS_CLIENT_ID).uri(initializeDlogNamespace()).build();
            log.info("Packages management bookKeeper storage initialized successfully");
        } catch (IOException e) {
            throw new RuntimeException("Initialize distributed log for packages management service failed.", e);
        }
    }

    private URI initializeDlogNamespace() throws IOException {
        String packagesManagementLedgerRootPath;
        String zookeeperServers;
        String property = this.configuration.getProperty("bookkeeperMetadataServiceUri");
        if (StringUtils.isNotBlank(property)) {
            URI create = URI.create(property);
            zookeeperServers = create.getAuthority().replace(";", ",");
            packagesManagementLedgerRootPath = create.getPath();
        } else {
            packagesManagementLedgerRootPath = this.configuration.getPackagesManagementLedgerRootPath();
            zookeeperServers = this.configuration.getZookeeperServers();
        }
        DLMetadata create2 = DLMetadata.create(new BKDLConfig(zookeeperServers, packagesManagementLedgerRootPath));
        URI create3 = URI.create(String.format("distributedlog://%s/pulsar/packages", this.configuration.getZookeeperServers()));
        try {
            create2.create(create3);
        } catch (ZKException e) {
            if (e.getKeeperExceptionCode() == KeeperException.Code.NODEEXISTS) {
                return create3;
            }
        }
        return create3;
    }

    private CompletableFuture<DistributedLogManager> openLogManagerAsync(String str) {
        CompletableFuture<DistributedLogManager> completableFuture = new CompletableFuture<>();
        CompletableFuture.runAsync(() -> {
            try {
                completableFuture.complete(this.namespace.openLog(str));
            } catch (IOException e) {
                completableFuture.completeExceptionally(e);
            }
        });
        return completableFuture;
    }

    public CompletableFuture<Void> writeAsync(String str, InputStream inputStream) {
        return openLogManagerAsync(str).thenCompose(DLOutputStream::openWriterAsync).thenCompose((Function<? super U, ? extends CompletionStage<U>>) dLOutputStream -> {
            return dLOutputStream.writeAsync(inputStream);
        }).thenCompose((v0) -> {
            return v0.closeAsync();
        });
    }

    public CompletableFuture<Void> readAsync(String str, OutputStream outputStream) {
        return openLogManagerAsync(str).thenCompose(DLInputStream::openReaderAsync).thenCompose((Function<? super U, ? extends CompletionStage<U>>) dLInputStream -> {
            return dLInputStream.readAsync(outputStream);
        }).thenCompose((v0) -> {
            return v0.closeAsync();
        });
    }

    public CompletableFuture<Void> deleteAsync(String str) {
        return this.namespace.getNamespaceDriver().getLogMetadataStore().getLogLocation(str).thenCompose(optional -> {
            return (CompletionStage) optional.map(uri -> {
                return this.namespace.getNamespaceDriver().getLogStreamMetadataStore(NamespaceDriver.Role.WRITER).deleteLog(uri, str);
            }).orElse(null);
        });
    }

    public CompletableFuture<List<String>> listAsync(String str) {
        return this.namespace.getNamespaceDriver().getLogMetadataStore().getLogs(str).thenApply(it -> {
            ArrayList arrayList = new ArrayList();
            Objects.requireNonNull(arrayList);
            it.forEachRemaining((v1) -> {
                r1.add(v1);
            });
            return arrayList;
        });
    }

    public CompletableFuture<Boolean> existAsync(String str) {
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        this.namespace.getNamespaceDriver().getLogMetadataStore().getLogLocation(str).whenComplete((optional, th) -> {
            if (th != null) {
                completableFuture.complete(false);
            } else if (optional.isPresent()) {
                this.namespace.getNamespaceDriver().getLogStreamMetadataStore(NamespaceDriver.Role.WRITER).logExists((URI) optional.get(), str).whenComplete((r4, th) -> {
                    if (th != null) {
                        completableFuture.complete(false);
                    } else {
                        completableFuture.complete(true);
                    }
                });
            } else {
                completableFuture.complete(false);
            }
        });
        return completableFuture;
    }

    public CompletableFuture<Void> closeAsync() {
        return CompletableFuture.runAsync(() -> {
            this.namespace.close();
        });
    }
}
