package org.apache.pulsar.client.impl;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.proto.Test;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"flaky"})
/* loaded from: input_file:org/apache/pulsar/client/impl/MultiTopicsReaderTest.class */
public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {
    private static final String subscription = "reader-multi-topics-sub";

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod(alwaysRun = true)
    protected void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("my-property", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2"}), Sets.newHashSet(new String[]{"test"})));
        Policies policies = new Policies();
        policies.replication_clusters = Sets.newHashSet(new String[]{"test"});
        policies.retention_policies = new RetentionPolicies(-1, -1);
        this.admin.namespaces().createNamespace("my-property/my-ns", policies);
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(timeOut = 30000)
    public void testReadMessageWithoutBatching() throws Exception {
        String str = "persistent://my-property/my-ns/my-reader-topic" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(str, 3);
        testReadMessages(str, false);
    }

    @Test(timeOut = 20000)
    public void testReadMessageWithoutBatchingWithMessageInclusive() throws Exception {
        String str = "persistent://my-property/my-ns/my-reader-topic-inclusive" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(str, 3);
        Set<String> publishMessages = publishMessages(str, 10, false);
        Reader create = this.pulsarClient.newReader().topic(str).startMessageId(MessageId.latest).startMessageIdInclusive().readerName(subscription).create();
        int i = 0;
        while (create.hasMessageAvailable()) {
            if (publishMessages.remove(create.readNext(5, TimeUnit.SECONDS).getKey())) {
                i++;
            }
        }
        Assert.assertEquals(i, 3);
        Assert.assertFalse(create.hasMessageAvailable());
        create.close();
    }

    @Test(timeOut = 10000)
    public void testReadMessageWithBatching() throws Exception {
        String str = "persistent://my-property/my-ns/my-reader-topic-with-batching" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(str, 3);
        testReadMessages(str, true);
    }

    @Test(timeOut = 10000)
    public void testReadMessageWithBatchingWithMessageInclusive() throws Exception {
        String str = "persistent://my-property/my-ns/my-reader-topic-with-batching-inclusive" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(str, 3);
        Set<String> publishMessages = publishMessages(str, 15, true);
        Reader create = this.pulsarClient.newReader().topic(str).startMessageId(MessageId.latest).startMessageIdInclusive().readerName(subscription).create();
        while (create.hasMessageAvailable()) {
            publishMessages.remove(create.readNext(2, TimeUnit.SECONDS).getKey());
        }
        Assert.assertEquals(publishMessages.size(), 15 - 3);
        Assert.assertFalse(publishMessages.contains("key14"));
        Assert.assertFalse(publishMessages.contains("key13"));
        Assert.assertFalse(publishMessages.contains("key12"));
        Assert.assertFalse(create.hasMessageAvailable());
        create.close();
    }

    @Test(timeOut = 10000)
    public void testReaderWithTimeLong() throws Exception {
        Message readNext;
        String str = "persistent://my-property/my-ns/testReadFromPartition" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(str, 3);
        this.admin.namespaces().setRetention("my-property/my-ns", new RetentionPolicies(-1, -1));
        ProducerBuilder newProducer = this.pulsarClient.newProducer();
        newProducer.topic(str);
        newProducer.enableBatching(false);
        Producer create = newProducer.create();
        long currentTimeMillis = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(5L);
        for (int i = 0; i < 10; i++) {
            TypedMessageBuilderImpl value = create.newMessage().value(("old" + i).getBytes());
            value.getMetadataBuilder().setPublishTime(currentTimeMillis).setSequenceId(i).setProducerName(create.getProducerName()).setReplicatedFrom("us-west1");
            value.send();
        }
        long currentTimeMillis2 = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1L);
        MessageId messageId = null;
        for (int i2 = 0; i2 < 10; i2++) {
            TypedMessageBuilderImpl value2 = create.newMessage().value(("new" + i2).getBytes());
            value2.getMetadataBuilder().setPublishTime(currentTimeMillis2).setProducerName(create.getProducerName()).setReplicatedFrom("us-west1");
            MessageId send = value2.send();
            if (messageId == null) {
                messageId = send;
            }
        }
        Reader create2 = this.pulsarClient.newReader().topic(str).startMessageFromRollbackDuration(2L, TimeUnit.HOURS).create();
        ArrayList newArrayList = Lists.newArrayList();
        while (create2.hasMessageAvailable() && (readNext = create2.readNext(1, TimeUnit.SECONDS)) != null) {
            newArrayList.add(readNext.getMessageId());
        }
        Assert.assertEquals(newArrayList.size(), 10);
        restartBroker();
        Assert.assertFalse(create2.hasMessageAvailable());
        create2.close();
        create.close();
    }

    @Test(timeOut = 10000)
    public void testRemoveSubscriptionForReaderNeedRemoveCursor() throws IOException, PulsarAdminException {
        String str = "persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(str, 3);
        Reader create = this.pulsarClient.newReader().topic(str).startMessageId(MessageId.earliest).create();
        try {
            create = this.pulsarClient.newReader().topic(str).startMessageId(MessageId.earliest).create();
            try {
                Assert.assertEquals(this.admin.topics().getSubscriptions(str).size(), 2);
                Iterator it = this.admin.topics().getPartitionedInternalStats(str).partitions.values().iterator();
                while (it.hasNext()) {
                    Assert.assertEquals(((PersistentTopicInternalStats) it.next()).cursors.size(), 2);
                }
                create.close();
                Assert.assertEquals(this.admin.topics().getSubscriptions(str).size(), 1);
                Iterator it2 = this.admin.topics().getPartitionedInternalStats(str).partitions.values().iterator();
                while (it2.hasNext()) {
                    Assert.assertEquals(((PersistentTopicInternalStats) it2.next()).cursors.size(), 1);
                }
                create.close();
                Assert.assertEquals(this.admin.topics().getSubscriptions(str).size(), 0);
                Iterator it3 = this.admin.topics().getPartitionedInternalStats(str).partitions.values().iterator();
                while (it3.hasNext()) {
                    Assert.assertEquals(((PersistentTopicInternalStats) it3.next()).cursors.size(), 0);
                }
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } finally {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test(timeOut = 10000)
    public void testMultiReaderSeek() throws Exception {
        String str = "persistent://my-property/my-ns/testKeyHashRangeReader" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(str, 3);
        publishMessages(str, 100, false);
    }

    @Test
    public void testMultiTopicSeekByFunction() throws Exception {
        String str = "persistent://my-property/my-ns/test" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(str, 4);
        publishMessages(str, 20, false);
        Reader create = this.pulsarClient.newReader().startMessageIdInclusive().startMessageId(MessageId.latest).topic(str).subscriptionName("my-sub").create();
        long currentTimeMillis = System.currentTimeMillis();
        create.seek(str2 -> {
            return Long.valueOf(currentTimeMillis);
        });
        Assert.assertNull(create.readNext(1, TimeUnit.SECONDS));
        create.seek(str3 -> {
            switch (TopicName.get(str3).getPartitionIndex()) {
                case SHARED_VALUE:
                    return MessageId.latest;
                case 1:
                    return MessageId.earliest;
                case 2:
                    return Long.valueOf(currentTimeMillis);
                case Test.TestMessage.INTFIELD_FIELD_NUMBER /* 3 */:
                    return Long.valueOf(currentTimeMillis - 999999);
                default:
                    return null;
            }
        });
        int i = 0;
        while (create.readNext(1, TimeUnit.SECONDS) != null) {
            i++;
        }
        Assert.assertEquals(i, 0 + (20 / 4) + 0 + (20 / 4));
    }

    @org.testng.annotations.Test
    public void testMultiTopicSeekByFunctionWithException() throws Exception {
        String str = "persistent://my-property/my-ns/test" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(str, 4);
        publishMessages(str, 20, false);
        Reader create = this.pulsarClient.newReader().startMessageIdInclusive().startMessageId(MessageId.latest).topic(str).subscriptionName("my-sub").create();
        long currentTimeMillis = System.currentTimeMillis();
        create.seek(str2 -> {
            return Long.valueOf(currentTimeMillis);
        });
        Assert.assertNull(create.readNext(1, TimeUnit.SECONDS));
        try {
            create.seek(str3 -> {
                switch (TopicName.get(str3).getPartitionIndex()) {
                    case SHARED_VALUE:
                        throw new RuntimeException("test");
                    case 1:
                        return MessageId.latest;
                    case 2:
                        return MessageId.earliest;
                    case Test.TestMessage.INTFIELD_FIELD_NUMBER /* 3 */:
                        return Long.valueOf(currentTimeMillis - 999999);
                    default:
                        return null;
                }
            });
        } catch (Exception e) {
            Assert.assertEquals(e.getMessage(), "test");
            Assert.assertTrue(e instanceof RuntimeException);
        }
    }

    @org.testng.annotations.Test(timeOut = 20000)
    public void testMultiTopic() throws Exception {
        String str = "persistent://my-property/my-ns/topic" + UUID.randomUUID();
        String str2 = "persistent://my-property/my-ns/topic2" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(str2, 3);
        List asList = Arrays.asList(str, str2, "persistent://my-property/my-ns/topic3" + UUID.randomUUID());
        PulsarClientImpl pulsarClientImpl = this.pulsarClient;
        Reader create = this.pulsarClient.newReader(Schema.STRING).startMessageId(MessageId.earliest).topics(asList).readerName("my-reader").create();
        ArrayList arrayList = new ArrayList();
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            arrayList.add(this.pulsarClient.newProducer(Schema.STRING).topic((String) it.next()).create());
        }
        HashSet hashSet = new HashSet();
        for (int i = 0; i < arrayList.size(); i++) {
            Producer producer = (Producer) arrayList.get(i);
            for (int i2 = 0; i2 < 10; i2++) {
                String str3 = i + "msg" + i2;
                producer.send(str3);
                hashSet.add(str3);
            }
        }
        while (create.hasMessageAvailable()) {
            hashSet.remove(create.readNext(5, TimeUnit.SECONDS).getValue());
        }
        Assert.assertEquals(hashSet.size(), 0);
        Assert.assertEquals(pulsarClientImpl.consumersCount(), 1);
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ((Producer) it2.next()).close();
        }
        create.close();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(pulsarClientImpl.consumersCount(), 0);
        });
    }

    /* JADX WARN: Finally extract failed */
    @org.testng.annotations.Test(timeOut = 10000)
    public void testKeyHashRangeReader() throws Exception {
        Message readNext;
        List<String> asList = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
        String str = "persistent://my-property/my-ns/testKeyHashRangeReader" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(str, 3);
        try {
            this.pulsarClient.newReader().topic(str).startMessageId(MessageId.earliest).keyHashRange(new Range[]{Range.of(0, 10000), Range.of(8000, 12000)}).create();
            Assert.fail("should failed with unexpected key hash range");
        } catch (IllegalArgumentException e) {
        }
        try {
            this.pulsarClient.newReader().topic(str).startMessageId(MessageId.earliest).keyHashRange(new Range[]{Range.of(30000, 20000)}).create();
            Assert.fail("should failed with unexpected key hash range");
        } catch (IllegalArgumentException e2) {
        }
        try {
            this.pulsarClient.newReader().topic(str).startMessageId(MessageId.earliest).keyHashRange(new Range[]{Range.of(80000, 90000)}).create();
            Assert.fail("should failed with unexpected key hash range");
        } catch (IllegalArgumentException e3) {
        }
        Reader create = this.pulsarClient.newReader(Schema.STRING).topic(str).startMessageId(MessageId.earliest).keyHashRange(new Range[]{Range.of(0, 32768)}).create();
        try {
            Producer create2 = this.pulsarClient.newProducer(Schema.STRING).topic(str).enableBatching(false).create();
            try {
                int i = 0;
                for (String str2 : asList) {
                    if (Murmur3_32Hash.getInstance().makeHash(str2.getBytes()) % 65536 <= 32768) {
                        i++;
                    }
                    create2.newMessage().key(str2).value(str2).send();
                }
                ArrayList arrayList = new ArrayList();
                do {
                    readNext = create.readNext(1, TimeUnit.SECONDS);
                    if (readNext != null) {
                        arrayList.add((String) readNext.getValue());
                    }
                } while (readNext != null);
                Assert.assertTrue(i > 0);
                Assert.assertEquals(arrayList.size(), i);
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    Assert.assertTrue(Integer.parseInt((String) it.next()) <= 32768);
                }
                if (Collections.singletonList(create2).get(0) != null) {
                    create2.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create2).get(0) != null) {
                    create2.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    private void testReadMessages(String str, boolean z) throws Exception {
        Set<String> publishMessages = publishMessages(str, 9, z);
        Reader create = this.pulsarClient.newReader().topic(str).startMessageId(MessageId.earliest).readerName(subscription).create();
        while (create.hasMessageAvailable()) {
            publishMessages.remove(create.readNext(5, TimeUnit.SECONDS).getKey());
        }
        Assert.assertEquals(publishMessages.size(), 0);
        Assert.assertFalse(this.pulsarClient.newReader().topic(str).startMessageId(MessageId.latest).readerName("reader-multi-topics-sublatest").create().hasMessageAvailable());
    }

    private Set<String> publishMessages(String str, int i, boolean z) throws Exception {
        HashSet hashSet = new HashSet();
        ProducerBuilder newProducer = this.pulsarClient.newProducer();
        newProducer.messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
        newProducer.batchingMaxPublishDelay(1L, TimeUnit.DAYS);
        newProducer.topic(str);
        if (z) {
            newProducer.enableBatching(true);
            newProducer.batchingMaxMessages(i);
        } else {
            newProducer.enableBatching(false);
            newProducer.maxPendingMessages(1);
        }
        Producer create = newProducer.create();
        try {
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < i; i2++) {
                String str2 = "key" + i2;
                byte[] bytes = ("my-message-" + i2).getBytes();
                if (z) {
                    arrayList.add(create.newMessage().key(str2).value(bytes).sendAsync());
                } else {
                    create.newMessage().key(str2).value(bytes).send();
                }
                hashSet.add(str2);
            }
            create.flush();
            FutureUtil.waitForAll(arrayList).get();
            if (create != null) {
                create.close();
            }
            return hashSet;
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
