package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker-api"})
/* loaded from: input_file:org/apache/pulsar/client/api/ConsumerRedeliveryTest.class */
public class ConsumerRedeliveryTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(ConsumerRedeliveryTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        this.conf.setManagedLedgerCacheEvictionFrequency(0.1d);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "ackReceiptEnabled")
    public Object[][] ackReceiptEnabled() {
        return new Object[]{new Object[]{true}, new Object[]{false}};
    }

    /* JADX WARN: Finally extract failed */
    @Test(dataProvider = "ackReceiptEnabled")
    public void testOrderedRedelivery(boolean z) throws Exception {
        String str = "persistent://my-property/my-ns/redelivery-" + System.currentTimeMillis();
        this.conf.setManagedLedgerMaxEntriesPerLedger(2);
        this.conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
        Producer create = this.pulsarClient.newProducer().topic(str).producerName("my-producer-name").create();
        try {
            ConsumerBuilder isAckReceiptEnabled = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("s1").subscriptionType(SubscriptionType.Shared).isAckReceiptEnabled(z);
            ConsumerImpl subscribe = isAckReceiptEnabled.subscribe();
            for (int i = 0; i < 100; i++) {
                create.send(("my-message-" + i).getBytes());
            }
            int i2 = 0;
            HashSet newHashSet = Sets.newHashSet();
            for (int i3 = 0; i3 < 100; i3++) {
                Message receive = subscribe.receive(5, TimeUnit.SECONDS);
                if (receive == null || i2 % 2 != 0) {
                    newHashSet.add(receive.getMessageId());
                } else {
                    subscribe.acknowledge(receive);
                }
                i2++;
            }
            Assert.assertEquals(100, i2);
            subscribe.redeliverUnacknowledgedMessages(newHashSet);
            MessageIdImpl messageIdImpl = null;
            for (int i4 = 0; i4 < 50; i4++) {
                MessageIdImpl messageId = subscribe.receive(5, TimeUnit.SECONDS).getMessageId();
                if (messageIdImpl != null) {
                    Assert.assertTrue(messageIdImpl.getLedgerId() <= messageId.getLedgerId(), "lastMsgId: " + messageIdImpl + " -- msgId: " + messageId);
                }
                messageIdImpl = messageId;
            }
            subscribe.close();
            Consumer subscribe2 = isAckReceiptEnabled.subscribe();
            MessageIdImpl messageIdImpl2 = null;
            for (int i5 = 0; i5 < 50; i5++) {
                try {
                    MessageIdImpl messageId2 = subscribe2.receive(5, TimeUnit.SECONDS).getMessageId();
                    if (messageIdImpl2 != null) {
                        Assert.assertTrue(messageIdImpl2.getLedgerId() <= messageId2.getLedgerId());
                    }
                    messageIdImpl2 = messageId2;
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe2).get(0) != null) {
                        subscribe2.close();
                    }
                    throw th;
                }
            }
            if (Collections.singletonList(subscribe2).get(0) != null) {
                subscribe2.close();
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test(dataProvider = "ackReceiptEnabled")
    public void testUnAckMessageRedeliveryWithReceiveAsync(boolean z) throws PulsarClientException, ExecutionException, InterruptedException {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/async-unack-redelivery"}).subscriptionName("s1").isAckReceiptEnabled(z).enableBatchIndexAcknowledgment(z).ackTimeout(3L, TimeUnit.SECONDS).subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/async-unack-redelivery").enableBatching(true).batchingMaxMessages(5).batchingMaxPublishDelay(1L, TimeUnit.SECONDS).create();
        ArrayList arrayList = new ArrayList(10);
        for (int i = 0; i < 10; i++) {
            arrayList.add(subscribe.receiveAsync());
        }
        for (int i2 = 0; i2 < 10; i2++) {
            create.sendAsync("my-message-" + i2);
        }
        int i3 = 0;
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assert.assertNotNull((Message) ((CompletableFuture) it.next()).get());
            i3++;
        }
        Assert.assertEquals(10, i3);
        for (int i4 = 0; i4 < 10; i4++) {
            Message receive = subscribe.receive();
            Assert.assertNotNull(receive);
            i3++;
            subscribe.acknowledge(receive);
        }
        Assert.assertEquals(20, i3);
        create.close();
        subscribe.close();
    }

    @Test
    public void testConsumerWithPermitReceiveBatchMessages() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        int i = 100;
        String str = "permitReceiveBatchMessages" + UUID.randomUUID().toString();
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscriptionName("my-subscriber-name").subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic(str);
        producerBuilder.enableBatching(true);
        producerBuilder.batchingMaxPublishDelay(2000L, TimeUnit.MILLISECONDS);
        producerBuilder.batchingMaxMessages(100);
        Producer create = producerBuilder.create();
        for (int i2 = 0; i2 < 100; i2++) {
            create.sendAsync(("my-message-" + i2).getBytes());
        }
        create.flush();
        for (int i3 = 0; i3 < 10; i3++) {
            create.sendAsync(("my-message-" + i3).getBytes());
        }
        create.flush();
        retryStrategically(r5 -> {
            return subscribe.getTotalIncomingMessages() == i;
        }, 5, 2000L);
        Assert.assertEquals(subscribe.getTotalIncomingMessages(), 100);
        ConsumerImpl subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{str}).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscriptionName("my-subscriber-name").subscribe();
        retryStrategically(r4 -> {
            return subscribe2.getTotalIncomingMessages() == 10;
        }, 5, 2000L);
        Assert.assertEquals(subscribe2.getTotalIncomingMessages(), 10);
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(timeOut = 30000)
    public void testMessageRedeliveryAfterUnloadedWithEarliestPosition() throws Exception {
        String str = "testMessageRedeliveryAfterUnloadedWithEarliestPosition" + UUID.randomUUID();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).enableBatching(false).create();
        ArrayList arrayList = new ArrayList(100);
        for (int i = 0; i < 100; i++) {
            arrayList.add(create.sendAsync("Hello - " + i));
        }
        create.flush();
        FutureUtil.waitForAll(arrayList).get();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionType(SubscriptionType.Shared).subscriptionName("my-subscriber-name").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        ArrayList arrayList2 = new ArrayList(100);
        for (int i2 = 0; i2 < 100; i2++) {
            arrayList2.add(subscribe.receive());
        }
        Assert.assertEquals(arrayList2.size(), 100);
        Assert.assertNull(subscribe.receive(1, TimeUnit.SECONDS));
        this.admin.topics().unload(str);
        arrayList2.clear();
        for (int i3 = 0; i3 < 100; i3++) {
            arrayList2.add(subscribe.receive());
        }
        Assert.assertEquals(arrayList2.size(), 100);
        Assert.assertNull(subscribe.receive(1, TimeUnit.SECONDS));
        subscribe.close();
        create.close();
    }
}
