package org.apache.pulsar.broker.resourcegroup;

import com.google.common.collect.Sets;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup;
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.class */
public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase {
    private ResourceGroupService rgs;
    final String TenantName = "pulsar-test";
    final String NsName = "test";
    final String TenantAndNsName = "pulsar-test/test";
    final String TestProduceConsumeTopicName = "/test/prod-cons-topic";
    final String PRODUCE_CONSUME_PERSISTENT_TOPIC = "persistent://pulsar-test/test/test/prod-cons-topic";
    final String PRODUCE_CONSUME_NON_PERSISTENT_TOPIC = "non-persistent://pulsar-test/test/test/prod-cons-topic";
    private static final int PUBLISH_INTERVAL_SECS = 300;

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        prepareData();
        ResourceQuotaCalculator resourceQuotaCalculator = new ResourceQuotaCalculator() { // from class: org.apache.pulsar.broker.resourcegroup.ResourceGroupUsageAggregationTest.1
            public boolean needToReportLocalUsage(long j, long j2, long j3, long j4, long j5) {
                return false;
            }

            public long computeLocalQuota(long j, long j2, long[] jArr) {
                return 0L;
            }
        };
        this.rgs = new ResourceGroupService(this.pulsar, TimeUnit.MILLISECONDS, new ResourceUsageTransportManager(this.pulsar), resourceQuotaCalculator);
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testProduceConsumeUsageOnRG() throws Exception {
        testProduceConsumeUsageOnRG("persistent://pulsar-test/test/test/prod-cons-topic");
        testProduceConsumeUsageOnRG("non-persistent://pulsar-test/test/test/prod-cons-topic");
    }

    private void testProduceConsumeUsageOnRG(String str) throws Exception {
        ResourceUsagePublisher resourceUsagePublisher = new ResourceUsagePublisher() { // from class: org.apache.pulsar.broker.resourcegroup.ResourceGroupUsageAggregationTest.2
            public String getID() {
                return "";
            }

            public void fillResourceUsage(ResourceUsage resourceUsage) {
            }
        };
        ResourceUsageConsumer resourceUsageConsumer = new ResourceUsageConsumer() { // from class: org.apache.pulsar.broker.resourcegroup.ResourceGroupUsageAggregationTest.3
            public String getID() {
                return "";
            }

            public void acceptResourceUsage(String str2, ResourceUsage resourceUsage) {
            }
        };
        ResourceGroupConfigInfo resourceGroupConfigInfo = new ResourceGroupConfigInfo();
        resourceGroupConfigInfo.setName("runProduceConsume");
        resourceGroupConfigInfo.setPublishBytesPerPeriod(1500L);
        resourceGroupConfigInfo.setPublishMessagesPerPeriod(100L);
        resourceGroupConfigInfo.setDispatchBytesPerPeriod(4000L);
        resourceGroupConfigInfo.setDispatchMessagesPerPeriod(500L);
        this.rgs.resourceGroupCreate(resourceGroupConfigInfo, resourceUsagePublisher, resourceUsageConsumer);
        Producer<byte[]> producer = null;
        Consumer<byte[]> consumer = null;
        this.pulsar.getBrokerService().getOrCreateTopic(str);
        try {
            producer = this.pulsarClient.newProducer().topic(str).create();
        } catch (PulsarClientException e) {
            Assert.assertTrue(false, String.format("Got exception while building producer: ex={}", e.getMessage()));
        }
        try {
            consumer = this.pulsarClient.newConsumer().topic(str).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscribe();
        } catch (PulsarClientException e2) {
            Assert.assertTrue(false, String.format("Got exception while building consumer: ex={}", e2.getMessage()));
        }
        TopicName topicName = TopicName.get(str);
        String tenant = topicName.getTenant();
        String namespacePortion = topicName.getNamespacePortion();
        this.rgs.registerTenant(resourceGroupConfigInfo.getName(), tenant);
        this.rgs.registerNameSpace(resourceGroupConfigInfo.getName(), namespacePortion);
        int i = 0;
        int i2 = 0;
        int i3 = 0;
        int i4 = 0;
        for (int i5 = 0; i5 < 10; i5++) {
            try {
                byte[] bytes = String.format("Hi, ix={}", Integer.valueOf(i5)).getBytes();
                producer.send(bytes);
                i += bytes.length;
                i2++;
                verfyStats(str, resourceGroupConfigInfo.getName(), i, i2, 0, 0, true, false);
            } catch (PulsarClientException e3) {
                Assert.assertTrue(false, String.format("Got exception while sending {}-th time: ex={}", Integer.valueOf(i5), e3.getMessage()));
            }
        }
        producer.close();
        verfyStats(str, resourceGroupConfigInfo.getName(), i, i2, 0, 0, true, false);
        while (i4 < i2) {
            try {
                i3 += consumer.receive().getValue().length;
            } catch (PulsarClientException e4) {
                Assert.assertTrue(false, String.format("Got exception in while receiving {}-th mesg at consumer: ex={}", Integer.valueOf(i4), e4.getMessage()));
            }
            i4++;
        }
        verfyStats(str, resourceGroupConfigInfo.getName(), i, i2, i3, i4, true, true);
        consumer.close();
        this.rgs.unRegisterTenant(resourceGroupConfigInfo.getName(), tenant);
        this.rgs.unRegisterNameSpace(resourceGroupConfigInfo.getName(), namespacePortion);
        this.rgs.resourceGroupDelete(resourceGroupConfigInfo.getName());
    }

    private void verfyStats(String str, String str2, int i, int i2, int i3, int i4, boolean z, boolean z2) throws InterruptedException, PulsarAdminException {
        for (Map.Entry entry : this.pulsar.getBrokerService().getTopicStats().entrySet()) {
            if (((String) entry.getKey()).equals(str)) {
                TopicStatsImpl topicStatsImpl = (TopicStatsImpl) entry.getValue();
                if (z) {
                    Assert.assertTrue(topicStatsImpl.bytesInCounter >= ((long) i));
                    Assert.assertTrue(topicStatsImpl.msgInCounter == ((long) i2));
                }
                if (z2) {
                    Assert.assertTrue(topicStatsImpl.bytesOutCounter >= ((long) i3));
                    Assert.assertTrue(topicStatsImpl.msgOutCounter == ((long) i4));
                }
                if (i2 > 0 || i4 > 0) {
                    this.rgs.aggregateResourceGroupLocalUsages();
                    ResourceGroup.BytesAndMessagesCount rGUsage = this.rgs.getRGUsage(str2, ResourceGroup.ResourceGroupMonitoringClass.Publish);
                    ResourceGroup.BytesAndMessagesCount rGUsage2 = this.rgs.getRGUsage(str2, ResourceGroup.ResourceGroupMonitoringClass.Dispatch);
                    ResourceGroup.BytesAndMessagesCount rGUsage3 = this.rgs.getRGUsage(str2, ResourceGroup.ResourceGroupMonitoringClass.Publish);
                    ResourceGroup.BytesAndMessagesCount rGUsage4 = this.rgs.getRGUsage(str2, ResourceGroup.ResourceGroupMonitoringClass.Dispatch);
                    Assert.assertTrue(rGUsage.bytes == rGUsage3.bytes);
                    Assert.assertTrue(rGUsage.messages == rGUsage3.messages);
                    Assert.assertTrue(rGUsage2.bytes == rGUsage4.bytes);
                    Assert.assertTrue(rGUsage2.messages == rGUsage4.messages);
                    if (z) {
                        Assert.assertTrue(rGUsage.bytes >= ((long) i));
                        Assert.assertTrue(rGUsage.messages == ((long) i2));
                    }
                    if (z2) {
                        Assert.assertTrue(rGUsage2.bytes >= ((long) i3));
                        Assert.assertTrue(rGUsage2.messages == ((long) i4));
                    }
                }
            }
        }
    }

    private void prepareData() throws PulsarAdminException {
        this.conf.setResourceUsageTransportPublishIntervalInSecs(300);
        this.conf.setAllowAutoTopicCreation(true);
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.brokerUrl.toString()).build());
        this.admin.tenants().createTenant("pulsar-test", new TenantInfoImpl(Sets.newHashSet("fakeAdminRole"), Sets.newHashSet("test")));
        this.admin.namespaces().createNamespace("pulsar-test/test");
        this.admin.namespaces().setNamespaceReplicationClusters("pulsar-test/test", Sets.newHashSet("test"));
    }
}
