package org.apache.pulsar.broker.namespace;

import com.google.common.base.Preconditions;
import com.google.common.hash.Hashing;
import io.prometheus.client.Counter;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.manager.RedirectManager;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.BundleSplitOption;
import org.apache.pulsar.common.naming.FlowOrQpsEquallyDivideBundleSplitOption;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.NamespaceIsolationPolicy;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/namespace/NamespaceService.class */
public class NamespaceService implements AutoCloseable {
    private final ServiceConfiguration config;
    private final AtomicReference<LoadManager> loadManager;
    private final PulsarService pulsar;
    private final OwnershipCache ownershipCache;
    private final MetadataCache<LocalBrokerData> localBrokerDataCache;
    private final NamespaceBundleFactory bundleFactory;
    private final String host;
    public static final int BUNDLE_SPLIT_RETRY_LIMIT = 7;
    public static final String SLA_NAMESPACE_PROPERTY = "sla-monitor";
    public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s";
    public static final String HEARTBEAT_NAMESPACE_FMT_V2 = "pulsar/%s";
    public static final String SLA_NAMESPACE_FMT = "sla-monitor/%s/%s";
    private final RedirectManager redirectManager;
    private static final Logger log = LoggerFactory.getLogger(NamespaceService.class);
    private static final Logger LOG = LoggerFactory.getLogger(NamespaceService.class);
    public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)");
    public static final Pattern HEARTBEAT_NAMESPACE_PATTERN_V2 = Pattern.compile("pulsar/([^:]+:\\d+)");
    public static final Pattern SLA_NAMESPACE_PATTERN = Pattern.compile("sla-monitor/[^/]+/([^:]+:\\d+)");
    private static final Counter lookupRedirects = Counter.build("pulsar_broker_lookup_redirects", "-").register();
    private static final Counter lookupFailures = Counter.build("pulsar_broker_lookup_failures", "-").register();
    private static final Counter lookupAnswers = Counter.build("pulsar_broker_lookup_answers", "-").register();
    private static final Summary lookupLatency = (Summary) Summary.build("pulsar_broker_lookup", "-").quantile(0.5d).quantile(0.99d).quantile(0.999d).quantile(1.0d).register();
    private ConcurrentHashMap<String, CompletableFuture<List<String>>> inProgressQueryUserTopics = new ConcurrentHashMap<>();
    private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> findingBundlesAuthoritative = ConcurrentOpenHashMap.newBuilder().build();
    private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> findingBundlesNotAuthoritative = ConcurrentOpenHashMap.newBuilder().build();
    private final ConcurrentOpenHashMap<ClusterDataImpl, PulsarClientImpl> namespaceClients = ConcurrentOpenHashMap.newBuilder().build();
    private final List<NamespaceBundleOwnershipListener> bundleOwnershipListeners = new CopyOnWriteArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.pulsar.broker.namespace.NamespaceService$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/pulsar/broker/namespace/NamespaceService$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$pulsar$common$api$proto$CommandGetTopicsOfNamespace$Mode = new int[CommandGetTopicsOfNamespace.Mode.values().length];

        static {
            try {
                $SwitchMap$org$apache$pulsar$common$api$proto$CommandGetTopicsOfNamespace$Mode[CommandGetTopicsOfNamespace.Mode.ALL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$api$proto$CommandGetTopicsOfNamespace$Mode[CommandGetTopicsOfNamespace.Mode.NON_PERSISTENT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$api$proto$CommandGetTopicsOfNamespace$Mode[CommandGetTopicsOfNamespace.Mode.PERSISTENT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public NamespaceService(PulsarService pulsarService) {
        this.pulsar = pulsarService;
        this.host = pulsarService.getAdvertisedAddress();
        this.config = pulsarService.getConfiguration();
        this.loadManager = pulsarService.getLoadManager();
        this.bundleFactory = new NamespaceBundleFactory(pulsarService, Hashing.crc32());
        this.ownershipCache = new OwnershipCache(pulsarService, this.bundleFactory, this);
        this.localBrokerDataCache = pulsarService.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
        this.redirectManager = new RedirectManager(pulsarService);
    }

    public void initialize() {
        if (!getOwnershipCache().refreshSelfOwnerInfo()) {
            throw new RuntimeException("Failed to refresh self owner info.");
        }
    }

    public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topicName, LookupOptions lookupOptions) {
        long nanoTime = System.nanoTime();
        CompletableFuture thenCompose = getBundleAsync(topicName).thenCompose(namespaceBundle -> {
            return findRedirectLookupResultAsync(namespaceBundle).thenCompose(optional -> {
                if (!optional.isPresent()) {
                    return ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar) ? this.loadManager.get().findBrokerServiceUrl(Optional.of(topicName), namespaceBundle, lookupOptions) : findBrokerServiceUrl(namespaceBundle, lookupOptions);
                }
                LOG.info("[{}] Redirect lookup request to {} for topic {}", new Object[]{this.pulsar.getBrokerId(), optional.get(), topicName});
                return CompletableFuture.completedFuture(optional);
            });
        });
        thenCompose.thenAccept(optional -> {
            lookupLatency.observe(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
            if (optional.isPresent()) {
                if (((LookupResult) optional.get()).isRedirect()) {
                    lookupRedirects.inc();
                } else {
                    lookupAnswers.inc();
                }
            }
        }).exceptionally(th -> {
            lookupFailures.inc();
            return null;
        });
        return thenCompose;
    }

    private CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync(ServiceUnitId serviceUnitId) {
        return isSLAOrHeartbeatNamespace(serviceUnitId.getNamespaceObject().toString()) ? CompletableFuture.completedFuture(Optional.empty()) : this.redirectManager.findRedirectLookupResultAsync();
    }

    public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topicName) {
        return this.bundleFactory.getBundlesAsync(topicName.getNamespaceObject()).thenApply(namespaceBundles -> {
            return namespaceBundles.findBundle(topicName);
        });
    }

    public Optional<NamespaceBundle> getBundleIfPresent(TopicName topicName) {
        return this.bundleFactory.getBundlesIfPresent(topicName.getNamespaceObject()).map(namespaceBundles -> {
            return namespaceBundles.findBundle(topicName);
        });
    }

    public NamespaceBundle getBundle(TopicName topicName) {
        return this.bundleFactory.getBundles(topicName.getNamespaceObject()).findBundle(topicName);
    }

    public int getBundleCount(NamespaceName namespaceName) throws Exception {
        return this.bundleFactory.getBundles(namespaceName).size();
    }

    private NamespaceBundle getFullBundle(NamespaceName namespaceName) throws Exception {
        return this.bundleFactory.getFullBundle(namespaceName);
    }

    private CompletableFuture<NamespaceBundle> getFullBundleAsync(NamespaceName namespaceName) {
        return this.bundleFactory.getFullBundleAsync(namespaceName);
    }

    public CompletableFuture<Optional<URL>> getWebServiceUrlAsync(ServiceUnitId serviceUnitId, LookupOptions lookupOptions) {
        if (serviceUnitId instanceof TopicName) {
            TopicName topicName = (TopicName) serviceUnitId;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Getting web service URL of topic: {} - options: {}", topicName, lookupOptions);
            }
            return getBundleAsync(topicName).thenCompose(namespaceBundle -> {
                return internalGetWebServiceUrl(topicName, namespaceBundle, lookupOptions);
            });
        }
        if (serviceUnitId instanceof NamespaceName) {
            return getFullBundleAsync((NamespaceName) serviceUnitId).thenCompose(namespaceBundle2 -> {
                return internalGetWebServiceUrl(null, namespaceBundle2, lookupOptions);
            });
        }
        if (serviceUnitId instanceof NamespaceBundle) {
            return internalGetWebServiceUrl(null, (NamespaceBundle) serviceUnitId, lookupOptions);
        }
        throw new IllegalArgumentException("Unrecognized class of NamespaceBundle: " + serviceUnitId.getClass().getName());
    }

    public Optional<URL> getWebServiceUrl(ServiceUnitId serviceUnitId, LookupOptions lookupOptions) throws Exception {
        return getWebServiceUrlAsync(serviceUnitId, lookupOptions).get(this.pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
    }

    private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(@Nullable ServiceUnitId serviceUnitId, NamespaceBundle namespaceBundle, LookupOptions lookupOptions) {
        return findRedirectLookupResultAsync(namespaceBundle).thenCompose(optional -> {
            if (!optional.isPresent()) {
                return (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar) ? this.loadManager.get().findBrokerServiceUrl(Optional.ofNullable(serviceUnitId), namespaceBundle, lookupOptions) : findBrokerServiceUrl(namespaceBundle, lookupOptions)).thenApply(optional -> {
                    if (optional.isPresent()) {
                        try {
                            LookupData lookupData = ((LookupResult) optional.get()).getLookupData();
                            return Optional.of(new URL(lookupOptions.isRequestHttps() ? lookupData.getHttpUrlTls() : lookupData.getHttpUrl()));
                        } catch (Exception e) {
                            LOG.warn("internalGetWebServiceUrl [{}]", e.getMessage(), e);
                        }
                    }
                    return Optional.empty();
                });
            }
            LOG.info("[{}] Redirect lookup request to {} for topic {}", new Object[]{this.pulsar.getBrokerId(), optional.get(), serviceUnitId});
            try {
                LookupData lookupData = ((LookupResult) optional.get()).getLookupData();
                return CompletableFuture.completedFuture(Optional.of(new URL(lookupOptions.isRequestHttps() ? lookupData.getHttpUrlTls() : lookupData.getHttpUrl())));
            } catch (Exception e) {
                LOG.warn("internalGetWebServiceUrl [{}]", e.getMessage(), e);
                return CompletableFuture.completedFuture(Optional.empty());
            }
        });
    }

    public void registerBootstrapNamespaces() throws PulsarServerException {
        String brokerId = this.pulsar.getBrokerId();
        if (registerNamespace(getHeartbeatNamespace(brokerId, this.config), true)) {
            LOG.info("added heartbeat namespace name in local cache: ns={}", getHeartbeatNamespace(brokerId, this.config));
        }
        if (registerNamespace(getHeartbeatNamespaceV2(brokerId, this.config), true)) {
            LOG.info("added heartbeat namespace name in local cache: ns={}", getHeartbeatNamespaceV2(brokerId, this.config));
        }
        for (String str : this.config.getBootstrapNamespaces()) {
            if (registerNamespace(NamespaceName.get(str), false)) {
                LOG.info("added bootstrap namespace name in local cache: ns={}", str);
            }
        }
    }

    public boolean registerNamespace(NamespaceName namespaceName, boolean z) throws PulsarServerException {
        try {
            NamespaceBundle fullBundle = this.bundleFactory.getFullBundle(namespaceName);
            NamespaceEphemeralData namespaceEphemeralData = ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar) ? ExtensibleLoadManagerImpl.get(this.loadManager.get()).tryAcquiringOwnership(fullBundle).get() : this.ownershipCache.tryAcquiringOwnership(fullBundle).get();
            if (StringUtils.equals(this.pulsar.getBrokerServiceUrl(), namespaceEphemeralData.getNativeUrl()) || StringUtils.equals(this.pulsar.getBrokerServiceUrlTls(), namespaceEphemeralData.getNativeUrlTls())) {
                if (fullBundle == null) {
                    return true;
                }
                this.pulsar.loadNamespaceTopics(fullBundle);
                return true;
            }
            String format = String.format("namespace already owned by other broker : ns=%s expected=%s actual=%s", namespaceName, StringUtils.defaultString(this.pulsar.getBrokerServiceUrl(), this.pulsar.getBrokerServiceUrlTls()), StringUtils.defaultString(namespaceEphemeralData.getNativeUrl(), namespaceEphemeralData.getNativeUrlTls()));
            if (z) {
                throw new IllegalStateException(format);
            }
            LOG.info(format);
            return false;
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            throw new PulsarServerException(e);
        }
    }

    private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(NamespaceBundle namespaceBundle, LookupOptions lookupOptions) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("findBrokerServiceUrl: {} - options: {}", namespaceBundle, lookupOptions);
        }
        ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> concurrentOpenHashMap = lookupOptions.isAuthoritative() ? this.findingBundlesAuthoritative : this.findingBundlesNotAuthoritative;
        ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> concurrentOpenHashMap2 = concurrentOpenHashMap;
        return (CompletableFuture) concurrentOpenHashMap.computeIfAbsent(namespaceBundle, namespaceBundle2 -> {
            CompletableFuture completableFuture = new CompletableFuture();
            this.ownershipCache.getOwnerAsync(namespaceBundle).thenAccept(optional -> {
                if (optional.isEmpty()) {
                    if (lookupOptions.isReadOnly()) {
                        completableFuture.complete(Optional.empty());
                        return;
                    } else {
                        this.pulsar.getExecutor().execute(() -> {
                            searchForCandidateBroker(namespaceBundle, completableFuture, lookupOptions);
                        });
                        return;
                    }
                }
                if (((NamespaceEphemeralData) optional.get()).isDisabled()) {
                    completableFuture.completeExceptionally(new IllegalStateException(String.format("Namespace bundle %s is being unloaded", namespaceBundle)));
                    return;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Namespace bundle {} already owned by {} ", namespaceBundle, optional);
                }
                if (!lookupOptions.hasAdvertisedListenerName()) {
                    completableFuture.complete(Optional.of(new LookupResult((NamespaceEphemeralData) optional.get())));
                    return;
                }
                AdvertisedListener advertisedListener = ((NamespaceEphemeralData) optional.get()).getAdvertisedListeners().get(lookupOptions.getAdvertisedListenerName());
                if (advertisedListener == null) {
                    completableFuture.completeExceptionally(new PulsarServerException("the broker do not have " + lookupOptions.getAdvertisedListenerName() + " listener"));
                    return;
                }
                URI brokerServiceUrl = advertisedListener.getBrokerServiceUrl();
                URI brokerServiceUrlTls = advertisedListener.getBrokerServiceUrlTls();
                completableFuture.complete(Optional.of(new LookupResult((NamespaceEphemeralData) optional.get(), brokerServiceUrl == null ? null : brokerServiceUrl.toString(), brokerServiceUrlTls == null ? null : brokerServiceUrlTls.toString())));
            }).exceptionally(th -> {
                LOG.warn("Failed to check owner for bundle {}: {}", new Object[]{namespaceBundle, th.getMessage(), th});
                completableFuture.completeExceptionally(th);
                return null;
            });
            completableFuture.whenComplete((optional2, th2) -> {
                this.pulsar.getExecutor().execute(() -> {
                    concurrentOpenHashMap2.remove(namespaceBundle);
                });
            });
            return completableFuture;
        });
    }

    public CompletableFuture<String> getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId serviceUnitId, Function<String, CompletableFuture<Boolean>> function) {
        String checkHeartbeatNamespace = checkHeartbeatNamespace(serviceUnitId);
        if (checkHeartbeatNamespace != null) {
            return CompletableFuture.completedFuture(checkHeartbeatNamespace);
        }
        String checkHeartbeatNamespaceV2 = checkHeartbeatNamespaceV2(serviceUnitId);
        if (checkHeartbeatNamespaceV2 != null) {
            return CompletableFuture.completedFuture(checkHeartbeatNamespaceV2);
        }
        String sLAMonitorBrokerName = getSLAMonitorBrokerName(serviceUnitId);
        return sLAMonitorBrokerName != null ? function.apply(sLAMonitorBrokerName).thenApply(bool -> {
            if (bool.booleanValue()) {
                return sLAMonitorBrokerName;
            }
            return null;
        }) : CompletableFuture.completedFuture(null);
    }

    private void searchForCandidateBroker(NamespaceBundle namespaceBundle, CompletableFuture<Optional<LookupResult>> completableFuture, LookupOptions lookupOptions) {
        if (null == this.pulsar.getLeaderElectionService()) {
            LOG.warn("The leader election has not yet been completed! NamespaceBundle[{}]", namespaceBundle);
            completableFuture.completeExceptionally(new IllegalStateException("The leader election has not yet been completed!"));
            return;
        }
        LeaderElectionService leaderElectionService = this.pulsar.getLeaderElectionService();
        if (leaderElectionService == null) {
            LOG.warn("Leader election service isn't initialized yet. Returning empty result to lookup. NamespaceBundle[{}]", namespaceBundle);
            completableFuture.complete(Optional.empty());
            return;
        }
        boolean isLeader = leaderElectionService.isLeader();
        try {
            String str = getHeartbeatOrSLAMonitorBrokerId(namespaceBundle, str2 -> {
                return CompletableFuture.completedFuture(Boolean.valueOf(isBrokerActive(str2)));
            }).get(this.config.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
            if (str == null) {
                Optional<LeaderBroker> currentLeader = this.pulsar.getLeaderElectionService().getCurrentLeader();
                if (lookupOptions.isAuthoritative()) {
                    str = this.pulsar.getBrokerId();
                } else {
                    boolean z = !this.loadManager.get().isCentralized() || leaderElectionService.isLeader();
                    if (!z) {
                        if (!(currentLeader.isPresent() && isBrokerActive(currentLeader.get().getBrokerId()))) {
                            z = true;
                            if (currentLeader.isEmpty()) {
                                LOG.warn("The information about the current leader broker wasn't available. Handling load manager decisions in a decentralized way. NamespaceBundle[{}]", namespaceBundle);
                            } else {
                                LOG.warn("The current leader broker {} isn't active. Handling load manager decisions in a decentralized way. NamespaceBundle[{}]", currentLeader.get(), namespaceBundle);
                            }
                        }
                    }
                    if (z) {
                        Optional<String> leastLoadedFromLoadManager = getLeastLoadedFromLoadManager(namespaceBundle);
                        if (leastLoadedFromLoadManager.isEmpty()) {
                            LOG.warn("Load manager didn't return any available broker. Returning empty result to lookup. NamespaceBundle[{}]", namespaceBundle);
                            completableFuture.complete(Optional.empty());
                            return;
                        } else {
                            str = leastLoadedFromLoadManager.get();
                            isLeader = true;
                        }
                    } else {
                        str = currentLeader.get().getBrokerId();
                    }
                }
            }
            try {
                Objects.requireNonNull(str);
                if (str.equals(this.pulsar.getBrokerId())) {
                    this.ownershipCache.tryAcquiringOwnership(namespaceBundle).thenAccept(namespaceEphemeralData -> {
                        if (namespaceEphemeralData.isDisabled()) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Namespace bundle {} is currently being unloaded", namespaceBundle);
                            }
                            completableFuture.completeExceptionally(new IllegalStateException(String.format("Namespace bundle %s is currently being unloaded", namespaceBundle)));
                            return;
                        }
                        if (lookupOptions.isLoadTopicsInBundle()) {
                            this.pulsar.loadNamespaceTopics(namespaceBundle);
                        }
                        if (!lookupOptions.hasAdvertisedListenerName()) {
                            completableFuture.complete(Optional.of(new LookupResult(namespaceEphemeralData)));
                            return;
                        }
                        AdvertisedListener advertisedListener = namespaceEphemeralData.getAdvertisedListeners().get(lookupOptions.getAdvertisedListenerName());
                        if (advertisedListener == null) {
                            completableFuture.completeExceptionally(new PulsarServerException("the broker do not have " + lookupOptions.getAdvertisedListenerName() + " listener"));
                            return;
                        }
                        URI brokerServiceUrl = advertisedListener.getBrokerServiceUrl();
                        URI brokerServiceUrlTls = advertisedListener.getBrokerServiceUrlTls();
                        completableFuture.complete(Optional.of(new LookupResult(namespaceEphemeralData, brokerServiceUrl == null ? null : brokerServiceUrl.toString(), brokerServiceUrlTls == null ? null : brokerServiceUrlTls.toString())));
                    }).exceptionally(th -> {
                        LOG.warn("Failed to acquire ownership for namespace bundle {}: {}", namespaceBundle, th);
                        completableFuture.completeExceptionally(new PulsarServerException("Failed to acquire ownership for namespace bundle " + namespaceBundle, th));
                        return null;
                    });
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Redirecting to broker {} to acquire ownership of bundle {}", str, namespaceBundle);
                    }
                    createLookupResult(str, isLeader, lookupOptions.getAdvertisedListenerName()).thenAccept(lookupResult -> {
                        completableFuture.complete(Optional.of(lookupResult));
                    }).exceptionally(th2 -> {
                        completableFuture.completeExceptionally(th2);
                        return null;
                    });
                }
            } catch (Exception e) {
                LOG.warn("Error in trying to acquire namespace bundle ownership for {}: {}", new Object[]{namespaceBundle, e.getMessage(), e});
                completableFuture.completeExceptionally(e);
            }
        } catch (Exception e2) {
            LOG.warn("Error when searching for candidate broker to acquire {}: {}", new Object[]{namespaceBundle, e2.getMessage(), e2});
            completableFuture.completeExceptionally(e2);
        }
    }

    public CompletableFuture<LookupResult> createLookupResult(String str, boolean z, String str2) {
        CompletableFuture<LookupResult> completableFuture = new CompletableFuture<>();
        try {
            Preconditions.checkArgument(StringUtils.isNotBlank(str), "Lookup broker can't be null %s", str);
            String str3 = "/loadbalance/brokers/" + str;
            this.localBrokerDataCache.get(str3).thenAccept(optional -> {
                if (!optional.isPresent()) {
                    completableFuture.completeExceptionally(new MetadataStoreException.NotFoundException(str3));
                    return;
                }
                LocalBrokerData localBrokerData = (LocalBrokerData) optional.get();
                if (!StringUtils.isNotBlank(str2)) {
                    completableFuture.complete(new LookupResult(localBrokerData.getWebServiceUrl(), localBrokerData.getWebServiceUrlTls(), localBrokerData.getPulsarServiceUrl(), localBrokerData.getPulsarServiceUrlTls(), z));
                    return;
                }
                AdvertisedListener advertisedListener = (AdvertisedListener) localBrokerData.getAdvertisedListeners().get(str2);
                if (advertisedListener == null) {
                    completableFuture.completeExceptionally(new PulsarServerException("the broker do not have " + str2 + " listener"));
                    return;
                }
                URI brokerServiceUrl = advertisedListener.getBrokerServiceUrl();
                URI brokerServiceUrlTls = advertisedListener.getBrokerServiceUrlTls();
                completableFuture.complete(new LookupResult(localBrokerData.getWebServiceUrl(), localBrokerData.getWebServiceUrlTls(), brokerServiceUrl == null ? null : brokerServiceUrl.toString(), brokerServiceUrlTls == null ? null : brokerServiceUrlTls.toString(), z));
            }).exceptionally(th -> {
                completableFuture.completeExceptionally(th);
                return null;
            });
        } catch (Exception e) {
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    public boolean isBrokerActive(String str) {
        Set<String> availableBrokers = getAvailableBrokers();
        if (!availableBrokers.contains(str)) {
            LOG.warn("Broker {} couldn't be found in available brokers {}", str, String.join(",", availableBrokers));
            return false;
        }
        if (!LOG.isDebugEnabled()) {
            return true;
        }
        LOG.debug("Broker {} is available for.", str);
        return true;
    }

    private Set<String> getAvailableBrokers() {
        try {
            return this.loadManager.get().getAvailableBrokers();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private Optional<String> getLeastLoadedFromLoadManager(ServiceUnitId serviceUnitId) throws Exception {
        Optional<ResourceUnit> leastLoaded = this.loadManager.get().getLeastLoaded(serviceUnitId);
        if (leastLoaded.isEmpty()) {
            LOG.warn("No broker is available for {}", serviceUnitId);
            return Optional.empty();
        }
        String resourceId = leastLoaded.get().getResourceId();
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} : redirecting to the least loaded broker, lookup address={}", this.pulsar.getBrokerId(), resourceId);
        }
        return Optional.of(resourceId);
    }

    public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle namespaceBundle) {
        return unloadNamespaceBundle(namespaceBundle, Optional.empty());
    }

    public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle namespaceBundle, Optional<String> optional) {
        return unloadNamespaceBundle(namespaceBundle, optional, this.config.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
    }

    public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle namespaceBundle, Optional<String> optional, long j, TimeUnit timeUnit) {
        return unloadNamespaceBundle(namespaceBundle, optional, j, timeUnit, true);
    }

    public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle namespaceBundle, long j, TimeUnit timeUnit) {
        return unloadNamespaceBundle(namespaceBundle, Optional.empty(), j, timeUnit, true);
    }

    public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle namespaceBundle, long j, TimeUnit timeUnit, boolean z) {
        return unloadNamespaceBundle(namespaceBundle, Optional.empty(), j, timeUnit, z);
    }

    public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle namespaceBundle, Optional<String> optional, long j, TimeUnit timeUnit, boolean z) {
        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar)) {
            return ExtensibleLoadManagerImpl.get(this.loadManager.get()).unloadNamespaceBundleAsync(namespaceBundle, optional, false, j, timeUnit);
        }
        OwnedBundle ownedBundle = this.ownershipCache.getOwnedBundle(namespaceBundle);
        return ownedBundle == null ? FutureUtil.failedFuture(new IllegalStateException("Bundle " + namespaceBundle + " is not currently owned")) : ownedBundle.handleUnloadRequest(this.pulsar, j, timeUnit, z);
    }

    public CompletableFuture<Boolean> isNamespaceBundleOwned(NamespaceBundle namespaceBundle) {
        return ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar) ? ExtensibleLoadManagerImpl.get(this.loadManager.get()).getOwnershipAsync(Optional.empty(), namespaceBundle).thenApply((v0) -> {
            return v0.isPresent();
        }) : this.pulsar.getLocalMetadataStore().exists(ServiceUnitUtils.path(namespaceBundle));
    }

    public CompletableFuture<Map<String, NamespaceOwnershipStatus>> getOwnedNameSpacesStatusAsync() {
        return this.pulsar.getPulsarResources().getNamespaceResources().getIsolationPolicies().getIsolationDataPoliciesAsync(this.pulsar.getConfiguration().getClusterName()).thenApply(optional -> {
            return (NamespaceIsolationPolicies) optional.orElseGet(NamespaceIsolationPolicies::new);
        }).thenCompose(namespaceIsolationPolicies -> {
            if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar)) {
                return ExtensibleLoadManagerImpl.get(this.loadManager.get()).getOwnedServiceUnitsAsync().thenApply(set -> {
                    return (Map) set.stream().collect(Collectors.toMap((v0) -> {
                        return v0.toString();
                    }, namespaceBundle -> {
                        return getNamespaceOwnershipStatus(true, namespaceIsolationPolicies.getPolicyByNamespace(namespaceBundle.getNamespaceObject()));
                    }));
                });
            }
            Collection<CompletableFuture<OwnedBundle>> values = this.ownershipCache.getOwnedBundlesAsync().values();
            return FutureUtil.waitForAll(values).thenApply(r8 -> {
                return (Map) values.stream().map((v0) -> {
                    return v0.join();
                }).collect(Collectors.toMap(ownedBundle -> {
                    return ownedBundle.getNamespaceBundle().toString();
                }, ownedBundle2 -> {
                    return getNamespaceOwnershipStatus(ownedBundle2.isActive(), namespaceIsolationPolicies.getPolicyByNamespace(ownedBundle2.getNamespaceBundle().getNamespaceObject()));
                }));
            });
        });
    }

    private NamespaceOwnershipStatus getNamespaceOwnershipStatus(boolean z, NamespaceIsolationPolicy namespaceIsolationPolicy) {
        NamespaceOwnershipStatus namespaceOwnershipStatus = new NamespaceOwnershipStatus(BrokerAssignment.shared, false, z);
        if (namespaceIsolationPolicy == null) {
            return namespaceOwnershipStatus;
        }
        namespaceOwnershipStatus.is_controlled = true;
        if (namespaceIsolationPolicy.isPrimaryBroker(this.pulsar.getAdvertisedAddress())) {
            namespaceOwnershipStatus.broker_assignment = BrokerAssignment.primary;
        } else if (namespaceIsolationPolicy.isSecondaryBroker(this.pulsar.getAdvertisedAddress())) {
            namespaceOwnershipStatus.broker_assignment = BrokerAssignment.secondary;
        }
        return namespaceOwnershipStatus;
    }

    public boolean isNamespaceBundleDisabled(NamespaceBundle namespaceBundle) throws Exception {
        Optional<NamespaceEphemeralData> now;
        try {
            CompletableFuture<Optional<NamespaceEphemeralData>> ownerAsync = this.ownershipCache.getOwnerAsync(namespaceBundle);
            if (ownerAsync == null || (now = ownerAsync.getNow(null)) == null || !now.isPresent()) {
                return false;
            }
            return now.get().isDisabled();
        } catch (Exception e) {
            LOG.warn("Exception in getting ownership info for service unit {}: {}", new Object[]{namespaceBundle, e.getMessage(), e});
            return false;
        }
    }

    public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle namespaceBundle, boolean z, NamespaceBundleSplitAlgorithm namespaceBundleSplitAlgorithm, List<Long> list) {
        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar)) {
            return ExtensibleLoadManagerImpl.get(this.loadManager.get()).splitNamespaceBundleAsync(namespaceBundle, namespaceBundleSplitAlgorithm, list);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        splitAndOwnBundleOnceAndRetry(namespaceBundle, z, new AtomicInteger(7), completableFuture, namespaceBundleSplitAlgorithm, list);
        return completableFuture;
    }

    void splitAndOwnBundleOnceAndRetry(NamespaceBundle namespaceBundle, boolean z, AtomicInteger atomicInteger, CompletableFuture<Void> completableFuture, NamespaceBundleSplitAlgorithm namespaceBundleSplitAlgorithm, List<Long> list) {
        namespaceBundleSplitAlgorithm.getSplitBoundary(getBundleSplitOption(namespaceBundle, list, this.config)).whenComplete((list2, th) -> {
            CompletableFuture completableFuture2 = new CompletableFuture();
            if (th != null) {
                completableFuture2.completeExceptionally(th);
            } else if (list2 == null || list2.size() == 0) {
                LOG.info("[{}] No valid boundary found in {} to split bundle {}", new Object[]{namespaceBundle.getNamespaceObject().toString(), list, namespaceBundle.getBundleRange()});
                completableFuture.complete(null);
                return;
            } else {
                try {
                    this.bundleFactory.splitBundles(namespaceBundle, list2.size() + 1, list2).thenAccept(pair -> {
                        if (pair == null) {
                            String format = String.format("bundle %s not found under namespace", namespaceBundle.toString());
                            LOG.warn(format);
                            completableFuture2.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(format));
                            return;
                        }
                        Objects.requireNonNull((NamespaceBundles) pair.getLeft());
                        Objects.requireNonNull((List) pair.getRight());
                        Preconditions.checkArgument(((List) pair.getRight()).size() == list2.size() + 1, "bundle has to be split in " + (list2.size() + 1) + " bundles");
                        NamespaceName namespaceObject = namespaceBundle.getNamespaceObject();
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("[{}] splitAndOwnBundleOnce: {}, counter: {}, bundles: {}", new Object[]{namespaceObject.toString(), namespaceBundle.getBundleRange(), Integer.valueOf(atomicInteger.get()), pair.getRight()});
                        }
                        try {
                            Iterator it = ((List) pair.getRight()).iterator();
                            while (it.hasNext()) {
                                Objects.requireNonNull(this.ownershipCache.tryAcquiringOwnership((NamespaceBundle) it.next()));
                            }
                            updateNamespaceBundles(namespaceObject, (NamespaceBundles) pair.getLeft()).thenCompose(r7 -> {
                                return updateNamespaceBundlesForPolicies(namespaceObject, (NamespaceBundles) pair.getLeft());
                            }).thenRun(() -> {
                                this.bundleFactory.invalidateBundleCache(namespaceBundle.getNamespaceObject());
                                completableFuture2.complete((List) pair.getRight());
                            }).exceptionally(th -> {
                                String format2 = String.format("failed to update namespace policies [%s], NamespaceBundle: %s due to %s", namespaceObject.toString(), namespaceBundle.getBundleRange(), th.getMessage());
                                LOG.warn(format2);
                                completableFuture2.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(format2, th.getCause()));
                                return null;
                            });
                        } catch (Exception e) {
                            String format2 = String.format("failed to acquire ownership of split bundle for namespace [%s], %s", namespaceObject.toString(), e.getMessage());
                            LOG.warn(format2, e);
                            completableFuture2.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(format2, e));
                        }
                    });
                } catch (Exception e) {
                    completableFuture2.completeExceptionally(e);
                }
            }
            completableFuture2.whenCompleteAsync((list2, th) -> {
                if (th == null) {
                    getOwnershipCache().updateBundleState(namespaceBundle, false).thenRun(() -> {
                        this.pulsar.getBrokerService().refreshTopicToStatsMaps(namespaceBundle);
                        this.loadManager.get().setLoadReportForceUpdateFlag();
                        this.pulsar.getNamespaceService().getOwnershipCache().removeOwnership(namespaceBundle);
                        completableFuture.complete(null);
                        if (z) {
                            list2.forEach(this::unloadNamespaceBundle);
                        }
                    }).exceptionally(th -> {
                        String format = String.format("failed to disable bundle %s under namespace [%s] with error %s", namespaceBundle.getNamespaceObject().toString(), namespaceBundle, th.getMessage());
                        LOG.warn(format, th);
                        completableFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(format));
                        return null;
                    });
                    return;
                }
                if ((th.getCause() instanceof MetadataStoreException.BadVersionException) && atomicInteger.decrementAndGet() >= 0) {
                    this.pulsar.getExecutor().schedule(() -> {
                        this.pulsar.getOrderedExecutor().execute(() -> {
                            splitAndOwnBundleOnceAndRetry(namespaceBundle, z, atomicInteger, completableFuture, namespaceBundleSplitAlgorithm, list);
                        });
                    }, 100L, TimeUnit.MILLISECONDS);
                } else {
                    if (th instanceof IllegalArgumentException) {
                        completableFuture.completeExceptionally(th);
                        return;
                    }
                    String format = String.format(" %s not success update nsBundles, counter %d, reason %s", namespaceBundle.toString(), Integer.valueOf(atomicInteger.get()), th.getMessage());
                    LOG.warn(format);
                    completableFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(format));
                }
            }, (Executor) this.pulsar.getOrderedExecutor());
        });
    }

    public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> getSplitBoundary(NamespaceBundle namespaceBundle, NamespaceBundleSplitAlgorithm namespaceBundleSplitAlgorithm, List<Long> list) {
        return getSplitBoundary(namespaceBundle, list, namespaceBundleSplitAlgorithm).thenCompose(list2 -> {
            if (list2 != null && list2.size() != 0) {
                return this.pulsar.getNamespaceService().getNamespaceBundleFactory().splitBundles(namespaceBundle, list2.size() + 1, list2);
            }
            LOG.info("[{}] No valid boundary found in {} to split bundle {}", new Object[]{namespaceBundle.getNamespaceObject().toString(), list, namespaceBundle.getBundleRange()});
            return CompletableFuture.completedFuture(null);
        });
    }

    public CompletableFuture<List<Long>> getSplitBoundary(NamespaceBundle namespaceBundle, List<Long> list, NamespaceBundleSplitAlgorithm namespaceBundleSplitAlgorithm) {
        return namespaceBundleSplitAlgorithm.getSplitBoundary(getBundleSplitOption(namespaceBundle, list, this.config));
    }

    private BundleSplitOption getBundleSplitOption(NamespaceBundle namespaceBundle, List<Long> list, ServiceConfiguration serviceConfiguration) {
        return serviceConfiguration.getDefaultNamespaceBundleSplitAlgorithm().equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE) ? new FlowOrQpsEquallyDivideBundleSplitOption(this, namespaceBundle, list, this.pulsar.getBrokerService().getTopicStats(namespaceBundle), serviceConfiguration.getLoadBalancerNamespaceBundleMaxMsgRate(), serviceConfiguration.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(), serviceConfiguration.getFlowOrQpsDifferenceThresholdPercentage()) : new BundleSplitOption(this, namespaceBundle, list);
    }

    public NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String str) {
        NamespaceBundleSplitAlgorithm of = NamespaceBundleSplitAlgorithm.of(str);
        if (of == null) {
            of = NamespaceBundleSplitAlgorithm.of(this.pulsar.getConfig().getDefaultNamespaceBundleSplitAlgorithm());
        }
        if (of == null) {
            of = NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO;
        }
        return of;
    }

    public CompletableFuture<Void> updateNamespaceBundlesForPolicies(NamespaceName namespaceName, NamespaceBundles namespaceBundles) {
        Objects.requireNonNull(namespaceName);
        Objects.requireNonNull(namespaceBundles);
        return this.pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespaceName).thenCompose(optional -> {
            if (optional.isPresent()) {
                return this.pulsar.getPulsarResources().getNamespaceResources().setPoliciesAsync(namespaceName, policies -> {
                    policies.bundles = namespaceBundles.getBundlesData();
                    return policies;
                });
            }
            LOG.error("Policies of namespace {} is not exist!", namespaceName);
            Policies policies2 = new Policies();
            policies2.bundles = namespaceBundles.getBundlesData();
            return this.pulsar.getPulsarResources().getNamespaceResources().createPoliciesAsync(namespaceName, policies2);
        });
    }

    public CompletableFuture<Void> updateNamespaceBundles(NamespaceName namespaceName, NamespaceBundles namespaceBundles) {
        Objects.requireNonNull(namespaceName);
        Objects.requireNonNull(namespaceBundles);
        return this.pulsar.getPulsarResources().getLocalPolicies().setLocalPoliciesWithVersion(namespaceName, namespaceBundles.toLocalPolicies(), namespaceBundles.getVersion());
    }

    public OwnershipCache getOwnershipCache() {
        return this.ownershipCache;
    }

    public Set<NamespaceBundle> getOwnedServiceUnits() {
        if (!ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar)) {
            return (Set) this.ownershipCache.getOwnedBundles().values().stream().map((v0) -> {
                return v0.getNamespaceBundle();
            }).collect(Collectors.toSet());
        }
        try {
            return ExtensibleLoadManagerImpl.get(this.loadManager.get()).getOwnedServiceUnitsAsync().get(this.config.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public boolean isServiceUnitOwned(ServiceUnitId serviceUnitId) throws Exception {
        return isServiceUnitOwnedAsync(serviceUnitId).get(this.config.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS).booleanValue();
    }

    public CompletableFuture<Boolean> isServiceUnitOwnedAsync(ServiceUnitId serviceUnitId) {
        return serviceUnitId instanceof TopicName ? isTopicOwnedAsync((TopicName) serviceUnitId) : serviceUnitId instanceof NamespaceName ? isNamespaceOwnedAsync((NamespaceName) serviceUnitId) : serviceUnitId instanceof NamespaceBundle ? ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar) ? this.loadManager.get().checkOwnershipAsync(Optional.empty(), serviceUnitId) : CompletableFuture.completedFuture(Boolean.valueOf(this.ownershipCache.isNamespaceBundleOwned((NamespaceBundle) serviceUnitId))) : FutureUtil.failedFuture(new IllegalArgumentException("Invalid class of NamespaceBundle: " + serviceUnitId.getClass().getName()));
    }

    @Deprecated
    public boolean isServiceUnitActive(TopicName topicName) {
        try {
            return isServiceUnitActiveAsync(topicName).get(this.pulsar.getConfig().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS).booleanValue();
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            LOG.warn("Unable to find OwnedBundle for topic in time - [{}]", topicName, e);
            throw new RuntimeException(e);
        }
    }

    public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName topicName) {
        return ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar) ? getBundleAsync(topicName).thenCompose(namespaceBundle -> {
            return this.loadManager.get().checkOwnershipAsync(Optional.of(topicName), namespaceBundle);
        }) : getBundleAsync(topicName).thenCompose(namespaceBundle2 -> {
            Optional<CompletableFuture<OwnedBundle>> ownedBundleAsync = this.ownershipCache.getOwnedBundleAsync(namespaceBundle2);
            return ownedBundleAsync.isEmpty() ? CompletableFuture.completedFuture(false) : ownedBundleAsync.get().thenApply(ownedBundle -> {
                return Boolean.valueOf(ownedBundle != null && ownedBundle.isActive());
            });
        });
    }

    private CompletableFuture<Boolean> isNamespaceOwnedAsync(NamespaceName namespaceName) {
        return ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar) ? getFullBundleAsync(namespaceName).thenCompose(namespaceBundle -> {
            return this.loadManager.get().checkOwnershipAsync(Optional.empty(), namespaceBundle);
        }) : getFullBundleAsync(namespaceName).thenApply(namespaceBundle2 -> {
            return Boolean.valueOf(this.ownershipCache.getOwnedBundle(namespaceBundle2) != null);
        });
    }

    private CompletableFuture<Boolean> isTopicOwnedAsync(TopicName topicName) {
        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar)) {
            return getBundleAsync(topicName).thenCompose(namespaceBundle -> {
                return this.loadManager.get().checkOwnershipAsync(Optional.of(topicName), namespaceBundle);
            });
        }
        CompletableFuture<NamespaceBundle> bundleAsync = getBundleAsync(topicName);
        OwnershipCache ownershipCache = this.ownershipCache;
        Objects.requireNonNull(ownershipCache);
        return bundleAsync.thenApply(ownershipCache::isNamespaceBundleOwned);
    }

    public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) {
        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar)) {
            return getBundleAsync(topicName).thenCompose(namespaceBundle -> {
                return this.loadManager.get().checkOwnershipAsync(Optional.of(topicName), namespaceBundle);
            });
        }
        CompletableFuture<NamespaceBundle> bundleAsync = getBundleAsync(topicName);
        OwnershipCache ownershipCache = this.ownershipCache;
        Objects.requireNonNull(ownershipCache);
        return bundleAsync.thenCompose(ownershipCache::checkOwnershipAsync);
    }

    public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle namespaceBundle) {
        return (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar) ? ExtensibleLoadManagerImpl.get(this.loadManager.get()).unloadNamespaceBundleAsync(namespaceBundle, Optional.empty(), true, this.pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS) : this.ownershipCache.removeOwnership(namespaceBundle)).thenRun(() -> {
            this.bundleFactory.invalidateBundleCache(namespaceBundle.getNamespaceObject());
        });
    }

    public void onNamespaceBundleOwned(NamespaceBundle namespaceBundle) {
        Iterator<NamespaceBundleOwnershipListener> it = this.bundleOwnershipListeners.iterator();
        while (it.hasNext()) {
            notifyNamespaceBundleOwnershipListener(namespaceBundle, it.next());
        }
    }

    public void onNamespaceBundleUnload(NamespaceBundle namespaceBundle) {
        for (NamespaceBundleOwnershipListener namespaceBundleOwnershipListener : this.bundleOwnershipListeners) {
            try {
                if (namespaceBundleOwnershipListener.test(namespaceBundle)) {
                    namespaceBundleOwnershipListener.unLoad(namespaceBundle);
                }
            } catch (Throwable th) {
                LOG.error("Call bundle {} ownership listener error", namespaceBundle, th);
            }
        }
    }

    public void addNamespaceBundleOwnershipListener(NamespaceBundleOwnershipListener... namespaceBundleOwnershipListenerArr) {
        Objects.requireNonNull(namespaceBundleOwnershipListenerArr);
        for (NamespaceBundleOwnershipListener namespaceBundleOwnershipListener : namespaceBundleOwnershipListenerArr) {
            if (namespaceBundleOwnershipListener != null) {
                this.bundleOwnershipListeners.add(namespaceBundleOwnershipListener);
            }
        }
        this.pulsar.runWhenReadyForIncomingRequests(() -> {
            getOwnedServiceUnits().forEach(namespaceBundle -> {
                notifyNamespaceBundleOwnershipListener(namespaceBundle, namespaceBundleOwnershipListenerArr);
            });
        });
    }

    private void notifyNamespaceBundleOwnershipListener(NamespaceBundle namespaceBundle, NamespaceBundleOwnershipListener... namespaceBundleOwnershipListenerArr) {
        if (namespaceBundleOwnershipListenerArr != null) {
            for (NamespaceBundleOwnershipListener namespaceBundleOwnershipListener : namespaceBundleOwnershipListenerArr) {
                try {
                    if (namespaceBundleOwnershipListener.test(namespaceBundle)) {
                        namespaceBundleOwnershipListener.onLoad(namespaceBundle);
                    }
                } catch (Throwable th) {
                    LOG.error("Call bundle {} ownership listener error", namespaceBundle, th);
                }
            }
        }
    }

    public NamespaceBundleFactory getNamespaceBundleFactory() {
        return this.bundleFactory;
    }

    public ServiceUnitId getServiceUnitId(TopicName topicName) throws Exception {
        return getBundle(topicName);
    }

    public CompletableFuture<List<String>> getFullListOfTopics(NamespaceName namespaceName) {
        return getListOfPersistentTopics(namespaceName).thenCombine((CompletionStage) getListOfNonPersistentTopics(namespaceName), ListUtils::union);
    }

    public CompletableFuture<List<String>> getFullListOfPartitionedTopic(NamespaceName namespaceName) {
        NamespaceResources.PartitionedTopicResources partitionedTopicResources = this.pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources();
        return partitionedTopicResources.listPartitionedTopicsAsync(namespaceName, TopicDomain.persistent).thenCombine((CompletionStage) partitionedTopicResources.listPartitionedTopicsAsync(namespaceName, TopicDomain.non_persistent), ListUtils::union);
    }

    public CompletableFuture<List<String>> getOwnedTopicListForNamespaceBundle(NamespaceBundle namespaceBundle) {
        return getFullListOfTopics(namespaceBundle.getNamespaceObject()).thenCompose(list -> {
            return CompletableFuture.completedFuture((List) list.stream().filter(str -> {
                return namespaceBundle.includes(TopicName.get(str));
            }).collect(Collectors.toList()));
        }).thenCombine((CompletionStage) getAllPartitions(namespaceBundle.getNamespaceObject()).thenCompose(list2 -> {
            return CompletableFuture.completedFuture((List) list2.stream().filter(str -> {
                return namespaceBundle.includes(TopicName.get(str));
            }).collect(Collectors.toList()));
        }), (BiFunction<? super U, ? super U, ? extends V>) (list3, list4) -> {
            Iterator it = list4.iterator();
            while (it.hasNext()) {
                String str = (String) it.next();
                if (!list3.contains(str)) {
                    list3.add(str);
                }
            }
            return list3;
        });
    }

    public CompletableFuture<TopicExistsInfo> checkTopicExists(TopicName topicName) {
        return this.pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(TopicName.get(topicName.toString())).thenCompose(partitionedTopicMetadata -> {
            return partitionedTopicMetadata.partitions > 0 ? CompletableFuture.completedFuture(TopicExistsInfo.newPartitionedTopicExists(Integer.valueOf(partitionedTopicMetadata.partitions))) : checkNonPartitionedTopicExists(topicName).thenApply(bool -> {
                return bool.booleanValue() ? TopicExistsInfo.newNonPartitionedTopicExists() : TopicExistsInfo.newTopicNotExists();
            });
        });
    }

    public CompletableFuture<Boolean> checkNonPartitionedTopicExists(TopicName topicName) {
        return topicName.isPersistent() ? this.pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName) : checkNonPersistentNonPartitionedTopicExists(topicName.toString());
    }

    public CompletableFuture<Boolean> checkNonPersistentNonPartitionedTopicExists(String str) {
        TopicName topicName = TopicName.get(str);
        return checkTopicOwnership(TopicName.get(str)).thenCompose(bool -> {
            if (bool.booleanValue()) {
                CompletableFuture<Optional<Topic>> topic = this.pulsar.getBrokerService().getTopic(str, false);
                return topic != null ? topic.thenApply((v0) -> {
                    return v0.isPresent();
                }) : CompletableFuture.completedFuture(false);
            }
            try {
                PulsarClientImpl client = this.pulsar.getClient();
                return getBrokerServiceUrlAsync(TopicName.get(str), LookupOptions.builder().readOnly(false).authoritative(true).build()).thenCompose(optional -> {
                    if (optional.isPresent()) {
                        LookupData lookupData = ((LookupResult) optional.get()).getLookupData();
                        return client.getLookup((this.pulsar.getConfiguration().isBrokerClientTlsEnabled() && StringUtils.isNotEmpty(lookupData.getBrokerUrlTls())) ? lookupData.getBrokerUrlTls() : lookupData.getBrokerUrl()).getPartitionedTopicMetadata(topicName, false).thenApply(partitionedTopicMetadata -> {
                            return true;
                        }).exceptionallyCompose(th -> {
                            Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
                            if ((unwrapCompletionException instanceof PulsarClientException.NotFoundException) || (unwrapCompletionException instanceof PulsarClientException.TopicDoesNotExistException) || (unwrapCompletionException instanceof PulsarAdminException.NotFoundException)) {
                                return CompletableFuture.completedFuture(false);
                            }
                            log.error("{} Failed to get partition metadata due to redirecting fails", str, th);
                            return CompletableFuture.failedFuture(th);
                        });
                    }
                    log.error("{} Failed to get partition metadata due can not find the owner broker", str);
                    return FutureUtil.failedFuture(new BrokerServiceException.ServiceUnitNotReadyException("No broker was available to own " + topicName));
                });
            } catch (Exception e) {
                log.error("{} Failed to get partition metadata due to create internal admin client fails", str, e);
                return FutureUtil.failedFuture(e);
            }
        });
    }

    public CompletableFuture<List<String>> getListOfTopics(NamespaceName namespaceName, CommandGetTopicsOfNamespace.Mode mode) {
        switch (AnonymousClass1.$SwitchMap$org$apache$pulsar$common$api$proto$CommandGetTopicsOfNamespace$Mode[mode.ordinal()]) {
            case 1:
                return getFullListOfTopics(namespaceName);
            case 2:
                return getListOfNonPersistentTopics(namespaceName);
            case 3:
            default:
                return getListOfPersistentTopics(namespaceName);
        }
    }

    public CompletableFuture<List<String>> getListOfUserTopics(NamespaceName namespaceName, CommandGetTopicsOfNamespace.Mode mode) {
        String format = String.format("%s://%s", mode, namespaceName);
        MutableBoolean mutableBoolean = new MutableBoolean();
        CompletableFuture<List<String>> computeIfAbsent = this.inProgressQueryUserTopics.computeIfAbsent(format, str -> {
            mutableBoolean.setTrue();
            return getListOfTopics(namespaceName, mode).thenApplyAsync(list -> {
                return TopicList.filterSystemTopic(list);
            }, (Executor) this.pulsar.getExecutor());
        });
        if (mutableBoolean.getValue().booleanValue()) {
            computeIfAbsent.whenComplete((list, th) -> {
                this.inProgressQueryUserTopics.remove(format, computeIfAbsent);
            });
        }
        return computeIfAbsent;
    }

    public CompletableFuture<List<String>> getAllPartitions(NamespaceName namespaceName) {
        return getPartitions(namespaceName, TopicDomain.persistent).thenCombine((CompletionStage) getPartitions(namespaceName, TopicDomain.non_persistent), ListUtils::union);
    }

    public CompletableFuture<List<String>> getPartitions(NamespaceName namespaceName, TopicDomain topicDomain) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Getting children from partitioned-topics now: {} - {}", namespaceName, topicDomain);
        }
        return this.pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().listPartitionedTopicsAsync(namespaceName, topicDomain).thenCompose(list -> {
            CompletableFuture completableFuture = new CompletableFuture();
            List synchronizedList = Collections.synchronizedList(new ArrayList());
            if (CollectionUtils.isNotEmpty(list)) {
                ArrayList arrayList = new ArrayList();
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    CompletableFuture<List<String>> partitionsForTopic = getPartitionsForTopic(TopicName.get((String) it.next()));
                    arrayList.add(partitionsForTopic);
                    Objects.requireNonNull(synchronizedList);
                    partitionsForTopic.thenAccept((v1) -> {
                        r1.addAll(v1);
                    });
                }
                FutureUtil.waitForAll(arrayList).whenComplete((r5, th) -> {
                    if (th != null) {
                        completableFuture.completeExceptionally(th);
                    } else {
                        completableFuture.complete(synchronizedList);
                    }
                });
            } else {
                completableFuture.complete(synchronizedList);
            }
            return completableFuture;
        });
    }

    private CompletableFuture<List<String>> getPartitionsForTopic(TopicName topicName) {
        return this.pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenCompose(partitionedTopicMetadata -> {
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
                arrayList.add(topicName.getPartition(i).toString());
            }
            return CompletableFuture.completedFuture(arrayList);
        });
    }

    public CompletableFuture<List<String>> getListOfPersistentTopics(NamespaceName namespaceName) {
        return this.pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(namespaceName);
    }

    public CompletableFuture<List<String>> getListOfNonPersistentTopics(NamespaceName namespaceName) {
        return PulsarWebResource.checkLocalOrGetPeerReplicationCluster(this.pulsar, namespaceName, true).thenCompose(clusterDataImpl -> {
            if (clusterDataImpl != null) {
                return getNonPersistentTopicsFromPeerCluster(clusterDataImpl, namespaceName);
            }
            ArrayList arrayList = new ArrayList();
            synchronized (this.pulsar.getBrokerService().getMultiLayerTopicMap()) {
                if (this.pulsar.getBrokerService().getMultiLayerTopicMap().containsKey(namespaceName.toString())) {
                    ((ConcurrentOpenHashMap) this.pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString())).forEach((str, concurrentOpenHashMap) -> {
                        concurrentOpenHashMap.forEach((str, topic) -> {
                            if ((topic instanceof NonPersistentTopic) && ((NonPersistentTopic) topic).isActive()) {
                                arrayList.add(str);
                            }
                        });
                    });
                }
            }
            arrayList.sort(null);
            return CompletableFuture.completedFuture(arrayList);
        });
    }

    private CompletableFuture<List<String>> getNonPersistentTopicsFromPeerCluster(ClusterDataImpl clusterDataImpl, NamespaceName namespaceName) {
        return getNamespaceClient(clusterDataImpl).getLookup().getTopicsUnderNamespace(namespaceName, CommandGetTopicsOfNamespace.Mode.NON_PERSISTENT, (String) null, (String) null).thenApply((v0) -> {
            return v0.getTopics();
        });
    }

    public PulsarClientImpl getNamespaceClient(ClusterDataImpl clusterDataImpl) {
        PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) this.namespaceClients.get(clusterDataImpl);
        return pulsarClientImpl != null ? pulsarClientImpl : (PulsarClientImpl) this.namespaceClients.computeIfAbsent(clusterDataImpl, clusterDataImpl2 -> {
            try {
                ClientBuilderImpl statsInterval = PulsarClient.builder().memoryLimit(0L, SizeUnit.BYTES).enableTcpNoDelay(false).statsInterval(0L, TimeUnit.SECONDS);
                statsInterval.loadConf(PropertiesUtils.filterAndMapProperties(this.config.getProperties(), "brokerClient_"));
                statsInterval.connectionMaxIdleSeconds(-1);
                if (this.pulsar.getConfiguration().isAuthenticationEnabled()) {
                    statsInterval.authentication(this.pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(), this.pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
                }
                if (this.pulsar.getConfiguration().isTlsEnabled()) {
                    statsInterval.serviceUrl(StringUtils.isNotBlank(clusterDataImpl.getBrokerServiceUrlTls()) ? clusterDataImpl.getBrokerServiceUrlTls() : clusterDataImpl.getServiceUrlTls()).enableTls(true).tlsTrustCertsFilePath(this.pulsar.getConfiguration().getBrokerClientTrustCertsFilePath()).allowTlsInsecureConnection(this.pulsar.getConfiguration().isTlsAllowInsecureConnection()).enableTlsHostnameVerification(this.pulsar.getConfiguration().isTlsHostnameVerificationEnabled());
                } else {
                    statsInterval.serviceUrl(StringUtils.isNotBlank(clusterDataImpl.getBrokerServiceUrl()) ? clusterDataImpl.getBrokerServiceUrl() : clusterDataImpl.getServiceUrl());
                }
                return this.pulsar.createClientImpl(statsInterval.getClientConfigurationData());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    public CompletableFuture<Optional<NamespaceEphemeralData>> getOwnerAsync(NamespaceBundle namespaceBundle) {
        return ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar) ? ExtensibleLoadManagerImpl.get(this.loadManager.get()).getOwnershipWithLookupDataAsync(namespaceBundle).thenCompose(optional -> {
            return (CompletionStage) optional.map(brokerLookupData -> {
                return CompletableFuture.completedFuture(Optional.of(brokerLookupData.toNamespaceEphemeralData()));
            }).orElseGet(() -> {
                return CompletableFuture.completedFuture(Optional.empty());
            });
        }) : this.ownershipCache.getOwnerAsync(namespaceBundle);
    }

    public boolean checkOwnershipPresent(NamespaceBundle namespaceBundle) throws Exception {
        return checkOwnershipPresentAsync(namespaceBundle).get(this.pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS).booleanValue();
    }

    public CompletableFuture<Boolean> checkOwnershipPresentAsync(NamespaceBundle namespaceBundle) {
        return ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this.pulsar) ? ExtensibleLoadManagerImpl.get(this.loadManager.get()).getOwnershipAsync(Optional.empty(), namespaceBundle).thenApply((v0) -> {
            return v0.isPresent();
        }) : getOwnerAsync(namespaceBundle).thenApply((v0) -> {
            return v0.isPresent();
        });
    }

    public void unloadSLANamespace() throws Exception {
        NamespaceName sLAMonitorNamespace = getSLAMonitorNamespace(this.pulsar.getBrokerId(), this.config);
        LOG.info("Checking owner for SLA namespace {}", sLAMonitorNamespace);
        if (checkOwnershipPresent(getFullBundle(sLAMonitorNamespace))) {
            LOG.info("Trying to unload SLA namespace {}", sLAMonitorNamespace);
            this.pulsar.getAdminClient().namespaces().unload(sLAMonitorNamespace.toString());
            LOG.info("Namespace {} unloaded successfully", sLAMonitorNamespace);
        }
    }

    public static NamespaceName getHeartbeatNamespace(String str, ServiceConfiguration serviceConfiguration) {
        return NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT, serviceConfiguration.getClusterName(), str));
    }

    public static NamespaceName getHeartbeatNamespaceV2(String str, ServiceConfiguration serviceConfiguration) {
        return NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, str));
    }

    public static NamespaceName getSLAMonitorNamespace(String str, ServiceConfiguration serviceConfiguration) {
        return NamespaceName.get(String.format(SLA_NAMESPACE_FMT, serviceConfiguration.getClusterName(), str));
    }

    public static String checkHeartbeatNamespace(ServiceUnitId serviceUnitId) {
        Matcher matcher = HEARTBEAT_NAMESPACE_PATTERN.matcher(serviceUnitId.getNamespaceObject().toString());
        if (!matcher.matches()) {
            return null;
        }
        LOG.debug("Heartbeat namespace matched the lookup namespace {}", serviceUnitId.getNamespaceObject().toString());
        return matcher.group(1);
    }

    public static String checkHeartbeatNamespaceV2(ServiceUnitId serviceUnitId) {
        Matcher matcher = HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(serviceUnitId.getNamespaceObject().toString());
        if (!matcher.matches()) {
            return null;
        }
        LOG.debug("Heartbeat namespace v2 matched the lookup namespace {}", serviceUnitId.getNamespaceObject().toString());
        return matcher.group(1);
    }

    public static String getSLAMonitorBrokerName(ServiceUnitId serviceUnitId) {
        Matcher matcher = SLA_NAMESPACE_PATTERN.matcher(serviceUnitId.getNamespaceObject().toString());
        if (matcher.matches()) {
            return matcher.group(1);
        }
        return null;
    }

    public static boolean isSystemServiceNamespace(String str) {
        return NamespaceName.SYSTEM_NAMESPACE.toString().equals(str) || SLA_NAMESPACE_PATTERN.matcher(str).matches() || HEARTBEAT_NAMESPACE_PATTERN.matcher(str).matches() || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(str).matches();
    }

    public static boolean isSLAOrHeartbeatNamespace(String str) {
        return SLA_NAMESPACE_PATTERN.matcher(str).matches() || HEARTBEAT_NAMESPACE_PATTERN.matcher(str).matches() || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(str).matches();
    }

    public static boolean isHeartbeatNamespace(ServiceUnitId serviceUnitId) {
        String namespaceName = serviceUnitId.getNamespaceObject().toString();
        return HEARTBEAT_NAMESPACE_PATTERN.matcher(namespaceName).matches() || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespaceName).matches();
    }

    public boolean registerSLANamespace() throws PulsarServerException {
        String brokerId = this.pulsar.getBrokerId();
        boolean registerNamespace = registerNamespace(getSLAMonitorNamespace(brokerId, this.config), false);
        if (registerNamespace) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Added SLA Monitoring namespace name in local cache: ns={}", getSLAMonitorNamespace(brokerId, this.config));
            }
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("SLA Monitoring not owned by the broker: ns={}", getSLAMonitorNamespace(brokerId, this.config));
        }
        return registerNamespace;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.namespaceClients.forEach((clusterDataImpl, pulsarClientImpl) -> {
            try {
                pulsarClientImpl.shutdown();
            } catch (PulsarClientException e) {
                LOG.warn("Error shutting down namespace client for cluster {}", clusterDataImpl, e);
            }
        });
    }
}
