package org.apache.pulsar.client.api;

import java.util.ArrayList;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.springframework.util.backoff.ExponentialBackOff;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker-api"})
/* loaded from: input_file:org/apache/pulsar/client/api/ConsumerAckListTest.class */
public class ConsumerAckListTest extends ProducerConsumerBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "ackReceiptEnabled")
    public Object[][] ackReceiptEnabled() {
        return new Object[]{new Object[]{true}, new Object[]{false}};
    }

    @Test(timeOut = ExponentialBackOff.DEFAULT_MAX_INTERVAL, dataProvider = "ackReceiptEnabled")
    public void testBatchListAck(boolean z) throws Exception {
        ackListMessage(true, true, z);
        ackListMessage(true, false, z);
        ackListMessage(false, false, z);
        ackListMessage(false, true, z);
    }

    private void ackListMessage(boolean z, boolean z2, boolean z3) throws Exception {
        String str = "persistent://my-property/my-ns/batch-ack-" + UUID.randomUUID();
        String str2 = "testBatchAck-sub" + UUID.randomUUID();
        int nextInt = ThreadLocalRandom.current().nextInt(50, 100);
        if (z2) {
            this.admin.topics().createPartitionedTopic(str, 3);
        }
        Producer<String> create = this.pulsarClient.newProducer(Schema.STRING).enableBatching(z).batchingMaxPublishDelay(50L, TimeUnit.MILLISECONDS).topic(str).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Shared).topic(str).negativeAckRedeliveryDelay(1001L, TimeUnit.MILLISECONDS).subscriptionName(str2).enableBatchIndexAcknowledgment(z3).isAckReceiptEnabled(z3).subscribe();
            try {
                sendMessagesAsyncAndWait(create, nextInt);
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < nextInt; i++) {
                    arrayList.add(subscribe.receive().getMessageId());
                }
                subscribe.acknowledge(arrayList);
                Thread.sleep(1000L);
                subscribe.redeliverUnacknowledgedMessages();
                Assert.assertNull(subscribe.receive(2, TimeUnit.SECONDS));
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    private void sendMessagesAsyncAndWait(Producer<String> producer, int i) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(i);
        for (int i2 = 0; i2 < i; i2++) {
            producer.sendAsync("my-message-" + i2).thenAccept(messageId -> {
                if (messageId != null) {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
    }
}
