package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.class */
public class ReplicatorSubscriptionTest extends ReplicatorTestBase {
    private static final Logger log = LoggerFactory.getLogger(ReplicatorSubscriptionTest.class);

    @Override // org.apache.pulsar.broker.service.ReplicatorTestBase
    @BeforeClass(timeOut = 300000)
    public void setup() throws Exception {
        super.setup();
    }

    @Override // org.apache.pulsar.broker.service.ReplicatorTestBase
    @AfterClass(alwaysRun = true, timeOut = 300000)
    public void cleanup() throws Exception {
        super.cleanup();
    }

    @Test
    public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
        String str = "persistent://" + newUniqueName + "/mytopic";
        this.admin1.namespaces().createNamespace(newUniqueName);
        this.admin1.namespaces().setNamespaceReplicationClusters(newUniqueName, Sets.newHashSet(new String[]{"r1", "r2"}));
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            createReplicatedSubscription(build, str, "cluster-subscription", true);
            build = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                createReplicatedSubscription(build, str, "cluster-subscription", true);
                LinkedHashSet linkedHashSet = new LinkedHashSet();
                Producer create = build.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                for (int i = 0; i < 6; i++) {
                    try {
                        String str2 = "message" + i;
                        create.send(str2.getBytes(StandardCharsets.UTF_8));
                        linkedHashSet.add(str2);
                    } catch (Throwable th) {
                        if (Collections.singletonList(create).get(0) != null) {
                            create.close();
                        }
                        throw th;
                    }
                }
                create.close();
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                LinkedHashSet linkedHashSet2 = new LinkedHashSet();
                Consumer<byte[]> subscribe = build.newConsumer().topic(new String[]{str}).subscriptionName("cluster-subscription").replicateSubscriptionState(true).subscribe();
                try {
                    readMessages(subscribe, linkedHashSet2, 3, true);
                    if (subscribe != null) {
                        subscribe.close();
                    }
                    Thread.sleep(2 * this.config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
                    subscribe = build.newConsumer().topic(new String[]{str}).subscriptionName("cluster-subscription").replicateSubscriptionState(true).subscribe();
                    try {
                        readMessages(subscribe, linkedHashSet2, -1, true);
                        if (subscribe != null) {
                            subscribe.close();
                        }
                        Assert.assertEquals(new ArrayList(linkedHashSet), new ArrayList(linkedHashSet2), "Sent and received messages don't match.");
                        if (Collections.singletonList(build).get(0) != null) {
                            build.close();
                        }
                    } finally {
                    }
                } finally {
                }
            } finally {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th2;
        }
    }

    @Test
    public void testReplicatedSubscribeAndSwitchToStandbyCluster() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("pulsar/ns_");
        String newUniqueName2 = BrokerTestUtil.newUniqueName("persistent://" + newUniqueName + "/tp_");
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        Set synchronizedSet = Collections.synchronizedSet(new LinkedHashSet());
        this.admin1.namespaces().createNamespace(newUniqueName);
        this.admin1.namespaces().setNamespaceReplicationClusters(newUniqueName, Sets.newHashSet(new String[]{"r1", "r2"}));
        this.admin1.topics().createNonPartitionedTopic(newUniqueName2);
        this.admin1.topics().createSubscription(newUniqueName2, "s1", MessageId.earliest, true);
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar1.getBrokerService().getTopic(newUniqueName2, false).join()).get();
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).build();
        Producer create = build.newProducer(Schema.STRING).topic(newUniqueName2).enableBatching(false).create();
        Consumer subscribe = build.newConsumer(Schema.STRING).topic(new String[]{newUniqueName2}).subscriptionName("s1").replicateSubscriptionState(true).subscribe();
        for (int i = 0; i < 10; i++) {
            String str = i + "";
            create.send(str);
            linkedHashSet.add(str);
        }
        Awaitility.await().untilAsserted(() -> {
            ConcurrentOpenHashMap replicators = persistentTopic.getReplicators();
            Assert.assertTrue(replicators != null && replicators.size() == 1, "Replicator should started");
            Assert.assertTrue(((Replicator) replicators.values().iterator().next()).isConnected(), "Replicator should be connected");
            Assert.assertTrue(((ReplicatedSubscriptionsController) persistentTopic.getReplicatedSubscriptionController().get()).getLastCompletedSnapshotId().isPresent(), "One snapshot should be finished");
        });
        PersistentTopic persistentTopic2 = (PersistentTopic) ((Optional) this.pulsar2.getBrokerService().getTopic(newUniqueName2, false).join()).get();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(persistentTopic2.getReplicatedSubscriptionController().isPresent(), "Replicated subscription controller should created");
        });
        for (int i2 = 10; i2 < 20; i2++) {
            String str2 = i2 + "";
            create.send(str2);
            linkedHashSet.add(str2);
        }
        for (int i3 = 0; i3 < 10; i3++) {
            Message receive = subscribe.receive(2, TimeUnit.SECONDS);
            if (receive == null) {
                Assert.fail("Should not receive null.");
            }
            synchronizedSet.add((String) receive.getValue());
            subscribe.acknowledge(receive);
        }
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(persistentTopic2.getSubscriptions().get("s1"), "Subscription should created");
        });
        subscribe.close();
        PulsarClient build2 = PulsarClient.builder().serviceUrl(this.url2.toString()).build();
        Consumer subscribe2 = build2.newConsumer(Schema.AUTO_CONSUME()).topic(new String[]{newUniqueName2}).subscriptionName("s1").replicateSubscriptionState(true).subscribe();
        Awaitility.await().untilAsserted(() -> {
            while (true) {
                Message receive2 = subscribe2.receive(2, TimeUnit.SECONDS);
                if (receive2 == null) {
                    Assert.assertEquals(synchronizedSet.size(), linkedHashSet.size());
                    return;
                } else {
                    synchronizedSet.add(receive2.getValue().toString());
                    subscribe2.acknowledge(receive2);
                }
            }
        });
        subscribe2.close();
        create.close();
        build.close();
        build2.close();
    }

    @Test
    public void testReplicationSnapshotStopWhenNoTraffic() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
        String str = "persistent://" + newUniqueName + "/mytopic";
        this.admin1.namespaces().createNamespace(newUniqueName);
        this.admin1.namespaces().setNamespaceReplicationClusters(newUniqueName, Sets.newHashSet(new String[]{"r1", "r2"}));
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            createReplicatedSubscription(build, str, "cluster-subscription", true);
            Thread.sleep(2 * this.config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
            PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar1.getBrokerService().getTopic(str, false).get()).get();
            ReplicatedSubscriptionsController replicatedSubscriptionsController = (ReplicatedSubscriptionsController) persistentTopic.getReplicatedSubscriptionController().get();
            Assert.assertFalse(replicatedSubscriptionsController.getLastCompletedSnapshotId().isPresent());
            build = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                new LinkedHashSet();
                Producer create = build.newProducer(Schema.STRING).topic(str).create();
                for (int i = 0; i < 10; i++) {
                    try {
                        create.send("hello-" + i);
                    } finally {
                    }
                }
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                Thread.sleep(2 * this.config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
                Position lastPosition = persistentTopic.getLastPosition();
                String str2 = (String) replicatedSubscriptionsController.getLastCompletedSnapshotId().get();
                PersistentTopic persistentTopic2 = (PersistentTopic) ((Optional) this.pulsar1.getBrokerService().getTopic(str, false).get()).get();
                ReplicatedSubscriptionsController replicatedSubscriptionsController2 = (ReplicatedSubscriptionsController) persistentTopic2.getReplicatedSubscriptionController().get();
                Position lastPosition2 = persistentTopic2.getLastPosition();
                String str3 = (String) replicatedSubscriptionsController2.getLastCompletedSnapshotId().get();
                Thread.sleep(2 * this.config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
                Assert.assertEquals(persistentTopic.getLastPosition(), lastPosition);
                Assert.assertEquals((String) replicatedSubscriptionsController.getLastCompletedSnapshotId().get(), str2);
                Assert.assertEquals(persistentTopic2.getLastPosition(), lastPosition2);
                Assert.assertEquals((String) replicatedSubscriptionsController2.getLastCompletedSnapshotId().get(), str3);
                create = build.newProducer(Schema.STRING).topic(str).create();
                for (int i2 = 0; i2 < 10; i2++) {
                    try {
                        create.send("hello-" + i2);
                    } finally {
                    }
                }
                Thread.sleep(2 * this.config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
                Assert.assertNotEquals(persistentTopic.getLastPosition(), lastPosition);
                Assert.assertNotEquals(replicatedSubscriptionsController.getLastCompletedSnapshotId().get(), str2);
                Assert.assertNotEquals(persistentTopic2.getLastPosition(), lastPosition2);
                Assert.assertNotEquals(replicatedSubscriptionsController2.getLastCompletedSnapshotId().get(), str3);
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            } finally {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test(timeOut = 30000)
    public void testReplicatedSubscriptionRestApi1() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
        String str = "persistent://" + newUniqueName + "/topic-rest-api1";
        this.admin1.namespaces().createNamespace(newUniqueName);
        this.admin1.namespaces().setNamespaceReplicationClusters(newUniqueName, Sets.newHashSet(new String[]{"r1", "r2"}));
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            createReplicatedSubscription(build, str, "sub", true);
            build = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                createReplicatedSubscription(build, str, "sub", true);
                Assert.assertTrue(((SubscriptionStats) this.admin1.topics().getStats(str).getSubscriptions().get("sub")).isReplicated());
                this.admin1.topics().setReplicatedSubscriptionStatus(str, "sub", false);
                Assert.assertFalse(((SubscriptionStats) this.admin1.topics().getStats(str).getSubscriptions().get("sub")).isReplicated());
                Assert.assertTrue(((SubscriptionStats) this.admin2.topics().getStats(str).getSubscriptions().get("sub")).isReplicated());
                this.admin2.topics().setReplicatedSubscriptionStatus(str, "sub", false);
                Assert.assertFalse(((SubscriptionStats) this.admin2.topics().getStats(str).getSubscriptions().get("sub")).isReplicated());
                this.admin1.topics().unload(str);
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertFalse(((SubscriptionStats) this.admin1.topics().getStats(str).getSubscriptions().get("sub")).isReplicated());
                });
                LinkedHashSet linkedHashSet = new LinkedHashSet();
                LinkedHashSet linkedHashSet2 = new LinkedHashSet();
                Producer<byte[]> create = build.newProducer().topic(str).enableBatching(false).create();
                linkedHashSet.clear();
                publishMessages(create, 0, 20, linkedHashSet);
                create.close();
                Consumer<byte[]> subscribe = build.newConsumer().topic(new String[]{str}).subscriptionName("sub").subscribe();
                linkedHashSet2.clear();
                readMessages(subscribe, linkedHashSet2, 20, false);
                Assert.assertEquals(linkedHashSet2, linkedHashSet);
                subscribe.close();
                Consumer<byte[]> subscribe2 = build.newConsumer().topic(new String[]{str}).subscriptionName("sub").subscribe();
                linkedHashSet2.clear();
                readMessages(subscribe2, linkedHashSet2, 20, false);
                Assert.assertEquals(linkedHashSet2, linkedHashSet);
                subscribe2.close();
                this.admin1.topics().setReplicatedSubscriptionStatus(str, "sub", true);
                Assert.assertTrue(((SubscriptionStats) this.admin1.topics().getStats(str).getSubscriptions().get("sub")).isReplicated());
                Assert.assertFalse(((SubscriptionStats) this.admin2.topics().getStats(str).getSubscriptions().get("sub")).isReplicated());
                this.admin2.topics().setReplicatedSubscriptionStatus(str, "sub", true);
                Assert.assertTrue(((SubscriptionStats) this.admin2.topics().getStats(str).getSubscriptions().get("sub")).isReplicated());
                linkedHashSet.clear();
                linkedHashSet2.clear();
                Producer<byte[]> create2 = build.newProducer().topic(str).enableBatching(false).create();
                publishMessages(create2, 0, 10, linkedHashSet);
                create2.close();
                Thread.sleep(2 * this.config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
                Consumer<byte[]> subscribe3 = build.newConsumer().topic(new String[]{str}).subscriptionName("sub").subscribe();
                int readMessages = readMessages(subscribe3, linkedHashSet2, 10, true);
                subscribe3.close();
                Producer<byte[]> create3 = build.newProducer().topic(str).enableBatching(false).create();
                publishMessages(create3, 10, 10, linkedHashSet);
                create3.close();
                Thread.sleep(2 * this.config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
                Consumer<byte[]> subscribe4 = build.newConsumer().topic(new String[]{str}).subscriptionName("sub").subscribe();
                int readMessages2 = readMessages(subscribe4, linkedHashSet2, -1, true);
                subscribe4.close();
                Assert.assertEquals(linkedHashSet2, linkedHashSet);
                Assert.assertTrue(readMessages < 20, String.format("numReceivedMessages1 (%d) should be less than %d", Integer.valueOf(readMessages), 20));
                Assert.assertTrue(readMessages2 < 20, String.format("numReceivedMessages2 (%d) should be less than %d", Integer.valueOf(readMessages2), 20));
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            } finally {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testGetReplicatedSubscriptionStatus() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
        String str = "persistent://" + newUniqueName + "/tp-no-part";
        String str2 = "persistent://" + newUniqueName + "/tp-with-part";
        this.admin1.namespaces().createNamespace(newUniqueName);
        this.admin1.topics().createNonPartitionedTopic(str);
        this.admin1.topics().createPartitionedTopic(str2, 3);
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            createReplicatedSubscription(build, str, "sub1", true);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertTrue(((Boolean) this.admin1.topics().getReplicatedSubscriptionStatus(str, "sub1").get(str)).booleanValue());
            });
            this.admin1.topics().setReplicatedSubscriptionStatus(str, "sub1", false);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertFalse(((Boolean) this.admin1.topics().getReplicatedSubscriptionStatus(str, "sub1").get(str)).booleanValue());
            });
            createReplicatedSubscription(build, str2, "sub2", true);
            Awaitility.await().untilAsserted(() -> {
                Map replicatedSubscriptionStatus = this.admin1.topics().getReplicatedSubscriptionStatus(str2, "sub2");
                Assert.assertEquals(replicatedSubscriptionStatus.size(), 3);
                for (int i = 0; i < 3; i++) {
                    Assert.assertTrue(((Boolean) replicatedSubscriptionStatus.get(str2 + "-partition-" + i)).booleanValue());
                }
            });
            this.admin1.topics().setReplicatedSubscriptionStatus(str2, "sub2", false);
            Awaitility.await().untilAsserted(() -> {
                Map replicatedSubscriptionStatus = this.admin1.topics().getReplicatedSubscriptionStatus(str2, "sub2");
                Assert.assertEquals(replicatedSubscriptionStatus.size(), 3);
                for (int i = 0; i < 3; i++) {
                    Assert.assertFalse(((Boolean) replicatedSubscriptionStatus.get(str2 + "-partition-" + i)).booleanValue());
                }
            });
            this.admin1.topics().setReplicatedSubscriptionStatus(str2 + "-partition-2", "sub2", true);
            Awaitility.await().untilAsserted(() -> {
                Map replicatedSubscriptionStatus = this.admin1.topics().getReplicatedSubscriptionStatus(str2, "sub2");
                Assert.assertEquals(replicatedSubscriptionStatus.size(), 3);
                for (int i = 0; i < 3; i++) {
                    if (i == 2) {
                        Assert.assertTrue(((Boolean) replicatedSubscriptionStatus.get(str2 + "-partition-" + i)).booleanValue());
                    } else {
                        Assert.assertFalse(((Boolean) replicatedSubscriptionStatus.get(str2 + "-partition-" + i)).booleanValue());
                    }
                }
            });
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test(timeOut = 30000)
    public void testReplicatedSubscriptionRestApi2() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
        String str = "persistent://" + newUniqueName + "/topic-rest-api2";
        this.admin1.namespaces().createNamespace(newUniqueName);
        this.admin1.namespaces().setNamespaceReplicationClusters(newUniqueName, Sets.newHashSet(new String[]{"r1", "r2"}));
        this.admin1.topics().createPartitionedTopic(str, 2);
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            createReplicatedSubscription(build, str, "sub", true);
            build = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                createReplicatedSubscription(build, str, "sub", true);
                Iterator it = this.admin1.topics().getPartitionedStats(str, true).getPartitions().values().iterator();
                while (it.hasNext()) {
                    Assert.assertTrue(((SubscriptionStats) ((TopicStats) it.next()).getSubscriptions().get("sub")).isReplicated());
                }
                this.admin1.topics().setReplicatedSubscriptionStatus(str, "sub", false);
                Iterator it2 = this.admin1.topics().getPartitionedStats(str, true).getPartitions().values().iterator();
                while (it2.hasNext()) {
                    Assert.assertFalse(((SubscriptionStats) ((TopicStats) it2.next()).getSubscriptions().get("sub")).isReplicated());
                }
                this.admin2.topics().setReplicatedSubscriptionStatus(str, "sub", false);
                Iterator it3 = this.admin2.topics().getPartitionedStats(str, true).getPartitions().values().iterator();
                while (it3.hasNext()) {
                    Assert.assertFalse(((SubscriptionStats) ((TopicStats) it3.next()).getSubscriptions().get("sub")).isReplicated());
                }
                LinkedHashSet linkedHashSet = new LinkedHashSet();
                LinkedHashSet linkedHashSet2 = new LinkedHashSet();
                Producer<byte[]> create = build.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                linkedHashSet.clear();
                publishMessages(create, 0, 20, linkedHashSet);
                create.close();
                Consumer<byte[]> subscribe = build.newConsumer().topic(new String[]{str}).subscriptionName("sub").subscribe();
                linkedHashSet2.clear();
                readMessages(subscribe, linkedHashSet2, 20, false);
                Assert.assertEquals(linkedHashSet2, linkedHashSet);
                subscribe.close();
                Consumer<byte[]> subscribe2 = build.newConsumer().topic(new String[]{str}).subscriptionName("sub").subscribe();
                linkedHashSet2.clear();
                readMessages(subscribe2, linkedHashSet2, 20, false);
                Assert.assertEquals(linkedHashSet2, linkedHashSet);
                subscribe2.close();
                this.admin1.topics().setReplicatedSubscriptionStatus(str, "sub", true);
                Iterator it4 = this.admin1.topics().getPartitionedStats(str, true).getPartitions().values().iterator();
                while (it4.hasNext()) {
                    Assert.assertTrue(((SubscriptionStats) ((TopicStats) it4.next()).getSubscriptions().get("sub")).isReplicated());
                }
                this.admin2.topics().setReplicatedSubscriptionStatus(str, "sub", true);
                Iterator it5 = this.admin2.topics().getPartitionedStats(str, true).getPartitions().values().iterator();
                while (it5.hasNext()) {
                    Assert.assertTrue(((SubscriptionStats) ((TopicStats) it5.next()).getSubscriptions().get("sub")).isReplicated());
                }
                linkedHashSet.clear();
                linkedHashSet2.clear();
                Producer<byte[]> create2 = build.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                publishMessages(create2, 0, 10, linkedHashSet);
                create2.close();
                Thread.sleep(2 * this.config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
                Consumer<byte[]> subscribe3 = build.newConsumer().topic(new String[]{str}).subscriptionName("sub").subscribe();
                int readMessages = readMessages(subscribe3, linkedHashSet2, 10, true);
                subscribe3.close();
                Producer<byte[]> create3 = build.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                publishMessages(create3, 10, 10, linkedHashSet);
                create3.close();
                Thread.sleep(2 * this.config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
                Consumer<byte[]> subscribe4 = build.newConsumer().topic(new String[]{str}).subscriptionName("sub").subscribe();
                int readMessages2 = readMessages(subscribe4, linkedHashSet2, -1, true);
                subscribe4.close();
                Assert.assertEquals(linkedHashSet2, linkedHashSet);
                Assert.assertTrue(readMessages < 20, String.format("numReceivedMessages1 (%d) should be less than %d", Integer.valueOf(readMessages), 20));
                Assert.assertTrue(readMessages2 < 20, String.format("numReceivedMessages2 (%d) should be less than %d", Integer.valueOf(readMessages2), 20));
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            } finally {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testReplicatedSubscriptionWhenReplicatorProducerIsClosed() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
        String str = "persistent://" + newUniqueName + "/when-replicator-producer-is-closed";
        String str2 = "sub";
        this.admin1.namespaces().createNamespace(newUniqueName);
        this.admin1.namespaces().setNamespaceReplicationClusters(newUniqueName, Sets.newHashSet(new String[]{"r1", "r2"}));
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Consumer<byte[]> subscribe = build.newConsumer().topic(new String[]{str}).subscriptionName("sub").replicateSubscriptionState(true).subscribe();
            try {
                Producer create = build.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                try {
                    create.send("message".getBytes(StandardCharsets.UTF_8));
                    Assert.assertEquals(readMessages(subscribe, new HashSet(), 1, false), 1);
                    Awaitility.await().until(() -> {
                        return Boolean.valueOf(this.pulsar2.getBrokerService().getTopics().containsKey(str));
                    });
                    PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar2.getBrokerService().getTopic(str, false).join()).get();
                    Awaitility.await().untilAsserted(() -> {
                        Assert.assertTrue(((Replicator) persistentTopic.getReplicators().get("r1")).isConnected());
                    });
                    Awaitility.await().untilAsserted(() -> {
                        Assert.assertNotNull(persistentTopic.getSubscription(str2));
                    });
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    this.admin2.topics().deleteSubscription(str, "sub");
                    PersistentTopic persistentTopic2 = (PersistentTopic) ((Optional) this.pulsar2.getBrokerService().getTopic(str, false).join()).get();
                    Assert.assertNull(persistentTopic2.getSubscription("sub"));
                    Method declaredMethod = PersistentTopic.class.getDeclaredMethod("closeReplProducersIfNoBacklog", null);
                    declaredMethod.setAccessible(true);
                    ((CompletableFuture) declaredMethod.invoke(persistentTopic2, null)).join();
                    Assert.assertFalse(((Replicator) persistentTopic2.getReplicators().get("r1")).isConnected());
                    create = build.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                    for (int i = 0; i < 6; i++) {
                        try {
                            create.send(("message" + i).getBytes(StandardCharsets.UTF_8));
                        } catch (Throwable th) {
                            throw th;
                        }
                    }
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    LinkedHashSet linkedHashSet = new LinkedHashSet();
                    Consumer<byte[]> subscribe2 = build.newConsumer().topic(new String[]{str}).subscriptionName("sub").replicateSubscriptionState(true).subscribe();
                    try {
                        Assert.assertEquals(readMessages(subscribe2, linkedHashSet, 6, false), 6);
                        Awaitility.await().untilAsserted(() -> {
                            Assert.assertTrue(((Replicator) persistentTopic2.getReplicators().get("r1")).isConnected());
                        });
                        Awaitility.await().untilAsserted(() -> {
                            Assert.assertNotNull(persistentTopic2.getSubscription(str2));
                        });
                        if (Collections.singletonList(subscribe2).get(0) != null) {
                            subscribe2.close();
                        }
                    } catch (Throwable th2) {
                        if (Collections.singletonList(subscribe2).get(0) != null) {
                            subscribe2.close();
                        }
                        throw th2;
                    }
                } finally {
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                }
            } catch (Throwable th3) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th3;
            }
        } finally {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        }
    }

    void publishMessages(Producer<byte[]> producer, int i, int i2, Set<String> set) throws PulsarClientException {
        for (int i3 = i; i3 < i + i2; i3++) {
            String str = "msg" + i3;
            producer.send(str.getBytes(StandardCharsets.UTF_8));
            set.add(str);
        }
    }

    int readMessages(Consumer<byte[]> consumer, Set<String> set, int i, boolean z) throws PulsarClientException {
        Message receive;
        int i2 = 0;
        while (true) {
            if ((i2 < i || i == -1) && (receive = consumer.receive(2, TimeUnit.SECONDS)) != null) {
                i2++;
                String str = new String((byte[]) receive.getValue(), StandardCharsets.UTF_8);
                if (!z) {
                    Assert.assertFalse(set.contains(str), "Duplicate message '" + str + "' detected.");
                }
                set.add(str);
                consumer.acknowledge(receive);
            }
        }
        return i2;
    }

    void createReplicatedSubscription(PulsarClient pulsarClient, String str, String str2, boolean z) throws PulsarClientException {
        pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(str2).replicateSubscriptionState(z).subscribe().close();
    }
}
