package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.awaitility.reflect.WhiteboxImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/ReplicationTxnTest.class */
public class ReplicationTxnTest extends OneWayReplicatorTestBase {
    private static final Logger log = LoggerFactory.getLogger(ReplicationTxnTest.class);
    private boolean transactionBufferSegmentedSnapshotEnabled = false;
    private int txnLogPartitions = 4;

    @Override // org.apache.pulsar.broker.service.OneWayReplicatorTestBase
    @BeforeClass(alwaysRun = true, timeOut = 300000)
    public void setup() throws Exception {
        super.setup();
    }

    @Override // org.apache.pulsar.broker.service.OneWayReplicatorTestBase
    @AfterClass(alwaysRun = true, timeOut = 300000)
    public void cleanup() throws Exception {
        super.cleanup();
    }

    @Override // org.apache.pulsar.broker.service.OneWayReplicatorTestBase
    protected PulsarClient initClient(ClientBuilder clientBuilder) throws Exception {
        return clientBuilder.enableTransaction(true).build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.service.OneWayReplicatorTestBase
    public void setConfigDefaults(ServiceConfiguration serviceConfiguration, String str, LocalBookkeeperEnsemble localBookkeeperEnsemble, ZookeeperServerTest zookeeperServerTest) {
        super.setConfigDefaults(serviceConfiguration, str, localBookkeeperEnsemble, zookeeperServerTest);
        serviceConfiguration.setSystemTopicEnabled(true);
        serviceConfiguration.setTopicLevelPoliciesEnabled(true);
        serviceConfiguration.setTransactionCoordinatorEnabled(true);
        serviceConfiguration.setTransactionLogBatchedWriteEnabled(true);
        serviceConfiguration.setTransactionPendingAckBatchedWriteEnabled(true);
        serviceConfiguration.setTransactionBufferSegmentedSnapshotEnabled(this.transactionBufferSegmentedSnapshotEnabled);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.service.OneWayReplicatorTestBase
    public void createDefaultTenantsAndClustersAndNamespace() throws Exception {
        super.createDefaultTenantsAndClustersAndNamespace();
        this.admin1.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), new TenantInfoImpl(Collections.emptySet(), Sets.newHashSet(new String[]{"r1", "r2"})));
        this.admin1.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString(), 4);
        this.pulsar1.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, new PartitionedTopicMetadata(this.txnLogPartitions));
        this.admin2.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), new TenantInfoImpl(Collections.emptySet(), Sets.newHashSet(new String[]{"r1", "r2"})));
        this.admin2.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString(), 4);
        this.pulsar2.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, new PartitionedTopicMetadata(this.txnLogPartitions));
    }

    private void pubAndSubOneMsg(String str, String str2) throws Exception {
        Consumer subscribe = this.client1.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName(str2).isAckReceiptEnabled(true).subscribe();
        Producer create = this.client1.newProducer(Schema.STRING).topic(str).create();
        create.newMessage().value("msg1").send();
        Transaction transaction = (Transaction) this.client1.newTransaction().withTransactionTimeout(1L, TimeUnit.MINUTES).build().get();
        Message receive = subscribe.receive(5, TimeUnit.SECONDS);
        Assert.assertNotNull(receive);
        Assert.assertEquals((String) receive.getValue(), "msg1");
        subscribe.acknowledgeAsync(receive.getMessageId(), transaction).join();
        create.newMessage(transaction).value("msg2").send();
        transaction.commit().get();
        Message receive2 = subscribe.receive(5, TimeUnit.SECONDS);
        Assert.assertNotNull(receive2);
        Assert.assertEquals((String) receive2.getValue(), "msg2");
        subscribe.acknowledgeAsync(receive2.getMessageId()).join();
        Consumer subscribe2 = this.client2.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName(str2).subscribe();
        Message receive3 = subscribe2.receive(15, TimeUnit.SECONDS);
        Assert.assertNotNull(receive3);
        MessageMetadata messageMetadata = (MessageMetadata) WhiteboxImpl.getInternalState(receive3, "msgMetadata");
        Assert.assertFalse(messageMetadata.hasTxnidMostBits());
        Assert.assertFalse(messageMetadata.hasTxnidLeastBits());
        subscribe2.acknowledge(receive3);
        Message receive4 = subscribe2.receive(15, TimeUnit.SECONDS);
        Assert.assertNotNull(receive4);
        MessageMetadata messageMetadata2 = (MessageMetadata) WhiteboxImpl.getInternalState(receive4, "msgMetadata");
        Assert.assertFalse(messageMetadata2.hasTxnidMostBits());
        Assert.assertFalse(messageMetadata2.hasTxnidLeastBits());
        subscribe2.acknowledge(receive4);
        create.close();
        subscribe.close();
        subscribe2.close();
    }

    private void verifyNoReplicator(BrokerService brokerService, TopicName topicName) throws Exception {
        CompletableFuture topic = brokerService.getTopic(topicName.toString(), true);
        if (topic == null) {
            return;
        }
        Assert.assertTrue(((PersistentTopic) ((Optional) topic.join()).get()).getReplicators().isEmpty());
    }

    @Test
    public void testTxnLogNotBeReplicated() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
        this.admin1.topics().createNonPartitionedTopic(newUniqueName);
        waitReplicatorStarted(newUniqueName);
        this.admin1.topics().createSubscription(newUniqueName, "s1", MessageId.earliest);
        this.admin2.topics().createSubscription(newUniqueName, "s1", MessageId.earliest);
        pubAndSubOneMsg(newUniqueName, "s1");
        Thread.sleep(3000L);
        for (int i = 0; i < this.txnLogPartitions; i++) {
            TopicName topicName = TopicName.get(TopicDomain.persistent.value(), NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_" + i);
            Assert.assertNotNull(this.pulsar1.getManagedLedgerFactory().getManagedLedgerInfo(topicName.getPersistenceNamingEncoding()));
            Assert.assertFalse(this.broker1.getTopics().containsKey(topicName.toString()));
        }
        TopicName topicName2 = TopicName.get(MLPendingAckStore.getTransactionPendingAckStoreSuffix(newUniqueName, "s1"));
        Assert.assertNotNull(this.pulsar1.getManagedLedgerFactory().getManagedLedgerInfo(topicName2.getPersistenceNamingEncoding()));
        Assert.assertFalse(this.broker1.getTopics().containsKey(topicName2.toString()));
        verifyNoReplicator(this.broker1, TopicName.get(TopicDomain.persistent.value(), TopicName.get(newUniqueName).getNamespaceObject(), "__transaction_buffer_snapshot"));
        verifyNoReplicator(this.broker1, TopicName.get(TopicDomain.persistent.value(), TopicName.get(newUniqueName).getNamespaceObject(), "__transaction_buffer_snapshot_segments"));
        verifyNoReplicator(this.broker1, TopicName.get(TopicDomain.persistent.value(), TopicName.get(newUniqueName).getNamespaceObject(), "__transaction_buffer_snapshot_indexes"));
        cleanupTopics(() -> {
            this.admin1.topics().delete(newUniqueName);
            this.admin2.topics().delete(newUniqueName);
            try {
                this.admin1.topics().delete(topicName2.toString());
            } catch (Exception e) {
            }
            try {
                this.admin2.topics().delete(topicName2.toString());
            } catch (Exception e2) {
            }
        });
    }

    @Test
    public void testOngoingMessagesWillNotBeReplicated() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
        this.admin1.topics().createNonPartitionedTopic(newUniqueName);
        waitReplicatorStarted(newUniqueName);
        this.admin1.topics().createSubscription(newUniqueName, "s1", MessageId.earliest);
        this.admin2.topics().createSubscription(newUniqueName, "s1", MessageId.earliest);
        Producer create = this.client1.newProducer(Schema.STRING).topic(newUniqueName).create();
        create.newMessage((Transaction) this.client1.newTransaction().withTransactionTimeout(1L, TimeUnit.HOURS).build().get()).value("msg1").send();
        Consumer subscribe = this.client2.newConsumer(Schema.STRING).topic(new String[]{newUniqueName}).subscriptionName("s1").subscribe();
        Assert.assertNull(subscribe.receive(15, TimeUnit.SECONDS));
        Assert.assertTrue(((GeoPersistentReplicator) ((PersistentTopic) ((Optional) this.broker1.getTopic(newUniqueName, false).join()).get()).getReplicators().values().iterator().next()).getCursor().hasMoreEntries());
        create.close();
        subscribe.close();
        cleanupTopics(() -> {
            this.admin1.topics().delete(newUniqueName);
            this.admin2.topics().delete(newUniqueName);
            TopicName topicName = TopicName.get(MLPendingAckStore.getTransactionPendingAckStoreSuffix(newUniqueName, "s1"));
            try {
                this.admin1.topics().delete(topicName.toString());
            } catch (Exception e) {
            }
            try {
                this.admin2.topics().delete(topicName.toString());
            } catch (Exception e2) {
            }
        });
    }
}
