package org.apache.pulsar.broker.transaction.pendingack;

import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import javassist.bytecode.Opcode;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.class */
public class PendingAckInMemoryDeleteTest extends TransactionTestBase {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PendingAckInMemoryDeleteTest.class);
    private static final String TENANT = "tnx";
    private static final String NAMESPACE1 = "tnx/ns1";

    @BeforeMethod
    protected void setup() throws Exception {
        internalSetup();
        String[] split = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
        this.admin.clusters().createCluster("test", new ClusterData("http://localhost:" + split[split.length - 1]));
        this.admin.tenants().createTenant(TENANT, new TenantInfo(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
        this.admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), new TenantInfo(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
        this.admin.namespaces().createNamespace(NAMESPACE1);
        this.admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
        this.admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);
        this.pulsarClient = PulsarClient.builder().serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()).statsInterval(0L, TimeUnit.SECONDS).enableTransaction(true).build();
        Thread.sleep(3000L);
    }

    @AfterMethod
    protected void cleanup() {
        super.internalCleanup();
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void txnAckTestNoBatchAndSharedSubMemoryDeleteTest() throws Exception {
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("tnx/ns1/normal-topic").subscriptionName("test").enableBatchIndexAcknowledgment(true).subscriptionType(SubscriptionType.Shared).ackTimeout(2L, TimeUnit.SECONDS).acknowledgmentGroupTime(0L, TimeUnit.MICROSECONDS).subscribe();
        try {
            Producer<byte[]> create = this.pulsarClient.newProducer().topic("tnx/ns1/normal-topic").enableBatching(false).create();
            for (int i = 0; i < 2; i++) {
                for (int i2 = 0; i2 < 1000; i2++) {
                    try {
                        create.newMessage().value("hello".getBytes()).sendAsync();
                    } catch (Throwable th) {
                        if (Collections.singletonList(create).get(0) != null) {
                            create.close();
                        }
                        throw th;
                    }
                }
                Transaction txn = getTxn();
                for (int i3 = 0; i3 < 1000 - 1; i3++) {
                    Message<byte[]> receive = subscribe.receive(2, TimeUnit.SECONDS);
                    Assert.assertNotNull(receive);
                    if (i3 % 2 == 0) {
                        subscribe.acknowledgeAsync(receive.getMessageId(), txn).get();
                        log.info("txn receive msgId: {}, count: {}", receive.getMessageId(), Integer.valueOf(i3));
                    } else {
                        subscribe.acknowledge(receive.getMessageId());
                        log.info("normal receive msgId: {}, count: {}", receive.getMessageId(), Integer.valueOf(i3));
                    }
                }
                txn.commit().get();
                int i4 = 0;
                for (int i5 = 0; i5 < getPulsarServiceList().size(); i5++) {
                    Field declaredField = BrokerService.class.getDeclaredField("topics");
                    declaredField.setAccessible(true);
                    CompletableFuture completableFuture = (CompletableFuture) ((ConcurrentOpenHashMap) declaredField.get(getPulsarServiceList().get(i5).getBrokerService())).get("persistent://tnx/ns1/normal-topic");
                    if (completableFuture != null) {
                        Optional optional = (Optional) completableFuture.get();
                        if (optional.isPresent()) {
                            PersistentSubscription subscription = ((Topic) optional.get()).getSubscription("test");
                            Field declaredField2 = PersistentSubscription.class.getDeclaredField("pendingAckHandle");
                            declaredField2.setAccessible(true);
                            PendingAckHandleImpl pendingAckHandleImpl = (PendingAckHandleImpl) declaredField2.get(subscription);
                            Field declaredField3 = PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
                            declaredField3.setAccessible(true);
                            Assert.assertTrue(((HashMap) declaredField3.get(pendingAckHandleImpl)).isEmpty());
                            if (i == 0) {
                                Assert.assertEquals(((org.apache.pulsar.broker.service.Consumer) subscription.getConsumers().get(0)).getPendingAcks().size(), 1L);
                            } else {
                                Assert.assertEquals(((org.apache.pulsar.broker.service.Consumer) subscription.getConsumers().get(0)).getPendingAcks().size(), 2L);
                            }
                            i4++;
                        }
                    }
                }
                Assert.assertEquals(i4, 1);
            }
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test
    public void txnAckTestBatchAndSharedSubMemoryDeleteTest() throws Exception {
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("tnx/ns1/normal-topic").subscriptionName("test").enableBatchIndexAcknowledgment(true).subscriptionType(SubscriptionType.Shared).ackTimeout(2L, TimeUnit.SECONDS).acknowledgmentGroupTime(0L, TimeUnit.MICROSECONDS).subscribe();
        try {
            Producer<byte[]> create = this.pulsarClient.newProducer().topic("tnx/ns1/normal-topic").enableBatching(true).batchingMaxMessages(Opcode.GOTO_W).create();
            PersistentSubscription persistentSubscription = null;
            ConcurrentSkipListMap concurrentSkipListMap = null;
            for (int i = 0; i < 2; i++) {
                for (int i2 = 0; i2 < 1000; i2++) {
                    try {
                        create.newMessage().value("hello".getBytes()).sendAsync();
                    } catch (Throwable th) {
                        if (Collections.singletonList(create).get(0) != null) {
                            create.close();
                        }
                        throw th;
                    }
                }
                Transaction txn = getTxn();
                for (int i3 = 0; i3 < 1000 - 1; i3++) {
                    Message<byte[]> receive = subscribe.receive(2, TimeUnit.SECONDS);
                    Assert.assertNotNull(receive);
                    if (i3 % 2 == 0) {
                        subscribe.acknowledgeAsync(receive.getMessageId(), txn).get();
                        log.info("txn receive msgId: {}, count: {}", receive.getMessageId(), Integer.valueOf(i3));
                    } else {
                        subscribe.acknowledge(receive.getMessageId());
                        log.info("normal receive msgId: {}, count: {}", receive.getMessageId(), Integer.valueOf(i3));
                    }
                }
                txn.commit().get();
                int i4 = 0;
                for (int i5 = 0; i5 < getPulsarServiceList().size(); i5++) {
                    Field declaredField = BrokerService.class.getDeclaredField("topics");
                    declaredField.setAccessible(true);
                    CompletableFuture completableFuture = (CompletableFuture) ((ConcurrentOpenHashMap) declaredField.get(getPulsarServiceList().get(i5).getBrokerService())).get("persistent://tnx/ns1/normal-topic");
                    if (completableFuture != null) {
                        Optional optional = (Optional) completableFuture.get();
                        if (optional.isPresent()) {
                            persistentSubscription = (PersistentSubscription) ((Topic) optional.get()).getSubscription("test");
                            Field declaredField2 = PersistentSubscription.class.getDeclaredField("pendingAckHandle");
                            declaredField2.setAccessible(true);
                            PendingAckHandleImpl pendingAckHandleImpl = (PendingAckHandleImpl) declaredField2.get(persistentSubscription);
                            Field declaredField3 = PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
                            declaredField3.setAccessible(true);
                            Assert.assertTrue(((HashMap) declaredField3.get(pendingAckHandleImpl)).isEmpty());
                            ManagedCursorImpl cursor = persistentSubscription.getCursor();
                            Field declaredField4 = ManagedCursorImpl.class.getDeclaredField("batchDeletedIndexes");
                            declaredField4.setAccessible(true);
                            concurrentSkipListMap = (ConcurrentSkipListMap) declaredField4.get(cursor);
                            if (i == 0) {
                                Assert.assertEquals(concurrentSkipListMap.size(), 1);
                                Assert.assertEquals(((org.apache.pulsar.broker.service.Consumer) persistentSubscription.getConsumers().get(0)).getPendingAcks().size(), 1L);
                            } else {
                                Assert.assertEquals(concurrentSkipListMap.size(), 1);
                                Assert.assertEquals(((org.apache.pulsar.broker.service.Consumer) persistentSubscription.getConsumers().get(0)).getPendingAcks().size(), 1L);
                            }
                            i4++;
                        }
                    }
                }
                Assert.assertEquals(i4, 1);
            }
            Transaction txn2 = getTxn();
            subscribe.acknowledge(subscribe.receive().getMessageId());
            Assert.assertEquals(concurrentSkipListMap.size(), 1);
            Assert.assertEquals(((org.apache.pulsar.broker.service.Consumer) persistentSubscription.getConsumers().get(0)).getPendingAcks().size(), 1L);
            subscribe.acknowledgeAsync(subscribe.receive().getMessageId(), txn2).get();
            Assert.assertEquals(concurrentSkipListMap.size(), 1);
            Assert.assertEquals(((org.apache.pulsar.broker.service.Consumer) persistentSubscription.getConsumers().get(0)).getPendingAcks().size(), 0L);
            txn2.commit().get();
            Assert.assertEquals(concurrentSkipListMap.size(), 0);
            Assert.assertEquals(((org.apache.pulsar.broker.service.Consumer) persistentSubscription.getConsumers().get(0)).getPendingAcks().size(), 0L);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    private Transaction getTxn() throws Exception {
        return this.pulsarClient.newTransaction().withTransactionTimeout(2L, TimeUnit.SECONDS).build().get();
    }
}
