package org.apache.pulsar.broker.service.persistent;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.transaction.buffer.impl.InMemTransactionBufferProvider;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleState;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.apache.zookeeper.MockZooKeeper;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.class */
public class PersistentSubscriptionTest {
    private PulsarService pulsarMock;
    private BrokerService brokerMock;
    private ManagedLedgerFactory mlFactoryMock;
    private MetadataStore store;
    private ManagedLedger ledgerMock;
    private ManagedCursorImpl cursorMock;
    private PersistentTopic topic;
    private PersistentSubscription persistentSubscription;
    private Consumer consumerMock;
    private ManagedLedgerConfig managedLedgerConfigMock;
    final String successTopicName = "persistent://prop/use/ns-abc/successTopic";
    final String subName = "subscriptionName";
    final TxnID txnID1 = new TxnID(1, 1);
    final TxnID txnID2 = new TxnID(1, 2);
    private static final Logger log = LoggerFactory.getLogger(PersistentTopicTest.class);
    private OrderedExecutor executor;
    private EventLoopGroup eventLoopGroup;

    @BeforeMethod
    public void setup() throws Exception {
        this.executor = OrderedExecutor.newBuilder().numThreads(1).name("persistent-subscription-test").build();
        this.eventLoopGroup = new NioEventLoopGroup();
        ServiceConfiguration serviceConfiguration = (ServiceConfiguration) Mockito.spy(ServiceConfiguration.class);
        serviceConfiguration.setBrokerShutdownTimeoutMs(0L);
        serviceConfiguration.setTransactionCoordinatorEnabled(true);
        serviceConfiguration.setClusterName("pulsar-cluster");
        this.pulsarMock = (PulsarService) BrokerTestUtil.spyWithClassAndConstructorArgs(PulsarService.class, serviceConfiguration);
        PulsarResources pulsarResources = (PulsarResources) Mockito.mock(PulsarResources.class);
        ((PulsarService) Mockito.doReturn(pulsarResources).when(this.pulsarMock)).getPulsarResources();
        NamespaceResources namespaceResources = (NamespaceResources) Mockito.mock(NamespaceResources.class);
        ((PulsarResources) Mockito.doReturn(namespaceResources).when(pulsarResources)).getNamespaceResources();
        ((NamespaceResources) Mockito.doReturn(Optional.of(new Policies())).when(namespaceResources)).getPoliciesIfCached((NamespaceName) ArgumentMatchers.any());
        ((PulsarService) Mockito.doReturn(new InMemTransactionBufferProvider()).when(this.pulsarMock)).getTransactionBufferProvider();
        ((PulsarService) Mockito.doReturn(new TransactionPendingAckStoreProvider() { // from class: org.apache.pulsar.broker.service.persistent.PersistentSubscriptionTest.1
            public CompletableFuture<PendingAckStore> newPendingAckStore(PersistentSubscription persistentSubscription) {
                return CompletableFuture.completedFuture(new PendingAckStore() { // from class: org.apache.pulsar.broker.service.persistent.PersistentSubscriptionTest.1.1
                    public void replayAsync(PendingAckHandleImpl pendingAckHandleImpl, ExecutorService executorService) {
                        try {
                            Field declaredField = PendingAckHandleState.class.getDeclaredField("state");
                            declaredField.setAccessible(true);
                            declaredField.set(pendingAckHandleImpl, PendingAckHandleState.State.Ready);
                        } catch (IllegalAccessException | NoSuchFieldException e) {
                            Assert.fail();
                        }
                    }

                    public CompletableFuture<Void> closeAsync() {
                        return CompletableFuture.completedFuture(null);
                    }

                    public CompletableFuture<Void> appendIndividualAck(TxnID txnID, List<MutablePair<PositionImpl, Integer>> list) {
                        return CompletableFuture.completedFuture(null);
                    }

                    public CompletableFuture<Void> appendCumulativeAck(TxnID txnID, PositionImpl positionImpl) {
                        return CompletableFuture.completedFuture(null);
                    }

                    public CompletableFuture<Void> appendCommitMark(TxnID txnID, CommandAck.AckType ackType) {
                        return CompletableFuture.completedFuture(null);
                    }

                    public CompletableFuture<Void> appendAbortMark(TxnID txnID, CommandAck.AckType ackType) {
                        return CompletableFuture.completedFuture(null);
                    }
                });
            }

            public CompletableFuture<Boolean> checkInitializedBefore(PersistentSubscription persistentSubscription) {
                return CompletableFuture.completedFuture(true);
            }
        }).when(this.pulsarMock)).getTransactionPendingAckStoreProvider();
        ((PulsarService) Mockito.doReturn(serviceConfiguration).when(this.pulsarMock)).getConfiguration();
        ((PulsarService) Mockito.doReturn(Mockito.mock(Compactor.class)).when(this.pulsarMock)).getCompactor();
        this.mlFactoryMock = (ManagedLedgerFactory) Mockito.mock(ManagedLedgerFactory.class);
        ((PulsarService) Mockito.doReturn(this.mlFactoryMock).when(this.pulsarMock)).getManagedLedgerFactory();
        MockZooKeeper createMockZooKeeper = MockedPulsarServiceBaseTest.createMockZooKeeper();
        ((PulsarService) Mockito.doReturn(MockedPulsarServiceBaseTest.createMockBookKeeper(this.executor)).when(this.pulsarMock)).getBookKeeperClient();
        this.store = new ZKMetadataStore(createMockZooKeeper);
        ((PulsarService) Mockito.doReturn(this.store).when(this.pulsarMock)).getLocalMetadataStore();
        ((PulsarService) Mockito.doReturn(this.store).when(this.pulsarMock)).getConfigurationMetadataStore();
        this.brokerMock = (BrokerService) BrokerTestUtil.spyWithClassAndConstructorArgs(BrokerService.class, this.pulsarMock, this.eventLoopGroup);
        ((BrokerService) Mockito.doNothing().when(this.brokerMock)).unloadNamespaceBundlesGracefully();
        ((PulsarService) Mockito.doReturn(this.brokerMock).when(this.pulsarMock)).getBrokerService();
        this.ledgerMock = (ManagedLedger) Mockito.mock(ManagedLedgerImpl.class);
        this.cursorMock = (ManagedCursorImpl) Mockito.mock(ManagedCursorImpl.class);
        this.managedLedgerConfigMock = (ManagedLedgerConfig) Mockito.mock(ManagedLedgerConfig.class);
        ((ManagedLedger) Mockito.doReturn(new ManagedCursorContainer()).when(this.ledgerMock)).getCursors();
        ((ManagedCursorImpl) Mockito.doReturn("mockCursor").when(this.cursorMock)).getName();
        ((ManagedCursorImpl) Mockito.doReturn(new PositionImpl(1L, 50L)).when(this.cursorMock)).getMarkDeletedPosition();
        ((ManagedCursorImpl) Mockito.doReturn(this.ledgerMock).when(this.cursorMock)).getManagedLedger();
        ((ManagedLedger) Mockito.doReturn(this.managedLedgerConfigMock).when(this.ledgerMock)).getConfig();
        ((ManagedLedgerConfig) Mockito.doReturn(false).when(this.managedLedgerConfigMock)).isAutoSkipNonRecoverableData();
        this.topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerMock);
        this.consumerMock = (Consumer) Mockito.mock(Consumer.class);
        this.persistentSubscription = new PersistentSubscription(this.topic, "subscriptionName", this.cursorMock, false);
    }

    @AfterMethod(alwaysRun = true)
    public void teardown() throws Exception {
        this.brokerMock.close();
        try {
            this.pulsarMock.close();
            this.store.close();
            this.executor.shutdownNow();
            if (this.eventLoopGroup != null) {
                this.eventLoopGroup.shutdownGracefully().get();
            }
        } catch (Exception e) {
            log.warn("Failed to close pulsar service", e);
            throw e;
        }
    }

    @Test
    public void testCanAcknowledgeAndCommitForTransaction() throws ExecutionException, InterruptedException {
        ((ManagedCursorImpl) Mockito.doAnswer(invocationOnMock -> {
            ((AsyncCallbacks.DeleteCallback) invocationOnMock.getArguments()[1]).deleteComplete(invocationOnMock.getArguments()[2]);
            return null;
        }).when(this.cursorMock)).asyncDelete((Iterable) ArgumentMatchers.any(List.class), (AsyncCallbacks.DeleteCallback) ArgumentMatchers.any(AsyncCallbacks.DeleteCallback.class), ArgumentMatchers.any());
        ArrayList arrayList = new ArrayList();
        arrayList.add(new MutablePair(new PositionImpl(1L, 1L), 0));
        arrayList.add(new MutablePair(new PositionImpl(1L, 3L), 0));
        arrayList.add(new MutablePair(new PositionImpl(1L, 5L), 0));
        ((ManagedCursorImpl) Mockito.doAnswer(invocationOnMock2 -> {
            Assert.assertTrue(Arrays.deepEquals(((List) invocationOnMock2.getArguments()[0]).toArray(), arrayList.toArray()));
            ((AsyncCallbacks.MarkDeleteCallback) invocationOnMock2.getArguments()[2]).markDeleteComplete(invocationOnMock2.getArguments()[3]);
            return null;
        }).when(this.cursorMock)).asyncMarkDelete((Position) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (AsyncCallbacks.MarkDeleteCallback) ArgumentMatchers.any(AsyncCallbacks.MarkDeleteCallback.class), ArgumentMatchers.any());
        this.persistentSubscription.transactionIndividualAcknowledge(this.txnID1, arrayList);
        this.persistentSubscription.endTxn(this.txnID1.getMostSigBits(), this.txnID1.getLeastSigBits(), 0, -1L).get();
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(new PositionImpl(3L, 100L));
        this.persistentSubscription.transactionCumulativeAcknowledge(this.txnID1, arrayList2);
        ((ManagedCursorImpl) Mockito.doAnswer(invocationOnMock3 -> {
            Assert.assertEquals(((PositionImpl) invocationOnMock3.getArguments()[0]).compareTo(new PositionImpl(3L, 100L)), 0);
            ((AsyncCallbacks.MarkDeleteCallback) invocationOnMock3.getArguments()[2]).markDeleteComplete(invocationOnMock3.getArguments()[3]);
            return null;
        }).when(this.cursorMock)).asyncMarkDelete((Position) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (AsyncCallbacks.MarkDeleteCallback) ArgumentMatchers.any(AsyncCallbacks.MarkDeleteCallback.class), ArgumentMatchers.any());
        this.persistentSubscription.endTxn(this.txnID1.getMostSigBits(), this.txnID1.getLeastSigBits(), 0, -1L).get();
    }

    @Test
    public void testCanAcknowledgeAndAbortForTransaction() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new MutablePair(new PositionImpl(2L, 1L), 0));
        arrayList.add(new MutablePair(new PositionImpl(2L, 3L), 0));
        arrayList.add(new MutablePair(new PositionImpl(2L, 5L), 0));
        ((ManagedCursorImpl) Mockito.doAnswer(invocationOnMock -> {
            ((AsyncCallbacks.DeleteCallback) invocationOnMock.getArguments()[1]).deleteComplete(invocationOnMock.getArguments()[2]);
            return null;
        }).when(this.cursorMock)).asyncDelete((Iterable) ArgumentMatchers.any(List.class), (AsyncCallbacks.DeleteCallback) ArgumentMatchers.any(AsyncCallbacks.DeleteCallback.class), ArgumentMatchers.any());
        ((Consumer) Mockito.doReturn(CommandSubscribe.SubType.Exclusive).when(this.consumerMock)).subType();
        Awaitility.await().until(() -> {
            try {
                this.persistentSubscription.addConsumer(this.consumerMock);
                return true;
            } catch (Exception e) {
                return false;
            }
        });
        this.persistentSubscription.transactionIndividualAcknowledge(this.txnID1, arrayList);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(new PositionImpl(1L, 100L));
        this.persistentSubscription.transactionCumulativeAcknowledge(this.txnID1, arrayList2).get();
        arrayList2.clear();
        arrayList2.add(new PositionImpl(2L, 1L));
        try {
            this.persistentSubscription.transactionIndividualAcknowledge(this.txnID2, arrayList).get();
            Assert.fail("Single acknowledge for transaction2 should fail. ");
        } catch (ExecutionException e) {
            Assert.assertEquals(e.getCause().getMessage(), "[persistent://prop/use/ns-abc/successTopic][subscriptionName] Transaction:(1,2) try to ack message:2:1 in pending ack status.");
        }
        arrayList2.clear();
        arrayList2.add(new PositionImpl(2L, 50L));
        try {
            this.persistentSubscription.transactionCumulativeAcknowledge(this.txnID2, arrayList2).get();
            Assert.fail("Cumulative acknowledge for transaction2 should fail. ");
        } catch (ExecutionException e2) {
            Assert.assertTrue(e2.getCause() instanceof TransactionConflictException);
            Assert.assertEquals(e2.getCause().getMessage(), "[persistent://prop/use/ns-abc/successTopic][subscriptionName] Transaction:(1,2) try to cumulative batch ack position: 2:50 within range of current currentPosition: 1:100");
        }
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(new PositionImpl(1L, 1L));
        arrayList3.add(new PositionImpl(1L, 3L));
        arrayList3.add(new PositionImpl(1L, 5L));
        arrayList3.add(new PositionImpl(3L, 1L));
        arrayList3.add(new PositionImpl(3L, 3L));
        arrayList3.add(new PositionImpl(3L, 5L));
        this.persistentSubscription.acknowledgeMessage(arrayList3, CommandAck.AckType.Individual, Collections.emptyMap());
        this.persistentSubscription.endTxn(this.txnID1.getMostSigBits(), this.txnID2.getLeastSigBits(), 1, -1L);
        arrayList2.clear();
        arrayList2.add(new PositionImpl(2L, 50L));
        this.persistentSubscription.transactionCumulativeAcknowledge(this.txnID2, arrayList2);
        arrayList.clear();
        arrayList.add(new MutablePair(new PositionImpl(2L, 1L), 0));
        this.persistentSubscription.transactionIndividualAcknowledge(this.txnID2, arrayList);
    }
}
