package org.apache.pulsar.broker.loadbalance.impl;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TimeAverageBrokerData;
import org.apache.pulsar.broker.TimeAverageMessageData;
import org.apache.pulsar.broker.loadbalance.BrokerFilter;
import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
import org.apache.pulsar.broker.loadbalance.BundleSplitStrategy;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.broker.loadbalance.ModularLoadManager;
import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.class */
public class ModularLoadManagerImpl implements ModularLoadManager {
    public static final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data";
    public static final double DEFAULT_MESSAGE_RATE = 50.0d;
    public static final double DEFAULT_MESSAGE_THROUGHPUT = 50000.0d;
    public static final int NUM_LONG_SAMPLES = 1000;
    public static final int NUM_SHORT_SAMPLES = 10;
    public static final String RESOURCE_QUOTA_ZPATH = "/loadbalance/resource-quota/namespace";
    public static final String TIME_AVERAGE_BROKER_ZPATH = "/loadbalance/broker-time-average";
    private final Set<String> brokerCandidateCache;
    private LockManager<LocalBrokerData> brokersData;
    private ResourceLock<LocalBrokerData> brokerDataLock;
    private MetadataCache<BundleData> bundlesCache;
    private MetadataCache<ResourceQuota> resourceQuotaCache;
    private MetadataCache<TimeAverageBrokerData> timeAverageBrokerDataCache;
    private BrokerHostUsage brokerHostUsage;
    private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange;
    private String brokerZnodePath;
    private BundleSplitStrategy bundleSplitStrategy;
    private ServiceConfiguration conf;
    private final NamespaceBundleStats defaultStats;
    private final List<BrokerFilter> filterPipeline;
    private long lastBundleDataUpdate;
    private LocalBrokerData lastData;
    private final List<LoadSheddingStrategy> loadSheddingPipeline;
    private LocalBrokerData localData;
    private final LoadData loadData;
    private final Map<String, String> preallocatedBundleToBroker;
    private ModularLoadManagerStrategy placementStrategy;
    private SimpleResourceAllocationPolicies policies;
    private PulsarService pulsar;
    private final ScheduledExecutorService scheduler;
    private final LoadManagerShared.BrokerTopicLoadingPredicate brokerTopicLoadingPredicate;
    private Map<String, String> brokerToFailureDomainMap;
    private SessionEvent lastMetadataSessionEvent;
    private AtomicReference<List<Metrics>> loadBalancingMetrics;
    private AtomicReference<List<Metrics>> bundleUnloadMetrics;
    private AtomicReference<List<Metrics>> bundleSplitMetrics;
    private AtomicReference<List<Metrics>> bundleMetrics;
    private long bundleSplitCount;
    private long unloadBrokerCount;
    private long unloadBundleCount;
    private final Lock lock;
    private static final Logger log = LoggerFactory.getLogger(ModularLoadManagerImpl.class);
    private static final Summary selectBrokerForAssignment = (Summary) Summary.build("pulsar_broker_load_manager_bundle_assigment", "-").quantile(0.5d).quantile(0.99d).quantile(0.999d).quantile(1.0d).register();

    public ModularLoadManagerImpl() {
        this.lastMetadataSessionEvent = SessionEvent.Reconnected;
        this.loadBalancingMetrics = new AtomicReference<>();
        this.bundleUnloadMetrics = new AtomicReference<>();
        this.bundleSplitMetrics = new AtomicReference<>();
        this.bundleMetrics = new AtomicReference<>();
        this.bundleSplitCount = 0L;
        this.unloadBrokerCount = 0L;
        this.unloadBundleCount = 0L;
        this.lock = new ReentrantLock();
        this.brokerCandidateCache = new HashSet();
        this.brokerToNamespaceToBundleRange = ConcurrentOpenHashMap.newBuilder().build();
        this.defaultStats = new NamespaceBundleStats();
        this.filterPipeline = new ArrayList();
        this.loadData = new LoadData();
        this.loadSheddingPipeline = new ArrayList();
        this.preallocatedBundleToBroker = new ConcurrentHashMap();
        this.scheduler = Executors.newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory("pulsar-modular-load-manager"));
        this.brokerToFailureDomainMap = new HashMap();
        this.brokerTopicLoadingPredicate = new LoadManagerShared.BrokerTopicLoadingPredicate() { // from class: org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.1
            @Override // org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate
            public boolean isEnablePersistentTopics(String str) {
                BrokerData brokerData = ModularLoadManagerImpl.this.loadData.getBrokerData().get(str.replace("http://", ""));
                return (brokerData == null || brokerData.getLocalData() == null || !brokerData.getLocalData().isPersistentTopicsEnabled()) ? false : true;
            }

            @Override // org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate
            public boolean isEnableNonPersistentTopics(String str) {
                BrokerData brokerData = ModularLoadManagerImpl.this.loadData.getBrokerData().get(str.replace("http://", ""));
                return (brokerData == null || brokerData.getLocalData() == null || !brokerData.getLocalData().isNonPersistentTopicsEnabled()) ? false : true;
            }
        };
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public void initialize(PulsarService pulsarService) {
        this.pulsar = pulsarService;
        this.brokersData = pulsarService.getCoordinationService().getLockManager(LocalBrokerData.class);
        this.bundlesCache = pulsarService.getLocalMetadataStore().getMetadataCache(BundleData.class);
        this.resourceQuotaCache = pulsarService.getLocalMetadataStore().getMetadataCache(ResourceQuota.class);
        this.timeAverageBrokerDataCache = pulsarService.getLocalMetadataStore().getMetadataCache(TimeAverageBrokerData.class);
        pulsarService.getLocalMetadataStore().registerListener(this::handleDataNotification);
        pulsarService.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent);
        if (SystemUtils.IS_OS_LINUX) {
            this.brokerHostUsage = new LinuxBrokerHostUsageImpl(pulsarService);
        } else {
            this.brokerHostUsage = new GenericBrokerHostUsageImpl(pulsarService);
        }
        this.bundleSplitStrategy = new BundleSplitterTask();
        this.conf = pulsarService.getConfiguration();
        this.defaultStats.msgThroughputIn = 50000.0d;
        this.defaultStats.msgThroughputOut = 50000.0d;
        this.defaultStats.msgRateIn = 50.0d;
        this.defaultStats.msgRateOut = 50.0d;
        this.placementStrategy = ModularLoadManagerStrategy.create(this.conf);
        this.policies = new SimpleResourceAllocationPolicies(pulsarService);
        this.filterPipeline.add(new BrokerVersionFilter());
        refreshBrokerToFailureDomainMap();
        pulsarService.getPulsarResources().getClusterResources().getFailureDomainResources().registerListener(notification -> {
            this.scheduler.execute(() -> {
                refreshBrokerToFailureDomainMap();
            });
        });
        this.loadSheddingPipeline.add(createLoadSheddingStrategy());
    }

    public void handleDataNotification(Notification notification) {
        if (notification.getPath().startsWith(LoadManager.LOADBALANCE_BROKERS_ROOT)) {
            this.brokersData.listLocks(LoadManager.LOADBALANCE_BROKERS_ROOT).thenAccept(list -> {
                reapDeadBrokerPreallocations(list);
            });
            try {
                this.scheduler.submit(this::updateAll);
            } catch (RejectedExecutionException e) {
            }
        }
    }

    private void handleMetadataSessionEvent(SessionEvent sessionEvent) {
        this.lastMetadataSessionEvent = sessionEvent;
    }

    private LoadSheddingStrategy createLoadSheddingStrategy() {
        try {
            Object newInstance = Class.forName(this.conf.getLoadBalancerLoadSheddingStrategy()).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            if (newInstance instanceof LoadSheddingStrategy) {
                return (LoadSheddingStrategy) newInstance;
            }
            log.error("create load shedding strategy failed. using OverloadShedder instead.");
            return new OverloadShedder();
        } catch (Exception e) {
            log.error("Error when trying to create load shedding strategy: ", e);
            return new OverloadShedder();
        }
    }

    public ModularLoadManagerImpl(PulsarService pulsarService) {
        this();
        initialize(pulsarService);
    }

    private void reapDeadBrokerPreallocations(List<String> list) {
        for (String str : this.loadData.getBrokerData().keySet()) {
            if (!list.contains(str)) {
                if (log.isDebugEnabled()) {
                    log.debug("Broker {} appears to have stopped; now reclaiming any preallocations", str);
                }
                Iterator<Map.Entry<String, String>> it = this.preallocatedBundleToBroker.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry<String, String> next = it.next();
                    String key = next.getKey();
                    String value = next.getValue();
                    if (str.equals(value)) {
                        if (log.isDebugEnabled()) {
                            log.debug("Removing old preallocation on dead broker {} for bundle {}", value, key);
                        }
                        it.remove();
                    }
                }
            }
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public Set<String> getAvailableBrokers() {
        try {
            return new HashSet((Collection) this.brokersData.listLocks(LoadManager.LOADBALANCE_BROKERS_ROOT).join());
        } catch (Exception e) {
            log.warn("Error when trying to get active brokers", e);
            return this.loadData.getBrokerData().keySet();
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
        CompletableFuture<Set<String>> completableFuture = new CompletableFuture<>();
        this.brokersData.listLocks(LoadManager.LOADBALANCE_BROKERS_ROOT).whenComplete((list, th) -> {
            if (th == null) {
                completableFuture.complete(Sets.newHashSet(list));
                return;
            }
            log.warn("Error when trying to get active brokers", FutureUtil.unwrapCompletionException(th));
            completableFuture.complete(this.loadData.getBrokerData().keySet());
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public BundleData getBundleDataOrDefault(String str) {
        Optional optional;
        BundleData bundleData = null;
        try {
            optional = (Optional) this.bundlesCache.get(getBundleDataPath(str)).join();
        } catch (Exception e) {
            log.warn("Error when trying to find bundle {} on metadata store: {}", str, e);
        }
        if (optional.isPresent()) {
            return (BundleData) optional.get();
        }
        Optional optional2 = (Optional) this.resourceQuotaCache.get(String.format("%s/%s", RESOURCE_QUOTA_ZPATH, str)).join();
        if (optional2.isPresent()) {
            ResourceQuota resourceQuota = (ResourceQuota) optional2.get();
            bundleData = new BundleData(10, 1000);
            TimeAverageMessageData shortTermData = bundleData.getShortTermData();
            TimeAverageMessageData longTermData = bundleData.getLongTermData();
            shortTermData.setMsgRateIn(resourceQuota.getMsgRateIn());
            shortTermData.setMsgRateOut(resourceQuota.getMsgRateOut());
            shortTermData.setMsgThroughputIn(resourceQuota.getBandwidthIn());
            shortTermData.setMsgThroughputOut(resourceQuota.getBandwidthOut());
            longTermData.setMsgRateIn(resourceQuota.getMsgRateIn());
            longTermData.setMsgRateOut(resourceQuota.getMsgRateOut());
            longTermData.setMsgThroughputIn(resourceQuota.getBandwidthIn());
            longTermData.setMsgThroughputOut(resourceQuota.getBandwidthOut());
            shortTermData.setNumSamples(10);
            longTermData.setNumSamples(1000);
        }
        if (bundleData == null) {
            bundleData = new BundleData(10, 1000, this.defaultStats);
        }
        return bundleData;
    }

    public static String getBundleDataPath(String str) {
        return "/loadbalance/bundle-data/" + str;
    }

    private Map<String, NamespaceBundleStats> getBundleStats() {
        return this.pulsar.getBrokerService().getBundleStats();
    }

    private double percentChange(double d, double d2) {
        return d == 0.0d ? d2 == 0.0d ? 0.0d : Double.POSITIVE_INFINITY : 100.0d * Math.abs((d - d2) / d);
    }

    private boolean needBrokerDataUpdate() {
        long millis = TimeUnit.MINUTES.toMillis(this.conf.getLoadBalancerReportUpdateMaxIntervalMinutes());
        long currentTimeMillis = System.currentTimeMillis() - this.localData.getLastUpdate();
        if (currentTimeMillis > millis) {
            log.info("Writing local data to metadata store because time since last update exceeded threshold of {} minutes", Integer.valueOf(this.conf.getLoadBalancerReportUpdateMaxIntervalMinutes()));
            return true;
        }
        double max = Math.max(100.0d * Math.abs(this.lastData.getMaxResourceUsage() - this.localData.getMaxResourceUsage()), Math.max(percentChange(this.lastData.getMsgRateIn() + this.lastData.getMsgRateOut(), this.localData.getMsgRateIn() + this.localData.getMsgRateOut()), Math.max(percentChange(this.lastData.getMsgThroughputIn() + this.lastData.getMsgThroughputOut(), this.localData.getMsgThroughputIn() + this.localData.getMsgThroughputOut()), percentChange(this.lastData.getNumBundles(), this.localData.getNumBundles()))));
        if (max <= this.conf.getLoadBalancerReportUpdateThresholdPercentage()) {
            return false;
        }
        log.info("Writing local data to metadata store because maximum change {}% exceeded threshold {}%; time since last report written is {} seconds", new Object[]{Double.valueOf(max), Integer.valueOf(this.conf.getLoadBalancerReportUpdateThresholdPercentage()), Double.valueOf(currentTimeMillis / 1000.0d)});
        return true;
    }

    public void updateAll() {
        if (log.isDebugEnabled()) {
            log.debug("Updating broker and bundle data for loadreport");
        }
        updateAllBrokerData();
        updateBundleData();
        checkNamespaceBundleSplit();
    }

    private void updateAllBrokerData() {
        Set<String> availableBrokers = getAvailableBrokers();
        Map<String, BrokerData> brokerData = this.loadData.getBrokerData();
        for (String str : availableBrokers) {
            try {
                Optional optional = (Optional) this.brokersData.readLock(String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, str)).get();
                if (!optional.isPresent()) {
                    brokerData.remove(str);
                    log.info("[{}] Broker load report is not present", str);
                } else if (brokerData.containsKey(str)) {
                    brokerData.get(str).setLocalData((LocalBrokerData) optional.get());
                } else {
                    brokerData.put(str, new BrokerData((LocalBrokerData) optional.get()));
                }
            } catch (Exception e) {
                log.warn("Error reading broker data from cache for broker - [{}], [{}]", str, e.getMessage());
            }
        }
        for (String str2 : brokerData.keySet()) {
            if (!availableBrokers.contains(str2)) {
                brokerData.remove(str2);
            }
        }
    }

    private void updateBundleData() {
        Map<String, BundleData> bundleData = this.loadData.getBundleData();
        HashSet hashSet = new HashSet();
        for (Map.Entry<String, BrokerData> entry : this.loadData.getBrokerData().entrySet()) {
            String key = entry.getKey();
            BrokerData value = entry.getValue();
            Map lastStats = value.getLocalData().getLastStats();
            for (Map.Entry entry2 : lastStats.entrySet()) {
                String str = (String) entry2.getKey();
                NamespaceBundleStats namespaceBundleStats = (NamespaceBundleStats) entry2.getValue();
                hashSet.add(str);
                if (bundleData.containsKey(str)) {
                    bundleData.get(str).update(namespaceBundleStats);
                } else {
                    BundleData bundleDataOrDefault = getBundleDataOrDefault(str);
                    bundleDataOrDefault.update(namespaceBundleStats);
                    bundleData.put(str, bundleDataOrDefault);
                }
            }
            for (String str2 : bundleData.keySet()) {
                if (!hashSet.contains(str2)) {
                    bundleData.remove(str2);
                    if (this.pulsar.getLeaderElectionService().isLeader()) {
                        deleteBundleDataFromMetadataStore(str2);
                    }
                }
            }
            Map<String, BundleData> preallocatedBundleData = value.getPreallocatedBundleData();
            Set set = (Set) this.pulsar.getNamespaceService().getOwnedServiceUnits().stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.toSet());
            synchronized (preallocatedBundleData) {
                this.preallocatedBundleToBroker.keySet().removeAll(preallocatedBundleData.keySet());
                Iterator<Map.Entry<String, BundleData>> it = preallocatedBundleData.entrySet().iterator();
                while (it.hasNext()) {
                    String key2 = it.next().getKey();
                    if (!set.contains(key2) || (value.getLocalData().getBundles().contains(key2) && bundleData.containsKey(key2))) {
                        it.remove();
                    }
                }
            }
            value.getTimeAverageData().reset(lastStats.keySet(), bundleData, this.defaultStats);
            ConcurrentOpenHashMap concurrentOpenHashMap = (ConcurrentOpenHashMap) this.brokerToNamespaceToBundleRange.computeIfAbsent(key, str3 -> {
                return ConcurrentOpenHashMap.newBuilder().build();
            });
            synchronized (concurrentOpenHashMap) {
                concurrentOpenHashMap.clear();
                LoadManagerShared.fillNamespaceToBundlesMap(lastStats.keySet(), concurrentOpenHashMap);
                LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundleData.keySet(), concurrentOpenHashMap);
            }
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public void disableBroker() throws PulsarServerException {
        if (StringUtils.isNotEmpty(this.brokerZnodePath)) {
            try {
                this.brokerDataLock.release().join();
            } catch (CompletionException e) {
                if (!(e.getCause() instanceof MetadataStoreException.NotFoundException)) {
                    throw new PulsarServerException(MetadataStoreException.unwrap(e));
                }
                throw new PulsarServerException.NotFoundException(MetadataStoreException.unwrap(e));
            }
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public synchronized void doLoadShedding() {
        if (LoadManagerShared.isLoadSheddingEnabled(this.pulsar)) {
            if (getAvailableBrokers().size() <= 1) {
                log.info("Only 1 broker available: no load shedding will be performed");
                return;
            }
            long currentTimeMillis = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(this.conf.getLoadBalancerSheddingGracePeriodMinutes());
            Map<String, Long> recentlyUnloadedBundles = this.loadData.getRecentlyUnloadedBundles();
            recentlyUnloadedBundles.keySet().removeIf(str -> {
                return ((Long) recentlyUnloadedBundles.get(str)).longValue() < currentTimeMillis;
            });
            for (LoadSheddingStrategy loadSheddingStrategy : this.loadSheddingPipeline) {
                Multimap<String, String> findBundlesForUnloading = loadSheddingStrategy.findBundlesForUnloading(this.loadData, this.conf);
                findBundlesForUnloading.asMap().forEach((str2, collection) -> {
                    collection.forEach(str2 -> {
                        String namespaceNameFromBundleName = LoadManagerShared.getNamespaceNameFromBundleName(str2);
                        String bundleRangeFromBundleName = LoadManagerShared.getBundleRangeFromBundleName(str2);
                        if (shouldAntiAffinityNamespaceUnload(namespaceNameFromBundleName, bundleRangeFromBundleName, str2)) {
                            log.info("[{}] Unloading bundle: {} from broker {}", new Object[]{loadSheddingStrategy.getClass().getSimpleName(), str2, str2});
                            try {
                                this.pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceNameFromBundleName, bundleRangeFromBundleName);
                                this.loadData.getRecentlyUnloadedBundles().put(str2, Long.valueOf(System.currentTimeMillis()));
                            } catch (PulsarServerException | PulsarAdminException e) {
                                log.warn("Error when trying to perform load shedding on {} for broker {}", new Object[]{str2, str2, e});
                            }
                        }
                    });
                });
                updateBundleUnloadingMetrics(findBundlesForUnloading);
            }
        }
    }

    private void updateBundleUnloadingMetrics(Multimap<String, String> multimap) {
        this.unloadBrokerCount += multimap.keySet().size();
        this.unloadBundleCount += multimap.values().size();
        ArrayList newArrayList = Lists.newArrayList();
        HashMap hashMap = new HashMap();
        hashMap.put("metric", "bundleUnloading");
        Metrics create = Metrics.create(hashMap);
        create.put("brk_lb_unload_broker_count", Long.valueOf(this.unloadBrokerCount));
        create.put("brk_lb_unload_bundle_count", Long.valueOf(this.unloadBundleCount));
        newArrayList.add(create);
        this.bundleUnloadMetrics.set(newArrayList);
    }

    public boolean shouldAntiAffinityNamespaceUnload(String str, String str2, String str3) {
        boolean shouldAntiAffinityNamespaceUnload;
        try {
            Optional localPolicies = this.pulsar.getPulsarResources().getLocalPolicies().getLocalPolicies(NamespaceName.get(str));
            if (!localPolicies.isPresent() || StringUtils.isBlank(((LocalPolicies) localPolicies.get()).namespaceAntiAffinityGroup)) {
                return true;
            }
            synchronized (this.brokerCandidateCache) {
                this.brokerCandidateCache.clear();
                LoadManagerShared.applyNamespacePolicies(this.pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(str, str2), this.policies, this.brokerCandidateCache, getAvailableBrokers(), this.brokerTopicLoadingPredicate);
                shouldAntiAffinityNamespaceUnload = LoadManagerShared.shouldAntiAffinityNamespaceUnload(str, str2, str3, this.pulsar, this.brokerToNamespaceToBundleRange, this.brokerCandidateCache);
            }
            return shouldAntiAffinityNamespaceUnload;
        } catch (Exception e) {
            log.warn("Failed to check anti-affinity namespace ownership for {}/{}/{}, {}", new Object[]{str, str2, str3, e.getMessage()});
            return true;
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public void checkNamespaceBundleSplit() {
        String namespaceNameFromBundleName;
        String bundleRangeFromBundleName;
        if (!this.conf.isLoadBalancerAutoBundleSplitEnabled() || this.pulsar.getLeaderElectionService() == null || !this.pulsar.getLeaderElectionService().isLeader() || this.loadData.getBrokerData().size() <= 1) {
            return;
        }
        boolean isLoadBalancerAutoUnloadSplitBundlesEnabled = this.pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled();
        synchronized (this.bundleSplitStrategy) {
            Set<String> findBundlesToSplit = this.bundleSplitStrategy.findBundlesToSplit(this.loadData, this.pulsar);
            NamespaceBundleFactory namespaceBundleFactory = this.pulsar.getNamespaceService().getNamespaceBundleFactory();
            int i = 0;
            for (String str : findBundlesToSplit) {
                try {
                    namespaceNameFromBundleName = LoadManagerShared.getNamespaceNameFromBundleName(str);
                    bundleRangeFromBundleName = LoadManagerShared.getBundleRangeFromBundleName(str);
                } catch (Exception e) {
                    log.error("Failed to split namespace bundle {}", str, e);
                }
                if (namespaceBundleFactory.canSplitBundle(namespaceBundleFactory.getBundle(namespaceNameFromBundleName, bundleRangeFromBundleName))) {
                    this.loadData.getBundleData().remove(str);
                    this.localData.getLastStats().remove(str);
                    this.pulsar.getNamespaceService().getNamespaceBundleFactory().invalidateBundleCache(NamespaceName.get(namespaceNameFromBundleName));
                    deleteBundleDataFromMetadataStore(str);
                    log.info("Load-manager splitting bundle {} and unloading {}", str, Boolean.valueOf(isLoadBalancerAutoUnloadSplitBundlesEnabled));
                    this.pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceNameFromBundleName, bundleRangeFromBundleName, isLoadBalancerAutoUnloadSplitBundlesEnabled, (String) null);
                    i++;
                    log.info("Successfully split namespace bundle {}", str);
                }
            }
            updateBundleSplitMetrics(i);
        }
    }

    private void updateBundleSplitMetrics(int i) {
        this.bundleSplitCount += i;
        ArrayList newArrayList = Lists.newArrayList();
        HashMap hashMap = new HashMap();
        hashMap.put("metric", "bundlesSplit");
        Metrics create = Metrics.create(hashMap);
        create.put("brk_lb_bundles_split_count", Long.valueOf(this.bundleSplitCount));
        newArrayList.add(create);
        this.bundleSplitMetrics.set(newArrayList);
    }

    /*  JADX ERROR: NullPointerException in pass: AttachTryCatchVisitor
        java.lang.NullPointerException: Cannot invoke "String.charAt(int)" because "obj" is null
        	at jadx.core.utils.Utils.cleanObjectName(Utils.java:38)
        	at jadx.core.dex.instructions.args.ArgType.object(ArgType.java:86)
        	at jadx.core.dex.info.ClassInfo.fromName(ClassInfo.java:42)
        	at jadx.core.dex.visitors.AttachTryCatchVisitor.convertToHandlers(AttachTryCatchVisitor.java:113)
        	at jadx.core.dex.visitors.AttachTryCatchVisitor.initTryCatches(AttachTryCatchVisitor.java:54)
        	at jadx.core.dex.visitors.AttachTryCatchVisitor.visit(AttachTryCatchVisitor.java:42)
        */
    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public java.util.Optional<java.lang.String> selectBrokerForAssignment(org.apache.pulsar.common.naming.ServiceUnitId r7) {
        /*
            Method dump skipped, instructions count: 704
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.selectBrokerForAssignment(org.apache.pulsar.common.naming.ServiceUnitId):java.util.Optional");
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public void start() throws PulsarServerException {
        try {
            Map<String, String> protocolDataToAdvertise = this.pulsar.getProtocolDataToAdvertise();
            this.lastData = new LocalBrokerData(this.pulsar.getSafeWebServiceAddress(), this.pulsar.getWebServiceAddressTls(), this.pulsar.getBrokerServiceUrl(), this.pulsar.getBrokerServiceUrlTls(), this.pulsar.getAdvertisedListeners());
            this.lastData.setProtocols(protocolDataToAdvertise);
            this.lastData.setPersistentTopicsEnabled(this.pulsar.getConfiguration().isEnablePersistentTopics());
            this.lastData.setNonPersistentTopicsEnabled(this.pulsar.getConfiguration().isEnableNonPersistentTopics());
            this.localData = new LocalBrokerData(this.pulsar.getSafeWebServiceAddress(), this.pulsar.getWebServiceAddressTls(), this.pulsar.getBrokerServiceUrl(), this.pulsar.getBrokerServiceUrlTls(), this.pulsar.getAdvertisedListeners());
            this.localData.setProtocols(protocolDataToAdvertise);
            this.localData.setBrokerVersionString(this.pulsar.getBrokerVersion());
            this.localData.setPersistentTopicsEnabled(this.pulsar.getConfiguration().isEnablePersistentTopics());
            this.localData.setNonPersistentTopicsEnabled(this.pulsar.getConfiguration().isEnableNonPersistentTopics());
            String str = this.pulsar.getAdvertisedAddress() + ":" + (this.conf.getWebServicePort().isPresent() ? (Integer) this.conf.getWebServicePort().get() : (Integer) this.conf.getWebServicePortTls().get());
            this.brokerZnodePath = "/loadbalance/brokers/" + str;
            updateLocalBrokerData();
            this.brokerDataLock = (ResourceLock) this.brokersData.acquireLock(this.brokerZnodePath, this.localData).join();
            this.timeAverageBrokerDataCache.readModifyUpdateOrCreate("/loadbalance/broker-time-average/" + str, optional -> {
                return new TimeAverageBrokerData();
            }).join();
            updateAll();
            this.lastBundleDataUpdate = System.currentTimeMillis();
        } catch (Exception e) {
            log.error("Unable to acquire lock for broker: [{}]", this.brokerZnodePath, e);
            throw new PulsarServerException(e);
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public void stop() throws PulsarServerException {
        this.scheduler.shutdownNow();
        try {
            this.brokersData.close();
        } catch (Exception e) {
            log.warn("Failed to release broker lock: {}", e.getMessage());
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public LocalBrokerData updateLocalBrokerData() {
        this.lock.lock();
        try {
            SystemResourceUsage systemResourceUsage = LoadManagerShared.getSystemResourceUsage(this.brokerHostUsage);
            this.localData.update(systemResourceUsage, getBundleStats());
            updateLoadBalancingMetrics(systemResourceUsage);
            if (this.conf.isExposeBundlesMetricsInPrometheus()) {
                updateLoadBalancingBundlesMetrics(getBundleStats());
            }
        } catch (Exception e) {
            log.warn("Error when attempting to update local broker data", e);
            if (e instanceof ConcurrentModificationException) {
                throw ((ConcurrentModificationException) e);
            }
        } finally {
            this.lock.unlock();
        }
        return this.localData;
    }

    private void updateLoadBalancingBundlesMetrics(Map<String, NamespaceBundleStats> map) {
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry<String, NamespaceBundleStats> entry : map.entrySet()) {
            String key = entry.getKey();
            NamespaceBundleStats value = entry.getValue();
            HashMap hashMap = new HashMap();
            hashMap.put("broker", this.pulsar.getAdvertisedAddress());
            hashMap.put("bundle", key);
            hashMap.put("metric", "bundle");
            Metrics create = Metrics.create(hashMap);
            create.put("brk_bundle_msg_rate_in", Double.valueOf(value.msgRateIn));
            create.put("brk_bundle_msg_rate_out", Double.valueOf(value.msgRateOut));
            create.put("brk_bundle_topics_count", Long.valueOf(value.topics));
            create.put("brk_bundle_consumer_count", Integer.valueOf(value.consumerCount));
            create.put("brk_bundle_producer_count", Integer.valueOf(value.producerCount));
            create.put("brk_bundle_msg_throughput_in", Double.valueOf(value.msgThroughputIn));
            create.put("brk_bundle_msg_throughput_out", Double.valueOf(value.msgThroughputOut));
            newArrayList.add(create);
        }
        this.bundleMetrics.set(newArrayList);
    }

    private void updateLoadBalancingMetrics(SystemResourceUsage systemResourceUsage) {
        ArrayList newArrayList = Lists.newArrayList();
        HashMap hashMap = new HashMap();
        hashMap.put("broker", this.pulsar.getAdvertisedAddress());
        hashMap.put("metric", "loadBalancing");
        Metrics create = Metrics.create(hashMap);
        create.put("brk_lb_cpu_usage", Float.valueOf(systemResourceUsage.getCpu().percentUsage()));
        create.put("brk_lb_memory_usage", Float.valueOf(systemResourceUsage.getMemory().percentUsage()));
        create.put("brk_lb_directMemory_usage", Float.valueOf(systemResourceUsage.getDirectMemory().percentUsage()));
        create.put("brk_lb_bandwidth_in_usage", Float.valueOf(systemResourceUsage.getBandwidthIn().percentUsage()));
        create.put("brk_lb_bandwidth_out_usage", Float.valueOf(systemResourceUsage.getBandwidthOut().percentUsage()));
        newArrayList.add(create);
        this.loadBalancingMetrics.set(newArrayList);
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public void writeBrokerDataOnZooKeeper() {
        writeBrokerDataOnZooKeeper(false);
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public void writeBrokerDataOnZooKeeper(boolean z) {
        this.lock.lock();
        try {
            updateLocalBrokerData();
            if (this.lastMetadataSessionEvent != null && this.lastMetadataSessionEvent.isConnected() && (needBrokerDataUpdate() || z)) {
                this.localData.setLastUpdate(System.currentTimeMillis());
                this.brokerDataLock.updateValue(this.localData).join();
                this.localData.cleanDeltas();
                this.lastData.update(this.localData);
            }
        } catch (Exception e) {
            log.warn("Error writing broker data on metadata store", e);
            if (e instanceof ConcurrentModificationException) {
                throw ((ConcurrentModificationException) e);
            }
        } finally {
            this.lock.unlock();
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public void writeBundleDataOnZooKeeper() {
        updateBundleData();
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, BundleData> entry : this.loadData.getBundleData().entrySet()) {
            String key = entry.getKey();
            BundleData value = entry.getValue();
            arrayList.add(this.bundlesCache.readModifyUpdateOrCreate(getBundleDataPath(key), optional -> {
                return value;
            }).thenApply(bundleData -> {
                return null;
            }));
        }
        for (Map.Entry<String, BrokerData> entry2 : this.loadData.getBrokerData().entrySet()) {
            String key2 = entry2.getKey();
            TimeAverageBrokerData timeAverageData = entry2.getValue().getTimeAverageData();
            arrayList.add(this.timeAverageBrokerDataCache.readModifyUpdateOrCreate("/loadbalance/broker-time-average/" + key2, optional2 -> {
                return timeAverageData;
            }).thenApply(timeAverageBrokerData -> {
                return null;
            }));
        }
        try {
            FutureUtil.waitForAll(arrayList).join();
        } catch (Exception e) {
            log.warn("Error when writing metadata data to store", e);
        }
    }

    private void deleteBundleDataFromMetadataStore(String str) {
        try {
            this.bundlesCache.delete(getBundleDataPath(str)).join();
        } catch (Exception e) {
            if (e.getCause() instanceof MetadataStoreException.NotFoundException) {
                return;
            }
            log.warn("Failed to delete bundle-data {} from metadata store", str, e);
        }
    }

    private void refreshBrokerToFailureDomainMap() {
        if (this.pulsar.getConfiguration().isFailureDomainsEnabled()) {
            ClusterResources.FailureDomainResources failureDomainResources = this.pulsar.getPulsarResources().getClusterResources().getFailureDomainResources();
            String clusterName = this.pulsar.getConfiguration().getClusterName();
            try {
                synchronized (this.brokerToFailureDomainMap) {
                    HashMap newHashMap = Maps.newHashMap();
                    for (String str : failureDomainResources.listFailureDomains(clusterName)) {
                        try {
                            Optional failureDomain = failureDomainResources.getFailureDomain(clusterName, str);
                            if (failureDomain.isPresent()) {
                                Iterator it = ((FailureDomainImpl) failureDomain.get()).brokers.iterator();
                                while (it.hasNext()) {
                                    newHashMap.put((String) it.next(), str);
                                }
                            }
                        } catch (Exception e) {
                            log.warn("Failed to get domain {}", str, e);
                        }
                    }
                    this.brokerToFailureDomainMap = newHashMap;
                }
                log.info("Cluster domain refreshed {}", this.brokerToFailureDomainMap);
            } catch (Exception e2) {
                log.warn("Failed to get domain-list for cluster {}", e2.getMessage());
            }
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public LocalBrokerData getBrokerLocalData(String str) {
        try {
            return (LocalBrokerData) ((Optional) this.brokersData.readLock(String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, str)).join()).orElse(null);
        } catch (Exception e) {
            log.warn("Failed to get local-broker data for {}", str, e);
            return null;
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public List<Metrics> getLoadBalancingMetrics() {
        ArrayList arrayList = new ArrayList();
        if (this.loadBalancingMetrics.get() != null) {
            arrayList.addAll(this.loadBalancingMetrics.get());
        }
        if (this.bundleUnloadMetrics.get() != null) {
            arrayList.addAll(this.bundleUnloadMetrics.get());
        }
        if (this.bundleSplitMetrics.get() != null) {
            arrayList.addAll(this.bundleSplitMetrics.get());
        }
        if (this.bundleMetrics.get() != null) {
            arrayList.addAll(this.bundleMetrics.get());
        }
        return arrayList;
    }
}
