package org.apache.pulsar.broker.admin;

import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.collections.Lists;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.class */
public class MaxUnackedMessagesTest extends ProducerConsumerBase {
    private final String testTenant = "my-property";
    private final String testNamespace = "my-ns";
    private final String myNamespace = "my-property/my-ns";
    private final String testTopic = "persistent://my-property/my-ns/max-unacked-";

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
        resetConfig();
    }

    @Test(timeOut = 10000)
    public void testMaxUnackedMessagesOnSubscriptionApi() throws Exception {
        String str = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        this.admin.topics().createPartitionedTopic(str, 3);
        waitCacheInit(str);
        Assert.assertNull(this.admin.topics().getMaxUnackedMessagesOnSubscription(str));
        this.admin.topics().setMaxUnackedMessagesOnSubscription(str, 2048);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.topics().getMaxUnackedMessagesOnSubscription(str));
        });
        Assert.assertEquals(this.admin.topics().getMaxUnackedMessagesOnSubscription(str).intValue(), 2048);
        this.admin.topics().removeMaxUnackedMessagesOnSubscription(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getMaxUnackedMessagesOnSubscription(str));
        });
        Assert.assertNull(this.admin.topics().getMaxUnackedMessagesOnSubscription(str));
    }

    @Test(timeOut = 20000)
    public void testMaxUnackedMessagesOnSubscription() throws Exception {
        Message receive;
        String str = "persistent://my-property/my-ns/max-unacked-" + System.currentTimeMillis();
        String str2 = "test-sub" + System.currentTimeMillis();
        this.pulsar.getConfiguration().setMaxUnackedMessagesPerSubscription(100);
        ConsumerBuilder subscriptionType = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(str2).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared);
        List newArrayList = Lists.newArrayList(new Consumer[]{subscriptionType.subscribe(), subscriptionType.subscribe(), subscriptionType.subscribe()});
        waitCacheInit(str);
        this.admin.topics().setMaxUnackedMessagesOnSubscription(str, 100);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.topics().getMaxUnackedMessagesOnSubscription(str));
        });
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        for (int i = 0; i < 200; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        HashMap newHashMap = Maps.newHashMap();
        for (int i2 = 0; i2 < 3; i2++) {
            for (int i3 = 0; i3 < 200 && (receive = ((Consumer) newArrayList.get(i2)).receive(500, TimeUnit.MILLISECONDS)) != null; i3++) {
                newHashMap.put(receive, (Consumer) newArrayList.get(i2));
            }
        }
        Assert.assertEquals(100.0f, newHashMap.size(), 30.0f);
        newHashMap.forEach((message, consumer) -> {
            try {
                consumer.acknowledge(message);
            } catch (PulsarClientException e) {
                Assert.fail("ack failed", e);
            }
        });
        ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
        int size = 200 - newHashMap.size();
        CountDownLatch countDownLatch = new CountDownLatch(size);
        for (int i4 = 0; i4 < newArrayList.size(); i4++) {
            int i5 = i4;
            for (int i6 = 0; i6 < 200; i6++) {
                ((Consumer) newArrayList.get(i4)).receiveAsync().thenAccept(message2 -> {
                    newKeySet.add(message2.getMessageId());
                    try {
                        ((Consumer) newArrayList.get(i5)).acknowledge(message2);
                    } catch (PulsarClientException e) {
                        Assert.fail("failed to ack msg", e);
                    }
                    countDownLatch.countDown();
                });
            }
        }
        countDownLatch.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals(newKeySet.size(), size);
        create.close();
        newArrayList.forEach(consumer2 -> {
            try {
                consumer2.close();
            } catch (PulsarClientException e) {
            }
        });
    }

    @Test(timeOut = 20000)
    public void testMaxUnackedMessagesOnConsumerApi() throws Exception {
        String str = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        this.admin.topics().createPartitionedTopic(str, 3);
        waitCacheInit(str);
        Assert.assertNull(this.admin.topics().getMaxUnackedMessagesOnConsumer(str));
        this.admin.topics().setMaxUnackedMessagesOnConsumer(str, 2048);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.topics().getMaxUnackedMessagesOnConsumer(str));
        });
        Assert.assertEquals(this.admin.topics().getMaxUnackedMessagesOnConsumer(str).intValue(), 2048);
        this.admin.topics().removeMaxUnackedMessagesOnConsumer(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getMaxUnackedMessagesOnConsumer(str));
        });
        Assert.assertNull(this.admin.topics().getMaxUnackedMessagesOnConsumer(str));
    }

    @Test(timeOut = 20000)
    public void testMaxUnackedMessagesOnConsumerAppliedApi() throws Exception {
        String str = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        this.admin.topics().createPartitionedTopic(str, 3);
        waitCacheInit(str);
        Assert.assertEquals(this.admin.topics().getMaxUnackedMessagesOnConsumer(str, true).intValue(), this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer());
        this.admin.namespaces().setMaxUnackedMessagesPerConsumer("my-property/my-ns", 10);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.namespaces().getMaxUnackedMessagesPerConsumer("my-property/my-ns"));
        });
        Assert.assertEquals(this.admin.topics().getMaxUnackedMessagesOnConsumer(str, true).intValue(), 10);
        this.admin.topics().setMaxUnackedMessagesOnConsumer(str, 20);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.topics().getMaxUnackedMessagesOnConsumer(str));
        });
        Assert.assertEquals(this.admin.topics().getMaxUnackedMessagesOnConsumer(str, true).intValue(), 20);
    }

    @Test
    public void testMaxUnackedMessagesOnSubApplied() throws Exception {
        String str = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        waitCacheInit(str);
        Assert.assertNull(this.admin.namespaces().getMaxUnackedMessagesPerSubscription("my-property/my-ns"));
        Assert.assertNull(this.admin.topics().getMaxUnackedMessagesOnSubscription(str));
        Assert.assertEquals(this.admin.topics().getMaxUnackedMessagesOnSubscription(str, true), Integer.valueOf(this.conf.getMaxUnackedMessagesPerSubscription()));
        this.admin.namespaces().setMaxUnackedMessagesPerSubscription("my-property/my-ns", 10);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.namespaces().getMaxUnackedMessagesPerSubscription("my-property/my-ns"), 10);
        });
        this.admin.topics().setMaxUnackedMessagesOnSubscription(str, 20);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getMaxUnackedMessagesOnSubscription(str), 20);
        });
        this.admin.topics().removeMaxUnackedMessagesOnSubscription(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.namespaces().getMaxUnackedMessagesPerSubscription("my-property/my-ns"), 10);
        });
        this.admin.namespaces().removeMaxUnackedMessagesPerSubscription("my-property/my-ns");
        Assert.assertEquals(this.admin.topics().getMaxUnackedMessagesOnSubscription(str, true), Integer.valueOf(this.conf.getMaxUnackedMessagesPerSubscription()));
    }

    /* JADX WARN: Finally extract failed */
    @Test(timeOut = 30000)
    public void testMaxUnackedMessagesOnConsumer() throws Exception {
        Message receive;
        String str = "persistent://my-property/my-ns/max-unacked-" + System.currentTimeMillis();
        ConsumerBuilder subscriptionType = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("test-sub" + System.currentTimeMillis()).receiverQueueSize(10).ackTimeout(1L, TimeUnit.MINUTES).subscriptionType(SubscriptionType.Shared);
        Consumer subscribe = subscriptionType.subscribe();
        try {
            Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).create();
            for (int i = 0; i < 300; i++) {
                try {
                    create.send("my-message-" + i);
                } catch (Throwable th) {
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    throw th;
                }
            }
            int i2 = 0;
            ArrayList arrayList = new ArrayList(300);
            for (int i3 = 0; i3 < 300 && (receive = subscribe.receive(1, TimeUnit.SECONDS)) != null; i3++) {
                i2++;
                arrayList.add(receive);
            }
            Assert.assertEquals(i2, 300);
            arrayList.forEach(message -> {
                try {
                    subscribe.acknowledge(message);
                } catch (PulsarClientException e) {
                }
            });
            waitCacheInit(str);
            this.admin.topics().setMaxUnackedMessagesOnConsumer(str, 100);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertNotNull(this.admin.topics().getMaxUnackedMessagesOnConsumer(str));
            });
            Assert.assertEquals(this.admin.topics().getMaxUnackedMessagesOnConsumer(str).intValue(), 100);
            subscribe = subscriptionType.subscribe();
            try {
                Consumer<String> subscribe2 = subscriptionType.subscribe();
                for (int i4 = 0; i4 < 300; i4++) {
                    try {
                        create.send("my-message-" + i4);
                    } finally {
                        if (Collections.singletonList(subscribe2).get(0) != null) {
                            subscribe2.close();
                        }
                    }
                }
                AtomicInteger atomicInteger = new AtomicInteger(0);
                AtomicInteger atomicInteger2 = new AtomicInteger(0);
                CountDownLatch countDownLatch = new CountDownLatch(2);
                startConsumer(subscribe, atomicInteger, countDownLatch);
                startConsumer(subscribe2, atomicInteger2, countDownLatch);
                countDownLatch.await(10L, TimeUnit.SECONDS);
                Assert.assertEquals(atomicInteger.get(), 100);
                Assert.assertEquals(atomicInteger2.get(), 100);
                if (Collections.singletonList(subscribe2).get(0) != null) {
                    subscribe2.close();
                }
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } finally {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    private void startConsumer(Consumer<String> consumer, AtomicInteger atomicInteger, CountDownLatch countDownLatch) {
        new Thread(() -> {
            while (consumer.receive(500, TimeUnit.MILLISECONDS) != null) {
                try {
                    atomicInteger.incrementAndGet();
                } catch (PulsarClientException e) {
                    return;
                }
            }
            countDownLatch.countDown();
        }).start();
    }

    private void waitCacheInit(String str) throws Exception {
        this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-sub").subscribe().close();
        TopicName topicName = TopicName.get(str);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(topicName));
        });
    }
}
