package org.apache.pulsar.client.impl;

import io.netty.buffer.ByteBufAllocator;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker-impl"})
/* loaded from: input_file:org/apache/pulsar/client/impl/ProducerMemoryLimitTest.class */
public class ProducerMemoryLimitTest extends ProducerConsumerBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(timeOut = 10000)
    public void testProducerInvalidMessageMemoryRelease() throws Exception {
        initClientWithMemoryLimit();
        ProducerImpl create = this.pulsarClient.newProducer().topic("testProducerMemoryLimit").sendTimeout(5, TimeUnit.SECONDS).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).batchingMaxBytes(10240).enableBatching(true).create();
        try {
            stopBroker();
            try {
                ConnectionHandler connectionHandler = (ConnectionHandler) Mockito.spy(create.getConnectionHandler());
                Field declaredField = create.getClass().getDeclaredField("connectionHandler");
                declaredField.setAccessible(true);
                declaredField.set(create, connectionHandler);
                Mockito.when(Integer.valueOf(connectionHandler.getMaxMessageSize())).thenReturn(8);
                create.send("memory-test".getBytes(StandardCharsets.UTF_8));
                throw new IllegalStateException("can not reach here");
            } catch (PulsarClientException.InvalidMessageException e) {
                Assert.assertEquals(this.pulsarClient.getMemoryLimitController().currentUsage(), 0L);
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test(timeOut = 10000)
    public void testProducerTimeoutMemoryRelease() throws Exception {
        initClientWithMemoryLimit();
        ProducerImpl create = this.pulsarClient.newProducer().topic("testProducerMemoryLimit").sendTimeout(5, TimeUnit.SECONDS).maxPendingMessages(0).enableBatching(false).create();
        try {
            stopBroker();
            try {
                create.send("memory-test".getBytes(StandardCharsets.UTF_8));
                throw new IllegalStateException("can not reach here");
            } catch (PulsarClientException.TimeoutException e) {
                Assert.assertEquals(this.pulsarClient.getMemoryLimitController().currentUsage(), 0L);
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test(timeOut = 10000)
    public void testProducerBatchSendTimeoutMemoryRelease() throws Exception {
        initClientWithMemoryLimit();
        ProducerImpl create = this.pulsarClient.newProducer().topic("testProducerMemoryLimit").sendTimeout(5, TimeUnit.SECONDS).maxPendingMessages(0).enableBatching(true).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).batchingMaxBytes(12).create();
        try {
            stopBroker();
            try {
                create.newMessage().value("memory-test".getBytes(StandardCharsets.UTF_8)).sendAsync();
                try {
                    create.newMessage().value("memory-test".getBytes(StandardCharsets.UTF_8)).sendAsync().get();
                    throw new IllegalStateException("can not reach here");
                } catch (Exception e) {
                    throw PulsarClientException.unwrap(e);
                }
            } catch (PulsarClientException.TimeoutException e2) {
                Assert.assertEquals(this.pulsarClient.getMemoryLimitController().currentUsage(), 0L);
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test(timeOut = 10000)
    public void testBatchMessageOOMMemoryRelease() throws Exception {
        initClientWithMemoryLimit();
        ProducerImpl create = this.pulsarClient.newProducer().topic("testProducerMemoryLimit").sendTimeout(5, TimeUnit.SECONDS).maxPendingMessages(0).enableBatching(true).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).batchingMaxBytes(12).create();
        try {
            stopBroker();
            try {
                ProducerImpl producerImpl = (ProducerImpl) Mockito.spy(create);
                ByteBufAllocator byteBufAllocator = (ByteBufAllocator) Mockito.mock(ByteBufAllocator.class);
                ((ByteBufAllocator) Mockito.doAnswer(invocationOnMock -> {
                    throw new OutOfMemoryError("memory-test");
                }).when(byteBufAllocator)).buffer(ArgumentMatchers.anyInt());
                BatchMessageContainerImpl batchMessageContainerImpl = new BatchMessageContainerImpl(byteBufAllocator);
                batchMessageContainerImpl.setProducer(create);
                Field declaredField = ProducerImpl.class.getDeclaredField("batchMessageContainer");
                declaredField.setAccessible(true);
                declaredField.set(producerImpl, batchMessageContainerImpl);
                producerImpl.send("memory-test".getBytes(StandardCharsets.UTF_8));
                Assert.fail("can not reach here");
            } catch (PulsarClientException e) {
                Assert.assertEquals(this.pulsarClient.getMemoryLimitController().currentUsage(), 0L);
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test(timeOut = 10000)
    public void testProducerCloseMemoryRelease() throws Exception {
        initClientWithMemoryLimit();
        ProducerImpl create = this.pulsarClient.newProducer().topic("testProducerMemoryLimit").sendTimeout(5, TimeUnit.SECONDS).maxPendingMessages(0).enableBatching(false).create();
        try {
            stopBroker();
            create.sendAsync("memory-test".getBytes(StandardCharsets.UTF_8));
            create.close();
            Assert.assertEquals(this.pulsarClient.getMemoryLimitController().currentUsage(), 0L);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test(timeOut = 10000)
    public void testProducerBlockReserveMemory() throws Exception {
        replacePulsarClient(PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).memoryLimit(1L, SizeUnit.KILO_BYTES));
        ProducerImpl create = this.pulsarClient.newProducer().topic("testProducerMemoryLimit").sendTimeout(5, TimeUnit.SECONDS).compressionType(CompressionType.SNAPPY).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).maxPendingMessages(0).blockIfQueueFull(true).enableBatching(true).batchingMaxMessages(100).batchingMaxBytes(65536).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).create();
        try {
            CountDownLatch countDownLatch = new CountDownLatch(5);
            for (int i = 0; i < 5; i++) {
                create.sendAsync("memory-test".getBytes(StandardCharsets.UTF_8)).whenComplete((messageId, th) -> {
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            create.close();
            Assert.assertEquals(this.pulsarClient.getMemoryLimitController().currentUsage(), 0L);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th2;
        }
    }

    private void initClientWithMemoryLimit() throws PulsarClientException {
        replacePulsarClient(PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).memoryLimit(50L, SizeUnit.KILO_BYTES));
    }
}
