package org.apache.pulsar.client.impl;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.class */
public class AutoScaledReceiverQueueSizeTest extends MockedPulsarServiceBaseTest {
    private static final Logger log = LoggerFactory.getLogger(AutoScaledReceiverQueueSizeTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass(alwaysRun = true)
    protected void setup() throws Exception {
        super.internalSetup();
        setupDefaultTenantAndNamespace();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testConsumerImpl() throws PulsarClientException {
        String str = "persistent://public/default/testConsumerImpl" + System.currentTimeMillis();
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-sub").receiverQueueSize(3).autoScaledReceiverQueueSizeEnabled(true).subscribe();
        try {
            Assert.assertEquals(subscribe.getCurrentReceiverQueueSize(), 1);
            Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(false).create();
            try {
                byte[] bytes = "data".getBytes(StandardCharsets.UTF_8);
                create.send(bytes);
                ConditionFactory await = Awaitility.await();
                AtomicBoolean atomicBoolean = subscribe.scaleReceiverQueueHint;
                Objects.requireNonNull(atomicBoolean);
                await.until(atomicBoolean::get);
                Assert.assertNotNull(subscribe.receive());
                log.info("getCurrentReceiverQueueSize={}", Integer.valueOf(subscribe.getCurrentReceiverQueueSize()));
                Assert.assertNull(subscribe.receive(0, TimeUnit.MILLISECONDS));
                log.info("getCurrentReceiverQueueSize={}", Integer.valueOf(subscribe.getCurrentReceiverQueueSize()));
                Assert.assertEquals(subscribe.getCurrentReceiverQueueSize(), 2);
                Assert.assertFalse(subscribe.scaleReceiverQueueHint.get());
                for (int i = 0; i < 5; i++) {
                    create.send(bytes);
                    create.send(bytes);
                    ConditionFactory await2 = Awaitility.await();
                    AtomicBoolean atomicBoolean2 = subscribe.scaleReceiverQueueHint;
                    Objects.requireNonNull(atomicBoolean2);
                    await2.until(atomicBoolean2::get);
                    Assert.assertNotNull(subscribe.receive());
                    Assert.assertNotNull(subscribe.receive());
                    Assert.assertEquals(subscribe.getCurrentReceiverQueueSize(), 2);
                }
                create.send(bytes);
                create.send(bytes);
                ConditionFactory await3 = Awaitility.await();
                AtomicBoolean atomicBoolean3 = subscribe.scaleReceiverQueueHint;
                Objects.requireNonNull(atomicBoolean3);
                await3.until(atomicBoolean3::get);
                Assert.assertNotNull(subscribe.receive());
                Assert.assertNotNull(subscribe.receive());
                Assert.assertNull(subscribe.receive(0, TimeUnit.MILLISECONDS));
                Assert.assertEquals(subscribe.getCurrentReceiverQueueSize(), 3);
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test
    public void testConsumerImplBatchReceive() throws PulsarClientException {
        String str = "persistent://public/default/testConsumerImplBatchReceive" + System.currentTimeMillis();
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-sub").batchReceivePolicy(BatchReceivePolicy.builder().maxNumMessages(5).build()).receiverQueueSize(20).autoScaledReceiverQueueSizeEnabled(true).subscribe();
        try {
            int i = 8;
            Assert.assertEquals(subscribe.getCurrentReceiverQueueSize(), 8);
            Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(false).create();
            try {
                byte[] bytes = "data".getBytes(StandardCharsets.UTF_8);
                for (int i2 = 0; i2 < 10; i2++) {
                    for (int i3 = 0; i3 < 5; i3++) {
                        create.send(bytes);
                    }
                    Awaitility.await().until(() -> {
                        return Boolean.valueOf(subscribe.incomingMessages.size() == 5);
                    });
                    log.info("i={},expandReceiverQueueHint:{},local permits:{}", new Object[]{Integer.valueOf(i2), Boolean.valueOf(subscribe.scaleReceiverQueueHint.get()), Integer.valueOf(subscribe.getAvailablePermits())});
                    Assert.assertEquals(subscribe.batchReceive().size(), 5);
                    Assert.assertEquals(subscribe.getCurrentReceiverQueueSize(), 8);
                    log.info("getCurrentReceiverQueueSize={}", Integer.valueOf(subscribe.getCurrentReceiverQueueSize()));
                }
                int availablePermits = (8 / 2) - subscribe.getAvailablePermits();
                for (int i4 = 0; i4 < availablePermits; i4++) {
                    create.send(bytes);
                    subscribe.receive();
                }
                Assert.assertEquals(subscribe.getAvailablePermits(), 0);
                for (int i5 = 0; i5 < 8; i5++) {
                    create.send(bytes);
                }
                ConditionFactory await = Awaitility.await();
                AtomicBoolean atomicBoolean = subscribe.scaleReceiverQueueHint;
                Objects.requireNonNull(atomicBoolean);
                await.until(atomicBoolean::get);
                Assert.assertEquals(subscribe.batchReceive().size(), 5);
                subscribe.batchReceiveAsync();
                Awaitility.await().until(() -> {
                    return Boolean.valueOf(subscribe.getCurrentReceiverQueueSize() == i * 2);
                });
                log.info("getCurrentReceiverQueueSize={}", Integer.valueOf(subscribe.getCurrentReceiverQueueSize()));
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test
    public void testMultiConsumerImpl() throws Exception {
        String str = "persistent://public/default/testMultiConsumerImpl" + System.currentTimeMillis();
        this.admin.topics().createPartitionedTopic(str, 3);
        MultiTopicsConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-sub").receiverQueueSize(10).autoScaledReceiverQueueSizeEnabled(true).subscribe();
        try {
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(subscribe.getCurrentReceiverQueueSize(), 3);
            });
            Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(false).create();
            try {
                byte[] bytes = "data".getBytes(StandardCharsets.UTF_8);
                for (int i = 0; i < 3; i++) {
                    create.send(bytes);
                }
                ConditionFactory await = Awaitility.await();
                AtomicBoolean atomicBoolean = subscribe.scaleReceiverQueueHint;
                Objects.requireNonNull(atomicBoolean);
                await.until(atomicBoolean::get);
                for (int i2 = 0; i2 < 3; i2++) {
                    Assert.assertNotNull(subscribe.receive());
                }
                Assert.assertTrue(subscribe.scaleReceiverQueueHint.get());
                log.info("getCurrentReceiverQueueSize={}", Integer.valueOf(subscribe.getCurrentReceiverQueueSize()));
                Assert.assertEquals(subscribe.getCurrentReceiverQueueSize(), 3);
                Assert.assertNull(subscribe.receive(0, TimeUnit.MILLISECONDS));
                log.info("getCurrentReceiverQueueSize={}", Integer.valueOf(subscribe.getCurrentReceiverQueueSize()));
                Assert.assertEquals(subscribe.getCurrentReceiverQueueSize(), 6);
                Assert.assertFalse(subscribe.scaleReceiverQueueHint.get());
                for (int i3 = 0; i3 < 5; i3++) {
                    for (int i4 = 0; i4 < 6; i4++) {
                        create.send(bytes);
                    }
                    for (int i5 = 0; i5 < 6; i5++) {
                        Assert.assertNotNull(subscribe.receive());
                    }
                    log.info("i={},currentReceiverQueueSize={},expandReceiverQueueHint={}", new Object[]{Integer.valueOf(i3), Integer.valueOf(subscribe.getCurrentReceiverQueueSize()), subscribe.scaleReceiverQueueHint});
                    Assert.assertEquals(subscribe.getCurrentReceiverQueueSize(), 6);
                }
                for (int i6 = 0; i6 < 6; i6++) {
                    create.send(bytes);
                }
                Awaitility.await().until(() -> {
                    return Boolean.valueOf(subscribe.scaleReceiverQueueHint.get());
                });
                for (int i7 = 0; i7 < 6; i7++) {
                    Assert.assertNotNull(subscribe.receive());
                }
                Assert.assertNull(subscribe.receive(0, TimeUnit.MILLISECONDS));
                log.info("getCurrentReceiverQueueSize={}", Integer.valueOf(subscribe.getCurrentReceiverQueueSize()));
                Assert.assertEquals(subscribe.getCurrentReceiverQueueSize(), 10);
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test
    public void testMultiConsumerImplBatchReceive() throws PulsarClientException, PulsarAdminException {
        String str = "persistent://public/default/testMultiConsumerImplBatchReceive" + System.currentTimeMillis();
        this.admin.topics().createPartitionedTopic(str, 3);
        MultiTopicsConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-sub").batchReceivePolicy(BatchReceivePolicy.builder().maxNumMessages(5).build()).receiverQueueSize(20).autoScaledReceiverQueueSizeEnabled(true).subscribe();
        try {
            int i = 5;
            Assert.assertEquals(subscribe.getCurrentReceiverQueueSize(), 5);
            Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(false).create();
            try {
                byte[] bytes = "data".getBytes(StandardCharsets.UTF_8);
                for (int i2 = 0; i2 < 10; i2++) {
                    for (int i3 = 0; i3 < 5; i3++) {
                        create.send(bytes);
                    }
                    log.info("i={},expandReceiverQueueHint:{},local permits:{}", new Object[]{Integer.valueOf(i2), Boolean.valueOf(subscribe.scaleReceiverQueueHint.get()), Integer.valueOf(subscribe.getAvailablePermits())});
                    ConditionFactory await = Awaitility.await();
                    Objects.requireNonNull(subscribe);
                    await.until(subscribe::hasEnoughMessagesForBatchReceive);
                    Assert.assertEquals(subscribe.batchReceive().size(), 5);
                    Assert.assertEquals(subscribe.getCurrentReceiverQueueSize(), 5);
                    log.info("getCurrentReceiverQueueSize={}", Integer.valueOf(subscribe.getCurrentReceiverQueueSize()));
                }
                for (int i4 = 0; i4 < 5; i4++) {
                    create.send(bytes);
                }
                ConditionFactory await2 = Awaitility.await();
                AtomicBoolean atomicBoolean = subscribe.scaleReceiverQueueHint;
                Objects.requireNonNull(atomicBoolean);
                await2.until(atomicBoolean::get);
                Assert.assertEquals(subscribe.batchReceive().size(), 5);
                subscribe.batchReceiveAsync();
                Awaitility.await().until(() -> {
                    return Boolean.valueOf(subscribe.getCurrentReceiverQueueSize() == i * 2);
                });
                log.info("getCurrentReceiverQueueSize={}", Integer.valueOf(subscribe.getCurrentReceiverQueueSize()));
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test
    public void testNegativeClientMemory() throws Exception {
        String str = "persistent://public/default/testMemory-" + UUID.randomUUID().toString();
        this.admin.topics().createPartitionedTopic(str, 3);
        Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(false).create();
        try {
            ArrayList arrayList = new ArrayList(1000);
            for (int i = 0; i < 1000; i++) {
                arrayList.add(create.newMessage().key(i).value(("Message-" + i).getBytes()).sendAsync());
            }
            FutureUtil.waitForAll(arrayList).get();
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).autoScaledReceiverQueueSizeEnabled(true).subscribe();
            try {
                Awaitility.await().untilAsserted(() -> {
                    long incomingMessageSize = ((ConsumerBase) subscribe).getIncomingMessageSize();
                    log.info("Check the incoming message size should greater that 0, current size is {}", Long.valueOf(incomingMessageSize));
                    Assert.assertTrue(incomingMessageSize > 0);
                });
                for (int i2 = 0; i2 < 1000; i2++) {
                    subscribe.receive();
                }
                Awaitility.await().untilAsserted(() -> {
                    long incomingMessageSize = ((ConsumerBase) subscribe).getIncomingMessageSize();
                    log.info("Check the incoming message size should be 0, current size is {}", Long.valueOf(incomingMessageSize));
                    Assert.assertEquals(incomingMessageSize, 0L);
                });
                MemoryLimitController memoryLimitController = this.pulsarClient.getMemoryLimitController();
                Assert.assertEquals(memoryLimitController.currentUsage(), 0L);
                Assert.assertEquals(memoryLimitController.currentUsagePercent(), 0.0d);
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }
}
