package org.apache.pulsar.broker.service;

import java.util.Collections;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/TopicDispatchRateLimiterTest.class */
public class TopicDispatchRateLimiterTest extends BrokerTestBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setDispatchThrottlingRatePerTopicInMsg(0);
        this.conf.setDispatchThrottlingRatePerTopicInByte(0L);
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testTopicDispatchRateLimiterPerTopicInMsgOnlyBrokerLevel() throws Exception {
        String str = "persistent://" + newTopicName();
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        try {
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic(str).get();
            Assert.assertNotNull(persistentTopic);
            Assert.assertFalse(persistentTopic.getDispatchRateLimiter().isPresent());
            this.admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg", "100");
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(this.pulsar.getConfig().getDispatchThrottlingRatePerTopicInMsg(), 100);
            });
            Awaitility.await().untilAsserted(() -> {
                Assert.assertTrue(persistentTopic.getDispatchRateLimiter().isPresent());
            });
            Assert.assertEquals(((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getAvailableDispatchRateLimitOnMsg(), 100L);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testTopicDispatchRateLimiterPerTopicInByteOnlyBrokerLevel() throws Exception {
        String str = "persistent://" + newTopicName();
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        try {
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic(str).get();
            Assert.assertNotNull(persistentTopic);
            Assert.assertFalse(persistentTopic.getDispatchRateLimiter().isPresent());
            this.admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInByte", "1000");
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(this.pulsar.getConfig().getDispatchThrottlingRatePerTopicInByte(), 1000L);
            });
            Awaitility.await().untilAsserted(() -> {
                Assert.assertTrue(persistentTopic.getDispatchRateLimiter().isPresent());
            });
            Assert.assertEquals(((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getAvailableDispatchRateLimitOnByte(), 1000L);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testTopicDispatchRateLimiterOnlyNamespaceLevel() throws Exception {
        String str = "persistent://" + newTopicName();
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        try {
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic(str).get();
            Assert.assertNotNull(persistentTopic);
            Assert.assertFalse(persistentTopic.getDispatchRateLimiter().isPresent());
            this.admin.namespaces().setDispatchRate("prop/ns-abc", DispatchRate.builder().dispatchThrottlingRateInMsg(100).dispatchThrottlingRateInByte(1000L).build());
            Awaitility.await().untilAsserted(() -> {
                Assert.assertNotNull(this.admin.namespaces().getDispatchRate("prop/ns-abc"));
                Assert.assertEquals(this.admin.namespaces().getDispatchRate("prop/ns-abc").getDispatchThrottlingRateInMsg(), 100);
                Assert.assertEquals(this.admin.namespaces().getDispatchRate("prop/ns-abc").getDispatchThrottlingRateInByte(), 1000L);
            });
            Awaitility.await().untilAsserted(() -> {
                Assert.assertTrue(persistentTopic.getDispatchRateLimiter().isPresent());
            });
            Assert.assertEquals(((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getAvailableDispatchRateLimitOnMsg(), 100L);
            Assert.assertEquals(((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getAvailableDispatchRateLimitOnByte(), 1000L);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testTopicDispatchRateLimiterOnlyTopicLevel() throws Exception {
        String str = "persistent://" + newTopicName();
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        try {
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic(str).get();
            Assert.assertNotNull(persistentTopic);
            Assert.assertFalse(persistentTopic.getDispatchRateLimiter().isPresent());
            this.admin.topicPolicies().setDispatchRate(str, DispatchRate.builder().dispatchThrottlingRateInMsg(100).dispatchThrottlingRateInByte(1000L).build());
            Awaitility.await().untilAsserted(() -> {
                Assert.assertNotNull(this.admin.topicPolicies().getDispatchRate(str));
                Assert.assertEquals(this.admin.topicPolicies().getDispatchRate(str).getDispatchThrottlingRateInMsg(), 100);
                Assert.assertEquals(this.admin.topicPolicies().getDispatchRate(str).getDispatchThrottlingRateInByte(), 1000L);
            });
            Awaitility.await().untilAsserted(() -> {
                Assert.assertTrue(persistentTopic.getDispatchRateLimiter().isPresent());
            });
            Assert.assertEquals(((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getAvailableDispatchRateLimitOnMsg(), 100L);
            Assert.assertEquals(((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getAvailableDispatchRateLimitOnByte(), 1000L);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }
}
