package org.apache.pulsar.broker.service;

import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerInterceptors;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProducerAccessMode;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.MockZooKeeper;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/PersistentTopicTest.class */
public class PersistentTopicTest extends MockedBookKeeperTestCase {
    protected PulsarService pulsar;
    private BrokerService brokerService;
    private ManagedLedgerFactory mlFactoryMock;
    private ServerCnx serverCnx;
    private MetadataStoreExtended store;
    private ManagedLedger ledgerMock;
    private ManagedCursor cursorMock;
    private ConfigurationCacheService configCacheService;
    final String successTopicName = "persistent://prop/use/ns-abc/successTopic";
    final String successPartitionTopicName = "persistent://prop/use/ns-abc/successTopic-partition-0";
    final String failTopicName = "persistent://prop/use/ns-abc/failTopic";
    final String successSubName = "successSub";
    final String successSubName2 = "successSub2";
    final String successSubName3 = "successSub3";
    private static final Logger log = LoggerFactory.getLogger(PersistentTopicTest.class);
    private OrderedExecutor executor;
    private EventLoopGroup eventLoopGroup;

    @BeforeMethod
    public void setup() throws Exception {
        this.eventLoopGroup = new NioEventLoopGroup();
        this.executor = OrderedExecutor.newBuilder().numThreads(1).build();
        ServiceConfiguration serviceConfiguration = (ServiceConfiguration) Mockito.spy(ServiceConfiguration.class);
        serviceConfiguration.setAdvertisedAddress("localhost");
        serviceConfiguration.setBrokerShutdownTimeoutMs(0L);
        this.pulsar = (PulsarService) BrokerTestUtil.spyWithClassAndConstructorArgs(PulsarService.class, serviceConfiguration);
        ((PulsarService) Mockito.doReturn(serviceConfiguration).when(this.pulsar)).getConfiguration();
        ((PulsarService) Mockito.doReturn(Mockito.mock(Compactor.class)).when(this.pulsar)).getCompactor();
        this.mlFactoryMock = (ManagedLedgerFactory) Mockito.mock(ManagedLedgerFactory.class);
        ((PulsarService) Mockito.doReturn(this.mlFactoryMock).when(this.pulsar)).getManagedLedgerFactory();
        ((ManagedLedgerFactory) Mockito.doAnswer(invocationOnMock -> {
            ((AsyncCallbacks.DeleteLedgerCallback) invocationOnMock.getArgument(1)).deleteLedgerComplete((Object) null);
            return null;
        }).when(this.mlFactoryMock)).asyncDelete((String) ArgumentMatchers.any(), (AsyncCallbacks.DeleteLedgerCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        MockZooKeeper createMockZooKeeper = MockedPulsarServiceBaseTest.createMockZooKeeper();
        ((PulsarService) Mockito.doReturn(createMockZooKeeper).when(this.pulsar)).getZkClient();
        ((PulsarService) Mockito.doReturn(MockedPulsarServiceBaseTest.createMockBookKeeper(this.executor)).when(this.pulsar)).getBookKeeperClient();
        ZooKeeperCache zooKeeperCache = (ZooKeeperCache) Mockito.mock(ZooKeeperCache.class);
        ((ZooKeeperCache) Mockito.doReturn(30).when(zooKeeperCache)).getZkOperationTimeoutSeconds();
        ((PulsarService) Mockito.doReturn(zooKeeperCache).when(this.pulsar)).getLocalZkCache();
        this.configCacheService = (ConfigurationCacheService) Mockito.mock(ConfigurationCacheService.class);
        ZooKeeperDataCache zooKeeperDataCache = (ZooKeeperDataCache) Mockito.mock(ZooKeeperDataCache.class);
        ((ConfigurationCacheService) Mockito.doReturn(zooKeeperDataCache).when(this.configCacheService)).policiesCache();
        ((PulsarService) Mockito.doReturn(this.configCacheService).when(this.pulsar)).getConfigurationCache();
        ((ZooKeeperDataCache) Mockito.doReturn(Optional.empty()).when(zooKeeperDataCache)).get((String) ArgumentMatchers.any());
        LocalZooKeeperCacheService localZooKeeperCacheService = (LocalZooKeeperCacheService) Mockito.mock(LocalZooKeeperCacheService.class);
        ((ZooKeeperDataCache) Mockito.doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zooKeeperDataCache)).getAsync((String) ArgumentMatchers.any());
        ((LocalZooKeeperCacheService) Mockito.doReturn(zooKeeperDataCache).when(localZooKeeperCacheService)).policiesCache();
        ((PulsarService) Mockito.doReturn(this.configCacheService).when(this.pulsar)).getConfigurationCache();
        ((PulsarService) Mockito.doReturn(localZooKeeperCacheService).when(this.pulsar)).getLocalZkCacheService();
        ((PulsarService) Mockito.doReturn(this.executor).when(this.pulsar)).getOrderedExecutor();
        this.store = new ZKMetadataStore(createMockZooKeeper);
        PulsarResources pulsarResources = (PulsarResources) BrokerTestUtil.spyWithClassAndConstructorArgs(PulsarResources.class, this.store, this.store);
        ((PulsarResources) Mockito.doReturn((NamespaceResources) BrokerTestUtil.spyWithClassAndConstructorArgs(NamespaceResources.class, this.store, 30)).when(pulsarResources)).getNamespaceResources();
        ((PulsarService) Mockito.doReturn(pulsarResources).when(this.pulsar)).getPulsarResources();
        this.brokerService = (BrokerService) BrokerTestUtil.spyWithClassAndConstructorArgs(BrokerService.class, this.pulsar, this.eventLoopGroup);
        ((PulsarService) Mockito.doReturn(this.brokerService).when(this.pulsar)).getBrokerService();
        this.serverCnx = (ServerCnx) BrokerTestUtil.spyWithClassAndConstructorArgs(ServerCnx.class, this.pulsar);
        ((ServerCnx) Mockito.doReturn(true).when(this.serverCnx)).isActive();
        ((ServerCnx) Mockito.doReturn(true).when(this.serverCnx)).isWritable();
        ((ServerCnx) Mockito.doReturn(new InetSocketAddress("localhost", 1234)).when(this.serverCnx)).clientAddress();
        ((ServerCnx) Mockito.doReturn(new PulsarCommandSenderImpl((BrokerInterceptor) null, this.serverCnx)).when(this.serverCnx)).getCommandSender();
        ChannelHandlerContext channelHandlerContext = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class);
        Channel channel = (Channel) Mockito.mock(Channel.class);
        ((Channel) Mockito.doReturn(Mockito.spy(DefaultEventLoop.class)).when(channel)).eventLoop();
        ((ChannelHandlerContext) Mockito.doReturn(channel).when(channelHandlerContext)).channel();
        ((ServerCnx) Mockito.doReturn(channelHandlerContext).when(this.serverCnx)).ctx();
        NamespaceService namespaceService = (NamespaceService) Mockito.mock(NamespaceService.class);
        ((NamespaceService) Mockito.doReturn(CompletableFuture.completedFuture((NamespaceBundle) Mockito.mock(NamespaceBundle.class))).when(namespaceService)).getBundleAsync((TopicName) ArgumentMatchers.any());
        ((PulsarService) Mockito.doReturn(namespaceService).when(this.pulsar)).getNamespaceService();
        ((NamespaceService) Mockito.doReturn(true).when(namespaceService)).isServiceUnitOwned((ServiceUnitId) ArgumentMatchers.any());
        ((NamespaceService) Mockito.doReturn(true).when(namespaceService)).isServiceUnitActive((TopicName) ArgumentMatchers.any());
        ((NamespaceService) Mockito.doReturn(CompletableFuture.completedFuture(true)).when(namespaceService)).checkTopicOwnership((TopicName) ArgumentMatchers.any());
        setupMLAsyncCallbackMocks();
    }

    @AfterMethod(alwaysRun = true)
    public void teardown() throws Exception {
        this.brokerService.getTopics().clear();
        this.brokerService.close();
        try {
            this.pulsar.close();
            this.executor.shutdownNow();
            this.eventLoopGroup.shutdownGracefully().get();
        } catch (Exception e) {
            log.warn("Failed to close pulsar service", e);
            throw e;
        }
    }

    @Test
    public void testCreateTopic() {
        final ManagedLedger managedLedger = (ManagedLedger) Mockito.mock(ManagedLedger.class);
        ((ManagedLedger) Mockito.doReturn(new ArrayList()).when(managedLedger)).getCursors();
        ((ManagedLedgerFactory) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.1
            public Object answer(InvocationOnMock invocationOnMock) {
                ((AsyncCallbacks.OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(managedLedger, (Object) null);
                return null;
            }
        }).when(this.mlFactoryMock)).asyncOpen(Mockito.anyString(), (ManagedLedgerConfig) ArgumentMatchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback) ArgumentMatchers.any(AsyncCallbacks.OpenLedgerCallback.class), (Supplier) ArgumentMatchers.any(Supplier.class), ArgumentMatchers.any());
        try {
            this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/topic1").thenAccept(topic -> {
                Assert.assertTrue(topic.toString().contains("persistent://prop/use/ns-abc/topic1"));
            }).exceptionally(th -> {
                Assert.fail("should not fail");
                return null;
            }).get(1L, TimeUnit.SECONDS);
        } catch (Exception e) {
            Assert.fail("Should not fail or time out");
        }
    }

    @Test
    public void testCreateTopicMLFailure() {
        ((ManagedLedgerFactory) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.2
            public Object answer(InvocationOnMock invocationOnMock) {
                new Thread(() -> {
                    ((AsyncCallbacks.OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), (Object) null);
                }).start();
                return null;
            }
        }).when(this.mlFactoryMock)).asyncOpen(Mockito.anyString(), (ManagedLedgerConfig) ArgumentMatchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback) ArgumentMatchers.any(AsyncCallbacks.OpenLedgerCallback.class), (Supplier) ArgumentMatchers.any(Supplier.class), ArgumentMatchers.any());
        try {
            this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/topic3").get(1L, TimeUnit.SECONDS);
            Assert.fail("should have failed");
        } catch (TimeoutException e) {
            Assert.fail("Should not time out");
        } catch (Exception e2) {
        }
    }

    @Test
    public void testPublishMessage() throws Exception {
        ((ManagedLedger) Mockito.doAnswer(invocationOnMock -> {
            ByteBuf byteBuf = (ByteBuf) invocationOnMock.getArguments()[0];
            ((AsyncCallbacks.AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete(PositionImpl.latest, byteBuf, (Topic.PublishContext) invocationOnMock.getArguments()[2]);
            return null;
        }).when(this.ledgerMock)).asyncAddEntry((ByteBuf) ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback) ArgumentMatchers.any(AsyncCallbacks.AddEntryCallback.class), ArgumentMatchers.any());
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        final ByteBuf wrappedBuffer = Unpooled.wrappedBuffer("content".getBytes());
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        persistentTopic.publishMessage(wrappedBuffer, new Topic.PublishContext() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.3
            public void completed(Exception exc, long j, long j2) {
                Assert.assertEquals(j, PositionImpl.latest.getLedgerId());
                Assert.assertEquals(j2, PositionImpl.latest.getEntryId());
                countDownLatch.countDown();
            }

            public void setMetadataFromEntryData(ByteBuf byteBuf) {
                Assert.assertEquals(countDownLatch.getCount(), 1L);
                Assert.assertEquals(byteBuf.array(), wrappedBuffer.array());
            }
        });
        Assert.assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
    }

    @Test
    public void testDispatcherMultiConsumerReadFailed() throws Exception {
        PersistentTopic persistentTopic = (PersistentTopic) BrokerTestUtil.spyWithClassAndConstructorArgs(PersistentTopic.class, "persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        ManagedCursor managedCursor = (ManagedCursor) Mockito.mock(ManagedCursor.class);
        Mockito.when(managedCursor.getName()).thenReturn("cursor");
        new PersistentDispatcherMultipleConsumers(persistentTopic, managedCursor, (Subscription) null).readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), (Object) null);
        ((PersistentTopic) Mockito.verify(persistentTopic, Mockito.atLeast(1))).getBrokerService();
    }

    @Test
    public void testDispatcherSingleConsumerReadFailed() throws Exception {
        PersistentTopic persistentTopic = (PersistentTopic) BrokerTestUtil.spyWithClassAndConstructorArgs(PersistentTopic.class, "persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        ManagedCursor managedCursor = (ManagedCursor) Mockito.mock(ManagedCursor.class);
        Mockito.when(managedCursor.getName()).thenReturn("cursor");
        new PersistentDispatcherSingleActiveConsumer(managedCursor, CommandSubscribe.SubType.Exclusive, 1, persistentTopic, (Subscription) null).readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), (Consumer) Mockito.mock(Consumer.class));
        ((PersistentTopic) Mockito.verify(persistentTopic, Mockito.atLeast(1))).getBrokerService();
    }

    @Test
    public void testPublishMessageMLFailure() throws Exception {
        ManagedLedger managedLedger = (ManagedLedger) Mockito.mock(ManagedLedger.class);
        ((ManagedLedger) Mockito.doReturn(new ArrayList()).when(managedLedger)).getCursors();
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", managedLedger, this.brokerService);
        new MessageMetadata().setPublishTime(System.currentTimeMillis()).setProducerName("prod-name").setSequenceId(1L);
        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer("content".getBytes());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.4
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.AddEntryCallback) invocationOnMock.getArguments()[1]).addFailed(new ManagedLedgerException("Managed ledger failure"), invocationOnMock.getArguments()[2]);
                return null;
            }
        }).when(managedLedger)).asyncAddEntry((ByteBuf) ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback) ArgumentMatchers.any(AsyncCallbacks.AddEntryCallback.class), ArgumentMatchers.any());
        persistentTopic.publishMessage(wrappedBuffer, (exc, j, j2) -> {
            if (exc == null) {
                Assert.fail("publish should have failed");
            } else {
                countDownLatch.countDown();
            }
        });
        Assert.assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
    }

    @Test
    public void testAddRemoveProducer() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        Producer producer = new Producer(persistentTopic, this.serverCnx, 1L, "prod-name", "appid1", false, (Map) null, SchemaVersion.Latest, 0L, false, ProducerAccessMode.Shared, Optional.empty());
        persistentTopic.addProducer(producer, new CompletableFuture());
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        try {
            persistentTopic.addProducer(producer, new CompletableFuture()).join();
            Assert.fail("Should have failed with naming exception because producer 'null' is already connected to the topic");
        } catch (Exception e) {
            Assert.assertEquals(e.getCause().getClass(), BrokerServiceException.NamingException.class);
        }
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        try {
            persistentTopic.addProducer(new Producer(new PersistentTopic("persistent://prop/use/ns-abc/failTopic", this.ledgerMock, this.brokerService), this.serverCnx, 2L, "prod-name", "appid1", false, (Map) null, SchemaVersion.Latest, 0L, false, ProducerAccessMode.Shared, Optional.empty()), new CompletableFuture());
            Assert.fail("should have failed");
        } catch (IllegalArgumentException e2) {
        }
        persistentTopic.removeProducer(new Producer(persistentTopic, this.serverCnx, 1L, "prod-name", "appid1", false, (Map) null, SchemaVersion.Latest, 0L, false, ProducerAccessMode.Shared, Optional.empty()));
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        Assert.assertSame(persistentTopic.getProducers().get(producer.getProducerName()), producer);
        persistentTopic.removeProducer(producer);
        Assert.assertEquals(persistentTopic.getProducers().size(), 0);
        persistentTopic.removeProducer(producer);
    }

    @Test
    public void testProducerOverwrite() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        Producer producer = new Producer(persistentTopic, this.serverCnx, 1L, "prod-name", "appid1", false, (Map) null, SchemaVersion.Latest, 0L, true, ProducerAccessMode.Shared, Optional.empty());
        Producer producer2 = new Producer(persistentTopic, this.serverCnx, 2L, "prod-name", "appid1", false, (Map) null, SchemaVersion.Latest, 0L, true, ProducerAccessMode.Shared, Optional.empty());
        try {
            persistentTopic.addProducer(producer, new CompletableFuture()).join();
            persistentTopic.addProducer(producer2, new CompletableFuture()).join();
            Assert.fail("should have failed");
        } catch (Exception e) {
            Assert.assertEquals(e.getCause().getClass(), BrokerServiceException.NamingException.class);
        }
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        Producer producer3 = new Producer(persistentTopic, this.serverCnx, 2L, "prod-name", "appid1", false, (Map) null, SchemaVersion.Latest, 1L, false, ProducerAccessMode.Shared, Optional.empty());
        try {
            persistentTopic.addProducer(producer3, new CompletableFuture()).join();
            Assert.fail("should have failed");
        } catch (Exception e2) {
            Assert.assertEquals(e2.getCause().getClass(), BrokerServiceException.NamingException.class);
        }
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        persistentTopic.removeProducer(producer);
        Assert.assertEquals(persistentTopic.getProducers().size(), 0);
        Producer producer4 = new Producer(persistentTopic, this.serverCnx, 2L, "prod-name", "appid1", false, (Map) null, SchemaVersion.Latest, 2L, false, ProducerAccessMode.Shared, Optional.empty());
        persistentTopic.addProducer(producer3, new CompletableFuture());
        persistentTopic.addProducer(producer4, new CompletableFuture());
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        persistentTopic.getProducers().values().forEach(producer5 -> {
            Assert.assertEquals(producer5.getEpoch(), 2L);
        });
        persistentTopic.removeProducer(producer4);
        Assert.assertEquals(persistentTopic.getProducers().size(), 0);
        persistentTopic.addProducer(new Producer(persistentTopic, this.serverCnx, 2L, "pulsar.repl.cluster1", "appid1", false, (Map) null, SchemaVersion.Latest, 1L, false, ProducerAccessMode.Shared, Optional.empty()), new CompletableFuture());
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        persistentTopic.addProducer(new Producer(persistentTopic, this.serverCnx, 2L, "pulsar.repl.cluster1", "appid1", false, (Map) null, SchemaVersion.Latest, 2L, false, ProducerAccessMode.Shared, Optional.empty()), new CompletableFuture());
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        persistentTopic.getProducers().values().forEach(producer6 -> {
            Assert.assertEquals(producer6.getEpoch(), 2L);
        });
        persistentTopic.addProducer(new Producer(persistentTopic, this.serverCnx, 2L, "pulsar.repl.cluster1", "appid1", false, (Map) null, SchemaVersion.Latest, 3L, true, ProducerAccessMode.Shared, Optional.empty()), new CompletableFuture());
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        persistentTopic.getProducers().values().forEach(producer7 -> {
            Assert.assertEquals(producer7.getEpoch(), 3L);
        });
    }

    private void testMaxProducers() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        persistentTopic.addProducer(new Producer(persistentTopic, this.serverCnx, 1L, "prod-name1", "appid1", false, (Map) null, SchemaVersion.Latest, 0L, false, ProducerAccessMode.Shared, Optional.empty()), new CompletableFuture());
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        persistentTopic.addProducer(new Producer(persistentTopic, this.serverCnx, 2L, "prod-name2", "appid1", false, (Map) null, SchemaVersion.Latest, 0L, false, ProducerAccessMode.Shared, Optional.empty()), new CompletableFuture());
        Assert.assertEquals(persistentTopic.getProducers().size(), 2);
        try {
            persistentTopic.addProducer(new Producer(persistentTopic, this.serverCnx, 3L, "prod-name3", "appid1", false, (Map) null, SchemaVersion.Latest, 0L, false, ProducerAccessMode.Shared, Optional.empty()), new CompletableFuture()).join();
            Assert.fail("should have failed");
        } catch (Exception e) {
            Assert.assertEquals(e.getCause().getClass(), BrokerServiceException.ProducerBusyException.class);
        }
    }

    @Test
    public void testMaxProducersForBroker() throws Exception {
        ServiceConfiguration serviceConfiguration = (ServiceConfiguration) Mockito.spy(ServiceConfiguration.class);
        ((ServiceConfiguration) Mockito.doReturn(2).when(serviceConfiguration)).getMaxProducersPerTopic();
        ((PulsarService) Mockito.doReturn(serviceConfiguration).when(this.pulsar)).getConfiguration();
        testMaxProducers();
    }

    @Test
    public void testMaxProducersForNamespace() throws Exception {
        ((PulsarService) Mockito.doReturn((ServiceConfiguration) Mockito.spy(ServiceConfiguration.class)).when(this.pulsar)).getConfiguration();
        Policies policies = new Policies();
        policies.max_producers_per_topic = 2;
        Mockito.when(this.pulsar.getConfigurationCache().policiesCache().get(AdminResource.path(new String[]{"policies", TopicName.get("persistent://prop/use/ns-abc/successTopic").getNamespace()}))).thenReturn(Optional.of(policies));
        testMaxProducers();
    }

    private Producer getMockedProducerWithSpecificAddress(Topic topic, long j, InetAddress inetAddress) throws Exception {
        ServerCnx serverCnx = (ServerCnx) BrokerTestUtil.spyWithClassAndConstructorArgs(ServerCnx.class, this.pulsar);
        ((ServerCnx) Mockito.doReturn(true).when(serverCnx)).isActive();
        ((ServerCnx) Mockito.doReturn(true).when(serverCnx)).isWritable();
        ((ServerCnx) Mockito.doReturn(new InetSocketAddress(inetAddress, 1234)).when(serverCnx)).clientAddress();
        ((ServerCnx) Mockito.doReturn(inetAddress.getHostAddress()).when(serverCnx)).clientSourceAddress();
        ((ServerCnx) Mockito.doReturn(new PulsarCommandSenderImpl((BrokerInterceptor) null, serverCnx)).when(serverCnx)).getCommandSender();
        return new Producer(topic, serverCnx, j, "producer" + j, "appid1", false, (Map) null, SchemaVersion.Latest, 0L, false, ProducerAccessMode.Shared, Optional.empty());
    }

    @Test
    public void testMaxSameAddressProducers() throws Exception {
        ServiceConfiguration serviceConfiguration = (ServiceConfiguration) Mockito.spy(ServiceConfiguration.class);
        ((ServiceConfiguration) Mockito.doReturn(2).when(serviceConfiguration)).getMaxSameAddressProducersPerTopic();
        ((PulsarService) Mockito.doReturn(serviceConfiguration).when(this.pulsar)).getConfiguration();
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        InetAddress byName = InetAddress.getByName("127.0.0.1");
        InetAddress byName2 = InetAddress.getByName("0.0.0.0");
        String hostAddress = byName.getHostAddress();
        String hostAddress2 = byName2.getHostAddress();
        Producer mockedProducerWithSpecificAddress = getMockedProducerWithSpecificAddress(persistentTopic, 1L, byName);
        persistentTopic.addProducer(mockedProducerWithSpecificAddress, new CompletableFuture());
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        Assert.assertEquals(persistentTopic.getNumberOfSameAddressProducers(hostAddress), 1);
        persistentTopic.addProducer(getMockedProducerWithSpecificAddress(persistentTopic, 2L, byName), new CompletableFuture());
        Assert.assertEquals(persistentTopic.getProducers().size(), 2);
        Assert.assertEquals(persistentTopic.getNumberOfSameAddressProducers(hostAddress), 2);
        try {
            persistentTopic.addProducer(getMockedProducerWithSpecificAddress(persistentTopic, 3L, byName), new CompletableFuture()).join();
            Assert.fail("should have failed");
        } catch (Exception e) {
            Assert.assertEquals(e.getCause().getClass(), BrokerServiceException.ProducerBusyException.class);
        }
        Assert.assertEquals(persistentTopic.getProducers().size(), 2);
        Assert.assertEquals(persistentTopic.getNumberOfSameAddressProducers(hostAddress), 2);
        persistentTopic.addProducer(getMockedProducerWithSpecificAddress(persistentTopic, 4L, byName2), new CompletableFuture());
        Assert.assertEquals(persistentTopic.getProducers().size(), 3);
        Assert.assertEquals(persistentTopic.getNumberOfSameAddressProducers(hostAddress2), 1);
        persistentTopic.addProducer(getMockedProducerWithSpecificAddress(persistentTopic, 5L, byName2), new CompletableFuture());
        Assert.assertEquals(persistentTopic.getProducers().size(), 4);
        Assert.assertEquals(persistentTopic.getNumberOfSameAddressProducers(hostAddress2), 2);
        try {
            persistentTopic.addProducer(getMockedProducerWithSpecificAddress(persistentTopic, 6L, byName2), new CompletableFuture()).join();
            Assert.fail("should have failed");
        } catch (Exception e2) {
            Assert.assertEquals(e2.getCause().getClass(), BrokerServiceException.ProducerBusyException.class);
        }
        Assert.assertEquals(persistentTopic.getProducers().size(), 4);
        Assert.assertEquals(persistentTopic.getNumberOfSameAddressProducers(hostAddress2), 2);
        persistentTopic.removeProducer(mockedProducerWithSpecificAddress);
        Assert.assertEquals(persistentTopic.getProducers().size(), 3);
        Assert.assertEquals(persistentTopic.getNumberOfSameAddressProducers(hostAddress), 1);
        persistentTopic.addProducer(getMockedProducerWithSpecificAddress(persistentTopic, 7L, byName), new CompletableFuture());
        Assert.assertEquals(persistentTopic.getProducers().size(), 4);
        Assert.assertEquals(persistentTopic.getNumberOfSameAddressProducers(hostAddress), 2);
    }

    @Test
    public void testSubscribeFail() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        CommandSubscribe subType = new CommandSubscribe().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("").setConsumerName("consumer-name").setReadCompacted(false).setRequestId(1L).setSubType(CommandSubscribe.SubType.Exclusive);
        try {
            persistentTopic.subscribe(this.serverCnx, subType.getSubscription(), subType.getConsumerId(), subType.getSubType(), 0, subType.getConsumerName(), subType.isDurable(), (MessageId) null, Collections.emptyMap(), subType.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, (KeySharedMeta) null).get();
            Assert.fail("should fail with exception");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof BrokerServiceException.NamingException);
        }
    }

    @Test
    public void testSubscribeUnsubscribe() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        CommandSubscribe subType = new CommandSubscribe().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setConsumerName("consumer-name").setReadCompacted(false).setRequestId(1L).setSubType(CommandSubscribe.SubType.Exclusive);
        persistentTopic.subscribe(this.serverCnx, subType.getSubscription(), subType.getConsumerId(), subType.getSubType(), 0, subType.getConsumerName(), subType.isDurable(), (MessageId) null, Collections.emptyMap(), subType.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, (KeySharedMeta) null).get();
        try {
            persistentTopic.subscribe(this.serverCnx, subType.getSubscription(), subType.getConsumerId(), subType.getSubType(), 0, subType.getConsumerName(), subType.isDurable(), (MessageId) null, Collections.emptyMap(), subType.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, (KeySharedMeta) null).get();
            Assert.fail("should fail with exception");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof BrokerServiceException.ConsumerBusyException);
        }
        persistentTopic.unsubscribe("successSub").get();
        Assert.assertNull(persistentTopic.getSubscription("successSub"));
    }

    @Test
    public void testChangeSubscriptionType() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription persistentSubscription = new PersistentSubscription(persistentTopic, "change-sub-type", this.cursorMock, false);
        Consumer consumer = new Consumer(persistentSubscription, CommandSubscribe.SubType.Exclusive, persistentTopic.getName(), 1L, 0, "Cons1", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest);
        persistentSubscription.addConsumer(consumer);
        consumer.close();
        CommandSubscribe.SubType subType = CommandSubscribe.SubType.Exclusive;
        Iterator it = Lists.newArrayList(new CommandSubscribe.SubType[]{CommandSubscribe.SubType.Shared, CommandSubscribe.SubType.Failover, CommandSubscribe.SubType.Key_Shared, CommandSubscribe.SubType.Exclusive}).iterator();
        while (it.hasNext()) {
            CommandSubscribe.SubType subType2 = (CommandSubscribe.SubType) it.next();
            Dispatcher dispatcher = persistentSubscription.getDispatcher();
            Consumer consumer2 = new Consumer(persistentSubscription, subType2, persistentTopic.getName(), 1L, 0, "Cons1", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest);
            persistentSubscription.addConsumer(consumer2);
            Assert.assertTrue(persistentSubscription.getDispatcher().isConsumerConnected());
            Assert.assertFalse(persistentSubscription.getDispatcher().isClosed());
            Assert.assertEquals(persistentSubscription.getDispatcher().getType(), subType2);
            Assert.assertFalse(dispatcher.isConsumerConnected());
            Assert.assertTrue(dispatcher.isClosed());
            Assert.assertEquals(dispatcher.getType(), subType);
            consumer2.close();
            subType = subType2;
        }
    }

    @Test
    public void testAddRemoveConsumer() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription persistentSubscription = new PersistentSubscription(persistentTopic, "sub-1", this.cursorMock, false);
        Consumer consumer = new Consumer(persistentSubscription, CommandSubscribe.SubType.Exclusive, persistentTopic.getName(), 1L, 0, "Cons1", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest);
        persistentSubscription.addConsumer(consumer);
        Assert.assertTrue(persistentSubscription.getDispatcher().isConsumerConnected());
        try {
            persistentSubscription.addConsumer(consumer).get();
            Assert.fail("Should fail with ConsumerBusyException");
        } catch (Exception e) {
            Assert.assertTrue(e.getCause() instanceof BrokerServiceException.ConsumerBusyException);
        }
        persistentSubscription.removeConsumer(consumer);
        Assert.assertFalse(persistentSubscription.getDispatcher().isConsumerConnected());
        try {
            persistentSubscription.removeConsumer(consumer);
            Assert.fail("Should fail with ServerMetadataException");
        } catch (BrokerServiceException e2) {
            Assert.assertTrue(e2 instanceof BrokerServiceException.ServerMetadataException);
        }
    }

    @Test
    public void testAddRemoveConsumerDurableCursor() throws Exception {
        ((ManagedCursor) Mockito.doReturn(false).when(this.cursorMock)).isDurable();
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription persistentSubscription = new PersistentSubscription(persistentTopic, "non-durable-sub", this.cursorMock, false);
        Consumer consumer = new Consumer(persistentSubscription, CommandSubscribe.SubType.Exclusive, persistentTopic.getName(), 1L, 0, "Cons1", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest);
        persistentSubscription.addConsumer(consumer);
        Assert.assertFalse(persistentSubscription.getDispatcher().isClosed());
        persistentSubscription.removeConsumer(consumer);
        for (int i = 0; i < 100 && !persistentSubscription.getDispatcher().isClosed(); i++) {
            Thread.sleep(100L);
        }
        Assert.assertTrue(persistentSubscription.getDispatcher().isClosed());
    }

    private void testMaxConsumersShared() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription persistentSubscription = new PersistentSubscription(persistentTopic, "sub-1", this.cursorMock, false);
        PersistentSubscription persistentSubscription2 = new PersistentSubscription(persistentTopic, "sub-2", this.cursorMock, false);
        Method declaredMethod = AbstractTopic.class.getDeclaredMethod("addConsumerToSubscription", Subscription.class, Consumer.class);
        declaredMethod.setAccessible(true);
        ConcurrentOpenHashMap build = ConcurrentOpenHashMap.newBuilder().expectedItems(16).concurrencyLevel(1).build();
        build.put("sub-1", persistentSubscription);
        build.put("sub-2", persistentSubscription2);
        Field declaredField = persistentTopic.getClass().getDeclaredField("subscriptions");
        declaredField.setAccessible(true);
        declaredField.set(persistentTopic, build);
        declaredMethod.invoke(persistentTopic, persistentSubscription, new Consumer(persistentSubscription, CommandSubscribe.SubType.Shared, persistentTopic.getName(), 1L, 0, "Cons1", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest));
        Assert.assertEquals(persistentSubscription.getConsumers().size(), 1);
        declaredMethod.invoke(persistentTopic, persistentSubscription, new Consumer(persistentSubscription, CommandSubscribe.SubType.Shared, persistentTopic.getName(), 2L, 0, "Cons2", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest));
        Assert.assertEquals(persistentSubscription.getConsumers().size(), 2);
        try {
            ((CompletableFuture) declaredMethod.invoke(persistentTopic, persistentSubscription, new Consumer(persistentSubscription, CommandSubscribe.SubType.Shared, persistentTopic.getName(), 3L, 0, "Cons3", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest))).get();
            Assert.fail("should have failed");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof BrokerServiceException.ConsumerBusyException);
        }
        Assert.assertEquals(persistentTopic.getNumberOfConsumers(), 2);
        declaredMethod.invoke(persistentTopic, persistentSubscription2, new Consumer(persistentSubscription2, CommandSubscribe.SubType.Shared, persistentTopic.getName(), 4L, 0, "Cons4", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest));
        Assert.assertEquals(persistentSubscription2.getConsumers().size(), 1);
        Assert.assertEquals(persistentTopic.getNumberOfConsumers(), 3);
        try {
            ((CompletableFuture) declaredMethod.invoke(persistentTopic, persistentSubscription2, new Consumer(persistentSubscription2, CommandSubscribe.SubType.Shared, persistentTopic.getName(), 5L, 0, "Cons5", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest))).get();
            Assert.fail("should have failed");
        } catch (ExecutionException e2) {
            Assert.assertTrue(e2.getCause() instanceof BrokerServiceException.ConsumerBusyException);
        }
    }

    @Test
    public void testMaxConsumersSharedForBroker() throws Exception {
        ServiceConfiguration serviceConfiguration = (ServiceConfiguration) Mockito.spy(ServiceConfiguration.class);
        ((ServiceConfiguration) Mockito.doReturn(2).when(serviceConfiguration)).getMaxConsumersPerSubscription();
        ((ServiceConfiguration) Mockito.doReturn(3).when(serviceConfiguration)).getMaxConsumersPerTopic();
        ((PulsarService) Mockito.doReturn(serviceConfiguration).when(this.pulsar)).getConfiguration();
        testMaxConsumersShared();
    }

    @Test
    public void testMaxConsumersSharedForNamespace() throws Exception {
        ((PulsarService) Mockito.doReturn((ServiceConfiguration) Mockito.spy(ServiceConfiguration.class)).when(this.pulsar)).getConfiguration();
        Policies policies = new Policies();
        policies.max_consumers_per_subscription = 2;
        policies.max_consumers_per_topic = 3;
        Mockito.when((Policies) this.pulsar.getConfigurationCache().policiesCache().getDataIfPresent(AdminResource.path(new String[]{"policies", TopicName.get("persistent://prop/use/ns-abc/successTopic").getNamespace()}))).thenReturn(policies);
        testMaxConsumersShared();
    }

    private void testMaxConsumersFailover() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription persistentSubscription = new PersistentSubscription(persistentTopic, "sub-1", this.cursorMock, false);
        PersistentSubscription persistentSubscription2 = new PersistentSubscription(persistentTopic, "sub-2", this.cursorMock, false);
        Method declaredMethod = AbstractTopic.class.getDeclaredMethod("addConsumerToSubscription", Subscription.class, Consumer.class);
        declaredMethod.setAccessible(true);
        ConcurrentOpenHashMap build = ConcurrentOpenHashMap.newBuilder().expectedItems(16).concurrencyLevel(1).build();
        build.put("sub-1", persistentSubscription);
        build.put("sub-2", persistentSubscription2);
        Field declaredField = persistentTopic.getClass().getDeclaredField("subscriptions");
        declaredField.setAccessible(true);
        declaredField.set(persistentTopic, build);
        declaredMethod.invoke(persistentTopic, persistentSubscription, new Consumer(persistentSubscription, CommandSubscribe.SubType.Failover, persistentTopic.getName(), 1L, 0, "Cons1", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest));
        Assert.assertEquals(persistentSubscription.getConsumers().size(), 1);
        declaredMethod.invoke(persistentTopic, persistentSubscription, new Consumer(persistentSubscription, CommandSubscribe.SubType.Failover, persistentTopic.getName(), 2L, 0, "Cons2", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest));
        Assert.assertEquals(persistentSubscription.getConsumers().size(), 2);
        try {
            ((CompletableFuture) declaredMethod.invoke(persistentTopic, persistentSubscription, new Consumer(persistentSubscription, CommandSubscribe.SubType.Failover, persistentTopic.getName(), 3L, 0, "Cons3", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest))).get();
            Assert.fail("should have failed");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof BrokerServiceException.ConsumerBusyException);
        }
        Assert.assertEquals(persistentTopic.getNumberOfConsumers(), 2);
        declaredMethod.invoke(persistentTopic, persistentSubscription2, new Consumer(persistentSubscription2, CommandSubscribe.SubType.Failover, persistentTopic.getName(), 4L, 0, "Cons4", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest));
        Assert.assertEquals(persistentSubscription2.getConsumers().size(), 1);
        Assert.assertEquals(persistentTopic.getNumberOfConsumers(), 3);
        try {
            ((CompletableFuture) declaredMethod.invoke(persistentTopic, persistentSubscription2, new Consumer(persistentSubscription2, CommandSubscribe.SubType.Failover, persistentTopic.getName(), 5L, 0, "Cons5", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest))).get();
            Assert.fail("should have failed");
        } catch (ExecutionException e2) {
            Assert.assertTrue(e2.getCause() instanceof BrokerServiceException.ConsumerBusyException);
        }
    }

    @Test
    public void testMaxConsumersFailoverForBroker() throws Exception {
        ServiceConfiguration serviceConfiguration = (ServiceConfiguration) Mockito.spy(ServiceConfiguration.class);
        ((ServiceConfiguration) Mockito.doReturn(2).when(serviceConfiguration)).getMaxConsumersPerSubscription();
        ((ServiceConfiguration) Mockito.doReturn(3).when(serviceConfiguration)).getMaxConsumersPerTopic();
        ((PulsarService) Mockito.doReturn(serviceConfiguration).when(this.pulsar)).getConfiguration();
        testMaxConsumersFailover();
    }

    @Test
    public void testMaxConsumersFailoverForNamespace() throws Exception {
        ((PulsarService) Mockito.doReturn((ServiceConfiguration) Mockito.spy(ServiceConfiguration.class)).when(this.pulsar)).getConfiguration();
        Policies policies = new Policies();
        policies.max_consumers_per_subscription = 2;
        policies.max_consumers_per_topic = 3;
        Mockito.when((Policies) this.pulsar.getConfigurationCache().policiesCache().getDataIfPresent(AdminResource.path(new String[]{"policies", TopicName.get("persistent://prop/use/ns-abc/successTopic").getNamespace()}))).thenReturn(policies);
        testMaxConsumersFailover();
    }

    private Consumer getMockedConsumerWithSpecificAddress(Topic topic, Subscription subscription, long j, InetAddress inetAddress) throws Exception {
        ServerCnx serverCnx = (ServerCnx) BrokerTestUtil.spyWithClassAndConstructorArgs(ServerCnx.class, this.pulsar);
        ((ServerCnx) Mockito.doReturn(true).when(serverCnx)).isActive();
        ((ServerCnx) Mockito.doReturn(true).when(serverCnx)).isWritable();
        ((ServerCnx) Mockito.doReturn(new InetSocketAddress(inetAddress, 1234)).when(serverCnx)).clientAddress();
        ((ServerCnx) Mockito.doReturn(inetAddress.getHostAddress()).when(serverCnx)).clientSourceAddress();
        ((ServerCnx) Mockito.doReturn(new PulsarCommandSenderImpl((BrokerInterceptor) null, serverCnx)).when(serverCnx)).getCommandSender();
        return new Consumer(subscription, CommandSubscribe.SubType.Shared, topic.getName(), j, 0, "consumer" + j, 50000, serverCnx, "appid1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest);
    }

    @Test
    public void testMaxSameAddressConsumers() throws Exception {
        ServiceConfiguration serviceConfiguration = (ServiceConfiguration) Mockito.spy(ServiceConfiguration.class);
        ((ServiceConfiguration) Mockito.doReturn(2).when(serviceConfiguration)).getMaxSameAddressConsumersPerTopic();
        ((PulsarService) Mockito.doReturn(serviceConfiguration).when(this.pulsar)).getConfiguration();
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription persistentSubscription = new PersistentSubscription(persistentTopic, "sub1", this.cursorMock, false);
        PersistentSubscription persistentSubscription2 = new PersistentSubscription(persistentTopic, "sub2", this.cursorMock, false);
        InetAddress byName = InetAddress.getByName("127.0.0.1");
        InetAddress byName2 = InetAddress.getByName("0.0.0.0");
        String hostAddress = byName.getHostAddress();
        String hostAddress2 = byName2.getHostAddress();
        Method declaredMethod = AbstractTopic.class.getDeclaredMethod("addConsumerToSubscription", Subscription.class, Consumer.class);
        declaredMethod.setAccessible(true);
        ConcurrentOpenHashMap build = ConcurrentOpenHashMap.newBuilder().expectedItems(16).concurrencyLevel(1).build();
        build.put("sub1", persistentSubscription);
        build.put("sub2", persistentSubscription2);
        Field declaredField = persistentTopic.getClass().getDeclaredField("subscriptions");
        declaredField.setAccessible(true);
        declaredField.set(persistentTopic, build);
        Consumer mockedConsumerWithSpecificAddress = getMockedConsumerWithSpecificAddress(persistentTopic, persistentSubscription, 1L, byName);
        ((CompletableFuture) declaredMethod.invoke(persistentTopic, persistentSubscription, mockedConsumerWithSpecificAddress)).get();
        Assert.assertEquals(persistentTopic.getNumberOfConsumers(), 1);
        Assert.assertEquals(persistentTopic.getNumberOfSameAddressConsumers(hostAddress), 1);
        Assert.assertEquals(persistentSubscription.getNumberOfSameAddressConsumers(hostAddress), 1);
        ((CompletableFuture) declaredMethod.invoke(persistentTopic, persistentSubscription2, getMockedConsumerWithSpecificAddress(persistentTopic, persistentSubscription2, 2L, byName))).get();
        Assert.assertEquals(persistentTopic.getNumberOfConsumers(), 2);
        Assert.assertEquals(persistentTopic.getNumberOfSameAddressConsumers(hostAddress), 2);
        Assert.assertEquals(persistentSubscription.getNumberOfSameAddressConsumers(hostAddress), 1);
        Assert.assertEquals(persistentSubscription2.getNumberOfSameAddressConsumers(hostAddress), 1);
        ((CompletableFuture) declaredMethod.invoke(persistentTopic, persistentSubscription, getMockedConsumerWithSpecificAddress(persistentTopic, persistentSubscription, 3L, byName2))).get();
        Assert.assertEquals(persistentTopic.getNumberOfConsumers(), 3);
        Assert.assertEquals(persistentTopic.getNumberOfSameAddressConsumers(hostAddress), 2);
        Assert.assertEquals(persistentTopic.getNumberOfSameAddressConsumers(hostAddress2), 1);
        Assert.assertEquals(persistentSubscription.getNumberOfSameAddressConsumers(hostAddress), 1);
        Assert.assertEquals(persistentSubscription.getNumberOfSameAddressConsumers(hostAddress2), 1);
        ((CompletableFuture) declaredMethod.invoke(persistentTopic, persistentSubscription2, getMockedConsumerWithSpecificAddress(persistentTopic, persistentSubscription2, 4L, byName2))).get();
        Assert.assertEquals(persistentTopic.getNumberOfConsumers(), 4);
        Assert.assertEquals(persistentTopic.getNumberOfSameAddressConsumers(hostAddress), 2);
        Assert.assertEquals(persistentTopic.getNumberOfSameAddressConsumers(hostAddress2), 2);
        Assert.assertEquals(persistentSubscription2.getNumberOfSameAddressConsumers(hostAddress), 1);
        Assert.assertEquals(persistentSubscription2.getNumberOfSameAddressConsumers(hostAddress2), 1);
        try {
            ((CompletableFuture) declaredMethod.invoke(persistentTopic, persistentSubscription, getMockedConsumerWithSpecificAddress(persistentTopic, persistentSubscription, 5L, byName))).get();
            Assert.fail("should have failed");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof BrokerServiceException.ConsumerBusyException);
        }
        Assert.assertEquals(persistentTopic.getNumberOfConsumers(), 4);
        Assert.assertEquals(persistentTopic.getNumberOfSameAddressConsumers(hostAddress), 2);
        Assert.assertEquals(persistentSubscription.getNumberOfSameAddressConsumers(hostAddress), 1);
        try {
            ((CompletableFuture) declaredMethod.invoke(persistentTopic, persistentSubscription2, getMockedConsumerWithSpecificAddress(persistentTopic, persistentSubscription2, 6L, byName2))).get();
            Assert.fail("should have failed");
        } catch (ExecutionException e2) {
            Assert.assertTrue(e2.getCause() instanceof BrokerServiceException.ConsumerBusyException);
        }
        Assert.assertEquals(persistentTopic.getNumberOfConsumers(), 4);
        Assert.assertEquals(persistentTopic.getNumberOfSameAddressConsumers(hostAddress2), 2);
        Assert.assertEquals(persistentSubscription2.getNumberOfSameAddressConsumers(hostAddress2), 1);
        mockedConsumerWithSpecificAddress.close();
        Assert.assertEquals(persistentTopic.getNumberOfConsumers(), 3);
        Assert.assertEquals(persistentTopic.getNumberOfSameAddressConsumers(hostAddress), 1);
        Assert.assertEquals(persistentSubscription.getNumberOfSameAddressConsumers(hostAddress), 0);
        declaredMethod.invoke(persistentTopic, persistentSubscription, getMockedConsumerWithSpecificAddress(persistentTopic, persistentSubscription, 7L, byName));
        Assert.assertEquals(persistentTopic.getNumberOfConsumers(), 4);
        Assert.assertEquals(persistentTopic.getNumberOfSameAddressConsumers(hostAddress), 2);
        Assert.assertEquals(persistentSubscription.getNumberOfSameAddressConsumers(hostAddress), 1);
    }

    @Test
    public void testUbsubscribeRaceConditions() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription persistentSubscription = new PersistentSubscription(persistentTopic, "sub-1", this.cursorMock, false);
        Consumer consumer = new Consumer(persistentSubscription, CommandSubscribe.SubType.Exclusive, persistentTopic.getName(), 1L, 0, "Cons1", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest);
        persistentSubscription.addConsumer(consumer);
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.5
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete((Object) null);
                Thread.sleep(1000L);
                return null;
            }
        }).when(this.ledgerMock)).asyncDeleteCursor(Mockito.matches(".*success.*"), (AsyncCallbacks.DeleteCursorCallback) ArgumentMatchers.any(AsyncCallbacks.DeleteCursorCallback.class), ArgumentMatchers.any());
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        try {
            newCachedThreadPool.submit(() -> {
                persistentSubscription.doUnsubscribe(consumer);
                return null;
            }).get();
            try {
                Thread.sleep(10L);
                persistentSubscription.addConsumer(new Consumer(persistentSubscription, CommandSubscribe.SubType.Exclusive, persistentTopic.getName(), 2L, 0, "Cons2", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest)).get();
                Assert.fail();
            } catch (Exception e) {
                Assert.assertTrue(e.getCause() instanceof BrokerServiceException.SubscriptionFencedException);
            }
        } finally {
            if (Collections.singletonList(newCachedThreadPool).get(0) != null) {
                newCachedThreadPool.shutdownNow();
            }
        }
    }

    @Test
    public void testCloseTopic() throws Exception {
        PersistentTopic persistentTopic = (PersistentTopic) this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/successTopic").get();
        Field declaredField = AbstractTopic.class.getDeclaredField("isFenced");
        declaredField.setAccessible(true);
        Field declaredField2 = PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
        declaredField2.setAccessible(true);
        Assert.assertFalse(((Boolean) declaredField.get(persistentTopic)).booleanValue());
        Assert.assertFalse(((Boolean) declaredField2.get(persistentTopic)).booleanValue());
        persistentTopic.close().get();
        Assert.assertFalse(this.brokerService.getTopicReference("persistent://prop/use/ns-abc/successTopic").isPresent());
        Assert.assertTrue(((Boolean) declaredField.get(persistentTopic)).booleanValue());
        Assert.assertTrue(((Boolean) declaredField2.get(persistentTopic)).booleanValue());
        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer("content".getBytes());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        persistentTopic.publishMessage(wrappedBuffer, (exc, j, j2) -> {
            Assert.assertTrue(exc instanceof BrokerServiceException.TopicFencedException);
            countDownLatch.countDown();
        });
        Assert.assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
        Assert.assertTrue(((Boolean) declaredField.get(persistentTopic)).booleanValue());
        Assert.assertTrue(((Boolean) declaredField2.get(persistentTopic)).booleanValue());
    }

    @Test
    public void testDeleteTopic() throws Exception {
        PersistentTopic persistentTopic = (PersistentTopic) this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/successTopic").get();
        Field declaredField = AbstractTopic.class.getDeclaredField("isFenced");
        declaredField.setAccessible(true);
        Field declaredField2 = PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
        declaredField2.setAccessible(true);
        Assert.assertFalse(((Boolean) declaredField.get(persistentTopic)).booleanValue());
        Assert.assertFalse(((Boolean) declaredField2.get(persistentTopic)).booleanValue());
        persistentTopic.delete().get();
        Assert.assertFalse(this.brokerService.getTopicReference("persistent://prop/use/ns-abc/successTopic").isPresent());
        Assert.assertTrue(((Boolean) declaredField.get(persistentTopic)).booleanValue());
        Assert.assertTrue(((Boolean) declaredField2.get(persistentTopic)).booleanValue());
        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer("content".getBytes());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        persistentTopic.publishMessage(wrappedBuffer, (exc, j, j2) -> {
            Assert.assertTrue(exc instanceof BrokerServiceException.TopicFencedException);
            countDownLatch.countDown();
        });
        Assert.assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
        Assert.assertTrue(((Boolean) declaredField.get(persistentTopic)).booleanValue());
        Assert.assertTrue(((Boolean) declaredField2.get(persistentTopic)).booleanValue());
        PersistentTopic persistentTopic2 = (PersistentTopic) this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/successTopic").get();
        Producer producer = new Producer(persistentTopic2, this.serverCnx, 1L, "prod-name", "appid1", false, (Map) null, SchemaVersion.Latest, 0L, false, ProducerAccessMode.Shared, Optional.empty());
        persistentTopic2.addProducer(producer, new CompletableFuture()).join();
        Assert.assertTrue(persistentTopic2.delete().isCompletedExceptionally());
        Assert.assertFalse(((Boolean) declaredField.get(persistentTopic2)).booleanValue());
        Assert.assertFalse(((Boolean) declaredField2.get(persistentTopic2)).booleanValue());
        persistentTopic2.removeProducer(producer);
        CommandSubscribe subType = new CommandSubscribe().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setConsumerName("consumer-name").setRequestId(1L).setSubType(CommandSubscribe.SubType.Exclusive);
        persistentTopic2.subscribe(this.serverCnx, subType.getSubscription(), subType.getConsumerId(), subType.getSubType(), 0, subType.getConsumerName(), subType.isDurable(), (MessageId) null, Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, 0L, false, (KeySharedMeta) null).get();
        Assert.assertTrue(persistentTopic2.delete().isCompletedExceptionally());
        Assert.assertFalse(((Boolean) declaredField.get(persistentTopic2)).booleanValue());
        Assert.assertFalse(((Boolean) declaredField2.get(persistentTopic2)).booleanValue());
        persistentTopic2.unsubscribe("successSub");
    }

    @Test
    public void testDeleteAndUnsubscribeTopic() throws Exception {
        final PersistentTopic persistentTopic = (PersistentTopic) this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/successTopic").get();
        CommandSubscribe subType = new CommandSubscribe().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setConsumerName("consumer-name").setRequestId(1L).setReadCompacted(false).setSubType(CommandSubscribe.SubType.Exclusive);
        persistentTopic.subscribe(this.serverCnx, subType.getSubscription(), subType.getConsumerId(), subType.getSubType(), 0, subType.getConsumerName(), subType.isDurable(), (MessageId) null, Collections.emptyMap(), subType.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, (KeySharedMeta) null).get();
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.6
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    cyclicBarrier.await();
                    Assert.assertFalse(persistentTopic.delete().isCompletedExceptionally());
                } catch (Exception e) {
                    e.printStackTrace();
                    atomicBoolean.set(true);
                } finally {
                    countDownLatch.countDown();
                }
            }
        };
        Thread thread2 = new Thread() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.7
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    cyclicBarrier.await();
                    persistentTopic.unsubscribe("successSub");
                } catch (Exception e) {
                    e.printStackTrace();
                    atomicBoolean.set(true);
                } finally {
                    countDownLatch.countDown();
                }
            }
        };
        thread.start();
        thread2.start();
        countDownLatch.await();
        Assert.assertFalse(atomicBoolean.get());
    }

    @Test(enabled = false)
    public void testConcurrentTopicAndSubscriptionDelete() throws Exception {
        final PersistentTopic persistentTopic = (PersistentTopic) this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/successTopic").get();
        CommandSubscribe subType = new CommandSubscribe().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setRequestId(1L).setSubType(CommandSubscribe.SubType.Exclusive);
        persistentTopic.subscribe(this.serverCnx, subType.getSubscription(), subType.getConsumerId(), subType.getSubType(), 0, subType.getConsumerName(), subType.isDurable(), (MessageId) null, Collections.emptyMap(), subType.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, (KeySharedMeta) null).get();
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.8
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    cyclicBarrier.await();
                    Thread.sleep(5L, 0);
                    PersistentTopicTest.log.info("deleter outcome is {}", persistentTopic.delete().get());
                } catch (Exception e) {
                    e.printStackTrace();
                    atomicBoolean.set(true);
                } finally {
                    countDownLatch.countDown();
                }
            }
        };
        Thread thread2 = new Thread() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.9
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    cyclicBarrier.await();
                    PersistentSubscription persistentSubscription = (PersistentSubscription) persistentTopic.getSubscriptions().get("successSub");
                    PersistentTopicTest.log.info("unsubscriber outcome is {}", persistentSubscription.doUnsubscribe((Consumer) persistentSubscription.getConsumers().get(0)).get());
                } catch (Exception e) {
                    e.printStackTrace();
                    atomicBoolean.set(true);
                } finally {
                    countDownLatch.countDown();
                }
            }
        };
        thread.start();
        thread2.start();
        countDownLatch.await();
        Assert.assertFalse(atomicBoolean.get());
    }

    @Test
    public void testDeleteTopicRaceConditions() throws Exception {
        PersistentTopic persistentTopic = (PersistentTopic) this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/successTopic").get();
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.10
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                Thread.sleep(1000L);
                ((AsyncCallbacks.DeleteLedgerCallback) invocationOnMock.getArguments()[0]).deleteLedgerComplete((Object) null);
                return null;
            }
        }).when(this.ledgerMock)).asyncDelete((AsyncCallbacks.DeleteLedgerCallback) ArgumentMatchers.any(AsyncCallbacks.DeleteLedgerCallback.class), ArgumentMatchers.any());
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        try {
            newCachedThreadPool.submit(() -> {
                persistentTopic.delete();
                return null;
            }).get();
            try {
                Thread.sleep(10L);
                persistentTopic.addProducer(new Producer(persistentTopic, this.serverCnx, 1L, "prod-name", "appid1", false, (Map) null, SchemaVersion.Latest, 0L, false, ProducerAccessMode.Shared, Optional.empty()), new CompletableFuture()).join();
                Assert.fail("Should have failed");
            } catch (Exception e) {
                Assert.assertEquals(e.getCause().getClass(), BrokerServiceException.TopicFencedException.class);
            }
            CommandSubscribe subType = new CommandSubscribe().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setConsumerName("consumer-name").setReadCompacted(false).setRequestId(1L).setSubType(CommandSubscribe.SubType.Exclusive);
            try {
                persistentTopic.subscribe(this.serverCnx, subType.getSubscription(), subType.getConsumerId(), subType.getSubType(), 0, subType.getConsumerName(), subType.isDurable(), (MessageId) null, Collections.emptyMap(), subType.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, (KeySharedMeta) null).get();
                Assert.fail("should have failed");
            } catch (ExecutionException e2) {
                Assert.assertTrue(e2.getCause() instanceof BrokerServiceException.TopicFencedException);
            }
        } finally {
            if (Collections.singletonList(newCachedThreadPool).get(0) != null) {
                newCachedThreadPool.shutdownNow();
            }
        }
    }

    void setupMLAsyncCallbackMocks() {
        this.ledgerMock = (ManagedLedger) Mockito.mock(ManagedLedger.class);
        this.cursorMock = (ManagedCursor) Mockito.mock(ManagedCursorImpl.class);
        final CompletableFuture completableFuture = new CompletableFuture();
        ((ManagedLedger) Mockito.doReturn(new ArrayList()).when(this.ledgerMock)).getCursors();
        ((ManagedCursor) Mockito.doReturn("mockCursor").when(this.cursorMock)).getName();
        ((ManagedCursor) Mockito.doReturn(true).when(this.cursorMock)).isDurable();
        ((ManagedCursor) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.12
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                return Boolean.valueOf(completableFuture.complete(null));
            }
        }).when(this.cursorMock)).asyncClose(new AsyncCallbacks.CloseCallback() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.11
            public void closeComplete(Object obj) {
                PersistentTopicTest.log.info("[{}] Successfully closed cursor ledger", "mockCursor");
                completableFuture.complete(null);
            }

            public void closeFailed(ManagedLedgerException managedLedgerException, Object obj) {
                PersistentTopicTest.log.error("Error closing cursor for subscription", managedLedgerException);
                completableFuture.completeExceptionally(new BrokerServiceException.PersistenceException(managedLedgerException));
            }
        }, (Object) null);
        ((ManagedLedgerFactory) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.13
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(PersistentTopicTest.this.ledgerMock, (Object) null);
                return null;
            }
        }).when(this.mlFactoryMock)).asyncOpen(Mockito.matches(".*success.*"), (ManagedLedgerConfig) ArgumentMatchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback) ArgumentMatchers.any(AsyncCallbacks.OpenLedgerCallback.class), (Supplier) ArgumentMatchers.any(Supplier.class), ArgumentMatchers.any());
        ((ManagedLedgerFactory) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.14
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), (Object) null);
                return null;
            }
        }).when(this.mlFactoryMock)).asyncOpen(Mockito.matches(".*fail.*"), (ManagedLedgerConfig) ArgumentMatchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback) ArgumentMatchers.any(AsyncCallbacks.OpenLedgerCallback.class), (Supplier) ArgumentMatchers.any(Supplier.class), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.15
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete(new PositionImpl(1L, 1L), (ByteBuf) null, invocationOnMock.getArguments()[2]);
                return null;
            }
        }).when(this.ledgerMock)).asyncAddEntry((ByteBuf) ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback) ArgumentMatchers.any(AsyncCallbacks.AddEntryCallback.class), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.16
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(PersistentTopicTest.this.cursorMock, (Object) null);
                return null;
            }
        }).when(this.ledgerMock)).asyncOpenCursor(Mockito.matches(".*success.*"), (CommandSubscribe.InitialPosition) ArgumentMatchers.any(CommandSubscribe.InitialPosition.class), (AsyncCallbacks.OpenCursorCallback) ArgumentMatchers.any(AsyncCallbacks.OpenCursorCallback.class), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.17
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(PersistentTopicTest.this.cursorMock, (Object) null);
                return null;
            }
        }).when(this.ledgerMock)).asyncOpenCursor(Mockito.matches(".*success.*"), (CommandSubscribe.InitialPosition) ArgumentMatchers.any(CommandSubscribe.InitialPosition.class), (Map) ArgumentMatchers.any(Map.class), (AsyncCallbacks.OpenCursorCallback) ArgumentMatchers.any(AsyncCallbacks.OpenCursorCallback.class), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.18
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.CloseCallback) invocationOnMock.getArguments()[0]).closeComplete((Object) null);
                return null;
            }
        }).when(this.ledgerMock)).asyncClose((AsyncCallbacks.CloseCallback) ArgumentMatchers.any(AsyncCallbacks.CloseCallback.class), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.19
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.DeleteLedgerCallback) invocationOnMock.getArguments()[0]).deleteLedgerComplete((Object) null);
                return null;
            }
        }).when(this.ledgerMock)).asyncDelete((AsyncCallbacks.DeleteLedgerCallback) ArgumentMatchers.any(AsyncCallbacks.DeleteLedgerCallback.class), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.20
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete((Object) null);
                return null;
            }
        }).when(this.ledgerMock)).asyncDeleteCursor(Mockito.matches(".*success.*"), (AsyncCallbacks.DeleteCursorCallback) ArgumentMatchers.any(AsyncCallbacks.DeleteCursorCallback.class), ArgumentMatchers.any());
        ((ManagedCursor) Mockito.doAnswer(invocationOnMock -> {
            ((AsyncCallbacks.MarkDeleteCallback) invocationOnMock.getArguments()[2]).markDeleteComplete(invocationOnMock.getArguments()[3]);
            return null;
        }).when(this.cursorMock)).asyncMarkDelete((Position) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (AsyncCallbacks.MarkDeleteCallback) ArgumentMatchers.any(AsyncCallbacks.MarkDeleteCallback.class), ArgumentMatchers.any());
    }

    @Test
    public void testFailoverSubscription() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        CommandSubscribe subType = new CommandSubscribe().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setConsumerName("consumer-name").setReadCompacted(false).setRequestId(1L).setSubType(CommandSubscribe.SubType.Failover);
        persistentTopic.subscribe(this.serverCnx, subType.getSubscription(), subType.getConsumerId(), subType.getSubType(), 0, subType.getConsumerName(), subType.isDurable(), (MessageId) null, Collections.emptyMap(), subType.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, (KeySharedMeta) null).get();
        PersistentTopic persistentTopic2 = new PersistentTopic("persistent://prop/use/ns-abc/successTopic-partition-0", this.ledgerMock, this.brokerService);
        CommandSubscribe subType2 = new CommandSubscribe().setConsumerId(1L).setConsumerName("C1").setTopic("persistent://prop/use/ns-abc/successTopic-partition-0").setSubscription("successSub").setReadCompacted(false).setRequestId(1L).setSubType(CommandSubscribe.SubType.Failover);
        persistentTopic2.subscribe(this.serverCnx, subType2.getSubscription(), subType2.getConsumerId(), subType2.getSubType(), 0, subType2.getConsumerName(), subType2.isDurable(), (MessageId) null, Collections.emptyMap(), subType2.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, (KeySharedMeta) null).get();
        CommandSubscribe subType3 = new CommandSubscribe().setConsumerId(2L).setConsumerName("C2").setTopic("persistent://prop/use/ns-abc/successTopic-partition-0").setSubscription("successSub").setReadCompacted(false).setRequestId(1L).setSubType(CommandSubscribe.SubType.Failover);
        persistentTopic2.subscribe(this.serverCnx, subType3.getSubscription(), subType3.getConsumerId(), subType3.getSubType(), 0, subType3.getConsumerName(), subType3.isDurable(), (MessageId) null, Collections.emptyMap(), subType3.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, (KeySharedMeta) null).get();
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerId(), 1L);
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerName(), "C1");
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(1)).consumerId(), 2L);
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(1)).consumerName(), "C2");
        CommandSubscribe subType4 = new CommandSubscribe().setConsumerId(3L).setConsumerName("C1").setTopic("persistent://prop/use/ns-abc/successTopic-partition-0").setSubscription("successSub").setReadCompacted(false).setRequestId(1L).setSubType(CommandSubscribe.SubType.Failover);
        persistentTopic2.subscribe(this.serverCnx, subType4.getSubscription(), subType4.getConsumerId(), subType4.getSubType(), 0, subType4.getConsumerName(), subType4.isDurable(), (MessageId) null, Collections.emptyMap(), subType4.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, (KeySharedMeta) null).get();
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerId(), 1L);
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerName(), "C1");
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(1)).consumerId(), 3L);
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(1)).consumerName(), "C1");
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(2)).consumerId(), 2L);
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(2)).consumerName(), "C2");
        CommandSubscribe subType5 = new CommandSubscribe().setConsumerId(2L).setConsumerName("C1").setTopic("persistent://prop/use/ns-abc/successTopic-partition-0").setSubscription("successSub").setReadCompacted(false).setRequestId(1L).setSubType(CommandSubscribe.SubType.Exclusive);
        try {
            persistentTopic2.subscribe(this.serverCnx, subType5.getSubscription(), subType5.getConsumerId(), subType5.getSubType(), 0, subType5.getConsumerName(), subType5.isDurable(), (MessageId) null, Collections.emptyMap(), subType5.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, (KeySharedMeta) null).get();
            Assert.fail("should fail with exception");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof BrokerServiceException.SubscriptionBusyException);
        }
        CommandSubscribe subType6 = new CommandSubscribe().setConsumerId(4L).setConsumerName("C3").setTopic("persistent://prop/use/ns-abc/successTopic-partition-0").setSubscription("successSub2").setReadCompacted(false).setRequestId(1L).setSubType(CommandSubscribe.SubType.Exclusive);
        persistentTopic2.subscribe(this.serverCnx, subType6.getSubscription(), subType6.getConsumerId(), subType6.getSubType(), 0, subType6.getConsumerName(), subType6.isDurable(), (MessageId) null, Collections.emptyMap(), subType6.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, (KeySharedMeta) null).get();
        persistentTopic2.unsubscribe("successSub2").get();
        Assert.assertNull(persistentTopic2.getSubscription("successSub2"));
        PersistentSubscription subscription = persistentTopic2.getSubscription("successSub");
        subscription.removeConsumer((Consumer) subscription.getDispatcher().getConsumers().get(0));
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerId(), 3L);
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerName(), "C1");
        subscription.removeConsumer((Consumer) subscription.getDispatcher().getConsumers().get(0));
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerId(), 2L);
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerName(), "C2");
        persistentTopic2.unsubscribe("successSub").get();
        Assert.assertNull(persistentTopic2.getSubscription("successSub"));
    }

    @Test
    public void testAtomicReplicationRemoval() throws Exception {
        ManagedLedger managedLedger = (ManagedLedger) Mockito.mock(ManagedLedger.class);
        ((ManagedLedger) Mockito.doNothing().when(managedLedger)).asyncDeleteCursor((String) ArgumentMatchers.any(), (AsyncCallbacks.DeleteCursorCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doReturn(new ArrayList()).when(managedLedger)).getCursors();
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/global/ns-abc/successTopic", managedLedger, this.brokerService);
        String str = persistentTopic.getReplicatorPrefix() + ".remote";
        ConcurrentOpenHashMap replicators = persistentTopic.getReplicators();
        PulsarClient build = PulsarClient.builder().serviceUrl(new URL("http://" + this.pulsar.getAdvertisedAddress() + ":" + this.pulsar.getConfiguration().getBrokerServicePort().get()).toString()).build();
        try {
            ManagedCursor managedCursor = (ManagedCursor) Mockito.mock(ManagedCursorImpl.class);
            ((ManagedCursor) Mockito.doReturn("remote").when(managedCursor)).getName();
            this.brokerService.getReplicationClients().put("remote", build);
            PersistentReplicator persistentReplicator = (PersistentReplicator) Mockito.spy(new PersistentReplicator(persistentTopic, managedCursor, "local", "remote", this.brokerService));
            replicators.put(str, persistentReplicator);
            Method declaredMethod = PersistentTopic.class.getDeclaredMethod("removeReplicator", String.class);
            declaredMethod.setAccessible(true);
            declaredMethod.invoke(persistentTopic, str);
            Mockito.when(this.pulsar.getConfigurationCache().policiesCache().get(AdminResource.path(new String[]{"policies", TopicName.get("persistent://prop/global/ns-abc/successTopic").getNamespace()}))).thenReturn(Optional.of(new Policies()));
            persistentTopic.startReplProducers();
            ((PersistentReplicator) Mockito.verify(persistentReplicator, Mockito.times(0))).startProducer();
            ArgumentCaptor forClass = ArgumentCaptor.forClass(AsyncCallbacks.DeleteCursorCallback.class);
            ((ManagedLedger) Mockito.verify(managedLedger)).asyncDeleteCursor((String) ArgumentMatchers.any(), (AsyncCallbacks.DeleteCursorCallback) forClass.capture(), ArgumentMatchers.any());
            ((AsyncCallbacks.DeleteCursorCallback) forClass.getValue()).deleteCursorComplete((Object) null);
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testClosingReplicationProducerTwice() throws Exception {
        ManagedLedger managedLedger = (ManagedLedger) Mockito.mock(ManagedLedger.class);
        ((ManagedLedger) Mockito.doNothing().when(managedLedger)).asyncDeleteCursor((String) ArgumentMatchers.any(), (AsyncCallbacks.DeleteCursorCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doReturn(new ArrayList()).when(managedLedger)).getCursors();
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/global/ns/testClosingReplicationProducerTwice", managedLedger, this.brokerService);
        PulsarClientImpl pulsarClientImpl = (PulsarClient) Mockito.spy(PulsarClient.builder().serviceUrl(new URL("http://" + this.pulsar.getAdvertisedAddress() + ":" + this.pulsar.getConfiguration().getBrokerServicePort().get()).toString()).build());
        try {
            PulsarClientImpl pulsarClientImpl2 = pulsarClientImpl;
            ((PulsarClientImpl) Mockito.doReturn(new CompletableFuture()).when(pulsarClientImpl2)).createProducerAsync((ProducerConfigurationData) ArgumentMatchers.any(ProducerConfigurationData.class), (Schema) ArgumentMatchers.any(Schema.class));
            ManagedCursor managedCursor = (ManagedCursor) Mockito.mock(ManagedCursorImpl.class);
            ((ManagedCursor) Mockito.doReturn("remote").when(managedCursor)).getName();
            this.brokerService.getReplicationClients().put("remote", pulsarClientImpl);
            PersistentReplicator persistentReplicator = new PersistentReplicator(persistentTopic, managedCursor, "local", "remote", this.brokerService);
            ((PulsarClientImpl) Mockito.verify(pulsarClientImpl2)).createProducerAsync((ProducerConfigurationData) ArgumentMatchers.any(ProducerConfigurationData.class), (Schema) ArgumentMatchers.any(), (ProducerInterceptors) Mockito.eq((Object) null));
            persistentReplicator.disconnect(false);
            persistentReplicator.disconnect(false);
            persistentReplicator.startProducer();
            ((PulsarClientImpl) Mockito.verify(pulsarClientImpl2, Mockito.times(2))).createProducerAsync((ProducerConfigurationData) ArgumentMatchers.any(), (Schema) ArgumentMatchers.any(), (ProducerInterceptors) ArgumentMatchers.any());
            if (Collections.singletonList(pulsarClientImpl).get(0) != null) {
                pulsarClientImpl.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(pulsarClientImpl).get(0) != null) {
                pulsarClientImpl.close();
            }
            throw th;
        }
    }

    @Test
    public void testCompactorSubscription() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        CompactedTopic compactedTopic = (CompactedTopic) Mockito.mock(CompactedTopic.class);
        Mockito.when(compactedTopic.newCompactedLedger((Position) ArgumentMatchers.any(Position.class), ArgumentMatchers.anyLong())).thenReturn(CompletableFuture.completedFuture((CompactedTopicContext) Mockito.mock(CompactedTopicContext.class)));
        CompactorSubscription compactorSubscription = new CompactorSubscription(persistentTopic, compactedTopic, "__compaction", this.cursorMock);
        PositionImpl positionImpl = new PositionImpl(1L, 1L);
        compactorSubscription.acknowledgeMessage(Collections.singletonList(positionImpl), CommandAck.AckType.Cumulative, ImmutableMap.of("CompactedTopicLedger", 202112766L));
        ((CompactedTopic) Mockito.verify(compactedTopic, Mockito.times(1))).newCompactedLedger(positionImpl, 202112766L);
    }

    @Test
    public void testCompactorSubscriptionUpdatedOnInit() throws Exception {
        ImmutableMap of = ImmutableMap.of("CompactedTopicLedger", 202112766L);
        PositionImpl positionImpl = new PositionImpl(1L, 1L);
        ((ManagedCursor) Mockito.doAnswer(invocationOnMock -> {
            return of;
        }).when(this.cursorMock)).getProperties();
        ((ManagedCursor) Mockito.doAnswer(invocationOnMock2 -> {
            return positionImpl;
        }).when(this.cursorMock)).getMarkDeletedPosition();
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        CompactedTopic compactedTopic = (CompactedTopic) Mockito.mock(CompactedTopic.class);
        Mockito.when(compactedTopic.newCompactedLedger((Position) ArgumentMatchers.any(Position.class), ArgumentMatchers.anyLong())).thenReturn(CompletableFuture.completedFuture(null));
        new CompactorSubscription(persistentTopic, compactedTopic, "__compaction", this.cursorMock);
        ((CompactedTopic) Mockito.verify(compactedTopic, Mockito.times(1))).newCompactedLedger(positionImpl, 202112766L);
    }

    @Test
    public void testCompactionTriggeredAfterThresholdFirstInvocation() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        Compactor compactor = this.pulsar.getCompactor();
        ((Compactor) Mockito.doReturn(completableFuture).when(compactor)).compact(Mockito.anyString());
        Policies policies = new Policies();
        policies.compaction_threshold = 1L;
        Mockito.when(this.pulsar.getConfigurationCache().policiesCache().get(AdminResource.path(new String[]{"policies", TopicName.get("persistent://prop/use/ns-abc/successTopic").getNamespace()}))).thenReturn(Optional.of(policies));
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        persistentTopic.checkCompaction();
        ((Compactor) Mockito.verify(compactor, Mockito.times(0))).compact(Mockito.anyString());
        ((ManagedLedger) Mockito.doReturn(10L).when(this.ledgerMock)).getTotalSize();
        ((ManagedLedger) Mockito.doReturn(10L).when(this.ledgerMock)).getEstimatedBacklogSize();
        persistentTopic.checkCompaction();
        ((Compactor) Mockito.verify(compactor, Mockito.times(1))).compact(Mockito.anyString());
        persistentTopic.checkCompaction();
        ((Compactor) Mockito.verify(compactor, Mockito.times(1))).compact(Mockito.anyString());
    }

    @Test
    public void testCompactionTriggeredAfterThresholdSecondInvocation() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        Compactor compactor = this.pulsar.getCompactor();
        ((Compactor) Mockito.doReturn(completableFuture).when(compactor)).compact(Mockito.anyString());
        ManagedCursor managedCursor = (ManagedCursor) Mockito.mock(ManagedCursor.class);
        ((ManagedLedger) Mockito.doReturn(Lists.newArrayList(new ManagedCursor[]{managedCursor})).when(this.ledgerMock)).getCursors();
        ((ManagedCursor) Mockito.doReturn("__compaction").when(managedCursor)).getName();
        Policies policies = new Policies();
        policies.compaction_threshold = 1L;
        Mockito.when(this.pulsar.getConfigurationCache().policiesCache().get(AdminResource.path(new String[]{"policies", TopicName.get("persistent://prop/use/ns-abc/successTopic").getNamespace()}))).thenReturn(Optional.of(policies));
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        persistentTopic.checkCompaction();
        ((Compactor) Mockito.verify(compactor, Mockito.times(0))).compact(Mockito.anyString());
        ((ManagedCursor) Mockito.doReturn(10L).when(managedCursor)).getEstimatedSizeSinceMarkDeletePosition();
        persistentTopic.checkCompaction();
        ((Compactor) Mockito.verify(compactor, Mockito.times(1))).compact(Mockito.anyString());
        persistentTopic.checkCompaction();
        ((Compactor) Mockito.verify(compactor, Mockito.times(1))).compact(Mockito.anyString());
    }

    @Test
    public void testCompactionDisabledWithZeroThreshold() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        Compactor compactor = this.pulsar.getCompactor();
        ((Compactor) Mockito.doReturn(completableFuture).when(compactor)).compact(Mockito.anyString());
        Policies policies = new Policies();
        policies.compaction_threshold = 0L;
        Mockito.when(this.pulsar.getConfigurationCache().policiesCache().get(AdminResource.path(new String[]{"policies", TopicName.get("persistent://prop/use/ns-abc/successTopic").getNamespace()}))).thenReturn(Optional.of(policies));
        ((ManagedLedger) Mockito.doReturn(1000L).when(this.ledgerMock)).getEstimatedBacklogSize();
        new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService).checkCompaction();
        ((Compactor) Mockito.verify(compactor, Mockito.times(0))).compact(Mockito.anyString());
    }

    @Test
    public void testBacklogCursor() throws Exception {
        this.pulsar.getConfiguration().setManagedLedgerCursorBackloggedThreshold(10);
        ManagedLedgerImpl open = this.factory.open("cache_backlog_ledger");
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", open, this.brokerService);
        ManagedCursor openCursor = open.openCursor("c1");
        PersistentSubscription persistentSubscription = new PersistentSubscription(persistentTopic, "sub-1", openCursor, false);
        Consumer consumer = new Consumer(persistentSubscription, CommandSubscribe.SubType.Exclusive, persistentTopic.getName(), 1L, 0, "Cons1", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest);
        persistentTopic.getSubscriptions().put(Codec.decode(openCursor.getName()), persistentSubscription);
        persistentSubscription.addConsumer(consumer);
        ManagedCursor openCursor2 = open.openCursor("c2");
        PersistentSubscription persistentSubscription2 = new PersistentSubscription(persistentTopic, "sub-2", openCursor2, false);
        Consumer consumer2 = new Consumer(persistentSubscription2, CommandSubscribe.SubType.Exclusive, persistentTopic.getName(), 2L, 0, "Cons2", 50000, this.serverCnx, "myrole-2", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest);
        persistentTopic.getSubscriptions().put(Codec.decode(openCursor2.getName()), persistentSubscription2);
        persistentSubscription2.addConsumer(consumer2);
        ManagedCursor openCursor3 = open.openCursor("c3");
        PersistentSubscription persistentSubscription3 = new PersistentSubscription(persistentTopic, "sub-3", openCursor3, false);
        Consumer consumer3 = new Consumer(persistentSubscription2, CommandSubscribe.SubType.Exclusive, persistentTopic.getName(), 3L, 0, "Cons2", 50000, this.serverCnx, "myrole-3", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest);
        persistentTopic.getSubscriptions().put(Codec.decode(openCursor3.getName()), persistentSubscription3);
        Assert.assertTrue(openCursor.isActive());
        Assert.assertTrue(openCursor2.isActive());
        Assert.assertTrue(openCursor3.isActive());
        persistentTopic.checkBackloggedCursors();
        Assert.assertTrue(openCursor.isActive());
        Assert.assertTrue(openCursor2.isActive());
        Assert.assertFalse(openCursor3.isActive());
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        for (int i = 0; i < 10 + 1; i++) {
            final ByteBuf messageWithMetadata = getMessageWithMetadata("entry".getBytes());
            open.asyncAddEntry(messageWithMetadata, new AsyncCallbacks.AddEntryCallback() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.21
                public void addComplete(Position position, ByteBuf byteBuf, Object obj) {
                    countDownLatch.countDown();
                    messageWithMetadata.release();
                }

                public void addFailed(ManagedLedgerException managedLedgerException, Object obj) {
                    countDownLatch.countDown();
                    messageWithMetadata.release();
                }
            }, (Object) null);
        }
        countDownLatch.await();
        Assert.assertTrue(openCursor.isActive());
        Assert.assertTrue(openCursor2.isActive());
        Assert.assertFalse(openCursor3.isActive());
        persistentTopic.checkBackloggedCursors();
        Assert.assertFalse(openCursor.isActive());
        Assert.assertFalse(openCursor2.isActive());
        Assert.assertFalse(openCursor3.isActive());
        for (Entry entry : openCursor.readEntries(50)) {
            log.info("Read entry. Position={} Content='{}'", entry.getPosition(), new String(entry.getData()));
            entry.release();
        }
        for (Entry entry2 : openCursor3.readEntries(50)) {
            log.info("Read entry. Position={} Content='{}'", entry2.getPosition(), new String(entry2.getData()));
            entry2.release();
        }
        persistentTopic.checkBackloggedCursors();
        Assert.assertTrue(openCursor.isActive());
        Assert.assertFalse(openCursor2.isActive());
        Assert.assertFalse(openCursor3.isActive());
        persistentSubscription3.addConsumer(consumer3);
        for (Entry entry3 : openCursor3.readEntries(50)) {
            log.info("Read entry. Position={} Content='{}'", entry3.getPosition(), new String(entry3.getData()));
            entry3.release();
        }
        persistentTopic.checkBackloggedCursors();
        Assert.assertTrue(openCursor3.isActive());
    }

    @Test
    public void testCheckInactiveSubscriptions() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        ConcurrentOpenHashMap build = ConcurrentOpenHashMap.newBuilder().expectedItems(16).concurrencyLevel(1).build();
        PersistentSubscription persistentSubscription = (PersistentSubscription) BrokerTestUtil.spyWithClassAndConstructorArgs(PersistentSubscription.class, persistentTopic, "nonDeletableSubscription1", this.cursorMock, false);
        build.put(persistentSubscription.getName(), persistentSubscription);
        PersistentSubscription persistentSubscription2 = (PersistentSubscription) BrokerTestUtil.spyWithClassAndConstructorArgs(PersistentSubscription.class, persistentTopic, "deletableSubscription1", this.cursorMock, false);
        build.put(persistentSubscription2.getName(), persistentSubscription2);
        PersistentSubscription persistentSubscription3 = (PersistentSubscription) BrokerTestUtil.spyWithClassAndConstructorArgs(PersistentSubscription.class, persistentTopic, "nonDeletableSubscription2", this.cursorMock, true);
        build.put(persistentSubscription3.getName(), persistentSubscription3);
        Field declaredField = persistentTopic.getClass().getDeclaredField("subscriptions");
        declaredField.setAccessible(true);
        declaredField.set(persistentTopic, build);
        Method declaredMethod = AbstractTopic.class.getDeclaredMethod("addConsumerToSubscription", Subscription.class, Consumer.class);
        declaredMethod.setAccessible(true);
        declaredMethod.invoke(persistentTopic, persistentSubscription, new Consumer(persistentSubscription, CommandSubscribe.SubType.Shared, persistentTopic.getName(), 1L, 0, "consumer1", 50000, this.serverCnx, "app1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, (KeySharedMeta) null, MessageId.latest));
        Mockito.when(this.pulsar.getConfigurationCache().policiesCache().get(AdminResource.path(new String[]{"policies", TopicName.get("persistent://prop/use/ns-abc/successTopic").getNamespace()}))).thenReturn(Optional.of(new Policies()));
        ServiceConfiguration serviceConfiguration = (ServiceConfiguration) Mockito.spy(ServiceConfiguration.class);
        ((ServiceConfiguration) Mockito.doReturn(5).when(serviceConfiguration)).getSubscriptionExpirationTimeMinutes();
        ((PulsarService) Mockito.doReturn(serviceConfiguration).when(this.pulsar)).getConfiguration();
        ((ManagedCursor) Mockito.doReturn(Long.valueOf(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(6L))).when(this.cursorMock)).getLastActive();
        persistentTopic.checkInactiveSubscriptions();
        ((PersistentSubscription) Mockito.verify(persistentSubscription, Mockito.times(0))).delete();
        ((PersistentSubscription) Mockito.verify(persistentSubscription2, Mockito.times(1))).delete();
        ((PersistentSubscription) Mockito.verify(persistentSubscription3, Mockito.times(0))).delete();
    }

    @Test
    public void testTopicFencingTimeout() throws Exception {
        ServiceConfiguration serviceConfiguration = (ServiceConfiguration) Mockito.spy(ServiceConfiguration.class);
        ((PulsarService) Mockito.doReturn(serviceConfiguration).when(this.pulsar)).getConfiguration();
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        Method declaredMethod = PersistentTopic.class.getDeclaredMethod("fence", new Class[0]);
        declaredMethod.setAccessible(true);
        Method declaredMethod2 = PersistentTopic.class.getDeclaredMethod("unfence", new Class[0]);
        declaredMethod2.setAccessible(true);
        Field declaredField = PersistentTopic.class.getDeclaredField("fencedTopicMonitoringTask");
        declaredField.setAccessible(true);
        Field declaredField2 = AbstractTopic.class.getDeclaredField("isFenced");
        declaredField2.setAccessible(true);
        Field declaredField3 = PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
        declaredField3.setAccessible(true);
        ((ServiceConfiguration) Mockito.doReturn(10).when(serviceConfiguration)).getTopicFencingTimeoutSeconds();
        declaredMethod.invoke(persistentTopic, new Object[0]);
        declaredMethod2.invoke(persistentTopic, new Object[0]);
        ScheduledFuture scheduledFuture = (ScheduledFuture) declaredField.get(persistentTopic);
        Assert.assertTrue(scheduledFuture.isDone());
        Assert.assertTrue(scheduledFuture.isCancelled());
        Assert.assertFalse(((Boolean) declaredField2.get(persistentTopic)).booleanValue());
        Assert.assertFalse(((Boolean) declaredField3.get(persistentTopic)).booleanValue());
        ((ServiceConfiguration) Mockito.doReturn(1).when(serviceConfiguration)).getTopicFencingTimeoutSeconds();
        declaredMethod.invoke(persistentTopic, new Object[0]);
        Thread.sleep(2000L);
        ScheduledFuture scheduledFuture2 = (ScheduledFuture) declaredField.get(persistentTopic);
        Assert.assertTrue(scheduledFuture2.isDone());
        Assert.assertFalse(scheduledFuture2.isCancelled());
        Assert.assertTrue(((Boolean) declaredField2.get(persistentTopic)).booleanValue());
        Assert.assertTrue(((Boolean) declaredField3.get(persistentTopic)).booleanValue());
    }

    @Test
    public void testTopicCloseFencingTimeout() throws Exception {
        this.pulsar.getConfiguration().setTopicFencingTimeoutSeconds(10);
        Method declaredMethod = PersistentTopic.class.getDeclaredMethod("fence", new Class[0]);
        declaredMethod.setAccessible(true);
        Field declaredField = PersistentTopic.class.getDeclaredField("fencedTopicMonitoringTask");
        declaredField.setAccessible(true);
        PersistentTopic persistentTopic = (PersistentTopic) this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/successTopic").get();
        declaredMethod.invoke(persistentTopic, new Object[0]);
        persistentTopic.close().get();
        Assert.assertFalse(this.brokerService.getTopicReference("persistent://prop/use/ns-abc/successTopic").isPresent());
        ScheduledFuture scheduledFuture = (ScheduledFuture) declaredField.get(persistentTopic);
        Assert.assertTrue(scheduledFuture.isDone());
        Assert.assertTrue(scheduledFuture.isCancelled());
    }

    @Test
    public void testGetDurableSubscription() throws Exception {
        ManagedLedger managedLedger = (ManagedLedger) Mockito.mock(ManagedLedger.class);
        ManagedCursor managedCursor = (ManagedCursor) Mockito.mock(ManagedCursorImpl.class);
        Position position = (Position) Mockito.mock(Position.class);
        ((ManagedCursor) Mockito.doReturn("test").when(managedCursor)).getName();
        ((ManagedCursor) Mockito.doAnswer(invocationOnMock -> {
            ((AsyncCallbacks.FindEntryCallback) invocationOnMock.getArguments()[2]).findEntryComplete(position, invocationOnMock.getArguments()[3]);
            return null;
        }).when(managedCursor)).asyncFindNewestMatching((ManagedCursor.FindPositionConstraint) ArgumentMatchers.any(), (Predicate) ArgumentMatchers.any(), (AsyncCallbacks.FindEntryCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        ((ManagedCursor) Mockito.doAnswer(invocationOnMock2 -> {
            ((AsyncCallbacks.ResetCursorCallback) invocationOnMock2.getArguments()[1]).resetComplete((Object) null);
            return null;
        }).when(managedCursor)).asyncResetCursor((Position) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), (AsyncCallbacks.ResetCursorCallback) ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(invocationOnMock3 -> {
            ((AsyncCallbacks.DeleteCursorCallback) invocationOnMock3.getArguments()[1]).deleteCursorComplete((Object) null);
            return null;
        }).when(managedLedger)).asyncDeleteCursor(Mockito.matches(".*success.*"), (AsyncCallbacks.DeleteCursorCallback) ArgumentMatchers.any(AsyncCallbacks.DeleteCursorCallback.class), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(invocationOnMock4 -> {
            ((AsyncCallbacks.OpenCursorCallback) invocationOnMock4.getArguments()[3]).openCursorComplete(managedCursor, (Object) null);
            return null;
        }).when(managedLedger)).asyncOpenCursor((String) ArgumentMatchers.any(), (CommandSubscribe.InitialPosition) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (AsyncCallbacks.OpenCursorCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", managedLedger, this.brokerService);
        CommandSubscribe subType = new CommandSubscribe().setConsumerId(1L).setDurable(true).setStartMessageRollbackDurationSec(60L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setConsumerName("consumer-name").setReadCompacted(false).setRequestId(1L).setSubType(CommandSubscribe.SubType.Exclusive);
        persistentTopic.subscribe(this.serverCnx, subType.getSubscription(), subType.getConsumerId(), subType.getSubType(), 0, subType.getConsumerName(), subType.isDurable(), (MessageId) null, Collections.emptyMap(), subType.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, subType.getStartMessageRollbackDurationSec(), false, (KeySharedMeta) null).get();
        persistentTopic.unsubscribe("successSub").get();
    }

    @Test
    public void testDisconnectProducer() throws Exception {
        Producer producer = new Producer(new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService), this.serverCnx, 1L, "prod-name", "appid1", false, (Map) null, SchemaVersion.Latest, 0L, false, ProducerAccessMode.Shared, Optional.empty());
        Assert.assertFalse(producer.isDisconnecting());
        producer.disconnect();
        producer.disconnect();
        ((ServerCnx) Mockito.verify(this.serverCnx)).execute((Runnable) ArgumentMatchers.any());
    }

    @Test
    public void testKeySharedMetadataExposedToStats() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription persistentSubscription = new PersistentSubscription(persistentTopic, "key-shared-stats1", this.cursorMock, false);
        PersistentSubscription persistentSubscription2 = new PersistentSubscription(persistentTopic, "key-shared-stats2", this.cursorMock, false);
        PersistentSubscription persistentSubscription3 = new PersistentSubscription(persistentTopic, "key-shared-stats3", this.cursorMock, false);
        Consumer consumer = new Consumer(persistentSubscription, CommandSubscribe.SubType.Key_Shared, persistentTopic.getName(), 1L, 0, "Cons1", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT).setAllowOutOfOrderDelivery(false), MessageId.latest);
        persistentSubscription.addConsumer(consumer);
        consumer.close();
        SubscriptionStatsImpl stats = persistentSubscription.getStats(false, false);
        Assert.assertEquals(stats.keySharedMode, "AUTO_SPLIT");
        Assert.assertFalse(stats.allowOutOfOrderDelivery);
        Consumer consumer2 = new Consumer(persistentSubscription2, CommandSubscribe.SubType.Key_Shared, persistentTopic.getName(), 2L, 0, "Cons2", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT).setAllowOutOfOrderDelivery(true), MessageId.latest);
        persistentSubscription2.addConsumer(consumer2);
        consumer2.close();
        SubscriptionStatsImpl stats2 = persistentSubscription2.getStats(false, false);
        Assert.assertEquals(stats2.keySharedMode, "AUTO_SPLIT");
        Assert.assertTrue(stats2.allowOutOfOrderDelivery);
        KeySharedMeta allowOutOfOrderDelivery = new KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY).setAllowOutOfOrderDelivery(false);
        allowOutOfOrderDelivery.addHashRange().setStart(0).setEnd(65535);
        Consumer consumer3 = new Consumer(persistentSubscription3, CommandSubscribe.SubType.Key_Shared, persistentTopic.getName(), 3L, 0, "Cons3", 50000, this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, allowOutOfOrderDelivery, MessageId.latest);
        persistentSubscription3.addConsumer(consumer3);
        consumer3.close();
        SubscriptionStatsImpl stats3 = persistentSubscription3.getStats(false, false);
        Assert.assertEquals(stats3.keySharedMode, "STICKY");
        Assert.assertFalse(stats3.allowOutOfOrderDelivery);
    }

    private ByteBuf getMessageWithMetadata(byte[] bArr) {
        MessageMetadata sequenceId = new MessageMetadata().setPublishTime(System.currentTimeMillis()).setProducerName("prod-name").setSequenceId(0L);
        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(bArr, 0, bArr.length);
        int serializedSize = sequenceId.getSerializedSize();
        int i = 4 + serializedSize;
        ByteBuf buffer = PulsarByteBufAllocator.DEFAULT.buffer(i, i);
        buffer.writeInt(serializedSize);
        sequenceId.writeTo(buffer);
        return ByteBufPair.coalesce(ByteBufPair.get(buffer, wrappedBuffer));
    }
}
