package org.apache.pulsar.client.impl;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.math3.dfp.Dfp;
import org.apache.commons.math3.optimization.direct.CMAESOptimizer;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.propertyeditors.CustomBooleanEditor;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker-impl"})
/* loaded from: input_file:org/apache/pulsar/client/impl/ReaderTest.class */
public class ReaderTest extends MockedPulsarServiceBaseTest {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ReaderTest.class);
    private static final String subscription = "reader-sub";

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("my-property", new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
        this.admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    private Set<String> publishMessages(String str, int i, boolean z) throws Exception {
        HashSet hashSet = new HashSet();
        ProducerBuilder<byte[]> newProducer = this.pulsarClient.newProducer();
        newProducer.messageRoutingMode(MessageRoutingMode.SinglePartition);
        newProducer.maxPendingMessages(i);
        newProducer.batchingMaxPublishDelay(1L, TimeUnit.DAYS);
        newProducer.topic(str);
        if (z) {
            newProducer.enableBatching(true);
            newProducer.batchingMaxMessages(i);
        } else {
            newProducer.enableBatching(false);
        }
        Producer<byte[]> create = newProducer.create();
        CompletableFuture<MessageId> completableFuture = null;
        for (int i2 = 0; i2 < i; i2++) {
            try {
                String str2 = "key" + i2;
                completableFuture = create.newMessage().key(str2).value(("my-message-" + i2).getBytes()).sendAsync();
                hashSet.add(str2);
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        create.flush();
        completableFuture.get();
        if (create != null) {
            create.close();
        }
        return hashSet;
    }

    @Test
    public void testReadMessageWithoutBatching() throws Exception {
        testReadMessages("persistent://my-property/my-ns/my-reader-topic", false);
    }

    @Test
    public void testReadMessageWithoutBatchingWithMessageInclusive() throws Exception {
        Set<String> publishMessages = publishMessages("persistent://my-property/my-ns/my-reader-topic-inclusive", 10, false);
        Reader<byte[]> create = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/my-reader-topic-inclusive").startMessageId(MessageId.latest).startMessageIdInclusive().readerName(subscription).create();
        Assert.assertTrue(create.hasMessageAvailable());
        Assert.assertTrue(publishMessages.remove(create.readNext().getKey()));
        Assert.assertFalse(create.hasMessageAvailable());
    }

    @Test
    public void testReadMessageWithBatching() throws Exception {
        testReadMessages("persistent://my-property/my-ns/my-reader-topic-with-batching", true);
    }

    @Test
    public void testReadMessageWithBatchingWithMessageInclusive() throws Exception {
        Set<String> publishMessages = publishMessages("persistent://my-property/my-ns/my-reader-topic-with-batching-inclusive", 10, true);
        Reader<byte[]> create = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/my-reader-topic-with-batching-inclusive").startMessageId(MessageId.latest).startMessageIdInclusive().readerName(subscription).create();
        while (create.hasMessageAvailable()) {
            Assert.assertTrue(publishMessages.remove(create.readNext().getKey()));
        }
        Assert.assertEquals(publishMessages.size(), 9);
        Assert.assertFalse(publishMessages.contains("key9"));
        Assert.assertFalse(create.hasMessageAvailable());
    }

    private void testReadMessages(String str, boolean z) throws Exception {
        Set<String> publishMessages = publishMessages(str, 10, z);
        Reader<byte[]> create = this.pulsarClient.newReader().topic(str).startMessageId(MessageId.earliest).readerName(subscription).create();
        while (create.hasMessageAvailable()) {
            Assert.assertTrue(publishMessages.remove(create.readNext().getKey()));
        }
        Assert.assertTrue(publishMessages.isEmpty());
        Assert.assertFalse(this.pulsarClient.newReader().topic(str).startMessageId(MessageId.latest).readerName("reader-sublatest").create().hasMessageAvailable());
    }

    @Test
    public void testMultiTopicSeekByFunction() throws Exception {
        String str = "persistent://my-property/my-ns/test" + UUID.randomUUID();
        publishMessages(str, 10, false);
        Reader create = this.pulsarClient.newReader().startMessageIdInclusive().startMessageId(MessageId.latest).topic(str).subscriptionName("my-sub").create();
        long currentTimeMillis = System.currentTimeMillis();
        create.seek(str2 -> {
            return Long.valueOf(currentTimeMillis);
        });
        Assert.assertNull(create.readNext(1, TimeUnit.SECONDS));
        create.seek(str3 -> {
            Assert.assertFalse(TopicName.get(str3).isPartitioned());
            return Long.valueOf(currentTimeMillis - 999999);
        });
        int i = 0;
        while (create.readNext(1, TimeUnit.SECONDS) != null) {
            i++;
        }
        Assert.assertEquals(i, 10);
        create.seek(str4 -> {
            Assert.assertFalse(TopicName.get(str4).isPartitioned());
            return MessageId.earliest;
        });
        int i2 = 0;
        while (create.readNext(1, TimeUnit.SECONDS) != null) {
            i2++;
        }
        Assert.assertEquals(i2, 10);
    }

    @Test
    public void testReadFromPartition() throws Exception {
        String str = "persistent://my-property/my-ns/testReadFromPartition-partition-0";
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/testReadFromPartition", 4);
        Set<String> publishMessages = publishMessages(str, 10, false);
        Reader<byte[]> create = this.pulsarClient.newReader().topic(str).startMessageId(MessageId.earliest).create();
        while (create.hasMessageAvailable()) {
            Assert.assertTrue(publishMessages.remove(create.readNext().getKey()));
        }
        Assert.assertTrue(publishMessages.isEmpty());
    }

    @Test
    public void testReaderWithTimeLong() throws Exception {
        Message<byte[]> readNext;
        String str = "persistent://my-property/my-ns/testReadFromPartition";
        this.admin.namespaces().setRetention("my-property/my-ns", new RetentionPolicies(-1, -1));
        ProducerBuilder<byte[]> newProducer = this.pulsarClient.newProducer();
        newProducer.topic(str);
        newProducer.enableBatching(false);
        Producer<byte[]> create = newProducer.create();
        long currentTimeMillis = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(5L);
        for (int i = 0; i < 10; i++) {
            TypedMessageBuilderImpl typedMessageBuilderImpl = (TypedMessageBuilderImpl) create.newMessage().value(("old" + i).getBytes());
            typedMessageBuilderImpl.getMetadataBuilder().setPublishTime(currentTimeMillis).setSequenceId(i).setProducerName(create.getProducerName()).setReplicatedFrom("us-west1");
            typedMessageBuilderImpl.send();
        }
        long currentTimeMillis2 = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1L);
        MessageId messageId = null;
        for (int i2 = 0; i2 < 10; i2++) {
            TypedMessageBuilderImpl typedMessageBuilderImpl2 = (TypedMessageBuilderImpl) create.newMessage().value(("new" + i2).getBytes());
            typedMessageBuilderImpl2.getMetadataBuilder().setPublishTime(currentTimeMillis2).setProducerName(create.getProducerName()).setReplicatedFrom("us-west1");
            MessageId send = typedMessageBuilderImpl2.send();
            if (messageId == null) {
                messageId = send;
            }
        }
        Reader<byte[]> create2 = this.pulsarClient.newReader().topic(str).startMessageFromRollbackDuration(2L, TimeUnit.HOURS).create();
        ArrayList newArrayList = Lists.newArrayList();
        while (create2.hasMessageAvailable() && (readNext = create2.readNext(1, TimeUnit.SECONDS)) != null) {
            System.out.println("msg.getMessageId()=" + readNext.getMessageId() + ", data=" + new String(readNext.getData()));
            newArrayList.add(readNext.getMessageId());
        }
        Assert.assertEquals(newArrayList.size(), 10);
        Assert.assertEquals(newArrayList.get(0), messageId);
        restartBroker();
        Assert.assertFalse(create2.hasMessageAvailable());
    }

    @Test
    public void testRemoveSubscriptionForReaderNeedRemoveCursor() throws IOException, PulsarAdminException {
        Reader<byte[]> create = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor").startMessageId(MessageId.earliest).create();
        try {
            create = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor").startMessageId(MessageId.earliest).create();
            try {
                Assert.assertEquals(this.admin.topics().getStats("persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor").getSubscriptions().size(), 2);
                Assert.assertEquals(this.admin.topics().getInternalStats("persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor", false).cursors.size(), 2);
                create.close();
                Assert.assertEquals(this.admin.topics().getStats("persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor").getSubscriptions().size(), 1);
                Assert.assertEquals(this.admin.topics().getInternalStats("persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor", false).cursors.size(), 1);
                create.close();
                Assert.assertEquals(this.admin.topics().getStats("persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor").getSubscriptions().size(), 0);
                Assert.assertEquals(this.admin.topics().getInternalStats("persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor", false).cursors.size(), 0);
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } finally {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testReaderHasMessageAvailable() throws Exception {
        Reader create = this.pulsarClient.newReader(Schema.STRING).topic("persistent://my-property/my-ns/testReaderHasMessageAvailable" + System.currentTimeMillis()).startMessageId(MessageId.latest).startMessageIdInclusive().create();
        try {
            Assert.assertFalse(create.hasMessageAvailable());
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testKeyHashRangeReader() throws IOException {
        Message readNext;
        List<String> asList = Arrays.asList(CustomBooleanEditor.VALUE_0, CustomBooleanEditor.VALUE_1, "2", "3", "4", "5", "6", "7", "8", "9");
        try {
            this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testKeyHashRangeReader").startMessageId(MessageId.earliest).keyHashRange(Range.of(0, Dfp.RADIX), Range.of(8000, 12000)).create();
            Assert.fail("should failed with unexpected key hash range");
        } catch (IllegalArgumentException e) {
            log.error("Create key hash range failed", (Throwable) e);
        }
        try {
            this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testKeyHashRangeReader").startMessageId(MessageId.earliest).keyHashRange(Range.of(CMAESOptimizer.DEFAULT_MAXITERATIONS, 20000)).create();
            Assert.fail("should failed with unexpected key hash range");
        } catch (IllegalArgumentException e2) {
            log.error("Create key hash range failed", (Throwable) e2);
        }
        try {
            this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testKeyHashRangeReader").startMessageId(MessageId.earliest).keyHashRange(Range.of(80000, 90000)).create();
            Assert.fail("should failed with unexpected key hash range");
        } catch (IllegalArgumentException e3) {
            log.error("Create key hash range failed", (Throwable) e3);
        }
        Reader create = this.pulsarClient.newReader(Schema.STRING).topic("persistent://my-property/my-ns/testKeyHashRangeReader").startMessageId(MessageId.earliest).keyHashRange(Range.of(0, 32768)).create();
        try {
            Producer create2 = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/testKeyHashRangeReader").enableBatching(false).create();
            try {
                int i = 0;
                for (String str : asList) {
                    int makeHash = org.apache.pulsar.common.util.Murmur3_32Hash.getInstance().makeHash(str.getBytes()) % 65536;
                    if (makeHash <= 32768) {
                        i++;
                    }
                    create2.newMessage().key(str).value(str).send();
                    log.info("Publish message to slot {}", Integer.valueOf(makeHash));
                }
                ArrayList<String> arrayList = new ArrayList();
                do {
                    readNext = create.readNext(1, TimeUnit.SECONDS);
                    if (readNext != null) {
                        arrayList.add((String) readNext.getValue());
                    }
                } while (readNext != null);
                Assert.assertTrue(i > 0);
                Assert.assertEquals(arrayList.size(), i);
                for (String str2 : arrayList) {
                    log.info("Receive message {}", str2);
                    Assert.assertTrue(Integer.parseInt(str2) <= 32768);
                }
                if (Collections.singletonList(create2).get(0) != null) {
                    create2.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create2).get(0) != null) {
                    create2.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testReaderSubName() throws Exception {
        doTestReaderSubName(true);
        doTestReaderSubName(false);
    }

    private void doTestReaderSubName(boolean z) throws Exception {
        ReaderBuilder startMessageId = this.pulsarClient.newReader(Schema.STRING).subscriptionName("my-sub-name").topic("persistent://my-property/my-ns/testReaderSubName" + System.currentTimeMillis()).startMessageId(MessageId.earliest);
        if (z) {
            startMessageId = startMessageId.subscriptionRolePrefix("my-sub-name" + System.currentTimeMillis());
        }
        Reader create = startMessageId.create();
        Assert.assertEquals(((ReaderImpl) create).getConsumer().getSubscription(), "my-sub-name");
        create.close();
        String str = "persistent://my-property/my-ns/testReaderSubName2" + System.currentTimeMillis();
        this.admin.topics().createPartitionedTopic(str, 3);
        ReaderBuilder startMessageId2 = this.pulsarClient.newReader(Schema.STRING).subscriptionName("my-sub-name").topic(str).startMessageId(MessageId.earliest);
        if (z) {
            startMessageId2 = startMessageId2.subscriptionRolePrefix("my-sub-name" + System.currentTimeMillis());
        }
        MultiTopicsReaderImpl multiTopicsReaderImpl = (MultiTopicsReaderImpl) startMessageId2.create();
        multiTopicsReaderImpl.getMultiTopicsConsumer().getConsumers().forEach(consumerImpl -> {
            Assert.assertEquals(consumerImpl.getSubscription(), "my-sub-name");
        });
        multiTopicsReaderImpl.close();
    }

    @Test
    public void testSameSubName() throws Exception {
        Reader create = this.pulsarClient.newReader(Schema.STRING).subscriptionName("my-sub-name").topic("persistent://my-property/my-ns/testSameSubName").startMessageId(MessageId.earliest).create();
        try {
            Reader create2 = this.pulsarClient.newReader(Schema.STRING).subscriptionName("my-sub-name").topic("persistent://my-property/my-ns/testSameSubName").startMessageId(MessageId.earliest).create();
            try {
                Assert.fail("should fail");
                if (create2 != null) {
                    create2.close();
                }
            } finally {
            }
        } catch (PulsarClientException e) {
            Assert.assertTrue(e instanceof PulsarClientException.ConsumerBusyException);
            Assert.assertTrue(e.getMessage().contains("Exclusive consumer is already connected"));
        }
        create.close();
        this.pulsarClient.newReader(Schema.STRING).subscriptionName("my-sub-name").topic("persistent://my-property/my-ns/testSameSubName").startMessageId(MessageId.earliest).create().close();
    }
}
