package org.apache.pulsar.broker.namespace;

import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.hash.Hashing;
import org.apache.pulsar.shade.io.prometheus.client.Counter;
import org.apache.pulsar.shade.org.apache.commons.cli.HelpFormatter;
import org.apache.pulsar.shade.org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.shade.org.apache.commons.collections4.ListUtils;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.shade.org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.NamespaceIsolationPolicy;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/namespace/NamespaceService.class */
public class NamespaceService implements AutoCloseable {
    private final ServiceConfiguration config;
    private final MetadataCache<LocalPolicies> localPoliciesCache;
    private final AtomicReference<LoadManager> loadManager;
    private final PulsarService pulsar;
    private final OwnershipCache ownershipCache;
    private final MetadataCache<LocalBrokerData> localBrokerDataCache;
    private final NamespaceBundleFactory bundleFactory;
    private int uncountedNamespaces;
    private final String host;
    private static final int BUNDLE_SPLIT_RETRY_LIMIT = 7;
    public static final String SLA_NAMESPACE_PROPERTY = "sla-monitor";
    public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s:%s";
    public static final String SLA_NAMESPACE_FMT = "sla-monitor/%s/%s:%s";
    public static final String NAMESPACE_ISOLATION_POLICIES = "namespaceIsolationPolicies";
    private static final Logger LOG = LoggerFactory.getLogger(NamespaceService.class);
    public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)");
    public static final Pattern SLA_NAMESPACE_PATTERN = Pattern.compile("sla-monitor/[^/]+/([^:]+:\\d+)");
    private static final Counter lookupRedirects = Counter.build("pulsar_broker_lookup_redirects", HelpFormatter.DEFAULT_OPT_PREFIX).register();
    private static final Counter lookupFailures = Counter.build("pulsar_broker_lookup_failures", HelpFormatter.DEFAULT_OPT_PREFIX).register();
    private static final Counter lookupAnswers = Counter.build("pulsar_broker_lookup_answers", HelpFormatter.DEFAULT_OPT_PREFIX).register();
    private static final Summary lookupLatency = Summary.build("pulsar_broker_lookup", HelpFormatter.DEFAULT_OPT_PREFIX).quantile(0.5d).quantile(0.99d).quantile(0.999d).quantile(1.0d).register();
    private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> findingBundlesAuthoritative = new ConcurrentOpenHashMap<>();
    private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> findingBundlesNotAuthoritative = new ConcurrentOpenHashMap<>();
    private final ConcurrentOpenHashMap<ClusterDataImpl, PulsarClientImpl> namespaceClients = new ConcurrentOpenHashMap<>();
    private final List<NamespaceBundleOwnershipListener> bundleOwnershipListeners = new CopyOnWriteArrayList();

    /* loaded from: input_file:org/apache/pulsar/broker/namespace/NamespaceService$AddressType.class */
    public enum AddressType {
        BROKER_URL,
        LOOKUP_URL
    }

    public NamespaceService(PulsarService pulsarService) {
        this.pulsar = pulsarService;
        this.host = pulsarService.getAdvertisedAddress();
        this.config = pulsarService.getConfiguration();
        this.loadManager = pulsarService.getLoadManager();
        this.bundleFactory = new NamespaceBundleFactory(pulsarService, Hashing.crc32());
        this.ownershipCache = new OwnershipCache(pulsarService, this.bundleFactory, this);
        this.localBrokerDataCache = pulsarService.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
        this.localPoliciesCache = pulsarService.getLocalMetadataStore().getMetadataCache(LocalPolicies.class);
    }

    public void initialize() {
        if (!getOwnershipCache().refreshSelfOwnerInfo()) {
            throw new RuntimeException("Failed to refresh self owner info.");
        }
    }

    public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topicName, LookupOptions lookupOptions) {
        long nanoTime = System.nanoTime();
        CompletableFuture thenCompose = getBundleAsync(topicName).thenCompose(namespaceBundle -> {
            return findBrokerServiceUrl(namespaceBundle, lookupOptions);
        });
        thenCompose.thenAccept(optional -> {
            lookupLatency.observe(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
            if (optional.isPresent()) {
                if (((LookupResult) optional.get()).isRedirect()) {
                    lookupRedirects.inc();
                } else {
                    lookupAnswers.inc();
                }
            }
        }).exceptionally(th -> {
            lookupFailures.inc();
            return null;
        });
        return thenCompose;
    }

    public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topicName) {
        return this.bundleFactory.getBundlesAsync(topicName.getNamespaceObject()).thenApply(namespaceBundles -> {
            return namespaceBundles.findBundle(topicName);
        });
    }

    public Optional<NamespaceBundle> getBundleIfPresent(TopicName topicName) {
        return this.bundleFactory.getBundlesIfPresent(topicName.getNamespaceObject()).map(namespaceBundles -> {
            return namespaceBundles.findBundle(topicName);
        });
    }

    public NamespaceBundle getBundle(TopicName topicName) throws Exception {
        return this.bundleFactory.getBundles(topicName.getNamespaceObject()).findBundle(topicName);
    }

    public int getBundleCount(NamespaceName namespaceName) throws Exception {
        return this.bundleFactory.getBundles(namespaceName).size();
    }

    private NamespaceBundle getFullBundle(NamespaceName namespaceName) throws Exception {
        return this.bundleFactory.getFullBundle(namespaceName);
    }

    private CompletableFuture<NamespaceBundle> getFullBundleAsync(NamespaceName namespaceName) {
        return this.bundleFactory.getFullBundleAsync(namespaceName);
    }

    public CompletableFuture<Optional<URL>> getWebServiceUrlAsync(ServiceUnitId serviceUnitId, LookupOptions lookupOptions) {
        if (serviceUnitId instanceof TopicName) {
            TopicName topicName = (TopicName) serviceUnitId;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Getting web service URL of topic: {} - options: {}", topicName, lookupOptions);
            }
            return getBundleAsync(topicName).thenCompose(namespaceBundle -> {
                return internalGetWebServiceUrl(namespaceBundle, lookupOptions);
            });
        }
        if (serviceUnitId instanceof NamespaceName) {
            return getFullBundleAsync((NamespaceName) serviceUnitId).thenCompose(namespaceBundle2 -> {
                return internalGetWebServiceUrl(namespaceBundle2, lookupOptions);
            });
        }
        if (serviceUnitId instanceof NamespaceBundle) {
            return internalGetWebServiceUrl((NamespaceBundle) serviceUnitId, lookupOptions);
        }
        throw new IllegalArgumentException("Unrecognized class of NamespaceBundle: " + serviceUnitId.getClass().getName());
    }

    public Optional<URL> getWebServiceUrl(ServiceUnitId serviceUnitId, LookupOptions lookupOptions) throws Exception {
        return getWebServiceUrlAsync(serviceUnitId, lookupOptions).get(this.pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS);
    }

    private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(NamespaceBundle namespaceBundle, LookupOptions lookupOptions) {
        return findBrokerServiceUrl(namespaceBundle, lookupOptions).thenApply(optional -> {
            if (optional.isPresent()) {
                try {
                    LookupData lookupData = ((LookupResult) optional.get()).getLookupData();
                    return Optional.of(new URL(lookupOptions.isRequestHttps() ? lookupData.getHttpUrlTls() : lookupData.getHttpUrl()));
                } catch (Exception e) {
                    LOG.warn("internalGetWebServiceUrl [{}]", e.getMessage(), e);
                }
            }
            return Optional.empty();
        });
    }

    public void registerBootstrapNamespaces() throws PulsarServerException {
        if (registerNamespace(getHeartbeatNamespace(this.host, this.config), true)) {
            this.uncountedNamespaces++;
            LOG.info("added heartbeat namespace name in local cache: ns={}", getHeartbeatNamespace(this.host, this.config));
        }
        for (String str : this.config.getBootstrapNamespaces()) {
            if (registerNamespace(str, false)) {
                LOG.info("added bootstrap namespace name in local cache: ns={}", str);
            }
        }
    }

    public boolean registerNamespace(String str, boolean z) throws PulsarServerException {
        String safeBrokerServiceUrl = this.pulsar.getSafeBrokerServiceUrl();
        try {
            NamespaceBundle fullBundle = this.bundleFactory.getFullBundle(NamespaceName.get(str));
            String nativeUrl = this.ownershipCache.tryAcquiringOwnership(fullBundle).get().getNativeUrl();
            if (safeBrokerServiceUrl.equals(nativeUrl)) {
                if (fullBundle == null) {
                    return true;
                }
                this.pulsar.loadNamespaceTopics(fullBundle);
                return true;
            }
            String format = String.format("namespace already owned by other broker : ns=%s expected=%s actual=%s", str, safeBrokerServiceUrl, nativeUrl);
            if (z) {
                throw new IllegalStateException(format);
            }
            LOG.info(format);
            return false;
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            throw new PulsarServerException(e);
        }
    }

    private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(NamespaceBundle namespaceBundle, LookupOptions lookupOptions) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("findBrokerServiceUrl: {} - options: {}", namespaceBundle, lookupOptions);
        }
        ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> concurrentOpenHashMap = lookupOptions.isAuthoritative() ? this.findingBundlesAuthoritative : this.findingBundlesNotAuthoritative;
        ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> concurrentOpenHashMap2 = concurrentOpenHashMap;
        return concurrentOpenHashMap.computeIfAbsent(namespaceBundle, namespaceBundle2 -> {
            CompletableFuture completableFuture = new CompletableFuture();
            this.ownershipCache.getOwnerAsync(namespaceBundle).thenAccept(optional -> {
                if (!optional.isPresent()) {
                    if (lookupOptions.isReadOnly()) {
                        completableFuture.complete(Optional.empty());
                        return;
                    } else {
                        this.pulsar.getExecutor().execute(() -> {
                            searchForCandidateBroker(namespaceBundle, completableFuture, lookupOptions);
                        });
                        return;
                    }
                }
                if (((NamespaceEphemeralData) optional.get()).isDisabled()) {
                    completableFuture.completeExceptionally(new IllegalStateException(String.format("Namespace bundle %s is being unloaded", namespaceBundle)));
                    return;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Namespace bundle {} already owned by {} ", namespaceBundle, optional);
                }
                if (!lookupOptions.hasAdvertisedListenerName()) {
                    completableFuture.complete(Optional.of(new LookupResult((NamespaceEphemeralData) optional.get())));
                    return;
                }
                AdvertisedListener advertisedListener = ((NamespaceEphemeralData) optional.get()).getAdvertisedListeners().get(lookupOptions.getAdvertisedListenerName());
                if (advertisedListener == null) {
                    completableFuture.completeExceptionally(new PulsarServerException("the broker do not have " + lookupOptions.getAdvertisedListenerName() + " listener"));
                } else {
                    URI brokerServiceUrlTls = advertisedListener.getBrokerServiceUrlTls();
                    completableFuture.complete(Optional.of(new LookupResult((NamespaceEphemeralData) optional.get(), advertisedListener.getBrokerServiceUrl().toString(), brokerServiceUrlTls == null ? null : brokerServiceUrlTls.toString())));
                }
            }).exceptionally(th -> {
                LOG.warn("Failed to check owner for bundle {}: {}", new Object[]{namespaceBundle, th.getMessage(), th});
                completableFuture.completeExceptionally(th);
                return null;
            });
            completableFuture.whenComplete((optional2, th2) -> {
                this.pulsar.getExecutor().execute(() -> {
                    concurrentOpenHashMap2.remove(namespaceBundle);
                });
            });
            return completableFuture;
        });
    }

    private void searchForCandidateBroker(NamespaceBundle namespaceBundle, CompletableFuture<Optional<LookupResult>> completableFuture, LookupOptions lookupOptions) {
        String sLAMonitorBrokerName;
        if (null == this.pulsar.getLeaderElectionService()) {
            LOG.warn("The leader election has not yet been completed! NamespaceBundle[{}]", namespaceBundle);
            completableFuture.completeExceptionally(new IllegalStateException("The leader election has not yet been completed!"));
            return;
        }
        LeaderElectionService leaderElectionService = this.pulsar.getLeaderElectionService();
        if (leaderElectionService == null) {
            LOG.warn("Leader election service isn't initialized yet. Returning empty result to lookup. NamespaceBundle[{}]", namespaceBundle);
            completableFuture.complete(Optional.empty());
            return;
        }
        boolean isLeader = leaderElectionService.isLeader();
        try {
            String checkHeartbeatNamespace = checkHeartbeatNamespace(namespaceBundle);
            if (checkHeartbeatNamespace == null && (sLAMonitorBrokerName = getSLAMonitorBrokerName(namespaceBundle)) != null && isBrokerActive(sLAMonitorBrokerName)) {
                checkHeartbeatNamespace = sLAMonitorBrokerName;
            }
            if (checkHeartbeatNamespace == null) {
                Optional<LeaderBroker> currentLeader = this.pulsar.getLeaderElectionService().getCurrentLeader();
                if (lookupOptions.isAuthoritative()) {
                    checkHeartbeatNamespace = this.pulsar.getSafeWebServiceAddress();
                } else {
                    boolean z = !this.loadManager.get().isCentralized() || leaderElectionService.isLeader();
                    if (!z) {
                        if (!(currentLeader.isPresent() && isBrokerActive(currentLeader.get().getServiceUrl()))) {
                            z = true;
                            if (currentLeader.isPresent()) {
                                LOG.warn("The current leader broker {} isn't active. Handling load manager decisions in a decentralized way. NamespaceBundle[{}]", currentLeader.get(), namespaceBundle);
                            } else {
                                LOG.warn("The information about the current leader broker wasn't available. Handling load manager decisions in a decentralized way. NamespaceBundle[{}]", namespaceBundle);
                            }
                        }
                    }
                    if (z) {
                        Optional<String> leastLoadedFromLoadManager = getLeastLoadedFromLoadManager(namespaceBundle);
                        if (!leastLoadedFromLoadManager.isPresent()) {
                            LOG.warn("Load manager didn't return any available broker. Returning empty result to lookup. NamespaceBundle[{}]", namespaceBundle);
                            completableFuture.complete(Optional.empty());
                            return;
                        } else {
                            checkHeartbeatNamespace = leastLoadedFromLoadManager.get();
                            isLeader = true;
                        }
                    } else {
                        checkHeartbeatNamespace = currentLeader.get().getServiceUrl();
                    }
                }
            }
            try {
                Preconditions.checkNotNull(checkHeartbeatNamespace);
                if (checkHeartbeatNamespace.equals(this.pulsar.getSafeWebServiceAddress())) {
                    this.pulsar.getConfigurationCache().policiesCache().invalidate(AdminResource.path("policies", namespaceBundle.getNamespaceObject().toString()));
                    this.ownershipCache.tryAcquiringOwnership(namespaceBundle).thenAccept(namespaceEphemeralData -> {
                        if (namespaceEphemeralData.isDisabled()) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Namespace bundle {} is currently being unloaded", namespaceBundle);
                            }
                            completableFuture.completeExceptionally(new IllegalStateException(String.format("Namespace bundle %s is currently being unloaded", namespaceBundle)));
                            return;
                        }
                        if (lookupOptions.isLoadTopicsInBundle()) {
                            this.pulsar.loadNamespaceTopics(namespaceBundle);
                        }
                        if (!lookupOptions.hasAdvertisedListenerName()) {
                            completableFuture.complete(Optional.of(new LookupResult(namespaceEphemeralData)));
                            return;
                        }
                        AdvertisedListener advertisedListener = namespaceEphemeralData.getAdvertisedListeners().get(lookupOptions.getAdvertisedListenerName());
                        if (advertisedListener == null) {
                            completableFuture.completeExceptionally(new PulsarServerException("the broker do not have " + lookupOptions.getAdvertisedListenerName() + " listener"));
                        } else {
                            URI brokerServiceUrlTls = advertisedListener.getBrokerServiceUrlTls();
                            completableFuture.complete(Optional.of(new LookupResult(namespaceEphemeralData, advertisedListener.getBrokerServiceUrl().toString(), brokerServiceUrlTls == null ? null : brokerServiceUrlTls.toString())));
                        }
                    }).exceptionally(th -> {
                        LOG.warn("Failed to acquire ownership for namespace bundle {}: {}", namespaceBundle, th);
                        completableFuture.completeExceptionally(new PulsarServerException("Failed to acquire ownership for namespace bundle " + namespaceBundle, th));
                        return null;
                    });
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Redirecting to broker {} to acquire ownership of bundle {}", checkHeartbeatNamespace, namespaceBundle);
                    }
                    createLookupResult(checkHeartbeatNamespace, isLeader, lookupOptions.getAdvertisedListenerName()).thenAccept(lookupResult -> {
                        completableFuture.complete(Optional.of(lookupResult));
                    }).exceptionally(th2 -> {
                        completableFuture.completeExceptionally(th2);
                        return null;
                    });
                }
            } catch (Exception e) {
                LOG.warn("Error in trying to acquire namespace bundle ownership for {}: {}", new Object[]{namespaceBundle, e.getMessage(), e});
                completableFuture.completeExceptionally(e);
            }
        } catch (Exception e2) {
            LOG.warn("Error when searching for candidate broker to acquire {}: {}", new Object[]{namespaceBundle, e2.getMessage(), e2});
            completableFuture.completeExceptionally(e2);
        }
    }

    protected CompletableFuture<LookupResult> createLookupResult(String str, boolean z, String str2) {
        CompletableFuture<LookupResult> completableFuture = new CompletableFuture<>();
        try {
            Preconditions.checkArgument(StringUtils.isNotBlank(str), "Lookup broker can't be null %s", str);
            String str3 = "/loadbalance/brokers/" + parseHostAndPort(str);
            this.localBrokerDataCache.get(str3).thenAccept(optional -> {
                if (!optional.isPresent()) {
                    completableFuture.completeExceptionally(new MetadataStoreException.NotFoundException(str3));
                    return;
                }
                LocalBrokerData localBrokerData = (LocalBrokerData) optional.get();
                if (!StringUtils.isNotBlank(str2)) {
                    completableFuture.complete(new LookupResult(localBrokerData.getWebServiceUrl(), localBrokerData.getWebServiceUrlTls(), localBrokerData.getPulsarServiceUrl(), localBrokerData.getPulsarServiceUrlTls(), z));
                    return;
                }
                AdvertisedListener advertisedListener = localBrokerData.getAdvertisedListeners().get(str2);
                if (advertisedListener == null) {
                    completableFuture.completeExceptionally(new PulsarServerException("the broker do not have " + str2 + " listener"));
                } else {
                    URI brokerServiceUrlTls = advertisedListener.getBrokerServiceUrlTls();
                    completableFuture.complete(new LookupResult(localBrokerData.getWebServiceUrl(), localBrokerData.getWebServiceUrlTls(), advertisedListener.getBrokerServiceUrl().toString(), brokerServiceUrlTls == null ? null : brokerServiceUrlTls.toString(), z));
                }
            }).exceptionally(th -> {
                completableFuture.completeExceptionally(th);
                return null;
            });
        } catch (Exception e) {
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    private boolean isBrokerActive(String str) {
        String parseHostAndPort = parseHostAndPort(str);
        Set<String> availableBrokers = getAvailableBrokers();
        if (!availableBrokers.contains(parseHostAndPort)) {
            LOG.warn("Broker {} ({}) couldn't be found in available brokers {}", new Object[]{str, parseHostAndPort, availableBrokers.stream().collect(Collectors.joining(","))});
            return false;
        }
        if (!LOG.isDebugEnabled()) {
            return true;
        }
        LOG.debug("Broker {} ({}) is available for.", str, parseHostAndPort);
        return true;
    }

    private static String parseHostAndPort(String str) {
        int indexOf = str.indexOf("://");
        if (indexOf == -1) {
            throw new IllegalArgumentException("'" + str + "' isn't an URI.");
        }
        return str.substring(indexOf + 3);
    }

    private Set<String> getAvailableBrokers() {
        try {
            return this.loadManager.get().getAvailableBrokers();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private Optional<String> getLeastLoadedFromLoadManager(ServiceUnitId serviceUnitId) throws Exception {
        Optional<ResourceUnit> leastLoaded = this.loadManager.get().getLeastLoaded(serviceUnitId);
        if (!leastLoaded.isPresent()) {
            LOG.warn("No broker is available for {}", serviceUnitId);
            return Optional.empty();
        }
        String resourceId = leastLoaded.get().getResourceId();
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} : redirecting to the least loaded broker, lookup address={}", this.pulsar.getSafeWebServiceAddress(), resourceId);
        }
        return Optional.of(resourceId);
    }

    public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle namespaceBundle) {
        return unloadNamespaceBundle(namespaceBundle, 5L, TimeUnit.MINUTES);
    }

    public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle namespaceBundle, long j, TimeUnit timeUnit) {
        OwnedBundle ownedBundle = this.ownershipCache.getOwnedBundle(namespaceBundle);
        return ownedBundle == null ? FutureUtil.failedFuture(new IllegalStateException("Bundle " + namespaceBundle + " is not currently owned")) : ownedBundle.handleUnloadRequest(this.pulsar, j, timeUnit);
    }

    public CompletableFuture<Boolean> isNamespaceBundleOwned(NamespaceBundle namespaceBundle) {
        return this.pulsar.getLocalMetadataStore().exists(ServiceUnitUtils.path(namespaceBundle));
    }

    public Map<String, NamespaceOwnershipStatus> getOwnedNameSpacesStatus() throws Exception {
        NamespaceIsolationPolicies localNamespaceIsolationPolicies = getLocalNamespaceIsolationPolicies();
        HashMap hashMap = new HashMap();
        for (OwnedBundle ownedBundle : this.ownershipCache.getOwnedBundles().values()) {
            hashMap.put(ownedBundle.getNamespaceBundle().toString(), getNamespaceOwnershipStatus(ownedBundle, localNamespaceIsolationPolicies.getPolicyByNamespace(ownedBundle.getNamespaceBundle().getNamespaceObject())));
        }
        return hashMap;
    }

    private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedBundle ownedBundle, NamespaceIsolationPolicy namespaceIsolationPolicy) {
        NamespaceOwnershipStatus namespaceOwnershipStatus = new NamespaceOwnershipStatus(BrokerAssignment.shared, false, ownedBundle.isActive());
        if (namespaceIsolationPolicy == null) {
            return namespaceOwnershipStatus;
        }
        namespaceOwnershipStatus.is_controlled = true;
        if (namespaceIsolationPolicy.isPrimaryBroker(this.pulsar.getAdvertisedAddress())) {
            namespaceOwnershipStatus.broker_assignment = BrokerAssignment.primary;
        } else if (namespaceIsolationPolicy.isSecondaryBroker(this.pulsar.getAdvertisedAddress())) {
            namespaceOwnershipStatus.broker_assignment = BrokerAssignment.secondary;
        }
        return namespaceOwnershipStatus;
    }

    private NamespaceIsolationPolicies getLocalNamespaceIsolationPolicies() throws Exception {
        return this.pulsar.getConfigurationCache().namespaceIsolationPoliciesCache().get(AdminResource.path("clusters", this.pulsar.getConfiguration().getClusterName(), NAMESPACE_ISOLATION_POLICIES)).orElseGet(() -> {
            return new NamespaceIsolationPolicies();
        });
    }

    public boolean isNamespaceBundleDisabled(NamespaceBundle namespaceBundle) throws Exception {
        Optional<NamespaceEphemeralData> now;
        try {
            CompletableFuture<Optional<NamespaceEphemeralData>> ownerAsync = this.ownershipCache.getOwnerAsync(namespaceBundle);
            if (ownerAsync == null || (now = ownerAsync.getNow(null)) == null || !now.isPresent()) {
                return false;
            }
            return now.get().isDisabled();
        } catch (Exception e) {
            LOG.warn("Exception in getting ownership info for service unit {}: {}", new Object[]{namespaceBundle, e.getMessage(), e});
            return false;
        }
    }

    public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle namespaceBundle, boolean z, NamespaceBundleSplitAlgorithm namespaceBundleSplitAlgorithm) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        splitAndOwnBundleOnceAndRetry(namespaceBundle, z, new AtomicInteger(7), completableFuture, namespaceBundleSplitAlgorithm);
        return completableFuture;
    }

    void splitAndOwnBundleOnceAndRetry(NamespaceBundle namespaceBundle, boolean z, AtomicInteger atomicInteger, CompletableFuture<Void> completableFuture, NamespaceBundleSplitAlgorithm namespaceBundleSplitAlgorithm) {
        namespaceBundleSplitAlgorithm.getSplitBoundary(this, namespaceBundle).whenComplete((l, th) -> {
            CompletableFuture completableFuture2 = new CompletableFuture();
            if (th == null) {
                try {
                    this.bundleFactory.splitBundles(namespaceBundle, 2, l).thenAccept(pair -> {
                        if (pair == null) {
                            String format = String.format("bundle %s not found under namespace", namespaceBundle.toString());
                            LOG.warn(format);
                            completableFuture2.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(format));
                            return;
                        }
                        Preconditions.checkNotNull((NamespaceBundles) pair.getLeft());
                        Preconditions.checkNotNull((List) pair.getRight());
                        Preconditions.checkArgument(((List) pair.getRight()).size() == 2, "bundle has to be split in two bundles");
                        NamespaceName namespaceObject = namespaceBundle.getNamespaceObject();
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("[{}] splitAndOwnBundleOnce: {}, counter: {},  2 bundles: {}, {}", new Object[]{namespaceObject.toString(), namespaceBundle.getBundleRange(), Integer.valueOf(atomicInteger.get()), ((NamespaceBundle) ((List) pair.getRight()).get(0)).getBundleRange(), ((NamespaceBundle) ((List) pair.getRight()).get(1)).getBundleRange()});
                        }
                        try {
                            Iterator it = ((List) pair.getRight()).iterator();
                            while (it.hasNext()) {
                                Preconditions.checkNotNull(this.ownershipCache.tryAcquiringOwnership((NamespaceBundle) it.next()));
                            }
                            updateNamespaceBundles(namespaceObject, (NamespaceBundles) pair.getLeft()).thenRun(() -> {
                                this.bundleFactory.invalidateBundleCache(namespaceBundle.getNamespaceObject());
                                completableFuture2.complete((List) pair.getRight());
                            }).exceptionally(th -> {
                                String format2 = String.format("failed to update namespace policies [%s], NamespaceBundle: %s due to %s", namespaceObject.toString(), namespaceBundle.getBundleRange(), th.getMessage());
                                LOG.warn(format2);
                                completableFuture2.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(format2, th.getCause()));
                                return null;
                            });
                        } catch (Exception e) {
                            String format2 = String.format("failed to acquire ownership of split bundle for namespace [%s], %s", namespaceObject.toString(), e.getMessage());
                            LOG.warn(format2, e);
                            completableFuture2.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(format2, e));
                        }
                    });
                } catch (Exception e) {
                    completableFuture2.completeExceptionally(e);
                }
            } else {
                completableFuture2.completeExceptionally(th);
            }
            completableFuture2.whenCompleteAsync((list, th) -> {
                if (th == null) {
                    getOwnershipCache().updateBundleState(namespaceBundle, false).thenRun(() -> {
                        this.pulsar.getBrokerService().refreshTopicToStatsMaps(namespaceBundle);
                        this.loadManager.get().setLoadReportForceUpdateFlag();
                        completableFuture.complete(null);
                        if (z) {
                            list.forEach(this::unloadNamespaceBundle);
                        }
                    }).exceptionally(th -> {
                        String format = String.format("failed to disable bundle %s under namespace [%s] with error %s", namespaceBundle.getNamespaceObject().toString(), namespaceBundle.toString(), th.getMessage());
                        LOG.warn(format, th);
                        completableFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(format));
                        return null;
                    });
                    return;
                }
                if ((th.getCause() instanceof MetadataStoreException.BadVersionException) && atomicInteger.decrementAndGet() >= 0) {
                    this.pulsar.getOrderedExecutor().execute(() -> {
                        splitAndOwnBundleOnceAndRetry(namespaceBundle, z, atomicInteger, completableFuture, namespaceBundleSplitAlgorithm);
                    });
                } else {
                    if (th instanceof IllegalArgumentException) {
                        completableFuture.completeExceptionally(th);
                        return;
                    }
                    String format = String.format(" %s not success update nsBundles, counter %d, reason %s", namespaceBundle.toString(), Integer.valueOf(atomicInteger.get()), th.getMessage());
                    LOG.warn(format);
                    completableFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(format));
                }
            }, (Executor) this.pulsar.getOrderedExecutor());
        });
    }

    private CompletableFuture<Void> updateNamespaceBundles(NamespaceName namespaceName, NamespaceBundles namespaceBundles) {
        Preconditions.checkNotNull(namespaceName);
        Preconditions.checkNotNull(namespaceBundles);
        try {
            return this.pulsar.getLocalMetadataStore().put(PulsarWebResource.joinPath(LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT, namespaceName.toString()), ObjectMapperFactory.getThreadLocal().writeValueAsBytes(namespaceBundles.toLocalPolicies()), namespaceBundles.getVersion()).thenApply(stat -> {
                return null;
            });
        } catch (IOException e) {
            return FutureUtil.failedFuture(e);
        }
    }

    public OwnershipCache getOwnershipCache() {
        return this.ownershipCache;
    }

    public int getTotalServiceUnitsLoaded() {
        return this.ownershipCache.getOwnedBundles().size() - this.uncountedNamespaces;
    }

    public Set<NamespaceBundle> getOwnedServiceUnits() {
        return (Set) this.ownershipCache.getOwnedBundles().values().stream().map((v0) -> {
            return v0.getNamespaceBundle();
        }).collect(Collectors.toSet());
    }

    public boolean isServiceUnitOwned(ServiceUnitId serviceUnitId) throws Exception {
        if (serviceUnitId instanceof TopicName) {
            return isTopicOwnedAsync((TopicName) serviceUnitId).get().booleanValue();
        }
        if (serviceUnitId instanceof NamespaceName) {
            return isNamespaceOwned((NamespaceName) serviceUnitId);
        }
        if (serviceUnitId instanceof NamespaceBundle) {
            return this.ownershipCache.isNamespaceBundleOwned((NamespaceBundle) serviceUnitId);
        }
        throw new IllegalArgumentException("Invalid class of NamespaceBundle: " + serviceUnitId.getClass().getName());
    }

    public CompletableFuture<Boolean> isServiceUnitOwnedAsync(ServiceUnitId serviceUnitId) {
        return serviceUnitId instanceof TopicName ? isTopicOwnedAsync((TopicName) serviceUnitId) : serviceUnitId instanceof NamespaceName ? isNamespaceOwnedAsync((NamespaceName) serviceUnitId) : serviceUnitId instanceof NamespaceBundle ? CompletableFuture.completedFuture(Boolean.valueOf(this.ownershipCache.isNamespaceBundleOwned((NamespaceBundle) serviceUnitId))) : FutureUtil.failedFuture(new IllegalArgumentException("Invalid class of NamespaceBundle: " + serviceUnitId.getClass().getName()));
    }

    public boolean isServiceUnitActive(TopicName topicName) {
        try {
            return this.ownershipCache.getOwnedBundle(getBundle(topicName)).isActive();
        } catch (Exception e) {
            LOG.warn("Unable to find OwnedBundle for topic - [{}]", topicName);
            return false;
        }
    }

    private boolean isNamespaceOwned(NamespaceName namespaceName) throws Exception {
        return this.ownershipCache.getOwnedBundle(getFullBundle(namespaceName)) != null;
    }

    private CompletableFuture<Boolean> isNamespaceOwnedAsync(NamespaceName namespaceName) {
        return getFullBundleAsync(namespaceName).thenApply(namespaceBundle -> {
            return Boolean.valueOf(this.ownershipCache.getOwnedBundle(namespaceBundle) != null);
        });
    }

    private CompletableFuture<Boolean> isTopicOwnedAsync(TopicName topicName) {
        return getBundleAsync(topicName).thenApply(namespaceBundle -> {
            return Boolean.valueOf(this.ownershipCache.isNamespaceBundleOwned(namespaceBundle));
        });
    }

    private boolean isTopicOwned(TopicName topicName) {
        Optional<NamespaceBundle> bundleIfPresent = getBundleIfPresent(topicName);
        if (bundleIfPresent.isPresent()) {
            return this.ownershipCache.getOwnedBundle(bundleIfPresent.get()) != null;
        }
        getBundleAsync(topicName).thenAccept(namespaceBundle -> {
            LOG.info("Succeeded in getting bundle {} for topic - [{}]", namespaceBundle, topicName);
        }).exceptionally(th -> {
            LOG.warn("Failed to get bundle for topic - [{}] {}", topicName, th.getMessage());
            return null;
        });
        return false;
    }

    public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) {
        return getBundleAsync(topicName).thenCompose(namespaceBundle -> {
            return this.ownershipCache.checkOwnership(namespaceBundle);
        });
    }

    public void removeOwnedServiceUnit(NamespaceBundle namespaceBundle) throws Exception {
        this.ownershipCache.removeOwnership(namespaceBundle).get(this.pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS);
        this.bundleFactory.invalidateBundleCache(namespaceBundle.getNamespaceObject());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onNamespaceBundleOwned(NamespaceBundle namespaceBundle) {
        Iterator<NamespaceBundleOwnershipListener> it = this.bundleOwnershipListeners.iterator();
        while (it.hasNext()) {
            notifyNamespaceBundleOwnershipListener(namespaceBundle, it.next());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onNamespaceBundleUnload(NamespaceBundle namespaceBundle) {
        for (NamespaceBundleOwnershipListener namespaceBundleOwnershipListener : this.bundleOwnershipListeners) {
            try {
                if (namespaceBundleOwnershipListener.test(namespaceBundle)) {
                    namespaceBundleOwnershipListener.unLoad(namespaceBundle);
                }
            } catch (Throwable th) {
                LOG.error("Call bundle {} ownership lister error", namespaceBundle, th);
            }
        }
    }

    public void addNamespaceBundleOwnershipListener(NamespaceBundleOwnershipListener... namespaceBundleOwnershipListenerArr) {
        Preconditions.checkNotNull(namespaceBundleOwnershipListenerArr);
        for (NamespaceBundleOwnershipListener namespaceBundleOwnershipListener : namespaceBundleOwnershipListenerArr) {
            if (namespaceBundleOwnershipListener != null) {
                this.bundleOwnershipListeners.add(namespaceBundleOwnershipListener);
            }
        }
        getOwnedServiceUnits().forEach(namespaceBundle -> {
            notifyNamespaceBundleOwnershipListener(namespaceBundle, namespaceBundleOwnershipListenerArr);
        });
    }

    private void notifyNamespaceBundleOwnershipListener(NamespaceBundle namespaceBundle, NamespaceBundleOwnershipListener... namespaceBundleOwnershipListenerArr) {
        if (namespaceBundleOwnershipListenerArr != null) {
            for (NamespaceBundleOwnershipListener namespaceBundleOwnershipListener : namespaceBundleOwnershipListenerArr) {
                try {
                    if (namespaceBundleOwnershipListener.test(namespaceBundle)) {
                        namespaceBundleOwnershipListener.onLoad(namespaceBundle);
                    }
                } catch (Throwable th) {
                    LOG.error("Call bundle {} ownership lister error", namespaceBundle, th);
                }
            }
        }
    }

    public NamespaceBundleFactory getNamespaceBundleFactory() {
        return this.bundleFactory;
    }

    public ServiceUnitId getServiceUnitId(TopicName topicName) throws Exception {
        return getBundle(topicName);
    }

    public CompletableFuture<List<String>> getFullListOfTopics(NamespaceName namespaceName) {
        return getListOfPersistentTopics(namespaceName).thenCombine((CompletionStage) getListOfNonPersistentTopics(namespaceName), (list, list2) -> {
            return ListUtils.union(list, list2);
        });
    }

    public CompletableFuture<List<String>> getOwnedTopicListForNamespaceBundle(NamespaceBundle namespaceBundle) {
        return getFullListOfTopics(namespaceBundle.getNamespaceObject()).thenCompose(list -> {
            return CompletableFuture.completedFuture((List) list.stream().filter(str -> {
                return namespaceBundle.includes(TopicName.get(str));
            }).collect(Collectors.toList()));
        }).thenCombine((CompletionStage) getAllPartitions(namespaceBundle.getNamespaceObject()).thenCompose(list2 -> {
            return CompletableFuture.completedFuture((List) list2.stream().filter(str -> {
                return namespaceBundle.includes(TopicName.get(str));
            }).collect(Collectors.toList()));
        }), (BiFunction<? super U, ? super U, ? extends V>) (list3, list4) -> {
            Iterator it = list4.iterator();
            while (it.hasNext()) {
                String str = (String) it.next();
                if (!list3.contains(str)) {
                    list3.add(str);
                }
            }
            return list3;
        });
    }

    public CompletableFuture<Boolean> checkTopicExists(TopicName topicName) {
        return topicName.isPersistent() ? this.pulsar.getLocalZkCacheService().managedLedgerExists(topicName.getPersistenceNamingEncoding()) : this.pulsar.getBrokerService().getTopicIfExists(topicName.toString()).thenApply(optional -> {
            return Boolean.valueOf(optional.isPresent());
        });
    }

    public CompletableFuture<List<String>> getListOfTopics(NamespaceName namespaceName, CommandGetTopicsOfNamespace.Mode mode) {
        switch (mode) {
            case ALL:
                return getFullListOfTopics(namespaceName);
            case NON_PERSISTENT:
                return getListOfNonPersistentTopics(namespaceName);
            case PERSISTENT:
            default:
                return getListOfPersistentTopics(namespaceName);
        }
    }

    public CompletableFuture<List<String>> getAllPartitions(NamespaceName namespaceName) {
        return getPartitions(namespaceName, TopicDomain.persistent).thenCombine((CompletionStage) getPartitions(namespaceName, TopicDomain.non_persistent), ListUtils::union);
    }

    public CompletableFuture<List<String>> getPartitions(NamespaceName namespaceName, TopicDomain topicDomain) {
        String path = PulsarWebResource.path("partitioned-topics", namespaceName.toString(), topicDomain.toString());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Getting children from partitioned-topics now: {}", path);
        }
        return this.pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().getChildrenAsync(path).thenCompose(list -> {
            CompletableFuture completableFuture = new CompletableFuture();
            List synchronizedList = Collections.synchronizedList(Lists.newArrayList());
            if (CollectionUtils.isNotEmpty(list)) {
                ArrayList newArrayList = Lists.newArrayList();
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    CompletableFuture<List<String>> partitionsForTopic = getPartitionsForTopic(TopicName.get(String.format("%s://%s/%s", topicDomain.value(), namespaceName.toString(), Codec.decode((String) it.next()))));
                    newArrayList.add(partitionsForTopic);
                    Objects.requireNonNull(synchronizedList);
                    partitionsForTopic.thenAccept((v1) -> {
                        r1.addAll(v1);
                    });
                }
                FutureUtil.waitForAll(newArrayList).whenComplete((r5, th) -> {
                    if (th != null) {
                        completableFuture.completeExceptionally(th);
                    } else {
                        completableFuture.complete(synchronizedList);
                    }
                });
            } else {
                completableFuture.complete(synchronizedList);
            }
            return completableFuture;
        });
    }

    private CompletableFuture<List<String>> getPartitionsForTopic(TopicName topicName) {
        return this.pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenCompose(partitionedTopicMetadata -> {
            ArrayList newArrayList = Lists.newArrayList();
            for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
                newArrayList.add(topicName.getPartition(i).toString());
            }
            return CompletableFuture.completedFuture(newArrayList);
        });
    }

    public CompletableFuture<List<String>> getListOfPersistentTopics(NamespaceName namespaceName) {
        String format = String.format("/managed-ledgers/%s/persistent", namespaceName);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Getting children from managed-ledgers now: {}", format);
        }
        return this.pulsar.getLocalZkCacheService().managedLedgerListCache().getAsync(format).thenApply(set -> {
            ArrayList newArrayList = Lists.newArrayList();
            Iterator it = set.iterator();
            while (it.hasNext()) {
                newArrayList.add(String.format("persistent://%s/%s", namespaceName, Codec.decode((String) it.next())));
            }
            newArrayList.sort(null);
            return newArrayList;
        });
    }

    public CompletableFuture<List<String>> getListOfNonPersistentTopics(NamespaceName namespaceName) {
        return PulsarWebResource.checkLocalOrGetPeerReplicationCluster(this.pulsar, namespaceName).thenCompose(clusterDataImpl -> {
            if (clusterDataImpl != null) {
                return getNonPersistentTopicsFromPeerCluster(clusterDataImpl, namespaceName);
            }
            ArrayList newArrayList = Lists.newArrayList();
            synchronized (this.pulsar.getBrokerService().getMultiLayerTopicMap()) {
                if (this.pulsar.getBrokerService().getMultiLayerTopicMap().containsKey(namespaceName.toString())) {
                    this.pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString()).forEach((str, concurrentOpenHashMap) -> {
                        concurrentOpenHashMap.forEach((str, topic) -> {
                            if ((topic instanceof NonPersistentTopic) && ((NonPersistentTopic) topic).isActive()) {
                                newArrayList.add(str);
                            }
                        });
                    });
                }
            }
            newArrayList.sort(null);
            return CompletableFuture.completedFuture(newArrayList);
        });
    }

    private CompletableFuture<List<String>> getNonPersistentTopicsFromPeerCluster(ClusterDataImpl clusterDataImpl, NamespaceName namespaceName) {
        return getNamespaceClient(clusterDataImpl).getLookup().getTopicsUnderNamespace(namespaceName, CommandGetTopicsOfNamespace.Mode.NON_PERSISTENT);
    }

    public PulsarClientImpl getNamespaceClient(ClusterDataImpl clusterDataImpl) {
        PulsarClientImpl pulsarClientImpl = this.namespaceClients.get(clusterDataImpl);
        return pulsarClientImpl != null ? pulsarClientImpl : this.namespaceClients.computeIfAbsent(clusterDataImpl, clusterDataImpl2 -> {
            try {
                ClientBuilder statsInterval = PulsarClient.builder().enableTcpNoDelay(false).statsInterval(0L, TimeUnit.SECONDS);
                if (this.pulsar.getConfiguration().isAuthenticationEnabled()) {
                    statsInterval.authentication(this.pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(), this.pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
                }
                if (this.pulsar.getConfiguration().isTlsEnabled()) {
                    statsInterval.serviceUrl(StringUtils.isNotBlank(clusterDataImpl.getBrokerServiceUrlTls()) ? clusterDataImpl.getBrokerServiceUrlTls() : clusterDataImpl.getServiceUrlTls()).enableTls(true).tlsTrustCertsFilePath(this.pulsar.getConfiguration().getBrokerClientTrustCertsFilePath()).allowTlsInsecureConnection(this.pulsar.getConfiguration().isTlsAllowInsecureConnection());
                } else {
                    statsInterval.serviceUrl(StringUtils.isNotBlank(clusterDataImpl.getBrokerServiceUrl()) ? clusterDataImpl.getBrokerServiceUrl() : clusterDataImpl.getServiceUrl());
                }
                return new PulsarClientImpl(((ClientBuilderImpl) statsInterval).getClientConfigurationData(), this.pulsar.getBrokerService().executor());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    public Optional<NamespaceEphemeralData> getOwner(NamespaceBundle namespaceBundle) throws Exception {
        return getOwnerAsync(namespaceBundle).get(this.pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS);
    }

    public CompletableFuture<Optional<NamespaceEphemeralData>> getOwnerAsync(NamespaceBundle namespaceBundle) {
        return this.ownershipCache.getOwnerAsync(namespaceBundle);
    }

    public void unloadSLANamespace() throws Exception {
        String sLAMonitorNamespace = getSLAMonitorNamespace(this.host, this.config);
        LOG.info("Checking owner for SLA namespace {}", sLAMonitorNamespace);
        if (getOwner(getFullBundle(NamespaceName.get(sLAMonitorNamespace))).isPresent()) {
            LOG.info("Trying to unload SLA namespace {}", sLAMonitorNamespace);
            this.pulsar.getAdminClient().namespaces().unload(sLAMonitorNamespace);
            LOG.info("Namespace {} unloaded successfully", sLAMonitorNamespace);
        }
    }

    public static String getHeartbeatNamespace(String str, ServiceConfiguration serviceConfiguration) {
        Integer num = null;
        if (serviceConfiguration.getWebServicePort().isPresent()) {
            num = serviceConfiguration.getWebServicePort().get();
        } else if (serviceConfiguration.getWebServicePortTls().isPresent()) {
            num = serviceConfiguration.getWebServicePortTls().get();
        }
        return String.format(HEARTBEAT_NAMESPACE_FMT, serviceConfiguration.getClusterName(), str, num);
    }

    public static String getSLAMonitorNamespace(String str, ServiceConfiguration serviceConfiguration) {
        Integer num = null;
        if (serviceConfiguration.getWebServicePort().isPresent()) {
            num = serviceConfiguration.getWebServicePort().get();
        } else if (serviceConfiguration.getWebServicePortTls().isPresent()) {
            num = serviceConfiguration.getWebServicePortTls().get();
        }
        return String.format(SLA_NAMESPACE_FMT, serviceConfiguration.getClusterName(), str, num);
    }

    public static String checkHeartbeatNamespace(ServiceUnitId serviceUnitId) {
        Matcher matcher = HEARTBEAT_NAMESPACE_PATTERN.matcher(serviceUnitId.getNamespaceObject().toString());
        if (!matcher.matches()) {
            return null;
        }
        LOG.debug("SLAMonitoring namespace matched the lookup namespace {}", serviceUnitId.getNamespaceObject().toString());
        return String.format("http://%s", matcher.group(1));
    }

    public static String getSLAMonitorBrokerName(ServiceUnitId serviceUnitId) {
        Matcher matcher = SLA_NAMESPACE_PATTERN.matcher(serviceUnitId.getNamespaceObject().toString());
        if (matcher.matches()) {
            return String.format("http://%s", matcher.group(1));
        }
        return null;
    }

    public boolean registerSLANamespace() throws PulsarServerException {
        boolean registerNamespace = registerNamespace(getSLAMonitorNamespace(this.host, this.config), false);
        if (registerNamespace) {
            this.uncountedNamespaces++;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Added SLA Monitoring namespace name in local cache: ns={}", getSLAMonitorNamespace(this.host, this.config));
            }
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("SLA Monitoring not owned by the broker: ns={}", getSLAMonitorNamespace(this.host, this.config));
        }
        return registerNamespace;
    }

    public void registerOwnedBundles() {
        ArrayList arrayList = new ArrayList(this.ownershipCache.getOwnedBundles().values());
        this.ownershipCache.invalidateLocalOwnerCache();
        arrayList.forEach(ownedBundle -> {
            try {
                if (!this.pulsar.getLocalZkCache().checkRegNodeAndWaitExpired(ServiceUnitUtils.path(ownedBundle.getNamespaceBundle()))) {
                    this.ownershipCache.tryAcquiringOwnership(ownedBundle.getNamespaceBundle());
                }
            } catch (Exception e) {
                try {
                    ownedBundle.handleUnloadRequest(this.pulsar, 5L, TimeUnit.MINUTES);
                } catch (IllegalStateException e2) {
                } catch (Exception e3) {
                    LOG.error("Unexpected exception occur when register owned bundle {}. Shutdown broker now !!!", ownedBundle.getNamespaceBundle(), e3);
                    this.pulsar.getShutdownService().shutdown(-1);
                }
            }
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.namespaceClients.forEach((clusterDataImpl, pulsarClientImpl) -> {
            try {
                pulsarClientImpl.shutdown();
            } catch (PulsarClientException e) {
                LOG.warn("Error shutting down namespace client for cluster {}", clusterDataImpl, e);
            }
        });
    }
}
