package org.apache.pulsar.broker.service;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pulsar.client.api.Producer;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.class */
public class MessagePublishBufferThrottleTest extends BrokerTestBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    protected void setup() throws Exception {
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
        resetConfig();
    }

    @Test
    public void testMessagePublishBufferThrottleDisabled() throws Exception {
        this.conf.setMaxMessagePublishBufferSizeInMB(-1);
        super.baseSetup();
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testMessagePublishBufferThrottleDisabled").producerName("producer-name").create();
        Assert.assertEquals(this.pulsar.getBrokerService().getPausedConnections(), 0L);
        this.mockBookKeeper.addEntryDelay(1L, TimeUnit.SECONDS);
        byte[] bArr = new byte[1048576];
        for (int i = 0; i < 10; i++) {
            create.sendAsync(bArr);
        }
        create.flush();
        Assert.assertEquals(this.pulsar.getBrokerService().getPausedConnections(), 0L);
    }

    @Test
    public void testMessagePublishBufferThrottleEnable() throws Exception {
        this.conf.setMaxMessagePublishBufferSizeInMB(1);
        super.baseSetup();
        Assert.assertEquals(this.pulsar.getBrokerService().getPausedConnections(), 0L);
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testMessagePublishBufferThrottleEnable").producerName("producer-name").create();
        Assert.assertEquals(this.pulsar.getBrokerService().getPausedConnections(), 0L);
        this.mockBookKeeper.addEntryDelay(1L, TimeUnit.SECONDS);
        byte[] bArr = new byte[1048576];
        for (int i = 0; i < 10; i++) {
            create.sendAsync(bArr);
        }
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.pulsar.getBrokerService().getPausedConnections(), 1L);
        });
        Assert.assertEquals(this.pulsar.getBrokerService().getPausedConnections(), 1L);
        create.flush();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.pulsar.getBrokerService().getPausedConnections(), 0L);
        });
        Assert.assertEquals(this.pulsar.getBrokerService().getPausedConnections(), 0L);
    }

    @Test
    public void testBlockByPublishRateLimiting() throws Exception {
        this.conf.setMaxMessagePublishBufferSizeInMB(1);
        super.baseSetup();
        Assert.assertEquals(this.pulsar.getBrokerService().getPausedConnections(), 0L);
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testBlockByPublishRateLimiting").producerName("producer-name").create();
        AbstractTopic abstractTopic = (Topic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testBlockByPublishRateLimiting").get();
        Assert.assertNotNull(abstractTopic);
        Assert.assertEquals(this.pulsar.getBrokerService().getPausedConnections(), 0L);
        this.mockBookKeeper.addEntryDelay(5L, TimeUnit.SECONDS);
        byte[] bArr = new byte[1048576];
        for (int i = 0; i < 10; i++) {
            create.sendAsync(bArr);
        }
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.pulsar.getBrokerService().getPausedConnections(), 1L);
        });
        CompletableFuture flushAsync = create.flushAsync();
        Assert.assertEquals(this.pulsar.getBrokerService().getPausedConnections(), 1L);
        try {
            flushAsync.get(2L, TimeUnit.SECONDS);
            Assert.fail("Should have timed out");
        } catch (TimeoutException e) {
        }
        flushAsync.join();
        Assert.assertEquals(this.pulsar.getBrokerService().getPausedConnections(), 0L);
        ((Producer) abstractTopic.producers.get("producer-name")).getCnx().enableCnxAutoRead();
        flushAsync.get();
        Assert.assertEquals(this.pulsar.getBrokerService().getPausedConnections(), 0L);
    }
}
