package org.apache.pulsar.broker.service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.class */
public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest {
    private static final Logger log = LoggerFactory.getLogger(BatchMessageWithBatchIndexLevelTest.class);

    @Override // org.apache.pulsar.broker.service.BatchMessageTest, org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        this.conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
        super.baseSetup();
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testBatchMessageAck() {
        String str = "persistent://prop/ns-abc/batchMessageAck-" + UUID.randomUUID();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("sub-batch-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).enableBatchIndexAcknowledgment(true).negativeAckRedeliveryDelay(100L, TimeUnit.MILLISECONDS).subscribe();
        try {
            Producer create = this.pulsarClient.newProducer().topic(str).batchingMaxMessages(20).batchingMaxPublishDelay(1L, TimeUnit.HOURS).enableBatching(true).create();
            try {
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < 40; i++) {
                    arrayList.add(create.newMessage().value(("batch-message-" + i).getBytes()).sendAsync());
                }
                FutureUtil.waitForAll(arrayList).get();
                PersistentDispatcherMultipleConsumers dispatcher = ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference(str).get()).getSubscription("sub-batch-1").getDispatcher();
                Message receive = subscribe.receive();
                Message receive2 = subscribe.receive();
                subscribe.acknowledge(receive);
                subscribe.acknowledge(receive2);
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertEquals(((Consumer) dispatcher.getConsumers().get(0)).getUnackedMessages(), 18);
                });
                Message receive3 = subscribe.receive();
                Message receive4 = subscribe.receive();
                subscribe.acknowledge(receive3);
                subscribe.acknowledge(receive4);
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertEquals(((Consumer) dispatcher.getConsumers().get(0)).getUnackedMessages(), 16);
                });
                subscribe.pause();
                subscribe.negativeAcknowledge(subscribe.receive());
                Awaitility.await().pollInterval(1L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                    Assert.assertEquals(((Consumer) dispatcher.getConsumers().get(0)).getUnackedMessages(), 0);
                });
                subscribe.resume();
                subscribe.receive();
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertEquals(((Consumer) dispatcher.getConsumers().get(0)).getUnackedMessages(), 16);
                });
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
            throw th2;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testBatchMessageMultiNegtiveAck() throws Exception {
        String str = "persistent://prop/ns-abc/batchMessageMultiNegtiveAck-" + UUID.randomUUID();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("sub-negtive-1").subscriptionType(SubscriptionType.Shared).receiverQueueSize(10).enableBatchIndexAcknowledgment(true).negativeAckRedeliveryDelay(100L, TimeUnit.MILLISECONDS).subscribe();
        try {
            Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).batchingMaxMessages(20).batchingMaxPublishDelay(1L, TimeUnit.HOURS).enableBatching(true).create();
            for (int i = 0; i < 20; i++) {
                try {
                    create.sendAsync("test-" + i);
                } catch (Throwable th) {
                    throw th;
                }
            }
            create.flush();
            for (int i2 = 0; i2 < 20; i2++) {
                Message receive = subscribe.receive();
                if (i2 % 2 == 0) {
                    subscribe.acknowledgeAsync(receive);
                } else {
                    subscribe.negativeAcknowledge(receive);
                }
            }
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats(str).getSubscriptions().get("sub-negtive-1")).getUnackedMessages(), 10L);
            });
            String str2 = "persistent://prop/ns-abc/batchMessageMultiNegtiveAck2-" + UUID.randomUUID();
            subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str2}).subscriptionName("sub-negtive-2").subscriptionType(SubscriptionType.Shared).receiverQueueSize(10).enableBatchIndexAcknowledgment(true).negativeAckRedeliveryDelay(100L, TimeUnit.MILLISECONDS).subscribe();
            try {
                create = this.pulsarClient.newProducer(Schema.STRING).topic(str2).batchingMaxMessages(20).batchingMaxPublishDelay(1L, TimeUnit.HOURS).enableBatching(true).create();
                for (int i3 = 0; i3 < 20; i3++) {
                    try {
                        create.sendAsync("test-" + i3);
                    } finally {
                        if (Collections.singletonList(create).get(0) != null) {
                            create.close();
                        }
                    }
                }
                create.flush();
                for (int i4 = 0; i4 < 20; i4++) {
                    Message receive2 = subscribe.receive();
                    if (i4 % 2 == 0) {
                        subscribe.acknowledgeAsync(receive2);
                    } else {
                        subscribe.negativeAcknowledge(receive2);
                        Thread.sleep(100L);
                    }
                }
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats(str).getSubscriptions().get("sub-negtive-1")).getUnackedMessages(), 10L);
                });
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } finally {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testAckMessageWithNotOwnerConsumerUnAckMessageCount() throws Exception {
        String str = "persistent://prop/ns-abc/testAckMessageWithNotOwnerConsumerUnAckMessageCount-" + UUID.randomUUID();
        Producer create = this.pulsarClient.newProducer().topic(str).batchingMaxPublishDelay(1L, TimeUnit.SECONDS).enableBatching(true).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).consumerName("consumer-1").negativeAckRedeliveryDelay(1L, TimeUnit.SECONDS).isAckReceiptEnabled(true).subscriptionName("test").subscriptionType(SubscriptionType.Shared).enableBatchIndexAcknowledgment(true).subscribe();
            try {
                Consumer subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{str}).consumerName("consumer-2").negativeAckRedeliveryDelay(1L, TimeUnit.SECONDS).isAckReceiptEnabled(true).subscriptionName("test").subscriptionType(SubscriptionType.Shared).enableBatchIndexAcknowledgment(true).subscribe();
                for (int i = 0; i < 5; i++) {
                    try {
                        create.newMessage().value(("Hello Pulsar - " + i).getBytes()).sendAsync();
                    } catch (Throwable th) {
                        if (Collections.singletonList(subscribe2).get(0) != null) {
                            subscribe2.close();
                        }
                        throw th;
                    }
                }
                ArrayList arrayList = new ArrayList();
                for (int i2 = 0; i2 < 5; i2++) {
                    arrayList.add(subscribe.receive().getMessageId());
                }
                subscribe.negativeAcknowledge((MessageId) arrayList.get(0));
                for (int i3 = 0; i3 < 5; i3++) {
                    subscribe2.receive().getMessageId();
                }
                subscribe.acknowledge((MessageId) arrayList.get(1));
                subscribe.acknowledge((MessageId) arrayList.get(2));
                subscribe2.negativeAcknowledge((MessageId) arrayList.get(1));
                subscribe.close();
                for (int i4 = 0; i4 < 3; i4++) {
                    subscribe2.acknowledge(subscribe2.receive().getMessageId());
                }
                Assert.assertNull(subscribe2.receive(1, TimeUnit.SECONDS));
                Awaitility.await().until(() -> {
                    return Boolean.valueOf(((Consumer) ((Topic) ((Optional) getPulsar().getBrokerService().getTopic(str, false).get()).get()).getSubscription("test").getConsumers().get(0)).getUnackedMessages() == 0);
                });
                if (Collections.singletonList(subscribe2).get(0) != null) {
                    subscribe2.close();
                }
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th2;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testNegativeAckAndLongAckDelayWillNotLeadRepeatConsume() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp_");
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(newUniqueName).enableBatching(true).batchingMaxMessages(1000).batchingMaxPublishDelay(1L, TimeUnit.HOURS).create();
        ConsumerImpl subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{newUniqueName}).subscriptionName("s1").subscriptionType(SubscriptionType.Shared).negativeAckRedeliveryDelay(2L, TimeUnit.SECONDS).enableBatchIndexAcknowledgment(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).acknowledgmentGroupTime(1L, TimeUnit.HOURS).subscribe();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < 10; i++) {
            String num = Integer.valueOf(i).toString();
            arrayList2.add(create.sendAsync(Integer.valueOf(i).toString()));
            arrayList.add(num);
        }
        create.flush();
        FutureUtil.waitForAll(arrayList2).join();
        ArrayList arrayList3 = new ArrayList();
        boolean z = false;
        while (true) {
            Message receive = subscribe.receive(2, TimeUnit.SECONDS);
            if (receive == null) {
                break;
            }
            if (z) {
                arrayList3.add((String) receive.getValue());
                subscribe.acknowledge(receive);
            } else if (receive.getMessageId().getBatchIndex() == 1) {
                subscribe.negativeAcknowledge(receive);
                z = true;
            } else {
                arrayList3.add((String) receive.getValue());
                subscribe.acknowledge(receive);
            }
        }
        while (true) {
            Message receive2 = subscribe.receive(6, TimeUnit.SECONDS);
            if (receive2 == null) {
                log.info("messagesSent: {}, messagesReceived: {}", arrayList, arrayList3);
                Assert.assertEquals(arrayList3.size(), arrayList.size());
                create.close();
                subscribe.close();
                this.admin.topics().delete(newUniqueName);
                return;
            }
            arrayList3.add((String) receive2.getValue());
            subscribe.acknowledge(receive2);
        }
    }

    @Test
    public void testMixIndexAndNonIndexUnAckMessageCount() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testMixIndexAndNonIndexUnAckMessageCount-").enableBatching(true).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/testMixIndexAndNonIndexUnAckMessageCount-"}).subscriptionName("sub").subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(100L, TimeUnit.MILLISECONDS).enableBatchIndexAcknowledgment(true).isAckReceiptEnabled(true).subscribe();
            try {
                create.send("1".getBytes());
                create.sendAsync("2".getBytes());
                create.send("3".getBytes());
                Message receive = subscribe.receive();
                Message receive2 = subscribe.receive();
                Message receive3 = subscribe.receive();
                subscribe.acknowledgeAsync(receive);
                subscribe.acknowledge(receive2);
                subscribe.acknowledge(receive3);
                Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats("persistent://prop/ns-abc/testMixIndexAndNonIndexUnAckMessageCount-").getSubscriptions().get("sub")).getUnackedMessages(), 0L);
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }
}
