package org.apache.pulsar.client.api.v1;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.EncryptionKeyInfo;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"flaky"})
/* loaded from: input_file:org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.class */
public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(V1_ProducerConsumerTest.class);
    private static final long BATCHING_MAX_PUBLISH_DELAY_THRESHOLD = 1;

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod(alwaysRun = true)
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "batch")
    public Object[][] codecProvider() {
        return new Object[]{new Object[]{0}, new Object[]{1000}};
    }

    @Test(dataProvider = "batch")
    public void testSyncProducerAndConsumer(int i) throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/use/my-ns/my-topic1");
        if (i != 0) {
            producerBuilder.enableBatching(true).batchingMaxPublishDelay(i, TimeUnit.MILLISECONDS).batchingMaxMessages(5);
        } else {
            producerBuilder.enableBatching(false);
        }
        Producer create = producerBuilder.create();
        for (int i2 = 0; i2 < 10; i2++) {
            create.send("my-message-" + i2);
        }
        Message message = null;
        HashSet newHashSet = Sets.newHashSet();
        for (int i3 = 0; i3 < 10; i3++) {
            message = subscribe.receive(5, TimeUnit.SECONDS);
            String str = (String) message.getValue();
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i3);
        }
        subscribe.acknowledgeCumulative(message);
        subscribe.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(dataProvider = "batch")
    public void testAsyncProducerAndAsyncAck(int i) throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").batchingMaxMessages(5).batchingMaxPublishDelay(BATCHING_MAX_PUBLISH_DELAY_THRESHOLD, TimeUnit.MILLISECONDS).enableBatching(i != 0).create();
        ArrayList newArrayList = Lists.newArrayList();
        for (int i2 = 0; i2 < 10; i2++) {
            newArrayList.add(create.sendAsync(("my-message-" + i2).getBytes()));
        }
        log.info("Waiting for async publish to complete");
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
        Message message = null;
        HashSet newHashSet = Sets.newHashSet();
        for (int i3 = 0; i3 < 10; i3++) {
            message = subscribe.receive(5, TimeUnit.SECONDS);
            String str = new String(message.getData());
            log.info("Received message: [{}]", str);
            testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i3);
        }
        CompletableFuture acknowledgeCumulativeAsync = subscribe.acknowledgeCumulativeAsync(message);
        log.info("Waiting for async ack to complete");
        acknowledgeCumulativeAsync.get();
        subscribe.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(dataProvider = "batch", timeOut = 100000)
    public void testMessageListener(int i) throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        CountDownLatch countDownLatch = new CountDownLatch(100);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic3"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).messageListener((consumer, message) -> {
            Assert.assertNotNull(message, "Message cannot be null");
            log.debug("Received message [{}] in the listener", new String(message.getData()));
            consumer.acknowledgeAsync(message);
            countDownLatch.countDown();
        }).subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic3").batchingMaxMessages(5).batchingMaxPublishDelay(BATCHING_MAX_PUBLISH_DELAY_THRESHOLD, TimeUnit.MILLISECONDS).enableBatching(i != 0).create();
        ArrayList newArrayList = Lists.newArrayList();
        for (int i2 = 0; i2 < 100; i2++) {
            newArrayList.add(create.sendAsync(("my-message-" + i2).getBytes()));
        }
        log.info("Waiting for async publish to complete");
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
        log.info("Waiting for message listener to ack all messages");
        Assert.assertTrue(countDownLatch.await(100, TimeUnit.SECONDS), "Timed out waiting for message listener acks");
        subscribe.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(dataProvider = "batch")
    public void testBackoffAndReconnect(int i) throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic4"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).startMessageIdInclusive().subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic4").batchingMaxMessages(5).batchingMaxPublishDelay(BATCHING_MAX_PUBLISH_DELAY_THRESHOLD, TimeUnit.MILLISECONDS).enableBatching(i != 0).create();
        CompletableFuture completableFuture = null;
        for (int i2 = 0; i2 < 10; i2++) {
            completableFuture = create.sendAsync(("my-message-" + i2).getBytes()).thenApply(messageId -> {
                log.info("Published message id: {}", messageId);
                return messageId;
            });
        }
        completableFuture.get();
        for (int i3 = 0; i3 < 10; i3++) {
            log.info("Received: [{}]", new String(subscribe.receive(5, TimeUnit.SECONDS).getData()));
        }
        log.info("-- Restarting broker --");
        restartBroker();
        Message message = null;
        log.info("Receiving duplicate messages..");
        for (int i4 = 0; i4 < 10; i4++) {
            message = subscribe.receive(5, TimeUnit.SECONDS);
            log.info("Received: [{}]", new String(message.getData()));
            Assert.assertNotNull(message, "Message cannot be null");
        }
        subscribe.acknowledgeCumulative(message);
        subscribe.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(dataProvider = "batch")
    public void testSendTimeout(int i) throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic5"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic5").batchingMaxMessages(5).batchingMaxPublishDelay(2L, TimeUnit.MILLISECONDS).enableBatching(i != 0).sendTimeout(1, TimeUnit.SECONDS).create();
        stopBroker();
        try {
            create.sendAsync("my-message".getBytes()).get();
            Assert.fail("Send operation should have failed");
        } catch (ExecutionException e) {
        }
        startBroker();
        Assert.assertNull(subscribe.receive(3, TimeUnit.SECONDS));
        subscribe.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test
    public void testInvalidSequence() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        PulsarClient build = PulsarClient.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build();
        build.close();
        try {
            build.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic6"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
            Assert.fail("Should fail");
        } catch (PulsarClientException e) {
            Assert.assertTrue(e instanceof PulsarClientException.AlreadyClosedException);
        }
        try {
            build.newProducer().topic("persistent://my-property/use/my-ns/my-topic6").create();
            Assert.fail("Should fail");
        } catch (PulsarClientException e2) {
            Assert.assertTrue(e2 instanceof PulsarClientException.AlreadyClosedException);
        }
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic6"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic6").create();
        try {
            subscribe.acknowledge(create.newMessage().value("InvalidMessage".getBytes()).getMessage());
        } catch (PulsarClientException.InvalidMessageException e3) {
        }
        subscribe.close();
        try {
            subscribe.receive();
            Assert.fail("Should fail");
        } catch (PulsarClientException.AlreadyClosedException e4) {
        }
        try {
            subscribe.unsubscribe();
            Assert.fail("Should fail");
        } catch (PulsarClientException.AlreadyClosedException e5) {
        }
        create.close();
        try {
            create.send("message".getBytes());
            Assert.fail("Should fail");
        } catch (PulsarClientException.AlreadyClosedException e6) {
        }
    }

    @Test
    public void testSillyUser() throws Exception {
        try {
            PulsarClient.builder().serviceUrl("invalid://url").build();
            Assert.fail("should fail");
        } catch (PulsarClientException e) {
            Assert.assertTrue(e instanceof PulsarClientException.InvalidServiceURL);
        }
        try {
            this.pulsarClient.newProducer().sendTimeout(-1, TimeUnit.SECONDS);
            Assert.fail("should fail");
        } catch (IllegalArgumentException e2) {
        }
        try {
            this.pulsarClient.newProducer().topic("invalid://topic").create();
            Assert.fail("should fail");
        } catch (PulsarClientException e3) {
            Assert.assertTrue(e3 instanceof PulsarClientException.InvalidTopicNameException);
        }
        try {
            this.pulsarClient.newConsumer().messageListener((MessageListener) null);
            Assert.fail("should fail");
        } catch (NullPointerException e4) {
        }
        try {
            this.pulsarClient.newConsumer().subscriptionType((SubscriptionType) null);
            Assert.fail("should fail");
        } catch (NullPointerException e5) {
        }
        try {
            this.pulsarClient.newConsumer().receiverQueueSize(-1);
            Assert.fail("should fail");
        } catch (IllegalArgumentException e6) {
        }
        try {
            this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic7"}).subscriptionName((String) null).subscribe();
            Assert.fail("Should fail");
        } catch (IllegalArgumentException e7) {
        }
        try {
            this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic7"}).subscriptionName("").subscribe();
            Assert.fail("Should fail");
        } catch (IllegalArgumentException e8) {
        }
        try {
            this.pulsarClient.newConsumer().topic(new String[]{"invalid://topic7"}).subscriptionName((String) null).subscribe();
            Assert.fail("Should fail");
        } catch (IllegalArgumentException e9) {
        }
    }

    @Test(dataProvider = "batch")
    public void testConcurrentConsumerReceiveWhileReconnect(int i) throws Exception {
        String str = "persistent://my-property/use/my-ns/my-topic-" + UUID.randomUUID().toString();
        final ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(UUID.randomUUID().toString()).startMessageIdInclusive().receiverQueueSize(100).subscribe();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        try {
            final CyclicBarrier cyclicBarrier = new CyclicBarrier(11);
            for (int i2 = 0; i2 < 10; i2++) {
                newCachedThreadPool.submit(new Callable<Void>() { // from class: org.apache.pulsar.client.api.v1.V1_ProducerConsumerTest.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        cyclicBarrier.await();
                        subscribe.receive();
                        return null;
                    }
                });
            }
            cyclicBarrier.await();
            Thread.sleep(100L);
            restartBroker();
            Thread.sleep(2000L);
            Producer create = this.pulsarClient.newProducer().topic(str).batchingMaxPublishDelay(BATCHING_MAX_PUBLISH_DELAY_THRESHOLD, TimeUnit.MILLISECONDS).batchingMaxMessages(5).enableBatching(i != 0).create();
            for (int i3 = 0; i3 < 100; i3++) {
                create.send(("my-message-" + i3).getBytes());
            }
            ConsumerImpl consumerImpl = subscribe;
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(consumerImpl.getAvailablePermits(), 10);
            });
            Assert.assertEquals(consumerImpl.numMessagesInQueue(), 90);
            cyclicBarrier.reset();
            for (int i4 = 0; i4 < 10; i4++) {
                newCachedThreadPool.submit(new Callable<Void>() { // from class: org.apache.pulsar.client.api.v1.V1_ProducerConsumerTest.2
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        cyclicBarrier.await();
                        subscribe.receive();
                        return null;
                    }
                });
            }
            cyclicBarrier.await();
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(consumerImpl.getAvailablePermits(), 20);
            });
            Assert.assertEquals(consumerImpl.numMessagesInQueue(), 80);
            do {
            } while (subscribe.receive(1, TimeUnit.SECONDS) != null);
            Assert.assertEquals(consumerImpl.getAvailablePermits(), 0);
            Assert.assertEquals(consumerImpl.numMessagesInQueue(), 0);
            cyclicBarrier.reset();
            for (int i5 = 0; i5 < 10; i5++) {
                newCachedThreadPool.submit(new Callable<Void>() { // from class: org.apache.pulsar.client.api.v1.V1_ProducerConsumerTest.3
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        cyclicBarrier.await();
                        subscribe.receive();
                        return null;
                    }
                });
            }
            cyclicBarrier.await();
            Thread.sleep(100L);
            restartBroker();
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(consumerImpl.getAvailablePermits(), 10);
            });
            Assert.assertEquals(consumerImpl.numMessagesInQueue(), 90);
            subscribe.close();
            if (Collections.singletonList(newCachedThreadPool).get(0) != null) {
                newCachedThreadPool.shutdownNow();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newCachedThreadPool).get(0) != null) {
                newCachedThreadPool.shutdownNow();
            }
            throw th;
        }
    }

    @Test
    public void testSendBigMessageSize() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/bigMsg").create();
        create.newMessage().value(new byte[5242880]);
        try {
            create.send(new byte[5242881]);
            Assert.fail("Should have thrown exception");
        } catch (PulsarClientException.InvalidMessageException e) {
        }
    }

    @Test(groups = {"quarantine"})
    public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        String str = "cache-topic-" + UUID.randomUUID().toString();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/" + str}).subscriptionName("faster-sub1").subscriptionType(SubscriptionType.Shared).receiverQueueSize(10).subscribe();
        String str2 = "persistent://my-property/use/my-ns/" + str;
        Producer create = this.pulsarClient.newProducer().topic(str2).enableBatching(true).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).batchingMaxMessages(5).create();
        ManagedLedgerImpl managedLedger = ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference(str2).get()).getManagedLedger();
        Field declaredField = ManagedLedgerImpl.class.getDeclaredField("entryCache");
        declaredField.setAccessible(true);
        Field declaredField2 = Field.class.getDeclaredField("modifiers");
        declaredField2.setAccessible(true);
        declaredField2.setInt(declaredField, declaredField.getModifiers() & (-17));
        EntryCacheImpl entryCacheImpl = (EntryCacheImpl) Mockito.spy((EntryCacheImpl) declaredField.get(managedLedger));
        declaredField.set(managedLedger, entryCacheImpl);
        for (int i = 0; i < 30; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        for (int i2 = 0; i2 < 30; i2++) {
            subscribe.acknowledge(subscribe.receive(5, TimeUnit.SECONDS));
        }
        ((EntryCacheImpl) Mockito.verify(entryCacheImpl, Mockito.atLeastOnce())).invalidateEntries((PositionImpl) Mockito.any());
        Thread.sleep(1000L);
        create.send("message".getBytes());
        subscribe.receive(5, TimeUnit.SECONDS);
        Consumer subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/" + str}).subscriptionName("slower-sub2").subscriptionType(SubscriptionType.Shared).subscribe();
        for (int i3 = 0; i3 < 20; i3++) {
            create.send(("my-message-" + i3).getBytes());
        }
        for (int i4 = 0; i4 < 20; i4++) {
            subscribe.acknowledge(subscribe.receive(5, TimeUnit.SECONDS));
        }
        Thread.sleep(1000L);
        create.send("message".getBytes());
        subscribe.receive(5, TimeUnit.SECONDS);
        retryStrategically(r6 -> {
            return entryCacheImpl.getSize() > 0;
        }, 10, 100L);
        Assert.assertTrue(entryCacheImpl.getSize() != 0);
        subscribe2.close();
        retryStrategically(r62 -> {
            return entryCacheImpl.getSize() == 0;
        }, 5, 100L);
        Assert.assertEquals(entryCacheImpl.getSize(), 0L);
        subscribe.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(timeOut = 2000)
    public void testAsyncProducerAndConsumer() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        HashSet newHashSet = Sets.newHashSet();
        HashSet newHashSet2 = Sets.newHashSet();
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
        for (int i = 0; i < 100; i++) {
            String str = "my-message-" + i;
            create.send(str.getBytes());
            newHashSet.add(str);
        }
        log.info(" start receiving messages :");
        CountDownLatch countDownLatch = new CountDownLatch(100);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        try {
            receiveAsync(subscribe, 100, 0, countDownLatch, newHashSet2, newFixedThreadPool);
            countDownLatch.await();
            Assert.assertEquals(newHashSet.size(), 100);
            newHashSet.removeAll(newHashSet2);
            Assert.assertTrue(newHashSet.isEmpty());
            create.close();
            subscribe.close();
            log.info("-- Exiting {} test --", this.methodName);
            if (Collections.singletonList(newFixedThreadPool).get(0) != null) {
                newFixedThreadPool.shutdownNow();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newFixedThreadPool).get(0) != null) {
                newFixedThreadPool.shutdownNow();
            }
            throw th;
        }
    }

    @Test(timeOut = 2000)
    public void testAsyncProducerAndConsumerWithZeroQueueSize() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        HashSet newHashSet = Sets.newHashSet();
        HashSet newHashSet2 = Sets.newHashSet();
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
        for (int i = 0; i < 100; i++) {
            String str = "my-message-" + i;
            create.send(str.getBytes());
            newHashSet.add(str);
        }
        log.info(" start receiving messages :");
        CountDownLatch countDownLatch = new CountDownLatch(100);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        try {
            receiveAsync(subscribe, 100, 0, countDownLatch, newHashSet2, newFixedThreadPool);
            countDownLatch.await();
            Assert.assertEquals(newHashSet.size(), 100);
            newHashSet.removeAll(newHashSet2);
            Assert.assertTrue(newHashSet.isEmpty());
            create.close();
            subscribe.close();
            log.info("-- Exiting {} test --", this.methodName);
            if (Collections.singletonList(newFixedThreadPool).get(0) != null) {
                newFixedThreadPool.shutdownNow();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newFixedThreadPool).get(0) != null) {
                newFixedThreadPool.shutdownNow();
            }
            throw th;
        }
    }

    @Test
    public void testSendCallBack() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(false).create();
        for (int i = 0; i < 100; i++) {
            String str = "my-message-" + i;
            AtomicInteger atomicInteger = new AtomicInteger();
            create.sendAsync(str.getBytes()).handle((messageId, th) -> {
                if (th != null) {
                    log.error("Message send failed:", th);
                    return null;
                }
                atomicInteger.set(str.length());
                return null;
            }).get();
            Assert.assertEquals(str.getBytes().length, atomicInteger.get());
        }
    }

    @Test(timeOut = 30000)
    public void testSharedConsumerAckDifferentConsumer() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        ConsumerBuilder receiverQueueSize = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).receiverQueueSize(1);
        Consumer subscribe = receiverQueueSize.subscribe();
        Consumer subscribe2 = receiverQueueSize.subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        HashSet newHashSet = Sets.newHashSet();
        HashSet newHashSet2 = Sets.newHashSet();
        for (int i2 = 0; i2 < 5; i2++) {
            newHashSet.add(subscribe.receive());
            newHashSet2.add(subscribe2.receive());
        }
        newHashSet.forEach(message -> {
            try {
                subscribe2.acknowledge(message);
            } catch (PulsarClientException e) {
                Assert.fail();
            }
        });
        newHashSet2.forEach(message2 -> {
            try {
                subscribe.acknowledge(message2);
            } catch (PulsarClientException e) {
                Assert.fail();
            }
        });
        subscribe.redeliverUnacknowledgedMessages();
        subscribe2.redeliverUnacknowledgedMessages();
        try {
            if (subscribe.receive(100, TimeUnit.MILLISECONDS) != null || subscribe2.receive(100, TimeUnit.MILLISECONDS) != null) {
                Assert.fail();
            }
            log.info("-- Exiting {} test --", this.methodName);
        } finally {
            subscribe.close();
            subscribe2.close();
        }
    }

    private void receiveAsync(Consumer<byte[]> consumer, int i, int i2, CountDownLatch countDownLatch, Set<String> set, ExecutorService executorService) throws PulsarClientException {
        if (i2 < i) {
            consumer.receiveAsync().handle((message, th) -> {
                if (th != null) {
                    return null;
                }
                set.add(new String(message.getData()));
                try {
                    consumer.acknowledge(message);
                } catch (PulsarClientException e) {
                    Assert.fail("message acknowledge failed", e);
                }
                executorService.execute(() -> {
                    try {
                        receiveAsync(consumer, i, i2 + 1, countDownLatch, set, executorService);
                    } catch (PulsarClientException e2) {
                        Assert.fail("message receive failed", e2);
                    }
                });
                countDownLatch.countDown();
                return null;
            });
        }
    }

    @Test
    public void testConsumerBlockingWithUnAckedMessages() throws Exception {
        Message receive;
        log.info("-- Starting {} test --", this.methodName);
        int maxUnackedMessagesPerConsumer = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            try {
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(500);
                Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
                Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic").create();
                for (int i = 0; i < 600; i++) {
                    create.send(("my-message-" + i).getBytes());
                }
                ArrayList newArrayList = Lists.newArrayList();
                for (int i2 = 0; i2 < 600 && (receive = subscribe.receive(1, TimeUnit.SECONDS)) != null; i2++) {
                    newArrayList.add(receive);
                    log.info("Received message: " + new String(receive.getData()));
                }
                Assert.assertEquals(newArrayList.size(), 500);
                newArrayList.forEach(message -> {
                    try {
                        subscribe.acknowledge(message);
                    } catch (PulsarClientException e) {
                        Assert.fail("ack failed", e);
                    }
                });
                int size = 600 - newArrayList.size();
                for (int i3 = 0; i3 < size; i3++) {
                    Message receive2 = subscribe.receive(1, TimeUnit.SECONDS);
                    if (receive2 != null) {
                        newArrayList.add(receive2);
                        log.info("Received message: " + new String(receive2.getData()));
                    }
                }
                Assert.assertEquals(600, newArrayList.size());
                create.close();
                subscribe.close();
                log.info("-- Exiting {} test --", this.methodName);
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            } catch (Exception e) {
                Assert.fail();
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            }
        } catch (Throwable th) {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            throw th;
        }
    }

    @Test
    public void testConsumerBlockingWithUnAckedMessagesMultipleIteration() throws Exception {
        Message receive;
        log.info("-- Starting {} test --", this.methodName);
        int maxUnackedMessagesPerConsumer = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            try {
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(500);
                Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
                Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic").create();
                for (int i = 0; i < 1500; i++) {
                    create.send(("my-message-" + i).getBytes());
                }
                int i2 = 0;
                for (int i3 = 0; i3 < 3; i3++) {
                    ArrayList newArrayList = Lists.newArrayList();
                    for (int i4 = 0; i4 < 1500 && (receive = subscribe.receive(1, TimeUnit.SECONDS)) != null; i4++) {
                        newArrayList.add(receive);
                        log.info("Received message: " + new String(receive.getData()));
                    }
                    Assert.assertEquals(newArrayList.size(), 500);
                    newArrayList.forEach(message -> {
                        try {
                            subscribe.acknowledge(message);
                        } catch (PulsarClientException e) {
                            Assert.fail("ack failed", e);
                        }
                    });
                    i2 += newArrayList.size();
                }
                Assert.assertEquals(i2, 1500);
                create.close();
                subscribe.close();
                log.info("-- Exiting {} test --", this.methodName);
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            } catch (Exception e) {
                Assert.fail();
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            }
        } catch (Throwable th) {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            throw th;
        }
    }

    @Test
    public void testMutlipleSharedConsumerBlockingWithUnAckedMessages() throws Exception {
        Message receive;
        Message receive2;
        Message receive3;
        log.info("-- Starting {} test --", this.methodName);
        int maxUnackedMessagesPerConsumer = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            try {
                int i = 0;
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(20);
                Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
                PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
                try {
                    Consumer subscribe2 = newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
                    Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic").create();
                    for (int i2 = 0; i2 < 100; i2++) {
                        create.send(("my-message-" + i2).getBytes());
                    }
                    ArrayList newArrayList = Lists.newArrayList();
                    for (int i3 = 0; i3 < 100 && (receive3 = subscribe.receive(1, TimeUnit.SECONDS)) != null; i3++) {
                        newArrayList.add(receive3);
                        i++;
                        log.info("Received message: " + new String(receive3.getData()));
                    }
                    Assert.assertEquals(newArrayList.size(), 20);
                    newArrayList.clear();
                    for (int i4 = 0; i4 < 80 && (receive2 = subscribe2.receive(1, TimeUnit.SECONDS)) != null; i4++) {
                        newArrayList.add(receive2);
                        i++;
                        log.info("Received message: " + new String(receive2.getData()));
                    }
                    Assert.assertEquals(newArrayList.size(), 20);
                    newArrayList.forEach(message -> {
                        try {
                            subscribe2.acknowledge(message);
                        } catch (PulsarClientException e) {
                            Assert.fail("shouldn't have failed ", e);
                        }
                    });
                    newArrayList.clear();
                    for (int i5 = 0; i5 < 60 && (receive = subscribe2.receive(1, TimeUnit.SECONDS)) != null; i5++) {
                        newArrayList.add(receive);
                        i++;
                        subscribe2.acknowledge(receive);
                        log.info("Received message: " + new String(receive.getData()));
                    }
                    Assert.assertEquals(100, i);
                    create.close();
                    subscribe.close();
                    subscribe2.close();
                    log.info("-- Exiting {} test --", this.methodName);
                    if (Collections.singletonList(newPulsarClient).get(0) != null) {
                        newPulsarClient.close();
                    }
                    this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
                } catch (Throwable th) {
                    if (Collections.singletonList(newPulsarClient).get(0) != null) {
                        newPulsarClient.close();
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
                throw th2;
            }
        } catch (Exception e) {
            Assert.fail();
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
        }
    }

    @Test
    public void testShouldNotBlockConsumerIfRedeliverBeforeReceive() throws Exception {
        Message receive;
        log.info("-- Starting {} test --", this.methodName);
        int maxUnackedMessagesPerConsumer = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        int i = 0;
        try {
            try {
                ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(20).ackTimeout(BATCHING_MAX_PUBLISH_DELAY_THRESHOLD, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Shared).subscribe();
                Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic").enableBatching(false).create();
                for (int i2 = 0; i2 < 100; i2++) {
                    create.send(("my-message-" + i2).getBytes());
                }
                Thread.sleep(1000L);
                Assert.assertEquals(subscribe.numMessagesInQueue(), 20);
                Thread.sleep(2000L);
                Assert.assertEquals(subscribe.numMessagesInQueue(), 20);
                for (int i3 = 0; i3 < 100 && (receive = subscribe.receive(1, TimeUnit.SECONDS)) != null; i3++) {
                    subscribe.acknowledge(receive);
                    i++;
                    log.info("Received message: " + new String(receive.getData()));
                }
                Assert.assertEquals(100, i);
                create.close();
                subscribe.close();
                log.info("-- Exiting {} test --", this.methodName);
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            } catch (Exception e) {
                Assert.fail();
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            }
        } catch (Throwable th) {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            throw th;
        }
    }

    @Test
    public void testUnackBlockRedeliverMessages() throws Exception {
        Message receive;
        Message receive2;
        log.info("-- Starting {} test --", this.methodName);
        int maxUnackedMessagesPerConsumer = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        int i = 0;
        try {
            try {
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(20);
                ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
                Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic").create();
                for (int i2 = 0; i2 < 100; i2++) {
                    create.send(("my-message-" + i2).getBytes());
                }
                ArrayList newArrayList = Lists.newArrayList();
                for (int i3 = 0; i3 < 100 && (receive2 = subscribe.receive(1, TimeUnit.SECONDS)) != null; i3++) {
                    newArrayList.add(receive2);
                    i++;
                    log.info("Received message: " + new String(receive2.getData()));
                }
                subscribe.redeliverUnacknowledgedMessages();
                Thread.sleep(1000L);
                int size = newArrayList.size();
                newArrayList.clear();
                for (int i4 = 0; i4 < 100 && (receive = subscribe.receive(1, TimeUnit.SECONDS)) != null; i4++) {
                    subscribe.acknowledge(receive);
                    i++;
                    log.info("Received message: " + new String(receive.getData()));
                }
                Assert.assertEquals(100 + size, i);
                create.close();
                subscribe.close();
                log.info("-- Exiting {} test --", this.methodName);
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            } catch (Exception e) {
                Assert.fail();
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            }
        } catch (Throwable th) {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            throw th;
        }
    }

    @Test(dataProvider = "batch")
    public void testUnackedBlockAtBatch(int i) throws Exception {
        Message receive;
        Message receive2;
        log.info("-- Starting {} test --", this.methodName);
        int maxUnackedMessagesPerConsumer = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            try {
                int i2 = 0;
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(20);
                Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).subscriptionName("subscriber-1").subscriptionType(SubscriptionType.Shared).receiverQueueSize(10).subscribe();
                Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic").enableBatching(i != 0).batchingMaxPublishDelay(BATCHING_MAX_PUBLISH_DELAY_THRESHOLD, TimeUnit.MILLISECONDS).batchingMaxMessages(5).create();
                ArrayList newArrayList = Lists.newArrayList();
                for (int i3 = 0; i3 < 100; i3++) {
                    newArrayList.add(create.sendAsync(("my-message-" + i3).getBytes()));
                }
                FutureUtil.waitForAll(newArrayList).get();
                ArrayList newArrayList2 = Lists.newArrayList();
                for (int i4 = 0; i4 < 100 && (receive2 = subscribe.receive(1, TimeUnit.SECONDS)) != null; i4++) {
                    newArrayList2.add(receive2);
                    i2++;
                    log.info("Received message: " + new String(receive2.getData()));
                }
                Assert.assertNotEquals(Integer.valueOf(newArrayList2.size()), 100);
                newArrayList2.forEach(message -> {
                    try {
                        subscribe.acknowledge(message);
                    } catch (PulsarClientException e) {
                        Assert.fail("shouldn't have failed ", e);
                    }
                });
                newArrayList2.clear();
                for (int i5 = 0; i5 < 100 && (receive = subscribe.receive(1, TimeUnit.SECONDS)) != null; i5++) {
                    newArrayList2.add(receive);
                    i2++;
                    subscribe.acknowledge(receive);
                    log.info("Received message: " + new String(receive.getData()));
                }
                Assert.assertEquals(100, i2);
                create.close();
                subscribe.close();
                log.info("-- Exiting {} test --", this.methodName);
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            } catch (Exception e) {
                Assert.fail();
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            }
        } catch (Throwable th) {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            throw th;
        }
    }

    @Test
    public void testBlockUnackConsumerAckByDifferentConsumer() throws Exception {
        Message receive;
        Message receive2;
        Message receive3;
        log.info("-- Starting {} test --", this.methodName);
        int maxUnackedMessagesPerConsumer = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            try {
                int i = 0;
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(20);
                ConsumerBuilder subscriptionType = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared);
                Consumer subscribe = subscriptionType.subscribe();
                Consumer subscribe2 = subscriptionType.subscribe();
                Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic").create();
                for (int i2 = 0; i2 < 100; i2++) {
                    create.send(("my-message-" + i2).getBytes());
                }
                ArrayList newArrayList = Lists.newArrayList();
                for (int i3 = 0; i3 < 100 && (receive3 = subscribe.receive(1, TimeUnit.SECONDS)) != null; i3++) {
                    newArrayList.add(receive3);
                    i++;
                    log.info("Received message: " + new String(receive3.getData()));
                }
                Assert.assertEquals(newArrayList.size(), 20);
                newArrayList.forEach(message -> {
                    try {
                        subscribe2.acknowledge(message);
                    } catch (PulsarClientException e) {
                        Assert.fail("shouldn't have failed ", e);
                    }
                });
                for (int i4 = 0; i4 < 100 && (receive2 = subscribe.receive(1, TimeUnit.SECONDS)) != null; i4++) {
                    i++;
                    subscribe2.acknowledge(receive2);
                    log.info("Received message: " + new String(receive2.getData()));
                }
                for (int i5 = 0; i5 < 100 && (receive = subscribe2.receive(1, TimeUnit.SECONDS)) != null; i5++) {
                    i++;
                    log.info("Received message: " + new String(receive.getData()));
                }
                Assert.assertEquals(100, i);
                create.close();
                subscribe.close();
                subscribe2.close();
                log.info("-- Exiting {} test --", this.methodName);
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            } catch (Exception e) {
                Assert.fail();
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            }
        } catch (Throwable th) {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            throw th;
        }
    }

    @Test
    public void testEnabledChecksumClient() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(true).batchingMaxPublishDelay(300L, TimeUnit.MILLISECONDS).batchingMaxMessages(5).create();
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Message message = null;
        HashSet newHashSet = Sets.newHashSet();
        for (int i2 = 0; i2 < 10; i2++) {
            message = subscribe.receive(5, TimeUnit.SECONDS);
            String str = new String(message.getData());
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i2);
        }
        subscribe.acknowledgeCumulative(message);
        subscribe.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test
    public void testBlockUnackedConsumerRedeliverySpecificMessagesProduceWithPause() throws Exception {
        Message receive;
        Message receive2;
        log.info("-- Starting {} test --", this.methodName);
        int maxUnackedMessagesPerConsumer = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            try {
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(10);
                ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).subscriptionName("subscriber-1").subscriptionType(SubscriptionType.Shared).receiverQueueSize(20).subscribe();
                Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic").create();
                for (int i = 0; i < 20; i++) {
                    create.send(("my-message-" + i).getBytes());
                    Thread.sleep(10L);
                }
                ArrayList newArrayList = Lists.newArrayList();
                for (int i2 = 0; i2 < 20 && (receive2 = subscribe.receive(1, TimeUnit.SECONDS)) != null; i2++) {
                    newArrayList.add(receive2);
                    log.info("Received message: " + new String(receive2.getData()));
                }
                Assert.assertEquals(newArrayList.size(), 10);
                Set set = (Set) newArrayList.stream().map(message -> {
                    return message.getMessageId();
                }).collect(Collectors.toSet());
                subscribe.redeliverUnacknowledgedMessages(Sets.newHashSet(set));
                Thread.sleep(1000L);
                HashSet newHashSet = Sets.newHashSet();
                for (int i3 = 0; i3 < 20 && (receive = subscribe.receive(1, TimeUnit.SECONDS)) != null; i3++) {
                    newHashSet.add(receive.getMessageId());
                    log.info("Received message: " + new String(receive.getData()));
                }
                Assert.assertEquals(newArrayList.size(), newHashSet.size());
                newHashSet.removeAll(set);
                Assert.assertEquals(newHashSet.size(), 0);
                create.close();
                subscribe.close();
                log.info("-- Exiting {} test --", this.methodName);
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            } catch (Exception e) {
                Assert.fail();
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            }
        } catch (Throwable th) {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            throw th;
        }
    }

    @Test
    public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhileProduce() throws Exception {
        Message receive;
        Message receive2;
        log.info("-- Starting {} test --", this.methodName);
        int maxUnackedMessagesPerConsumer = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            try {
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(10);
                this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(20).subscriptionType(SubscriptionType.Shared).subscribe().close();
                Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic").create();
                for (int i = 0; i < 50; i++) {
                    create.send(("my-message-" + i).getBytes());
                    Thread.sleep(10L);
                }
                ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).receiverQueueSize(20).subscriptionName("subscriber-1").subscriptionType(SubscriptionType.Shared).subscribe();
                ArrayList newArrayList = Lists.newArrayList();
                for (int i2 = 0; i2 < 50 && (receive2 = subscribe.receive(1, TimeUnit.SECONDS)) != null; i2++) {
                    newArrayList.add(receive2);
                    log.info("Received message: " + new String(receive2.getData()));
                }
                Assert.assertEquals(newArrayList.size(), 20);
                Set set = (Set) newArrayList.stream().map(message -> {
                    return message.getMessageId();
                }).collect(Collectors.toSet());
                subscribe.redeliverUnacknowledgedMessages(Sets.newHashSet(set));
                Thread.sleep(1000L);
                HashSet newHashSet = Sets.newHashSet();
                for (int i3 = 0; i3 < 50 && (receive = subscribe.receive(1, TimeUnit.SECONDS)) != null; i3++) {
                    newHashSet.add(receive.getMessageId());
                    log.info("Received message: " + new String(receive.getData()));
                }
                Assert.assertEquals(newArrayList.size(), newHashSet.size());
                newHashSet.removeAll(set);
                Assert.assertEquals(newHashSet.size(), 0);
                create.close();
                subscribe.close();
                log.info("-- Exiting {} test --", this.methodName);
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            } catch (Exception e) {
                Assert.fail();
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            }
        } catch (Throwable th) {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
            throw th;
        }
    }

    @Test
    public void testPriorityConsumer() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Consumer subscribe = newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).priorityLevel(1).receiverQueueSize(5).subscribe();
            newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
            try {
                Consumer subscribe2 = newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).priorityLevel(1).receiverQueueSize(5).subscribe();
                PulsarClient newPulsarClient2 = newPulsarClient(this.lookupUrl.toString(), 0);
                try {
                    Consumer subscribe3 = newPulsarClient2.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).priorityLevel(1).receiverQueueSize(5).subscribe();
                    PulsarClient newPulsarClient3 = newPulsarClient(this.lookupUrl.toString(), 0);
                    try {
                        Consumer subscribe4 = newPulsarClient3.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).priorityLevel(2).receiverQueueSize(5).subscribe();
                        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
                        ArrayList newArrayList = Lists.newArrayList();
                        for (int i = 0; i < 15; i++) {
                            newArrayList.add(create.sendAsync(("my-message-" + i).getBytes()));
                        }
                        log.info("Waiting for async publish to complete");
                        Iterator it = newArrayList.iterator();
                        while (it.hasNext()) {
                            ((Future) it.next()).get();
                        }
                        for (int i2 = 0; i2 < 20; i2++) {
                            subscribe.receive(100, TimeUnit.MILLISECONDS);
                            subscribe2.receive(100, TimeUnit.MILLISECONDS);
                        }
                        for (int i3 = 0; i3 < 5; i3++) {
                            newArrayList.add(create.sendAsync(("my-message-" + i3).getBytes()));
                        }
                        Assert.assertNull(subscribe4.receive(100, TimeUnit.MILLISECONDS));
                        create.close();
                        subscribe.close();
                        subscribe2.close();
                        subscribe3.close();
                        subscribe4.close();
                        log.info("-- Exiting {} test --", this.methodName);
                        if (Collections.singletonList(newPulsarClient3).get(0) != null) {
                            newPulsarClient3.close();
                        }
                        if (Collections.singletonList(newPulsarClient2).get(0) != null) {
                            newPulsarClient2.close();
                        }
                        if (Collections.singletonList(newPulsarClient).get(0) != null) {
                            newPulsarClient.close();
                        }
                    } finally {
                        if (Collections.singletonList(newPulsarClient3).get(0) != null) {
                            newPulsarClient3.close();
                        }
                    }
                } finally {
                    if (Collections.singletonList(newPulsarClient2).get(0) != null) {
                        newPulsarClient2.close();
                    }
                }
            } finally {
                if (Collections.singletonList(newPulsarClient).get(0) != null) {
                    newPulsarClient.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
            throw th;
        }
    }

    @Test
    public void testSharedSamePriorityConsumer() throws Exception {
        Message receive;
        Message receive2;
        Message receive3;
        Message receive4;
        Message receive5;
        log.info("-- Starting {} test --", this.methodName);
        int maxConcurrentLookupRequest = this.pulsar.getConfiguration().getMaxConcurrentLookupRequest();
        this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(5);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).subscribe();
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Consumer subscribe2 = newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).subscribe();
            Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").enableBatching(false).create();
            ArrayList newArrayList = Lists.newArrayList();
            for (int i = 0; i < 500; i++) {
                newArrayList.add(create.sendAsync(("my-message-" + i).getBytes()));
            }
            log.info("Waiting for async publish to complete");
            Iterator it = newArrayList.iterator();
            while (it.hasNext()) {
                ((Future) it.next()).get();
            }
            ArrayList newArrayList2 = Lists.newArrayList();
            for (int i2 = 0; i2 < 500 && (receive5 = subscribe.receive(500, TimeUnit.MILLISECONDS)) != null; i2++) {
                newArrayList2.add(receive5);
            }
            for (int i3 = 0; i3 < 500 && (receive4 = subscribe2.receive(500, TimeUnit.MILLISECONDS)) != null; i3++) {
                newArrayList2.add(receive4);
            }
            Assert.assertEquals(10, newArrayList2.size());
            newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
            try {
                Consumer subscribe3 = newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).subscribe();
                PulsarClient newPulsarClient2 = newPulsarClient(this.lookupUrl.toString(), 0);
                try {
                    Consumer subscribe4 = newPulsarClient2.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).subscribe();
                    PulsarClient newPulsarClient3 = newPulsarClient(this.lookupUrl.toString(), 0);
                    try {
                        Consumer subscribe5 = newPulsarClient3.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).subscribe();
                        for (int i4 = 0; i4 < 500 && (receive3 = subscribe4.receive(500, TimeUnit.MILLISECONDS)) != null; i4++) {
                            newArrayList2.add(receive3);
                        }
                        for (int i5 = 0; i5 < 500 && (receive2 = subscribe5.receive(500, TimeUnit.MILLISECONDS)) != null; i5++) {
                            newArrayList2.add(receive2);
                        }
                        for (int i6 = 0; i6 < 500 && (receive = subscribe3.receive(500, TimeUnit.MILLISECONDS)) != null; i6++) {
                            newArrayList2.add(receive);
                            subscribe3.acknowledge(receive);
                        }
                        Assert.assertEquals(newArrayList2.size(), 500);
                        create.close();
                        subscribe.close();
                        subscribe2.close();
                        subscribe3.close();
                        subscribe4.close();
                        subscribe5.close();
                        this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxConcurrentLookupRequest);
                        log.info("-- Exiting {} test --", this.methodName);
                        if (Collections.singletonList(newPulsarClient3).get(0) != null) {
                            newPulsarClient3.close();
                        }
                        if (Collections.singletonList(newPulsarClient2).get(0) != null) {
                            newPulsarClient2.close();
                        }
                        if (Collections.singletonList(newPulsarClient).get(0) != null) {
                            newPulsarClient.close();
                        }
                    } finally {
                        if (Collections.singletonList(newPulsarClient3).get(0) != null) {
                            newPulsarClient3.close();
                        }
                    }
                } finally {
                    if (Collections.singletonList(newPulsarClient2).get(0) != null) {
                        newPulsarClient2.close();
                    }
                }
            } finally {
                if (Collections.singletonList(newPulsarClient).get(0) != null) {
                    newPulsarClient.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
            throw th;
        }
    }

    @Test
    public void testRedeliveryFailOverConsumer() throws Exception {
        Message receive;
        Message receive2;
        Message receive3;
        log.info("-- Starting {} test --", this.methodName);
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Failover).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic").create();
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
            Thread.sleep(10L);
        }
        ArrayList newArrayList = Lists.newArrayList();
        for (int i2 = 0; i2 < 4 && (receive3 = subscribe.receive(1, TimeUnit.SECONDS)) != null; i2++) {
            newArrayList.add(receive3);
            subscribe.acknowledge(receive3);
            log.info("Received message: " + new String(receive3.getData()));
        }
        Assert.assertEquals(newArrayList.size(), 4);
        subscribe.redeliverUnacknowledgedMessages();
        newArrayList.clear();
        for (int i3 = 0; i3 < 4 && (receive2 = subscribe.receive(1, TimeUnit.SECONDS)) != null; i3++) {
            newArrayList.add(receive2);
            subscribe.acknowledge(receive2);
            log.info("Received message: " + new String(receive2.getData()));
        }
        Assert.assertEquals(newArrayList.size(), 4);
        subscribe.redeliverUnacknowledgedMessages();
        for (int i4 = 0; i4 < 10; i4++) {
            create.send(("my-message-" + i4).getBytes());
            Thread.sleep(100L);
        }
        int i5 = 20 - (2 * 4);
        newArrayList.clear();
        for (int i6 = 0; i6 < i5 && (receive = subscribe.receive(1, TimeUnit.SECONDS)) != null; i6++) {
            newArrayList.add(receive);
            subscribe.acknowledge(receive);
            log.info("Received message: " + new String(receive.getData()));
        }
        Assert.assertEquals(newArrayList.size(), i5);
        create.close();
        subscribe.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(timeOut = 5000)
    public void testFailReceiveAsyncOnConsumerClose() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/failAsyncReceive-1"}).subscriptionName("my-subscriber-name").subscribe();
        subscribe.close();
        try {
            subscribe.receiveAsync().get(BATCHING_MAX_PUBLISH_DELAY_THRESHOLD, TimeUnit.SECONDS);
            Assert.fail("it should have failed because consumer is already closed");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof PulsarClientException.AlreadyClosedException);
        }
        TopicName topicName = TopicName.get("persistent://my-property/use/my-ns/failAsyncReceive-2");
        this.admin.topics().createPartitionedTopic(topicName.toString(), 4);
        Consumer subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-partitioned-subscriber").subscribe();
        subscribe2.close();
        try {
            subscribe2.receiveAsync().get(BATCHING_MAX_PUBLISH_DELAY_THRESHOLD, TimeUnit.SECONDS);
            Assert.fail("it should have failed because consumer is already closed");
        } catch (ExecutionException e2) {
            Assert.assertTrue(e2.getCause() instanceof PulsarClientException.AlreadyClosedException);
        }
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test
    public void testECDSAEncryption() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        HashSet newHashSet = Sets.newHashSet();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/myecdsa-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).cryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.client.api.v1.V1_ProducerConsumerTest.1EncKeyReader
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            public EncryptionKeyInfo getPublicKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/public-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }

            public EncryptionKeyInfo getPrivateKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/private-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }
        }).subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/myecdsa-topic1").addEncryptionKey("client-ecdsa.pem").cryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.client.api.v1.V1_ProducerConsumerTest.1EncKeyReader
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            public EncryptionKeyInfo getPublicKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/public-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }

            public EncryptionKeyInfo getPrivateKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/private-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }
        }).create();
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Message message = null;
        for (int i2 = 0; i2 < 10; i2++) {
            message = subscribe.receive(5, TimeUnit.SECONDS);
            String str = new String(message.getData());
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i2);
        }
        subscribe.acknowledgeCumulative(message);
        subscribe.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test
    public void testRSAEncryption() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        HashSet newHashSet = Sets.newHashSet();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/myrsa-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).cryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.client.api.v1.V1_ProducerConsumerTest.2EncKeyReader
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            public EncryptionKeyInfo getPublicKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/public-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }

            public EncryptionKeyInfo getPrivateKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/private-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }
        }).subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/myrsa-topic1").addEncryptionKey("client-rsa.pem").cryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.client.api.v1.V1_ProducerConsumerTest.2EncKeyReader
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            public EncryptionKeyInfo getPublicKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/public-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }

            public EncryptionKeyInfo getPrivateKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/private-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }
        }).create();
        Producer create2 = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/myrsa-topic1").addEncryptionKey("client-rsa.pem").cryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.client.api.v1.V1_ProducerConsumerTest.2EncKeyReader
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            public EncryptionKeyInfo getPublicKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/public-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }

            public EncryptionKeyInfo getPrivateKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/private-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }
        }).create();
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        for (int i2 = 10; i2 < 20; i2++) {
            create2.send(("my-message-" + i2).getBytes());
        }
        Message message = null;
        for (int i3 = 0; i3 < 20; i3++) {
            message = subscribe.receive(5, TimeUnit.SECONDS);
            String str = new String(message.getData());
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i3);
        }
        subscribe.acknowledgeCumulative(message);
        subscribe.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test
    public void testEncryptionFailure() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        HashSet newHashSet = Sets.newHashSet();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/use/my-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
        try {
            this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/use/myenc-ns/myenc-topic1").enableBatching(false).addEncryptionKey("client-non-existant-rsa.pem").cryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.client.api.v1.V1_ProducerConsumerTest.3EncKeyReader
                EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

                public EncryptionKeyInfo getPublicKey(String str, Map<String, String> map) {
                    String str2 = "./src/test/resources/certificate/public-key." + str;
                    if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                        return null;
                    }
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                        return this.keyInfo;
                    } catch (IOException e) {
                        V1_ProducerConsumerTest.log.error("Failed to read certificate from {}", str2);
                        return null;
                    }
                }

                public EncryptionKeyInfo getPrivateKey(String str, Map<String, String> map) {
                    String str2 = "./src/test/resources/certificate/private-key." + str;
                    if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                        return null;
                    }
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                        return this.keyInfo;
                    } catch (IOException e) {
                        V1_ProducerConsumerTest.log.error("Failed to read certificate from {}", str2);
                        return null;
                    }
                }
            }).create();
            Assert.fail("Producer creation should not suceed if failing to read key");
        } catch (Exception e) {
        }
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/use/my-ns/myenc-topic1").enableBatching(false).addEncryptionKey("client-rsa.pem").cryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.client.api.v1.V1_ProducerConsumerTest.3EncKeyReader
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            public EncryptionKeyInfo getPublicKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/public-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e2) {
                    V1_ProducerConsumerTest.log.error("Failed to read certificate from {}", str2);
                    return null;
                }
            }

            public EncryptionKeyInfo getPrivateKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/private-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e2) {
                    V1_ProducerConsumerTest.log.error("Failed to read certificate from {}", str2);
                    return null;
                }
            }
        }).create();
        for (int i = 0; i < 10; i++) {
            create.send("my-message-" + i);
        }
        Message receive = subscribe.receive(5, TimeUnit.SECONDS);
        Assert.assertNull(receive, "Receive should have failed with no keyreader");
        subscribe.close();
        Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/use/my-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME).subscribe();
        int i2 = 0;
        try {
            receive = subscribe2.receive(5, TimeUnit.SECONDS);
            String str = (String) receive.getValue();
            i2 = 0 + 1;
            String str2 = "my-message-0";
            Assert.assertNotEquals(str, str2, "Received encrypted message " + str + " should not match the expected message " + str2);
            subscribe2.acknowledgeCumulative(receive);
        } catch (Exception e2) {
            Assert.fail("Failed to receive message even aftet ConsumerCryptoFailureAction.CONSUME is set.");
        }
        subscribe2.close();
        Consumer subscribe3 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/use/my-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).cryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.client.api.v1.V1_ProducerConsumerTest.3EncKeyReader
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            public EncryptionKeyInfo getPublicKey(String str3, Map<String, String> map) {
                String str22 = "./src/test/resources/certificate/public-key." + str3;
                if (!Files.isReadable(Paths.get(str22, new String[0]))) {
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str22, new String[0])));
                    return this.keyInfo;
                } catch (IOException e22) {
                    V1_ProducerConsumerTest.log.error("Failed to read certificate from {}", str22);
                    return null;
                }
            }

            public EncryptionKeyInfo getPrivateKey(String str3, Map<String, String> map) {
                String str22 = "./src/test/resources/certificate/private-key." + str3;
                if (!Files.isReadable(Paths.get(str22, new String[0]))) {
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str22, new String[0])));
                    return this.keyInfo;
                } catch (IOException e22) {
                    V1_ProducerConsumerTest.log.error("Failed to read certificate from {}", str22);
                    return null;
                }
            }
        }).cryptoFailureAction(ConsumerCryptoFailureAction.FAIL).subscribe();
        for (int i3 = i2; i3 < 9; i3++) {
            receive = subscribe3.receive(5, TimeUnit.SECONDS);
            String str3 = (String) receive.getValue();
            log.debug("Received message: [{}]", str3);
            testMessageOrderAndDuplicates(newHashSet, str3, "my-message-" + i3);
        }
        subscribe3.acknowledgeCumulative(receive);
        subscribe3.close();
        subscribe3.close();
        Assert.assertNull(this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/use/my-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD).subscribe().receive(5, TimeUnit.SECONDS), "Message received even after ConsumerCryptoFailureAction.DISCARD is set.");
        log.info("-- Exiting {} test --", this.methodName);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -705350206:
                if (implMethodName.equals("lambda$testMessageListener$a07a3e3e$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case SHARED_VALUE:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/CountDownLatch;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    CountDownLatch countDownLatch = (CountDownLatch) serializedLambda.getCapturedArg(0);
                    return (consumer, message) -> {
                        Assert.assertNotNull(message, "Message cannot be null");
                        log.debug("Received message [{}] in the listener", new String(message.getData()));
                        consumer.acknowledgeAsync(message);
                        countDownLatch.countDown();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
