package org.apache.pulsar.broker.stats;

import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/stats/TransactionMetricsTest.class */
public class TransactionMetricsTest extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(TransactionMetricsTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod(alwaysRun = true)
    protected void setup() throws Exception {
        ServiceConfiguration defaultConf = getDefaultConf();
        defaultConf.setTransactionCoordinatorEnabled(true);
        super.baseSetup(defaultConf);
        this.admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), TenantInfo.builder().adminRoles(Sets.newHashSet(new String[]{"appid1"})).allowedClusters(Sets.newHashSet(new String[]{"test"})).build());
        this.admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
        this.admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1);
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testTransactionCoordinatorMetrics() throws Exception {
        this.admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
        TransactionCoordinatorID transactionCoordinatorID = TransactionCoordinatorID.get(0L);
        TransactionCoordinatorID transactionCoordinatorID2 = TransactionCoordinatorID.get(1L);
        this.pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorID);
        this.pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorID2);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTransactionMetadataStoreService().getStores().size() == 2);
        });
        ((TransactionMetadataStore) this.pulsar.getTransactionMetadataStoreService().getStores().get(transactionCoordinatorID)).newTransaction(10000L, (String) null).get();
        ((TransactionMetadataStore) this.pulsar.getTransactionMetadataStoreService().getStores().get(transactionCoordinatorID2)).newTransaction(10000L, (String) null).get();
        ((TransactionMetadataStore) this.pulsar.getTransactionMetadataStoreService().getStores().get(transactionCoordinatorID2)).newTransaction(10000L, (String) null).get();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(this.pulsar, true, false, false, byteArrayOutputStream);
        Collection collection = PrometheusMetricsTest.parseMetrics(byteArrayOutputStream.toString()).get("pulsar_txn_active_count");
        Assert.assertEquals(collection.size(), 2);
        collection.forEach(metric -> {
            if ("0".equals(metric.tags.get("coordinator_id"))) {
                Assert.assertEquals(metric.value, 1.0d);
            } else {
                Assert.assertEquals(metric.value, 2.0d);
            }
        });
    }

    @Test
    public void testTransactionCoordinatorRateMetrics() throws Exception {
        int i = 120;
        this.admin.namespaces().createNamespace("prop/ns-abc1");
        String str = "persistent://prop/ns-abc1/test_coordinator_metrics";
        TransactionCoordinatorID transactionCoordinatorID = TransactionCoordinatorID.get(0L);
        this.admin.lookups().lookupPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
        this.pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorID);
        this.admin.topics().createNonPartitionedTopic(str);
        this.admin.topics().createSubscription(str, "test_coordinator_metrics", MessageId.earliest);
        Awaitility.await().atMost(2000L, TimeUnit.MILLISECONDS).until(() -> {
            return Boolean.valueOf(this.pulsar.getTransactionMetadataStoreService().getStores().size() == 1);
        });
        Consumer subscribe = this.pulsarClient.newConsumer().subscriptionName("test_coordinator_metrics").topic(new String[]{str}).subscribe();
        ArrayList arrayList = new ArrayList();
        this.pulsarClient = PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).enableTransaction(true).build();
        for (int i2 = 0; i2 < 120; i2++) {
            TransactionImpl transactionImpl = (TransactionImpl) this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.SECONDS).build().get();
            arrayList.add(new TxnID(transactionImpl.getTxnIdMostBits(), transactionImpl.getTxnIdLeastBits()));
            if (i2 == 1) {
                this.pulsarClient = PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).enableTransaction(true).build();
                subscribe.acknowledgeAsync(new MessageIdImpl(1000L, 1000L, -1), transactionImpl).get();
            } else if (i2 % 2 == 0) {
                this.pulsar.getTransactionMetadataStoreService().addProducedPartitionToTxn((TxnID) arrayList.get(i2), Collections.singletonList(str)).get();
            } else {
                this.pulsar.getTransactionMetadataStoreService().addAckedPartitionToTxn((TxnID) arrayList.get(i2), Collections.singletonList(TransactionSubscription.builder().topic(str).subscription("test_coordinator_metrics").build())).get();
            }
        }
        for (int i3 = 0; i3 < 120; i3++) {
            if (i3 % 2 == 0) {
                this.pulsar.getTransactionMetadataStoreService().endTransaction((TxnID) arrayList.get(i3), 0, false).get();
            } else {
                this.pulsar.getTransactionMetadataStoreService().endTransaction((TxnID) arrayList.get(i3), 1, false).get();
            }
        }
        this.pulsar.getBrokerService().updateRates();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(this.pulsar, true, false, false, byteArrayOutputStream);
        Multimap<String, PrometheusMetricsTest.Metric> parseMetrics = PrometheusMetricsTest.parseMetrics(byteArrayOutputStream.toString());
        Collection collection = parseMetrics.get("pulsar_txn_created_count");
        Assert.assertEquals(collection.size(), 1);
        collection.forEach(metric -> {
            Assert.assertEquals(metric.value, i);
        });
        Collection collection2 = parseMetrics.get("pulsar_txn_committed_count");
        Assert.assertEquals(collection2.size(), 1);
        collection2.forEach(metric2 -> {
            Assert.assertEquals(metric2.value, i / 2);
        });
        Collection collection3 = parseMetrics.get("pulsar_txn_aborted_count");
        Assert.assertEquals(collection3.size(), 1);
        collection3.forEach(metric3 -> {
            Assert.assertEquals(metric3.value, i / 2);
        });
        TxnID txnID = (TxnID) ((TransactionMetadataStore) this.pulsar.getTransactionMetadataStoreService().getStores().get(transactionCoordinatorID)).newTransaction(1000L, (String) null).get();
        Awaitility.await().atMost(2000L, TimeUnit.MILLISECONDS).until(() -> {
            try {
                ((TransactionMetadataStore) this.pulsar.getTransactionMetadataStoreService().getStores().get(transactionCoordinatorID)).getTxnMeta(txnID).get();
                return false;
            } catch (Exception e) {
                return true;
            }
        });
        ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(this.pulsar, true, false, false, byteArrayOutputStream2);
        Multimap<String, PrometheusMetricsTest.Metric> parseMetrics2 = PrometheusMetricsTest.parseMetrics(byteArrayOutputStream2.toString());
        Collection collection4 = parseMetrics2.get("pulsar_txn_timeout_count");
        Assert.assertEquals(collection4.size(), 1);
        collection4.forEach(metric4 -> {
            Assert.assertEquals(metric4.value, 1.0d);
        });
        Collection collection5 = parseMetrics2.get("pulsar_txn_append_log_count");
        Assert.assertEquals(collection5.size(), 1);
        collection5.forEach(metric5 -> {
            Assert.assertEquals(metric5.value, (i * 4) + 3);
        });
        Collection collection6 = parseMetrics2.get("pulsar_txn_execution_latency_le_5000");
        Assert.assertEquals(collection6.size(), 1);
        collection6.forEach(metric6 -> {
            Assert.assertEquals(metric6.value, 1.0d);
        });
    }

    @Test
    public void testManagedLedgerMetrics() throws Exception {
        this.admin.namespaces().createNamespace("prop/ns-abc1");
        String str = "persistent://prop/ns-abc1/test_managed_ledger_metrics";
        this.admin.topics().createNonPartitionedTopic(str);
        this.admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
        this.pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0L)).get();
        this.admin.topics().createSubscription(str, "test_managed_ledger_metrics", MessageId.earliest);
        Awaitility.await().atMost(2000L, TimeUnit.MILLISECONDS).until(() -> {
            return Boolean.valueOf(this.pulsar.getTransactionMetadataStoreService().getStores().size() == 1);
        });
        this.pulsarClient = PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).enableTransaction(true).build();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).receiverQueueSize(10).subscriptionName("test_managed_ledger_metrics").subscriptionType(SubscriptionType.Key_Shared).subscribe();
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.SECONDS).build().get();
        create.send("hello pulsar".getBytes());
        subscribe.acknowledgeAsync(subscribe.receive().getMessageId(), transaction).get();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(this.pulsar, true, false, false, byteArrayOutputStream);
        Multimap<String, PrometheusMetricsTest.Metric> parseMetrics = PrometheusMetricsTest.parseMetrics(byteArrayOutputStream.toString());
        Collection<PrometheusMetricsTest.Metric> collection = parseMetrics.get("pulsar_storage_size");
        checkManagedLedgerMetrics("test_managed_ledger_metrics", 32.0d, collection);
        checkManagedLedgerMetrics("transaction.subscription", 252.0d, collection);
        Collection<PrometheusMetricsTest.Metric> collection2 = parseMetrics.get("pulsar_storage_logical_size");
        checkManagedLedgerMetrics("test_managed_ledger_metrics", 16.0d, collection2);
        checkManagedLedgerMetrics("transaction.subscription", 126.0d, collection2);
        Collection<PrometheusMetricsTest.Metric> collection3 = parseMetrics.get("pulsar_storage_backlog_size");
        checkManagedLedgerMetrics("test_managed_ledger_metrics", 16.0d, collection3);
        checkManagedLedgerMetrics("transaction.subscription", 126.0d, collection3);
        ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(this.pulsar, false, false, false, byteArrayOutputStream2);
        Multimap<String, PrometheusMetricsTest.Metric> parseMetrics2 = PrometheusMetricsTest.parseMetrics(byteArrayOutputStream2.toString());
        Assert.assertEquals(parseMetrics2.get("pulsar_storage_size").size(), 3);
        Assert.assertEquals(parseMetrics2.get("pulsar_storage_logical_size").size(), 3);
        Assert.assertEquals(parseMetrics2.get("pulsar_storage_backlog_size").size(), 2);
    }

    @Test
    public void testManagedLedgerMetricsWhenPendingAckNotInit() throws Exception {
        this.admin.namespaces().createNamespace("prop/ns-abc1");
        String str = "persistent://prop/ns-abc1/testManagedLedgerMetricsWhenPendingAckNotInit";
        this.admin.topics().createNonPartitionedTopic(str);
        this.admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
        this.pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0L)).get();
        this.admin.topics().createSubscription(str, "test_managed_ledger_metrics", MessageId.earliest);
        this.admin.topics().createSubscription(str, "test_pending_ack_no_init", MessageId.earliest);
        Awaitility.await().atMost(2000L, TimeUnit.MILLISECONDS).until(() -> {
            return Boolean.valueOf(this.pulsar.getTransactionMetadataStoreService().getStores().size() == 1);
        });
        this.pulsarClient = PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).enableTransaction(true).build();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).receiverQueueSize(10).subscriptionName("test_managed_ledger_metrics").subscriptionType(SubscriptionType.Key_Shared).subscribe();
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.SECONDS).build().get();
        create.send("hello pulsar".getBytes());
        subscribe.acknowledgeAsync(subscribe.receive().getMessageId(), transaction).get();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(this.pulsar, true, false, false, byteArrayOutputStream);
        Collection<PrometheusMetricsTest.Metric> collection = PrometheusMetricsTest.parseMetrics(byteArrayOutputStream.toString()).get("pulsar_storage_size");
        checkManagedLedgerMetrics("test_managed_ledger_metrics", 32.0d, collection);
        Iterator<PrometheusMetricsTest.Metric> it = collection.iterator();
        while (it.hasNext()) {
            if (it.next().tags.containsValue("test_pending_ack_no_init")) {
                Assert.fail();
            }
        }
        Consumer subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{str}).receiverQueueSize(10).subscriptionName("test_pending_ack_no_init").subscriptionType(SubscriptionType.Key_Shared).subscribe();
        subscribe2.acknowledgeAsync(subscribe2.receive().getMessageId(), (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.SECONDS).build().get()).get();
        ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(this.pulsar, true, false, false, byteArrayOutputStream2);
        checkManagedLedgerMetrics("test_pending_ack_no_init", 32.0d, PrometheusMetricsTest.parseMetrics(byteArrayOutputStream2.toString()).get("pulsar_storage_size"));
    }

    @Test
    public void testDuplicateMetricTypeDefinitions() throws Exception {
        this.admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
        TransactionCoordinatorID transactionCoordinatorID = TransactionCoordinatorID.get(0L);
        TransactionCoordinatorID transactionCoordinatorID2 = TransactionCoordinatorID.get(1L);
        this.pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorID);
        this.pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorID2);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTransactionMetadataStoreService().getStores().size() == 2);
        });
        this.pulsarClient = PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).enableTransaction(true).build();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").sendTimeout(0, TimeUnit.SECONDS).create();
        Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        for (int i = 0; i < 10; i++) {
            create.newMessage(transaction).value(("my-message-" + i).getBytes()).send();
        }
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(this.pulsar, false, false, false, byteArrayOutputStream);
        String byteArrayOutputStream2 = byteArrayOutputStream.toString();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        Pattern compile = Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)");
        Splitter.on("\n").split(byteArrayOutputStream2).forEach(str -> {
            if (!str.isEmpty() && str.startsWith("#")) {
                Matcher matcher = compile.matcher(str);
                Preconditions.checkArgument(matcher.matches());
                String group = matcher.group(1);
                String group2 = matcher.group(2);
                if (hashMap.containsKey(group)) {
                    log.warn(byteArrayOutputStream2);
                    Assert.fail("Duplicate type definition found for TYPE definition " + group);
                } else {
                    hashMap.put(group, group2);
                }
                if (hashMap2.containsKey(group)) {
                    log.info(byteArrayOutputStream2);
                    Assert.fail("TYPE definition for " + group + " appears after first sample");
                }
            }
        });
    }

    private void checkManagedLedgerMetrics(String str, double d, Collection<PrometheusMetricsTest.Metric> collection) {
        boolean z = false;
        for (PrometheusMetricsTest.Metric metric : collection) {
            if (metric.tags.containsValue(str)) {
                Assert.assertEquals(metric.value, d);
                z = true;
            }
        }
        Assert.assertTrue(z);
    }
}
