package org.apache.pulsar.broker.namespace;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.util.concurrent.MoreExecutors;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.stats.CacheMetricsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/namespace/OwnershipCache.class */
public class OwnershipCache {
    private static final Logger log = LoggerFactory.getLogger(OwnershipCache.class);
    private static final Logger LOG = LoggerFactory.getLogger(OwnershipCache.class);
    private final String ownerBrokerUrl;
    private final String ownerBrokerUrlTls;
    private NamespaceEphemeralData selfOwnerInfo;
    private final NamespaceEphemeralData selfOwnerInfoDisabled;
    private final LockManager<NamespaceEphemeralData> lockManager;
    private final Map<NamespaceBundle, ResourceLock<NamespaceEphemeralData>> locallyAcquiredLocks = new ConcurrentHashMap();
    private final AsyncLoadingCache<NamespaceBundle, OwnedBundle> ownedBundlesCache = Caffeine.newBuilder().executor(MoreExecutors.directExecutor()).recordStats().buildAsync(new OwnedServiceUnitCacheLoader());
    private final NamespaceBundleFactory bundleFactory;
    private final NamespaceService namespaceService;
    private final PulsarService pulsar;

    /* loaded from: input_file:org/apache/pulsar/broker/namespace/OwnershipCache$OwnedServiceUnitCacheLoader.class */
    private class OwnedServiceUnitCacheLoader implements AsyncCacheLoader<NamespaceBundle, OwnedBundle> {
        private OwnedServiceUnitCacheLoader() {
        }

        @Override // org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.AsyncCacheLoader
        public CompletableFuture<OwnedBundle> asyncLoad(NamespaceBundle namespaceBundle, Executor executor) {
            return OwnershipCache.this.lockManager.acquireLock(ServiceUnitUtils.path(namespaceBundle), OwnershipCache.this.selfOwnerInfo).thenApply(resourceLock -> {
                OwnershipCache.this.locallyAcquiredLocks.put(namespaceBundle, resourceLock);
                resourceLock.getLockExpiredFuture().thenRun(() -> {
                    OwnershipCache.log.info("Resource lock for {} has expired", resourceLock.getPath());
                    OwnershipCache.this.namespaceService.unloadNamespaceBundle(namespaceBundle);
                    OwnershipCache.this.ownedBundlesCache.synchronous().invalidate(namespaceBundle);
                    OwnershipCache.this.namespaceService.onNamespaceBundleUnload(namespaceBundle);
                });
                return new OwnedBundle(namespaceBundle);
            });
        }
    }

    public OwnershipCache(PulsarService pulsarService, NamespaceBundleFactory namespaceBundleFactory, NamespaceService namespaceService) {
        this.namespaceService = namespaceService;
        this.pulsar = pulsarService;
        this.ownerBrokerUrl = pulsarService.getBrokerServiceUrl();
        this.ownerBrokerUrlTls = pulsarService.getBrokerServiceUrlTls();
        this.selfOwnerInfo = new NamespaceEphemeralData(this.ownerBrokerUrl, this.ownerBrokerUrlTls, pulsarService.getSafeWebServiceAddress(), pulsarService.getWebServiceAddressTls(), false, pulsarService.getAdvertisedListeners());
        this.selfOwnerInfoDisabled = new NamespaceEphemeralData(this.ownerBrokerUrl, this.ownerBrokerUrlTls, pulsarService.getSafeWebServiceAddress(), pulsarService.getWebServiceAddressTls(), true, pulsarService.getAdvertisedListeners());
        this.bundleFactory = namespaceBundleFactory;
        this.lockManager = pulsarService.getCoordinationService().getLockManager(NamespaceEphemeralData.class);
        CacheMetricsCollector.CAFFEINE.addCache("owned-bundles", this.ownedBundlesCache);
    }

    public CompletableFuture<Boolean> checkOwnershipAsync(NamespaceBundle namespaceBundle) {
        Optional<CompletableFuture<OwnedBundle>> ownedBundleAsync = getOwnedBundleAsync(namespaceBundle);
        return !ownedBundleAsync.isPresent() ? CompletableFuture.completedFuture(false) : ownedBundleAsync.get().thenApply(ownedBundle -> {
            return Boolean.valueOf(ownedBundle != null && ownedBundle.isActive());
        });
    }

    public CompletableFuture<Optional<NamespaceEphemeralData>> getOwnerAsync(NamespaceBundle namespaceBundle) {
        CompletableFuture<OwnedBundle> ifPresent = this.ownedBundlesCache.getIfPresent(namespaceBundle);
        if (ifPresent != null) {
            return ifPresent.thenApply(ownedBundle -> {
                return Optional.of(ownedBundle.isActive() ? this.selfOwnerInfo : this.selfOwnerInfoDisabled);
            });
        }
        return this.lockManager.readLock(ServiceUnitUtils.path(namespaceBundle));
    }

    public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(NamespaceBundle namespaceBundle) throws Exception {
        if (!refreshSelfOwnerInfo()) {
            return FutureUtil.failedFuture(new RuntimeException("Namespace service does not ready for acquiring ownership"));
        }
        LOG.info("Trying to acquire ownership of {}", namespaceBundle);
        return this.ownedBundlesCache.get(namespaceBundle).thenApply(ownedBundle -> {
            LOG.info("Successfully acquired ownership of {}", ownedBundle);
            this.namespaceService.onNamespaceBundleOwned(namespaceBundle);
            return this.selfOwnerInfo;
        });
    }

    public CompletableFuture<Void> removeOwnership(NamespaceBundle namespaceBundle) {
        ResourceLock<NamespaceEphemeralData> resourceLock = this.locallyAcquiredLocks.get(namespaceBundle);
        return resourceLock == null ? CompletableFuture.completedFuture(null) : resourceLock.release();
    }

    public CompletableFuture<Void> removeOwnership(NamespaceBundles namespaceBundles) {
        ArrayList newArrayList = Lists.newArrayList();
        for (NamespaceBundle namespaceBundle : namespaceBundles.getBundles()) {
            if (getOwnedBundle(namespaceBundle) != null) {
                newArrayList.add(removeOwnership(namespaceBundle));
            }
        }
        return FutureUtil.waitForAll((List<? extends CompletableFuture<?>>) newArrayList);
    }

    public Map<NamespaceBundle, OwnedBundle> getOwnedBundles() {
        return this.ownedBundlesCache.synchronous().asMap();
    }

    public boolean isNamespaceBundleOwned(NamespaceBundle namespaceBundle) {
        OwnedBundle ownedBundle = getOwnedBundle(namespaceBundle);
        return ownedBundle != null && ownedBundle.isActive();
    }

    public OwnedBundle getOwnedBundle(NamespaceBundle namespaceBundle) {
        CompletableFuture<OwnedBundle> ifPresent = this.ownedBundlesCache.getIfPresent(namespaceBundle);
        if (ifPresent == null || !ifPresent.isDone() || ifPresent.isCompletedExceptionally()) {
            return null;
        }
        try {
            return ifPresent.get(this.pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
        } catch (InterruptedException | TimeoutException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e2) {
            throw new RuntimeException(e2.getCause());
        }
    }

    public Optional<CompletableFuture<OwnedBundle>> getOwnedBundleAsync(NamespaceBundle namespaceBundle) {
        return Optional.ofNullable(this.ownedBundlesCache.getIfPresent(namespaceBundle));
    }

    public CompletableFuture<Void> disableOwnership(NamespaceBundle namespaceBundle) {
        return updateBundleState(namespaceBundle, false).thenCompose(r5 -> {
            ResourceLock<NamespaceEphemeralData> resourceLock = this.locallyAcquiredLocks.get(namespaceBundle);
            return resourceLock == null ? CompletableFuture.completedFuture(null) : resourceLock.updateValue(this.selfOwnerInfoDisabled);
        });
    }

    public CompletableFuture<Void> updateBundleState(NamespaceBundle namespaceBundle, boolean z) {
        CompletableFuture<OwnedBundle> ifPresent = this.ownedBundlesCache.getIfPresent(namespaceBundle);
        return (ifPresent == null || !ifPresent.isDone() || ifPresent.isCompletedExceptionally()) ? CompletableFuture.completedFuture(null) : ifPresent.thenAccept(ownedBundle -> {
            ownedBundle.setActive(z);
        });
    }

    public void invalidateLocalOwnerCache() {
        this.ownedBundlesCache.synchronous().invalidateAll();
    }

    public synchronized boolean refreshSelfOwnerInfo() {
        this.selfOwnerInfo = new NamespaceEphemeralData(this.pulsar.getBrokerServiceUrl(), this.pulsar.getBrokerServiceUrlTls(), this.pulsar.getSafeWebServiceAddress(), this.pulsar.getWebServiceAddressTls(), false, this.pulsar.getAdvertisedListeners());
        return (this.selfOwnerInfo.getNativeUrl() == null && this.selfOwnerInfo.getNativeUrlTls() == null) ? false : true;
    }
}
