package org.apache.pulsar.client.impl;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.schema.proto.Test;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker-impl"})
/* loaded from: input_file:org/apache/pulsar/client/impl/NegativeAcksTest.class */
public class NegativeAcksTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(NegativeAcksTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    public void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "variations")
    public static Object[][] variations() {
        return new Object[]{new Object[]{false, false, SubscriptionType.Shared, 100, 0}, new Object[]{false, false, SubscriptionType.Failover, 100, 0}, new Object[]{false, true, SubscriptionType.Shared, 100, 0}, new Object[]{false, true, SubscriptionType.Failover, 100, 0}, new Object[]{true, false, SubscriptionType.Shared, 100, 0}, new Object[]{true, false, SubscriptionType.Failover, 100, 0}, new Object[]{true, true, SubscriptionType.Shared, 100, 0}, new Object[]{true, true, SubscriptionType.Failover, 100, 0}, new Object[]{false, false, SubscriptionType.Shared, 0, 0}, new Object[]{false, false, SubscriptionType.Failover, 0, 0}, new Object[]{false, true, SubscriptionType.Shared, 0, 0}, new Object[]{false, true, SubscriptionType.Failover, 0, 0}, new Object[]{true, false, SubscriptionType.Shared, 0, 0}, new Object[]{true, false, SubscriptionType.Failover, 0, 0}, new Object[]{true, true, SubscriptionType.Shared, 0, 0}, new Object[]{true, true, SubscriptionType.Failover, 0, 0}, new Object[]{false, false, SubscriptionType.Shared, 100, 1000}, new Object[]{false, false, SubscriptionType.Failover, 100, 1000}, new Object[]{false, true, SubscriptionType.Shared, 100, 1000}, new Object[]{false, true, SubscriptionType.Failover, 100, 1000}, new Object[]{true, false, SubscriptionType.Shared, 100, 1000}, new Object[]{true, false, SubscriptionType.Failover, 100, 1000}, new Object[]{true, true, SubscriptionType.Shared, 100, 1000}, new Object[]{true, true, SubscriptionType.Failover, 100, 1000}, new Object[]{false, false, SubscriptionType.Shared, 0, 1000}, new Object[]{false, false, SubscriptionType.Failover, 0, 1000}, new Object[]{false, true, SubscriptionType.Shared, 0, 1000}, new Object[]{false, true, SubscriptionType.Failover, 0, 1000}, new Object[]{true, false, SubscriptionType.Shared, 0, 1000}, new Object[]{true, false, SubscriptionType.Failover, 0, 1000}, new Object[]{true, true, SubscriptionType.Shared, 0, 1000}, new Object[]{true, true, SubscriptionType.Failover, 0, 1000}};
    }

    @Test(dataProvider = "variations")
    public void testNegativeAcks(boolean z, boolean z2, SubscriptionType subscriptionType, int i, int i2) throws Exception {
        log.info("Test negative acks batching={} partitions={} subType={} negAckDelayMs={}", new Object[]{Boolean.valueOf(z), Boolean.valueOf(z2), subscriptionType, Integer.valueOf(i)});
        String newUniqueName = BrokerTestUtil.newUniqueName("testNegativeAcks");
        if (z2) {
            this.admin.topics().createPartitionedTopic(newUniqueName, 2);
        }
        ConsumerBase subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{newUniqueName}).subscriptionName("sub1").acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscriptionType(subscriptionType).negativeAckRedeliveryDelay(i, TimeUnit.MILLISECONDS).ackTimeout(i2, TimeUnit.MILLISECONDS).subscribe();
        try {
            Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(newUniqueName).enableBatching(z).create();
            try {
                HashSet hashSet = new HashSet();
                for (int i3 = 0; i3 < 10; i3++) {
                    String str = "test-" + i3;
                    create.sendAsync(str);
                    hashSet.add(str);
                }
                create.flush();
                for (int i4 = 0; i4 < 10; i4++) {
                    subscribe.negativeAcknowledge(subscribe.receive());
                }
                Assert.assertTrue(subscribe instanceof ConsumerBase);
                Assert.assertEquals(subscribe.getUnAckedMessageTracker().size(), 0L);
                HashSet hashSet2 = new HashSet();
                for (int i5 = 0; i5 < 10; i5++) {
                    Message receive = subscribe.receive();
                    hashSet2.add((String) receive.getValue());
                    subscribe.acknowledge(receive);
                }
                Assert.assertEquals(hashSet2, hashSet);
                Assert.assertNull(subscribe.receive(100, TimeUnit.MILLISECONDS));
                subscribe.close();
                create.close();
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "variationsBackoff")
    public static Object[][] variationsBackoff() {
        return new Object[]{new Object[]{false, false, SubscriptionType.Shared, 100, 1000}, new Object[]{false, false, SubscriptionType.Failover, 100, 1000}, new Object[]{false, true, SubscriptionType.Shared, 100, 1000}, new Object[]{false, true, SubscriptionType.Failover, 100, 1000}, new Object[]{true, false, SubscriptionType.Shared, 100, 1000}, new Object[]{true, false, SubscriptionType.Failover, 100, 1000}, new Object[]{true, true, SubscriptionType.Shared, 100, 1000}, new Object[]{true, true, SubscriptionType.Failover, 100, 1000}, new Object[]{false, false, SubscriptionType.Shared, 0, 1000}, new Object[]{false, false, SubscriptionType.Failover, 0, 1000}, new Object[]{false, true, SubscriptionType.Shared, 0, 1000}, new Object[]{false, true, SubscriptionType.Failover, 0, 1000}, new Object[]{true, false, SubscriptionType.Shared, 0, 1000}, new Object[]{true, false, SubscriptionType.Failover, 0, 1000}, new Object[]{true, true, SubscriptionType.Shared, 0, 1000}, new Object[]{true, true, SubscriptionType.Failover, 0, 1000}, new Object[]{false, false, SubscriptionType.Shared, 100, 1000}, new Object[]{false, false, SubscriptionType.Failover, 100, 1000}, new Object[]{false, true, SubscriptionType.Shared, 100, 1000}, new Object[]{false, true, SubscriptionType.Failover, 100, 1000}, new Object[]{true, false, SubscriptionType.Shared, 100, 1000}, new Object[]{true, false, SubscriptionType.Failover, 100, 1000}, new Object[]{true, true, SubscriptionType.Shared, 100, 1000}, new Object[]{true, true, SubscriptionType.Failover, 100, 1000}, new Object[]{false, false, SubscriptionType.Shared, 0, 1000}, new Object[]{false, false, SubscriptionType.Failover, 0, 1000}, new Object[]{false, true, SubscriptionType.Shared, 0, 1000}, new Object[]{false, true, SubscriptionType.Failover, 0, 1000}, new Object[]{true, false, SubscriptionType.Shared, 0, 1000}, new Object[]{true, false, SubscriptionType.Failover, 0, 1000}, new Object[]{true, true, SubscriptionType.Shared, 0, 1000}, new Object[]{true, true, SubscriptionType.Failover, 0, 1000}};
    }

    @Test(dataProvider = "variationsBackoff")
    public void testNegativeAcksWithBackoff(boolean z, boolean z2, SubscriptionType subscriptionType, int i, int i2) throws Exception {
        log.info("Test negative acks with back off batching={} partitions={} subType={} minNackTimeMs={}, maxNackTimeMs={}", new Object[]{Boolean.valueOf(z), Boolean.valueOf(z2), subscriptionType, Integer.valueOf(i), Integer.valueOf(i2)});
        String newUniqueName = BrokerTestUtil.newUniqueName("testNegativeAcksWithBackoff");
        MultiplierRedeliveryBackoff build = MultiplierRedeliveryBackoff.builder().minDelayMs(i).maxDelayMs(i2).build();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{newUniqueName}).subscriptionName("sub1").acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscriptionType(subscriptionType).negativeAckRedeliveryBackoff(build).subscribe();
        try {
            Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(newUniqueName).enableBatching(z).create();
            try {
                HashSet hashSet = new HashSet();
                for (int i3 = 0; i3 < 10; i3++) {
                    String str = "test-" + i3;
                    create.sendAsync(str);
                    hashSet.add(str);
                }
                create.flush();
                long currentTimeMillis = System.currentTimeMillis();
                long j = 0;
                for (int i4 = 0; i4 < 5; i4++) {
                    for (int i5 = 0; i5 < 10; i5++) {
                        Message receive = subscribe.receive();
                        log.info("Received message {}", receive.getValue());
                        subscribe.negativeAcknowledge(receive);
                    }
                    j += build.next(i4);
                }
                HashSet hashSet2 = new HashSet();
                for (int i6 = 0; i6 < 10; i6++) {
                    Message receive2 = subscribe.receive();
                    hashSet2.add((String) receive2.getValue());
                    subscribe.acknowledge(receive2);
                }
                long currentTimeMillis2 = System.currentTimeMillis();
                log.info("Total redelivery delay: {} ms", Long.valueOf(currentTimeMillis2 - currentTimeMillis));
                Assert.assertEquals(hashSet2, hashSet);
                if (SubscriptionType.Shared == subscriptionType) {
                    log.info("Total expected redelivery delay {} ms", Long.valueOf(j));
                    Assert.assertTrue(currentTimeMillis2 - currentTimeMillis >= j);
                }
                Assert.assertNull(subscribe.receive(100, TimeUnit.MILLISECONDS));
                subscribe.close();
                create.close();
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test(timeOut = 10000)
    public void testNegativeAcksDeleteFromUnackedTracker() throws Exception {
        ConsumerImpl subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{BrokerTestUtil.newUniqueName("testNegativeAcksDeleteFromUnackedTracker")}).subscriptionName("sub1").acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Shared).ackTimeout(100L, TimeUnit.SECONDS).negativeAckRedeliveryDelay(100L, TimeUnit.SECONDS).subscribe();
        try {
            TopicMessageId create = TopicMessageId.create("topic-1", new MessageIdImpl(3L, 1L, 0));
            BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(3L, 1L, 0, 0);
            BatchMessageIdImpl batchMessageIdImpl2 = new BatchMessageIdImpl(3L, 1L, 0, 1);
            BatchMessageIdImpl batchMessageIdImpl3 = new BatchMessageIdImpl(3L, 1L, 0, 2);
            UnAckedMessageTracker unAckedMessageTracker = subscribe.getUnAckedMessageTracker();
            unAckedMessageTracker.add(create);
            subscribe.negativeAcknowledge(create);
            NegativeAcksTracker negativeAcksTracker = subscribe.getNegativeAcksTracker();
            Assert.assertEquals(((Integer) negativeAcksTracker.getNackedMessagesCount().orElse(-1)).intValue(), 1);
            Assert.assertEquals(unAckedMessageTracker.size(), 0L);
            negativeAcksTracker.close();
            unAckedMessageTracker.add(batchMessageIdImpl);
            unAckedMessageTracker.add(batchMessageIdImpl2);
            unAckedMessageTracker.add(batchMessageIdImpl3);
            subscribe.negativeAcknowledge(batchMessageIdImpl);
            subscribe.negativeAcknowledge(batchMessageIdImpl2);
            subscribe.negativeAcknowledge(batchMessageIdImpl3);
            Assert.assertEquals(((Integer) negativeAcksTracker.getNackedMessagesCount().orElse(-1)).intValue(), 1);
            Assert.assertEquals(unAckedMessageTracker.size(), 0L);
            negativeAcksTracker.close();
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
            throw th;
        }
    }

    @Test
    public void testNegativeAcksWithBatchAckEnabled() throws Exception {
        cleanup();
        this.conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
        setup();
        String newUniqueName = BrokerTestUtil.newUniqueName("testNegativeAcksWithBatchAckEnabled");
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{newUniqueName}).subscriptionName("sub1").acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Shared).enableBatchIndexAcknowledgment(true).negativeAckRedeliveryDelay(1L, TimeUnit.SECONDS).subscribe();
        try {
            Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(newUniqueName).create();
            try {
                HashSet hashSet = new HashSet();
                for (int i = 0; i < 10; i++) {
                    String str = "test-" + i;
                    create.sendAsync(str);
                    hashSet.add(str);
                }
                create.flush();
                for (int i2 = 0; i2 < 10; i2++) {
                    subscribe.negativeAcknowledge(subscribe.receive());
                }
                HashSet hashSet2 = new HashSet();
                for (int i3 = 0; i3 < 10; i3++) {
                    Message receive = subscribe.receive();
                    hashSet2.add((String) receive.getValue());
                    subscribe.acknowledge(receive);
                }
                Assert.assertEquals(hashSet2, hashSet);
                Assert.assertNull(subscribe.receive(100, TimeUnit.MILLISECONDS));
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test
    public void testFailoverConsumerBatchCumulateAck() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("my-topic");
        this.admin.topics().createPartitionedTopic(newUniqueName, 2);
        final Consumer subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{newUniqueName}).subscriptionName("sub").subscriptionType(SubscriptionType.Failover).enableBatchIndexAcknowledgment(true).acknowledgmentGroupTime(100L, TimeUnit.MILLISECONDS).receiverQueueSize(10).subscribe();
        try {
            Producer create = this.pulsarClient.newProducer(Schema.INT32).topic(newUniqueName).batchingMaxMessages(10).batchingMaxPublishDelay(3L, TimeUnit.SECONDS).blockIfQueueFull(true).create();
            try {
                final HashSet hashSet = new HashSet();
                CountDownLatch countDownLatch = new CountDownLatch(10);
                for (int i = 0; i < 10; i++) {
                    hashSet.add(Integer.valueOf(i));
                    create.sendAsync(Integer.valueOf(i)).whenComplete((messageId, th) -> {
                        countDownLatch.countDown();
                    });
                }
                countDownLatch.await();
                final CountDownLatch countDownLatch2 = new CountDownLatch(1);
                new Thread(new Runnable() { // from class: org.apache.pulsar.client.impl.NegativeAcksTest.1
                    @Override // java.lang.Runnable
                    public void run() {
                        CompletableFuture receiveAsync = subscribe.receiveAsync();
                        Set set = hashSet;
                        Consumer consumer = subscribe;
                        CompletableFuture thenCompose = receiveAsync.thenCompose(message -> {
                            NegativeAcksTest.log.info("received one msg : {}", message.getMessageId());
                            set.remove(message.getValue());
                            return consumer.acknowledgeCumulativeAsync(message);
                        });
                        Consumer consumer2 = subscribe;
                        CompletableFuture<Void> thenAccept = thenCompose.thenAccept(r5 -> {
                            try {
                                Thread.sleep(500L);
                                consumer2.redeliverUnacknowledgedMessages();
                            } catch (Exception e) {
                                throw new RuntimeException(e);
                            }
                        });
                        CountDownLatch countDownLatch3 = countDownLatch2;
                        thenAccept.whenComplete((r3, th2) -> {
                            countDownLatch3.countDown();
                        });
                    }
                }).start();
                countDownLatch2.await();
                Thread.sleep(500L);
                int i2 = 0;
                while (true) {
                    Message receive = subscribe.receive(5, TimeUnit.SECONDS);
                    if (receive == null) {
                        break;
                    }
                    subscribe.acknowledgeCumulative(receive);
                    Thread.sleep(200L);
                    hashSet.remove(receive.getValue());
                    log.info("received msg : {}", receive.getMessageId());
                    i2++;
                }
                Assert.assertEquals(i2, 9);
                Assert.assertEquals(0, hashSet.size());
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th2;
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test(invocationCount = Test.TestMessage.NESTEDFIELD_FIELD_NUMBER)
    public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("my-topic");
        this.admin.topics().createPartitionedTopic(newUniqueName, 2);
        MultiTopicsConsumerImpl subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{newUniqueName}).subscriptionName("sub").receiverQueueSize(10).subscribe();
        try {
            ExecutorService executorService = (ExecutorService) WhiteboxImpl.getInternalState(subscribe, "internalPinnedExecutor");
            Producer create = this.pulsarClient.newProducer(Schema.INT32).topic(newUniqueName).enableBatching(false).create();
            for (int i = 0; i < 10; i++) {
                try {
                    create.send(Integer.valueOf(i));
                } catch (Throwable th) {
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    throw th;
                }
            }
            Awaitility.await().until(() -> {
                return Boolean.valueOf(subscribe.incomingMessages.size() == 10);
            });
            Thread.sleep(1000L);
            executorService.submit(() -> {
                subscribe.redeliverUnacknowledgedMessages();
            }).get();
            executorService.submit(() -> {
            }).get();
            HashSet hashSet = new HashSet();
            while (true) {
                Message receive = subscribe.receive(2, TimeUnit.SECONDS);
                if (receive == null) {
                    break;
                } else {
                    hashSet.add((Integer) receive.getValue());
                }
            }
            Assert.assertEquals(hashSet.size(), 10);
            create.close();
            subscribe.close();
            this.admin.topics().deletePartitionedTopic("persistent://public/default/" + newUniqueName);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }
}
