package org.apache.pulsar.broker.stats;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.junit.Assert;
import org.mockito.Mockito;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.class */
public class LedgerOffloaderMetricsTest extends BrokerTestBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    protected void setup() throws Exception {
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testTopicLevelMetrics() throws Exception {
        this.conf.setExposeTopicLevelMetricsInPrometheus(true);
        super.baseSetup();
        this.admin.namespaces().createNamespace("prop/ns-abc1");
        String[] strArr = new String[3];
        LedgerOffloaderStatsImpl offloaderStats = this.pulsar.getOffloaderStats();
        LedgerOffloader ledgerOffloader = (LedgerOffloader) Mockito.mock(LedgerOffloader.class);
        PersistentTopic persistentTopic = (Topic) Mockito.mock(PersistentTopic.class);
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete(Optional.of(persistentTopic));
        BrokerService brokerService = (BrokerService) Mockito.spy(this.pulsar.getBrokerService());
        ((PulsarService) Mockito.doReturn(brokerService).when(this.pulsar)).getBrokerService();
        for (int i = 0; i < 3; i++) {
            String str = "persistent://prop/ns-abc1/testMetrics" + UUID.randomUUID();
            strArr[i] = str;
            this.admin.topics().createNonPartitionedTopic(str);
            ((BrokerService) Mockito.doReturn(completableFuture).when(brokerService)).getTopicIfExists(str);
            Assert.assertTrue(persistentTopic instanceof PersistentTopic);
            ManagedLedger managedLedger = (ManagedLedger) Mockito.mock(ManagedLedger.class);
            ((PersistentTopic) Mockito.doReturn(managedLedger).when(persistentTopic)).getManagedLedger();
            ManagedLedgerConfig managedLedgerConfig = (ManagedLedgerConfig) Mockito.mock(ManagedLedgerConfig.class);
            ((ManagedLedger) Mockito.doReturn(managedLedgerConfig).when(managedLedger)).getConfig();
            ((ManagedLedgerConfig) Mockito.doReturn(ledgerOffloader).when(managedLedgerConfig)).getLedgerOffloader();
            offloaderStats.recordOffloadError(str);
            offloaderStats.recordOffloadError(str);
            offloaderStats.recordOffloadBytes(str, 100L);
            offloaderStats.recordReadLedgerLatency(str, 1000L, TimeUnit.NANOSECONDS);
            offloaderStats.recordReadOffloadError(str);
            offloaderStats.recordReadOffloadError(str);
            offloaderStats.recordReadOffloadIndexLatency(str, 1000000L, TimeUnit.NANOSECONDS);
            offloaderStats.recordReadOffloadBytes(str, 100000L);
            offloaderStats.recordWriteToStorageError(str);
            offloaderStats.recordWriteToStorageError(str);
        }
        for (String str2 : strArr) {
            Assert.assertEquals(offloaderStats.getOffloadError(str2), 2L);
            Assert.assertEquals(offloaderStats.getOffloadBytes(str2), 100L);
            Assert.assertEquals((long) offloaderStats.getReadLedgerLatency(str2).sum, 1L);
            Assert.assertEquals(offloaderStats.getReadOffloadError(str2), 2L);
            Assert.assertEquals((long) offloaderStats.getReadOffloadIndexLatency(str2).sum, 1000L);
            Assert.assertEquals(offloaderStats.getReadOffloadBytes(str2), 100000L);
            Assert.assertEquals(offloaderStats.getWriteStorageError(str2), 2L);
        }
    }

    @Test
    public void testNamespaceLevelMetrics() throws Exception {
        this.conf.setExposeTopicLevelMetricsInPrometheus(false);
        super.baseSetup();
        LedgerOffloaderStatsImpl offloaderStats = this.pulsar.getOffloaderStats();
        LedgerOffloader ledgerOffloader = (LedgerOffloader) Mockito.mock(LedgerOffloader.class);
        PersistentTopic persistentTopic = (Topic) Mockito.mock(PersistentTopic.class);
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete(Optional.of(persistentTopic));
        BrokerService brokerService = (BrokerService) Mockito.spy(this.pulsar.getBrokerService());
        ((PulsarService) Mockito.doReturn(brokerService).when(this.pulsar)).getBrokerService();
        LinkedList linkedList = new LinkedList();
        HashMap hashMap = new HashMap();
        int i = 0;
        while (i < 2) {
            String str = i == 1 ? "prop/ns-abc2" : "prop/ns-abc1";
            hashMap.put(str, new ArrayList());
            this.admin.namespaces().createNamespace(str);
            String str2 = "persistent://" + str + "/testMetrics";
            for (int i2 = 0; i2 < 6; i2++) {
                String str3 = str2 + UUID.randomUUID();
                ((List) hashMap.get(str)).add(str3);
                linkedList.add(str3);
                this.admin.topics().createNonPartitionedTopic(str3);
                ((BrokerService) Mockito.doReturn(completableFuture).when(brokerService)).getTopicIfExists(str3);
                Assert.assertTrue(persistentTopic instanceof PersistentTopic);
                ManagedLedger managedLedger = (ManagedLedger) Mockito.mock(ManagedLedger.class);
                ((PersistentTopic) Mockito.doReturn(managedLedger).when(persistentTopic)).getManagedLedger();
                ManagedLedgerConfig managedLedgerConfig = (ManagedLedgerConfig) Mockito.mock(ManagedLedgerConfig.class);
                ((ManagedLedger) Mockito.doReturn(managedLedgerConfig).when(managedLedger)).getConfig();
                ((ManagedLedgerConfig) Mockito.doReturn(ledgerOffloader).when(managedLedgerConfig)).getLedgerOffloader();
                Mockito.when(managedLedger.getName()).thenAnswer(invocationOnMock -> {
                    return (String) linkedList.poll();
                });
                offloaderStats.recordOffloadError(str3);
                offloaderStats.recordOffloadBytes(str3, 100L);
                offloaderStats.recordReadLedgerLatency(str3, 1000L, TimeUnit.NANOSECONDS);
                offloaderStats.recordReadOffloadError(str3);
                offloaderStats.recordReadOffloadIndexLatency(str3, 1000000L, TimeUnit.NANOSECONDS);
                offloaderStats.recordReadOffloadBytes(str3, 100000L);
                offloaderStats.recordWriteToStorageError(str3);
            }
            i++;
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            String str4 = (String) ((List) entry.getValue()).get(0);
            Assert.assertTrue(offloaderStats.getOffloadError(str4) >= 1);
            Assert.assertTrue(offloaderStats.getOffloadBytes(str4) >= 100);
            Assert.assertTrue(((long) offloaderStats.getReadLedgerLatency(str4).sum) >= 1);
            Assert.assertTrue(offloaderStats.getReadOffloadError(str4) >= 1);
            Assert.assertTrue(((long) offloaderStats.getReadOffloadIndexLatency(str4).sum) >= 1000);
            Assert.assertTrue(offloaderStats.getReadOffloadBytes(str4) >= 100000);
            Assert.assertTrue(offloaderStats.getWriteStorageError(str4) >= 1);
        }
    }
}
