package org.apache.pulsar.broker.loadbalance.extensions.scheduler;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Reflections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.class */
public class UnloadScheduler implements LoadManagerScheduler {
    private static final Logger log = LoggerFactory.getLogger(UnloadScheduler.class);
    private final NamespaceUnloadStrategy namespaceUnloadStrategy;
    private final ScheduledExecutorService loadManagerExecutor;
    private final PulsarService pulsar;
    private final UnloadManager unloadManager;
    private final LoadManagerContext context;
    private final ServiceUnitStateChannel channel;
    private final ServiceConfiguration conf;
    private final UnloadCounter counter;
    private final AtomicReference<List<Metrics>> unloadMetrics;
    private long counterLastUpdatedAt;
    private volatile ScheduledFuture<?> task;
    private final Set<String> unloadBrokers;
    private final Map<String, Long> recentlyUnloadedBundles;
    private final Map<String, Long> recentlyUnloadedBrokers;

    public UnloadScheduler(PulsarService pulsarService, ScheduledExecutorService scheduledExecutorService, UnloadManager unloadManager, LoadManagerContext loadManagerContext, ServiceUnitStateChannel serviceUnitStateChannel, UnloadCounter unloadCounter, AtomicReference<List<Metrics>> atomicReference) {
        this(pulsarService, scheduledExecutorService, unloadManager, loadManagerContext, serviceUnitStateChannel, createNamespaceUnloadStrategy(pulsarService), unloadCounter, atomicReference);
    }

    @VisibleForTesting
    protected UnloadScheduler(PulsarService pulsarService, ScheduledExecutorService scheduledExecutorService, UnloadManager unloadManager, LoadManagerContext loadManagerContext, ServiceUnitStateChannel serviceUnitStateChannel, NamespaceUnloadStrategy namespaceUnloadStrategy, UnloadCounter unloadCounter, AtomicReference<List<Metrics>> atomicReference) {
        this.counterLastUpdatedAt = 0L;
        this.pulsar = pulsarService;
        this.namespaceUnloadStrategy = namespaceUnloadStrategy;
        this.recentlyUnloadedBundles = new HashMap();
        this.recentlyUnloadedBrokers = new HashMap();
        this.unloadBrokers = new HashSet();
        this.loadManagerExecutor = scheduledExecutorService;
        this.counter = unloadCounter;
        this.unloadMetrics = atomicReference;
        this.unloadManager = unloadManager;
        this.context = loadManagerContext;
        this.conf = loadManagerContext.brokerConfiguration();
        this.channel = serviceUnitStateChannel;
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler
    public synchronized void execute() {
        boolean z = this.conf.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();
        if (z) {
            log.info("Load balancer enabled: {}, Shedding enabled: {}.", Boolean.valueOf(this.conf.isLoadBalancerEnabled()), Boolean.valueOf(this.conf.isLoadBalancerSheddingEnabled()));
        }
        if (!isLoadBalancerSheddingEnabled()) {
            if (z) {
                log.info("The load balancer or load balancer shedding already disabled. Skipping.");
                return;
            }
            return;
        }
        long currentTimeMillis = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(this.conf.getLoadBalancerSheddingGracePeriodMinutes());
        this.recentlyUnloadedBundles.keySet().removeIf(str -> {
            return this.recentlyUnloadedBundles.get(str).longValue() < currentTimeMillis;
        });
        long namespaceBundleUnloadingTimeoutMs = this.conf.getNamespaceBundleUnloadingTimeoutMs();
        synchronized (this.namespaceUnloadStrategy) {
            try {
                try {
                } catch (Exception e) {
                    log.error("[{}] Namespace unload has exception.", this.namespaceUnloadStrategy.getClass().getSimpleName(), e);
                    if (this.counter.updatedAt() > this.counterLastUpdatedAt) {
                        this.unloadMetrics.set(this.counter.toMetrics(this.pulsar.getAdvertisedAddress()));
                        this.counterLastUpdatedAt = this.counter.updatedAt();
                    }
                }
                if (!this.channel.isChannelOwnerAsync().get(namespaceBundleUnloadingTimeoutMs, TimeUnit.MILLISECONDS).booleanValue()) {
                    if (z) {
                        log.info("Current broker is not channel owner. Skipping.");
                    }
                    return;
                }
                List<String> list = this.context.brokerRegistry().getAvailableBrokersAsync().get(namespaceBundleUnloadingTimeoutMs, TimeUnit.MILLISECONDS);
                if (z) {
                    log.info("Available brokers: {}", list);
                }
                if (list.size() <= 1) {
                    log.info("Only 1 broker available: no load shedding will be performed. Skipping.");
                    if (this.counter.updatedAt() > this.counterLastUpdatedAt) {
                        this.unloadMetrics.set(this.counter.toMetrics(this.pulsar.getAdvertisedAddress()));
                        this.counterLastUpdatedAt = this.counter.updatedAt();
                    }
                    return;
                }
                Set<UnloadDecision> findBundlesForUnloading = this.namespaceUnloadStrategy.findBundlesForUnloading(this.context, this.recentlyUnloadedBundles, this.recentlyUnloadedBrokers);
                if (z) {
                    log.info("[{}] Unload decision result: {}", this.namespaceUnloadStrategy.getClass().getSimpleName(), findBundlesForUnloading);
                }
                if (findBundlesForUnloading.isEmpty()) {
                    if (z) {
                        log.info("[{}] Unload decision unloads is empty. Skipping.", this.namespaceUnloadStrategy.getClass().getSimpleName());
                    }
                    if (this.counter.updatedAt() > this.counterLastUpdatedAt) {
                        this.unloadMetrics.set(this.counter.toMetrics(this.pulsar.getAdvertisedAddress()));
                        this.counterLastUpdatedAt = this.counter.updatedAt();
                    }
                    return;
                }
                ArrayList arrayList = new ArrayList();
                this.unloadBrokers.clear();
                findBundlesForUnloading.forEach(unloadDecision -> {
                    if (unloadDecision.getLabel() == UnloadDecision.Label.Success) {
                        Unload unload = unloadDecision.getUnload();
                        log.info("[{}] Unloading bundle: {}", this.namespaceUnloadStrategy.getClass().getSimpleName(), unload);
                        arrayList.add(this.unloadManager.waitAsync(this.channel.publishUnloadEventAsync(unload), unload.serviceUnit(), unloadDecision, namespaceBundleUnloadingTimeoutMs, TimeUnit.MILLISECONDS).thenAccept(r7 -> {
                            this.unloadBrokers.add(unload.sourceBroker());
                            this.recentlyUnloadedBundles.put(unload.serviceUnit(), Long.valueOf(System.currentTimeMillis()));
                            this.recentlyUnloadedBrokers.put(unload.sourceBroker(), Long.valueOf(System.currentTimeMillis()));
                        }));
                    }
                });
                FutureUtil.waitForAll(arrayList).whenComplete((r4, th) -> {
                    this.counter.updateUnloadBrokerCount(this.unloadBrokers.size());
                }).get(namespaceBundleUnloadingTimeoutMs, TimeUnit.MILLISECONDS);
                if (this.counter.updatedAt() > this.counterLastUpdatedAt) {
                    this.unloadMetrics.set(this.counter.toMetrics(this.pulsar.getAdvertisedAddress()));
                    this.counterLastUpdatedAt = this.counter.updatedAt();
                }
            } finally {
                if (this.counter.updatedAt() > this.counterLastUpdatedAt) {
                    this.unloadMetrics.set(this.counter.toMetrics(this.pulsar.getAdvertisedAddress()));
                    this.counterLastUpdatedAt = this.counter.updatedAt();
                }
            }
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler
    public void start() {
        if (this.task == null) {
            long millis = TimeUnit.MINUTES.toMillis(this.conf.getLoadBalancerSheddingIntervalMinutes());
            this.task = this.loadManagerExecutor.scheduleAtFixedRate(this::execute, millis, millis, TimeUnit.MILLISECONDS);
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.task != null) {
            this.task.cancel(false);
            this.task = null;
        }
        this.recentlyUnloadedBundles.clear();
        this.recentlyUnloadedBrokers.clear();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v9, types: [org.apache.pulsar.broker.loadbalance.extensions.scheduler.NamespaceUnloadStrategy] */
    private static NamespaceUnloadStrategy createNamespaceUnloadStrategy(PulsarService pulsarService) {
        TransferShedder transferShedder;
        ServiceConfiguration configuration = pulsarService.getConfiguration();
        try {
            transferShedder = (NamespaceUnloadStrategy) Reflections.createInstance(configuration.getLoadBalancerLoadSheddingStrategy(), NamespaceUnloadStrategy.class, Thread.currentThread().getContextClassLoader());
            log.info("Created namespace unload strategy:{}", transferShedder.getClass().getCanonicalName());
        } catch (Exception e) {
            log.error("Error when trying to create namespace unload strategy: {}. Using {} instead.", new Object[]{configuration.getLoadBalancerLoadSheddingStrategy(), TransferShedder.class.getCanonicalName(), e});
            transferShedder = new TransferShedder();
        }
        transferShedder.initialize(pulsarService);
        return transferShedder;
    }

    private boolean isLoadBalancerSheddingEnabled() {
        return this.conf.isLoadBalancerEnabled() && this.conf.isLoadBalancerSheddingEnabled();
    }
}
