package org.apache.pulsar.broker.resourcegroup;

import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.Counter;
import io.prometheus.client.Summary;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.util.Runnables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/resourcegroup/ResourceGroupService.class */
public class ResourceGroupService implements AutoCloseable {
    private final PulsarService pulsar;
    protected final ResourceQuotaCalculator quotaCalculator;
    private ResourceUsageTransportManager resourceUsageTransportManagerMgr;
    private final ResourceGroupConfigListener rgConfigListener;
    private ConcurrentHashMap<String, ResourceGroup> resourceGroupsMap;
    private ConcurrentHashMap<String, ResourceGroup> tenantToRGsMap;
    private ConcurrentHashMap<NamespaceName, ResourceGroup> namespaceToRGsMap;
    private ConcurrentHashMap<String, ResourceGroup.BytesAndMessagesCount> topicProduceStats;
    private ConcurrentHashMap<String, ResourceGroup.BytesAndMessagesCount> topicConsumeStats;
    private ScheduledFuture<?> aggregateLocalUsagePeriodicTask;
    private long aggregateLocalUsagePeriodInSeconds;
    private ScheduledFuture<?> calculateQuotaPeriodicTask;
    private long resourceUsagePublishPeriodInSeconds;
    private TimeUnit timeUnitScale;
    protected static final int MaxUsageReportSuppressRounds = 5;
    protected static long maxIntervalForSuppressingReportsMSecs;
    protected static final float UsageReportSuppressionTolerancePercentage = 5.0f;
    private static final Logger log = LoggerFactory.getLogger(ResourceGroupService.class);
    private static final String[] resourceGroupLabel = {"ResourceGroup"};
    private static final String[] resourceGroupMonitoringclassLabels = {"ResourceGroup", "MonitoringClass"};
    private static final Counter rgCalculatedQuotaBytes = Counter.build().name("pulsar_resource_group_calculated_bytes_quota").help("Bytes quota calculated for resource group").labelNames(resourceGroupMonitoringclassLabels).register();
    private static final Counter rgCalculatedQuotaMessages = Counter.build().name("pulsar_resource_group_calculated_messages_quota").help("Messages quota calculated for resource group").labelNames(resourceGroupMonitoringclassLabels).register();
    private static final Counter rgLocalUsageBytes = Counter.build().name("pulsar_resource_group_bytes_used").help("Bytes locally used within this resource group during the last aggregation interval").labelNames(resourceGroupMonitoringclassLabels).register();
    private static final Counter rgLocalUsageMessages = Counter.build().name("pulsar_resource_group_messages_used").help("Messages locally used within this resource group during the last aggregation interval").labelNames(resourceGroupMonitoringclassLabels).register();
    private static final Counter rgUpdates = Counter.build().name("pulsar_resource_group_updates").help("Number of update operations on the given resource group").labelNames(resourceGroupLabel).register();
    private static final Counter rgTenantRegisters = Counter.build().name("pulsar_resource_group_tenant_registers").help("Number of registrations of tenants").labelNames(resourceGroupLabel).register();
    private static final Counter rgTenantUnRegisters = Counter.build().name("pulsar_resource_group_tenant_unregisters").help("Number of un-registrations of tenants").labelNames(resourceGroupLabel).register();
    private static final Counter rgNamespaceRegisters = Counter.build().name("pulsar_resource_group_namespace_registers").help("Number of registrations of namespaces").labelNames(resourceGroupLabel).register();
    private static final Counter rgNamespaceUnRegisters = Counter.build().name("pulsar_resource_group_namespace_unregisters").help("Number of un-registrations of namespaces").labelNames(resourceGroupLabel).register();
    private static final Summary rgUsageAggregationLatency = Summary.build().quantile(0.5d, 0.05d).quantile(0.9d, 0.01d).name("pulsar_resource_group_aggregate_usage_secs").help("Time required to aggregate usage of all resource groups, in seconds.").register();
    private static final Summary rgQuotaCalculationLatency = Summary.build().quantile(0.5d, 0.05d).quantile(0.9d, 0.01d).name("pulsar_resource_group_calculate_quota_secs").help("Time required to calculate quota of all resource groups, in seconds.").register();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/pulsar/broker/resourcegroup/ResourceGroupService$ResourceGroupOpStatus.class */
    public enum ResourceGroupOpStatus {
        OK,
        Exists,
        DoesNotExist,
        NotSupported
    }

    /* loaded from: input_file:org/apache/pulsar/broker/resourcegroup/ResourceGroupService$ResourceGroupUsageStatsType.class */
    protected enum ResourceGroupUsageStatsType {
        Cumulative,
        LocalSinceLastReported,
        ReportFromTransportMgr
    }

    public ResourceGroupService(PulsarService pulsarService) {
        this.resourceGroupsMap = new ConcurrentHashMap<>();
        this.tenantToRGsMap = new ConcurrentHashMap<>();
        this.namespaceToRGsMap = new ConcurrentHashMap<>();
        this.topicProduceStats = new ConcurrentHashMap<>();
        this.topicConsumeStats = new ConcurrentHashMap<>();
        this.pulsar = pulsarService;
        this.timeUnitScale = TimeUnit.SECONDS;
        this.quotaCalculator = new ResourceQuotaCalculatorImpl();
        this.resourceUsageTransportManagerMgr = pulsarService.getResourceUsageTransportManager();
        this.rgConfigListener = new ResourceGroupConfigListener(this, pulsarService);
        initialize();
    }

    public ResourceGroupService(PulsarService pulsarService, TimeUnit timeUnit, ResourceUsageTopicTransportManager resourceUsageTopicTransportManager, ResourceQuotaCalculator resourceQuotaCalculator) {
        this.resourceGroupsMap = new ConcurrentHashMap<>();
        this.tenantToRGsMap = new ConcurrentHashMap<>();
        this.namespaceToRGsMap = new ConcurrentHashMap<>();
        this.topicProduceStats = new ConcurrentHashMap<>();
        this.topicConsumeStats = new ConcurrentHashMap<>();
        this.pulsar = pulsarService;
        this.timeUnitScale = timeUnit;
        this.resourceUsageTransportManagerMgr = resourceUsageTopicTransportManager;
        this.quotaCalculator = resourceQuotaCalculator;
        this.rgConfigListener = new ResourceGroupConfigListener(this, pulsarService);
        initialize();
    }

    public void resourceGroupCreate(String str, org.apache.pulsar.common.policies.data.ResourceGroup resourceGroup) throws PulsarAdminException {
        checkRGCreateParams(str, resourceGroup);
        this.resourceGroupsMap.put(str, new ResourceGroup(this, str, resourceGroup));
    }

    public void resourceGroupCreate(String str, org.apache.pulsar.common.policies.data.ResourceGroup resourceGroup, ResourceUsagePublisher resourceUsagePublisher, ResourceUsageConsumer resourceUsageConsumer) throws PulsarAdminException {
        checkRGCreateParams(str, resourceGroup);
        this.resourceGroupsMap.put(str, new ResourceGroup(this, str, resourceGroup, resourceUsagePublisher, resourceUsageConsumer));
    }

    public ResourceGroup resourceGroupGet(String str) {
        ResourceGroup resourceGroupInternal = getResourceGroupInternal(str);
        if (resourceGroupInternal == null) {
            return null;
        }
        return new ResourceGroup(resourceGroupInternal);
    }

    public void resourceGroupUpdate(String str, org.apache.pulsar.common.policies.data.ResourceGroup resourceGroup) throws PulsarAdminException {
        if (resourceGroup == null) {
            throw new IllegalArgumentException("ResourceGroupUpdate: Invalid null ResourceGroup config");
        }
        ResourceGroup resourceGroupInternal = getResourceGroupInternal(str);
        if (resourceGroupInternal == null) {
            throw new PulsarAdminException("Resource group does not exist: " + str);
        }
        resourceGroupInternal.updateResourceGroup(resourceGroup);
        ((Counter.Child) rgUpdates.labels(new String[]{str})).inc();
    }

    public Set<String> resourceGroupGetAll() {
        return this.resourceGroupsMap.keySet();
    }

    public void resourceGroupDelete(String str) throws PulsarAdminException {
        ResourceGroup resourceGroupInternal = getResourceGroupInternal(str);
        if (resourceGroupInternal == null) {
            throw new PulsarAdminException("Resource group does not exist: " + str);
        }
        long resourceGroupNumOfTenantRefs = resourceGroupInternal.getResourceGroupNumOfTenantRefs();
        long resourceGroupNumOfNSRefs = resourceGroupInternal.getResourceGroupNumOfNSRefs();
        if (resourceGroupNumOfTenantRefs + resourceGroupNumOfNSRefs > 0) {
            throw new PulsarAdminException(("Resource group " + str + " still has " + resourceGroupNumOfTenantRefs + " tenant refs") + " and " + resourceGroupNumOfNSRefs + " namespace refs on it");
        }
        resourceGroupInternal.resourceGroupPublishLimiter.close();
        resourceGroupInternal.resourceGroupPublishLimiter = null;
        this.resourceGroupsMap.remove(str);
    }

    protected long getNumResourceGroups() {
        return this.resourceGroupsMap.mappingCount();
    }

    public void registerTenant(String str, String str2) throws PulsarAdminException {
        ResourceGroup checkResourceGroupExists = checkResourceGroupExists(str);
        ResourceGroup resourceGroup = this.tenantToRGsMap.get(str2);
        if (resourceGroup != null) {
            throw new PulsarAdminException("Tenant " + str2 + " already references a resource group: " + resourceGroup.getID());
        }
        if (checkResourceGroupExists.registerUsage(str2, ResourceGroup.ResourceGroupRefTypes.Tenants, true, this.resourceUsageTransportManagerMgr) == ResourceGroupOpStatus.Exists) {
            throw new PulsarAdminException(("Tenant " + str2 + " already references the resource group " + str) + "; this is unexpected");
        }
        this.tenantToRGsMap.put(str2, checkResourceGroupExists);
        ((Counter.Child) rgTenantRegisters.labels(new String[]{str})).inc();
    }

    public void unRegisterTenant(String str, String str2) throws PulsarAdminException {
        ResourceGroup checkResourceGroupExists = checkResourceGroupExists(str);
        if (checkResourceGroupExists.registerUsage(str2, ResourceGroup.ResourceGroupRefTypes.Tenants, false, this.resourceUsageTransportManagerMgr) == ResourceGroupOpStatus.DoesNotExist) {
            throw new PulsarAdminException("Tenant " + str2 + " does not yet reference resource group " + str);
        }
        this.tenantToRGsMap.remove(str2, checkResourceGroupExists);
        ((Counter.Child) rgTenantUnRegisters.labels(new String[]{str})).inc();
    }

    public void registerNameSpace(String str, NamespaceName namespaceName) throws PulsarAdminException {
        ResourceGroup checkResourceGroupExists = checkResourceGroupExists(str);
        ResourceGroup resourceGroup = this.namespaceToRGsMap.get(namespaceName);
        if (resourceGroup != null) {
            throw new PulsarAdminException("Namespace " + namespaceName + " already references a resource group: " + resourceGroup.getID());
        }
        if (checkResourceGroupExists.registerUsage(namespaceName.toString(), ResourceGroup.ResourceGroupRefTypes.Namespaces, true, this.resourceUsageTransportManagerMgr) == ResourceGroupOpStatus.Exists) {
            throw new PulsarAdminException(String.format("Namespace %s already references the target resource group %s", namespaceName, str));
        }
        this.namespaceToRGsMap.put(namespaceName, checkResourceGroupExists);
        ((Counter.Child) rgNamespaceRegisters.labels(new String[]{str})).inc();
    }

    public void unRegisterNameSpace(String str, NamespaceName namespaceName) throws PulsarAdminException {
        ResourceGroup checkResourceGroupExists = checkResourceGroupExists(str);
        if (checkResourceGroupExists.registerUsage(namespaceName.toString(), ResourceGroup.ResourceGroupRefTypes.Namespaces, false, this.resourceUsageTransportManagerMgr) == ResourceGroupOpStatus.DoesNotExist) {
            throw new PulsarAdminException(String.format("Namespace %s does not yet reference resource group %s", namespaceName, str));
        }
        this.namespaceToRGsMap.remove(namespaceName, checkResourceGroupExists);
        ((Counter.Child) rgNamespaceUnRegisters.labels(new String[]{str})).inc();
    }

    public ResourceGroup getNamespaceResourceGroup(NamespaceName namespaceName) {
        return this.namespaceToRGsMap.get(namespaceName);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.aggregateLocalUsagePeriodicTask != null) {
            this.aggregateLocalUsagePeriodicTask.cancel(true);
        }
        if (this.calculateQuotaPeriodicTask != null) {
            this.calculateQuotaPeriodicTask.cancel(true);
        }
        this.resourceGroupsMap.clear();
        this.tenantToRGsMap.clear();
        this.namespaceToRGsMap.clear();
        this.topicProduceStats.clear();
        this.topicConsumeStats.clear();
    }

    protected boolean incrementUsage(String str, String str2, ResourceGroup.ResourceGroupMonitoringClass resourceGroupMonitoringClass, ResourceGroup.BytesAndMessagesCount bytesAndMessagesCount) throws PulsarAdminException {
        ResourceGroup resourceGroup = this.namespaceToRGsMap.get(NamespaceName.get(str, str2));
        ResourceGroup resourceGroup2 = this.tenantToRGsMap.get(str);
        if (resourceGroup2 == null && resourceGroup == null) {
            return false;
        }
        if (bytesAndMessagesCount.bytes < 0 || bytesAndMessagesCount.messages < 0) {
            throw new PulsarAdminException(String.format("incrementUsage on tenant=%s, NS=%s: bytes (%s) or mesgs (%s) is negative", str, str2, Long.valueOf(bytesAndMessagesCount.bytes), Long.valueOf(bytesAndMessagesCount.messages)));
        }
        if (resourceGroup == resourceGroup2) {
            resourceGroup.incrementLocalUsageStats(resourceGroupMonitoringClass, bytesAndMessagesCount);
            ((Counter.Child) rgLocalUsageMessages.labels(new String[]{resourceGroup.resourceGroupName, resourceGroupMonitoringClass.name()})).inc(bytesAndMessagesCount.messages);
            ((Counter.Child) rgLocalUsageBytes.labels(new String[]{resourceGroup.resourceGroupName, resourceGroupMonitoringClass.name()})).inc(bytesAndMessagesCount.bytes);
            return true;
        }
        if (resourceGroup2 != null) {
            resourceGroup2.incrementLocalUsageStats(resourceGroupMonitoringClass, bytesAndMessagesCount);
            ((Counter.Child) rgLocalUsageMessages.labels(new String[]{resourceGroup2.resourceGroupName, resourceGroupMonitoringClass.name()})).inc(bytesAndMessagesCount.messages);
            ((Counter.Child) rgLocalUsageBytes.labels(new String[]{resourceGroup2.resourceGroupName, resourceGroupMonitoringClass.name()})).inc(bytesAndMessagesCount.bytes);
        }
        if (resourceGroup == null) {
            return true;
        }
        resourceGroup.incrementLocalUsageStats(resourceGroupMonitoringClass, bytesAndMessagesCount);
        ((Counter.Child) rgLocalUsageMessages.labels(new String[]{resourceGroup.resourceGroupName, resourceGroupMonitoringClass.name()})).inc(bytesAndMessagesCount.messages);
        ((Counter.Child) rgLocalUsageBytes.labels(new String[]{resourceGroup.resourceGroupName, resourceGroupMonitoringClass.name()})).inc(bytesAndMessagesCount.bytes);
        return true;
    }

    protected ResourceGroup.BytesAndMessagesCount getRGUsage(String str, ResourceGroup.ResourceGroupMonitoringClass resourceGroupMonitoringClass, ResourceGroupUsageStatsType resourceGroupUsageStatsType) throws PulsarAdminException {
        ResourceGroup resourceGroupInternal = getResourceGroupInternal(str);
        if (resourceGroupInternal == null) {
            ResourceGroup.BytesAndMessagesCount bytesAndMessagesCount = new ResourceGroup.BytesAndMessagesCount();
            bytesAndMessagesCount.bytes = -1L;
            bytesAndMessagesCount.messages = -1L;
            return bytesAndMessagesCount;
        }
        switch (resourceGroupUsageStatsType) {
            case Cumulative:
                return resourceGroupInternal.getLocalUsageStatsCumulative(resourceGroupMonitoringClass);
            case LocalSinceLastReported:
                return resourceGroupInternal.getLocalUsageStats(resourceGroupMonitoringClass);
            case ReportFromTransportMgr:
                return resourceGroupInternal.getLocalUsageStatsFromBrokerReports(resourceGroupMonitoringClass);
            default:
                throw new PulsarAdminException("Unsupported statsType: " + resourceGroupUsageStatsType);
        }
    }

    private ResourceGroup getResourceGroupInternal(String str) {
        if (str == null) {
            throw new IllegalArgumentException("Invalid null resource group name: " + str);
        }
        return this.resourceGroupsMap.get(str);
    }

    private ResourceGroup checkResourceGroupExists(String str) throws PulsarAdminException {
        ResourceGroup resourceGroupInternal = getResourceGroupInternal(str);
        if (resourceGroupInternal == null) {
            throw new PulsarAdminException("Resource group does not exist: " + str);
        }
        return resourceGroupInternal;
    }

    private void updateStatsWithDiff(String str, String str2, String str3, long j, long j2, ResourceGroup.ResourceGroupMonitoringClass resourceGroupMonitoringClass) {
        ConcurrentHashMap<String, ResourceGroup.BytesAndMessagesCount> concurrentHashMap;
        switch (resourceGroupMonitoringClass) {
            case Publish:
                concurrentHashMap = this.topicProduceStats;
                break;
            case Dispatch:
                concurrentHashMap = this.topicConsumeStats;
                break;
            default:
                log.error("updateStatsWithDiff: Unknown monitoring class={}; ignoring", resourceGroupMonitoringClass);
                return;
        }
        ResourceGroup.BytesAndMessagesCount bytesAndMessagesCount = new ResourceGroup.BytesAndMessagesCount();
        ResourceGroup.BytesAndMessagesCount bytesAndMessagesCount2 = new ResourceGroup.BytesAndMessagesCount();
        bytesAndMessagesCount2.bytes = j;
        bytesAndMessagesCount2.messages = j2;
        ResourceGroup.BytesAndMessagesCount bytesAndMessagesCount3 = concurrentHashMap.get(str);
        if (bytesAndMessagesCount3 == null) {
            bytesAndMessagesCount.bytes = bytesAndMessagesCount2.bytes;
            bytesAndMessagesCount.messages = bytesAndMessagesCount2.messages;
        } else {
            bytesAndMessagesCount.bytes = bytesAndMessagesCount2.bytes - bytesAndMessagesCount3.bytes;
            bytesAndMessagesCount.messages = bytesAndMessagesCount2.messages - bytesAndMessagesCount3.messages;
        }
        if (bytesAndMessagesCount.bytes <= 0 || bytesAndMessagesCount.messages <= 0) {
            return;
        }
        try {
            boolean incrementUsage = incrementUsage(str2, str3, resourceGroupMonitoringClass, bytesAndMessagesCount);
            if (log.isDebugEnabled()) {
                log.debug("updateStatsWithDiff for topic={}: monclass={} statsUpdated={} for tenant={}, namespace={}; by {} bytes, {} mesgs", new Object[]{str, resourceGroupMonitoringClass, Boolean.valueOf(incrementUsage), str2, str3, Long.valueOf(bytesAndMessagesCount.bytes), Long.valueOf(bytesAndMessagesCount.messages)});
            }
            concurrentHashMap.put(str, bytesAndMessagesCount2);
        } catch (Throwable th) {
            log.error("updateStatsWithDiff: got ex={} while aggregating for {} side", th.getMessage(), resourceGroupMonitoringClass);
        }
    }

    protected ResourceGroup.BytesAndMessagesCount getPublishRateLimiters(String str) throws PulsarAdminException {
        ResourceGroup resourceGroupInternal = getResourceGroupInternal(str);
        if (resourceGroupInternal == null) {
            throw new PulsarAdminException("Resource group does not exist: " + str);
        }
        return resourceGroupInternal.getRgPublishRateLimiterValues();
    }

    protected static double getRgQuotaByteCount(String str, String str2) {
        return ((Counter.Child) rgCalculatedQuotaBytes.labels(new String[]{str, str2})).get();
    }

    protected static double getRgQuotaMessageCount(String str, String str2) {
        return ((Counter.Child) rgCalculatedQuotaMessages.labels(new String[]{str, str2})).get();
    }

    protected static double getRgLocalUsageByteCount(String str, String str2) {
        return ((Counter.Child) rgLocalUsageBytes.labels(new String[]{str, str2})).get();
    }

    protected static double getRgLocalUsageMessageCount(String str, String str2) {
        return ((Counter.Child) rgLocalUsageMessages.labels(new String[]{str, str2})).get();
    }

    protected static double getRgUpdatesCount(String str) {
        return ((Counter.Child) rgUpdates.labels(new String[]{str})).get();
    }

    protected static double getRgTenantRegistersCount(String str) {
        return ((Counter.Child) rgTenantRegisters.labels(new String[]{str})).get();
    }

    protected static double getRgTenantUnRegistersCount(String str) {
        return ((Counter.Child) rgTenantUnRegisters.labels(new String[]{str})).get();
    }

    protected static double getRgNamespaceRegistersCount(String str) {
        return ((Counter.Child) rgNamespaceRegisters.labels(new String[]{str})).get();
    }

    protected static double getRgNamespaceUnRegistersCount(String str) {
        return ((Counter.Child) rgNamespaceUnRegisters.labels(new String[]{str})).get();
    }

    protected static Summary.Child.Value getRgUsageAggregationLatency() {
        return rgUsageAggregationLatency.get();
    }

    protected static Summary.Child.Value getRgQuotaCalculationTime() {
        return rgQuotaCalculationLatency.get();
    }

    protected void aggregateResourceGroupLocalUsages() {
        Summary.Timer startTimer = rgUsageAggregationLatency.startTimer();
        for (Map.Entry<String, TopicStatsImpl> entry : this.pulsar.getBrokerService().getTopicStats().entrySet()) {
            String key = entry.getKey();
            TopicStats value = entry.getValue();
            TopicName topicName = TopicName.get(key);
            String tenant = topicName.getTenant();
            String namespacePortion = topicName.getNamespacePortion();
            NamespaceName namespaceObject = topicName.getNamespaceObject();
            ResourceGroup resourceGroup = this.tenantToRGsMap.get(tenant);
            ResourceGroup resourceGroup2 = this.namespaceToRGsMap.get(namespaceObject);
            if (resourceGroup != null || resourceGroup2 != null) {
                updateStatsWithDiff(key, tenant, namespacePortion, value.getBytesInCounter(), value.getMsgInCounter(), ResourceGroup.ResourceGroupMonitoringClass.Publish);
                updateStatsWithDiff(key, tenant, namespacePortion, value.getBytesOutCounter(), value.getMsgOutCounter(), ResourceGroup.ResourceGroupMonitoringClass.Dispatch);
            }
        }
        double observeDuration = startTimer.observeDuration();
        if (log.isDebugEnabled()) {
            log.debug("aggregateResourceGroupLocalUsages took {} milliseconds", Double.valueOf(observeDuration * 1000.0d));
        }
        long resourceUsageTransportPublishIntervalInSecs = this.pulsar.getConfiguration().getResourceUsageTransportPublishIntervalInSecs();
        if (resourceUsageTransportPublishIntervalInSecs != this.aggregateLocalUsagePeriodInSeconds) {
            if (this.aggregateLocalUsagePeriodicTask == null) {
                log.error("aggregateResourceGroupLocalUsages: Unable to find running task to cancel when publish period changed from {} to {} {}", new Object[]{Long.valueOf(this.aggregateLocalUsagePeriodInSeconds), Long.valueOf(resourceUsageTransportPublishIntervalInSecs), this.timeUnitScale});
            } else {
                log.info("aggregateResourceGroupLocalUsages: Got status={} in cancel of periodic when publish period changed from {} to {} {}", new Object[]{Boolean.valueOf(this.aggregateLocalUsagePeriodicTask.cancel(true)), Long.valueOf(this.aggregateLocalUsagePeriodInSeconds), Long.valueOf(resourceUsageTransportPublishIntervalInSecs), this.timeUnitScale});
            }
            this.aggregateLocalUsagePeriodicTask = this.pulsar.getExecutor().scheduleAtFixedRate(Runnables.catchingAndLoggingThrowables(this::aggregateResourceGroupLocalUsages), resourceUsageTransportPublishIntervalInSecs, resourceUsageTransportPublishIntervalInSecs, this.timeUnitScale);
            this.aggregateLocalUsagePeriodInSeconds = resourceUsageTransportPublishIntervalInSecs;
        }
    }

    protected void calculateQuotaForAllResourceGroups() {
        Summary.Timer startTimer = rgQuotaCalculationLatency.startTimer();
        ResourceGroup.BytesAndMessagesCount bytesAndMessagesCount = new ResourceGroup.BytesAndMessagesCount();
        this.resourceGroupsMap.forEach((str, resourceGroup) -> {
            for (ResourceGroup.ResourceGroupMonitoringClass resourceGroupMonitoringClass : ResourceGroup.ResourceGroupMonitoringClass.values()) {
                try {
                    ResourceGroup.BytesAndMessagesCount globalUsageStats = resourceGroup.getGlobalUsageStats(resourceGroupMonitoringClass);
                    ResourceGroup.BytesAndMessagesCount localUsageStatsFromBrokerReports = resourceGroup.getLocalUsageStatsFromBrokerReports(resourceGroupMonitoringClass);
                    ResourceGroup.BytesAndMessagesCount confLimits = resourceGroup.getConfLimits(resourceGroupMonitoringClass);
                    bytesAndMessagesCount.bytes = this.quotaCalculator.computeLocalQuota(confLimits.bytes, localUsageStatsFromBrokerReports.bytes, new long[]{globalUsageStats.bytes});
                    bytesAndMessagesCount.messages = this.quotaCalculator.computeLocalQuota(confLimits.messages, localUsageStatsFromBrokerReports.messages, new long[]{globalUsageStats.messages});
                    ResourceGroup.BytesAndMessagesCount updateLocalQuota = resourceGroup.updateLocalQuota(resourceGroupMonitoringClass, bytesAndMessagesCount);
                    if (bytesAndMessagesCount.messages >= 0) {
                        ((Counter.Child) rgCalculatedQuotaMessages.labels(new String[]{str, resourceGroupMonitoringClass.name()})).inc(bytesAndMessagesCount.messages);
                    }
                    if (bytesAndMessagesCount.bytes >= 0) {
                        ((Counter.Child) rgCalculatedQuotaBytes.labels(new String[]{str, resourceGroupMonitoringClass.name()})).inc(bytesAndMessagesCount.bytes);
                    }
                    if (updateLocalQuota != null) {
                        long j = bytesAndMessagesCount.messages - updateLocalQuota.messages;
                        long j2 = bytesAndMessagesCount.bytes - updateLocalQuota.bytes;
                        if (log.isDebugEnabled()) {
                            log.debug("calculateQuota for RG={} [class {}]: updatedlocalBytes={}, updatedlocalMesgs={}; old bytes={}, old mesgs={};  incremented bytes by {}, messages by {}", new Object[]{str, resourceGroupMonitoringClass, Long.valueOf(bytesAndMessagesCount.bytes), Long.valueOf(bytesAndMessagesCount.messages), Long.valueOf(updateLocalQuota.bytes), Long.valueOf(updateLocalQuota.messages), Long.valueOf(j2), Long.valueOf(j)});
                        }
                    } else if (log.isDebugEnabled()) {
                        log.debug("calculateQuota for RG={} [class {}]: got back null from updateLocalQuota", str, resourceGroupMonitoringClass);
                    }
                } catch (Throwable th) {
                    log.error("Got exception={} while calculating new quota for monitoring-class={} of RG={}", new Object[]{th.getMessage(), resourceGroupMonitoringClass, str});
                }
            }
        });
        double observeDuration = startTimer.observeDuration();
        if (log.isDebugEnabled()) {
            log.debug("calculateQuotaForAllResourceGroups took {} milliseconds", Double.valueOf(observeDuration * 1000.0d));
        }
        long resourceUsageTransportPublishIntervalInSecs = this.pulsar.getConfiguration().getResourceUsageTransportPublishIntervalInSecs();
        if (resourceUsageTransportPublishIntervalInSecs != this.resourceUsagePublishPeriodInSeconds) {
            if (this.calculateQuotaPeriodicTask == null) {
                log.error("calculateQuotaForAllResourceGroups: Unable to find running task to cancel when publish period changed from {} to {} {}", new Object[]{Long.valueOf(this.resourceUsagePublishPeriodInSeconds), Long.valueOf(resourceUsageTransportPublishIntervalInSecs), this.timeUnitScale});
            } else {
                log.info("calculateQuotaForAllResourceGroups: Got status={} in cancel of periodic  when publish period changed from {} to {} {}", new Object[]{Boolean.valueOf(this.calculateQuotaPeriodicTask.cancel(true)), Long.valueOf(this.resourceUsagePublishPeriodInSeconds), Long.valueOf(resourceUsageTransportPublishIntervalInSecs), this.timeUnitScale});
            }
            this.calculateQuotaPeriodicTask = this.pulsar.getExecutor().scheduleAtFixedRate(Runnables.catchingAndLoggingThrowables(this::calculateQuotaForAllResourceGroups), resourceUsageTransportPublishIntervalInSecs, resourceUsageTransportPublishIntervalInSecs, this.timeUnitScale);
            this.resourceUsagePublishPeriodInSeconds = resourceUsageTransportPublishIntervalInSecs;
            maxIntervalForSuppressingReportsMSecs = TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) * 5;
        }
    }

    private void initialize() {
        long resourceUsageTransportPublishIntervalInSecs = this.pulsar.getConfiguration().getResourceUsageTransportPublishIntervalInSecs();
        this.resourceUsagePublishPeriodInSeconds = resourceUsageTransportPublishIntervalInSecs;
        this.aggregateLocalUsagePeriodInSeconds = resourceUsageTransportPublishIntervalInSecs;
        this.aggregateLocalUsagePeriodicTask = this.pulsar.getExecutor().scheduleAtFixedRate(Runnables.catchingAndLoggingThrowables(this::aggregateResourceGroupLocalUsages), resourceUsageTransportPublishIntervalInSecs, resourceUsageTransportPublishIntervalInSecs, this.timeUnitScale);
        this.calculateQuotaPeriodicTask = this.pulsar.getExecutor().scheduleAtFixedRate(Runnables.catchingAndLoggingThrowables(this::calculateQuotaForAllResourceGroups), resourceUsageTransportPublishIntervalInSecs, resourceUsageTransportPublishIntervalInSecs, this.timeUnitScale);
        maxIntervalForSuppressingReportsMSecs = TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) * 5;
    }

    private void checkRGCreateParams(String str, org.apache.pulsar.common.policies.data.ResourceGroup resourceGroup) throws PulsarAdminException {
        if (resourceGroup == null) {
            throw new IllegalArgumentException("ResourceGroupCreate: Invalid null ResourceGroup config");
        }
        if (str.isEmpty()) {
            throw new IllegalArgumentException("ResourceGroupCreate: can't create resource group with an empty name");
        }
        if (getResourceGroupInternal(str) != null) {
            throw new PulsarAdminException("Resource group already exists:" + str);
        }
    }

    @VisibleForTesting
    ConcurrentHashMap getTopicConsumeStats() {
        return this.topicConsumeStats;
    }

    @VisibleForTesting
    ConcurrentHashMap getTopicProduceStats() {
        return this.topicProduceStats;
    }

    @VisibleForTesting
    ScheduledFuture<?> getAggregateLocalUsagePeriodicTask() {
        return this.aggregateLocalUsagePeriodicTask;
    }

    @VisibleForTesting
    ScheduledFuture<?> getCalculateQuotaPeriodicTask() {
        return this.calculateQuotaPeriodicTask;
    }

    public PulsarService getPulsar() {
        return this.pulsar;
    }
}
