package org.apache.pulsar.broker.service;

import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.sasl.SaslConstants;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(groups = {SaslConstants.SASL_BROKER_PROTOCOL})
/* loaded from: input_file:org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.class */
public class ConsumedLedgersTrimTest extends BrokerTestBase {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ConsumedLedgersTrimTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    protected void setup() throws Exception {
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void TestConsumedLedgersTrim() throws Exception {
        this.conf.setRetentionCheckIntervalInSeconds(1);
        super.baseSetup();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/TestConsumedLedgersTrim").producerName("producer-name").create();
        try {
            Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://prop/ns-abc/TestConsumedLedgersTrim").subscriptionName("my-subscriber-name").subscribe();
            try {
                Assert.assertNotNull((Topic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/TestConsumedLedgersTrim").get());
                PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic("persistent://prop/ns-abc/TestConsumedLedgersTrim").get();
                ManagedLedgerConfig config = persistentTopic.getManagedLedger().getConfig();
                config.setRetentionSizeInMB(1L);
                config.setRetentionTime(1, TimeUnit.SECONDS);
                config.setMaxEntriesPerLedger(2);
                config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
                for (int i = 0; i < 10; i++) {
                    create.send(new byte[1048576]);
                }
                ManagedLedgerImpl managedLedger = persistentTopic.getManagedLedger();
                Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 10 / 2);
                Thread.sleep(1200L);
                Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 10 / 2);
                for (int i2 = 0; i2 < 10; i2++) {
                    Message<byte[]> receive = subscribe.receive(2, TimeUnit.SECONDS);
                    Assert.assertNotNull(receive);
                    subscribe.acknowledge((Message<?>) receive);
                }
                Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 10 / 2);
                Thread.sleep(1500L);
                Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testConsumedLedgersTrimNoSubscriptions() throws Exception {
        this.conf.setRetentionCheckIntervalInSeconds(1);
        this.conf.setBrokerDeleteInactiveTopicsEnabled(false);
        super.baseSetup();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions").producerName("producer-name").create();
        try {
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic("persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions").get();
            ManagedLedgerConfig config = persistentTopic.getManagedLedger().getConfig();
            config.setRetentionSizeInMB(-1L);
            config.setRetentionTime(1, TimeUnit.SECONDS);
            config.setMaxEntriesPerLedger(1000);
            config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
            MessageId messageId = (MessageId) persistentTopic.getLastMessageId().get();
            LOG.info("lastmessageid " + messageId);
            for (int i = 0; i < 7; i++) {
                create.send(new byte[1048576]);
            }
            Assert.assertEquals(persistentTopic.getManagedLedger().getLedgersInfoAsList().size(), 1);
            MessageId lastMessageId = this.pulsar.getAdminClient().topics().getLastMessageId("persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions");
            LOG.info("messageIdBeforeRestart " + lastMessageId);
            Assert.assertNotEquals(lastMessageId, messageId);
            restartBroker();
            Awaitility.await().ignoreExceptions().untilAsserted(() -> {
                Assert.assertNotNull(((Optional) this.pulsar.getBrokerService().getTopicIfExists("persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions").get(3L, TimeUnit.SECONDS)).get());
            });
            this.pulsar.getAdminClient().topics().getStats("persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions");
            MessageId lastMessageId2 = this.pulsar.getAdminClient().topics().getLastMessageId("persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions");
            LOG.info("lastmessageid " + lastMessageId2);
            Assert.assertEquals(lastMessageId2, lastMessageId);
            PersistentTopic persistentTopic2 = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic("persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions").get();
            ManagedLedgerConfig config2 = persistentTopic2.getManagedLedger().getConfig();
            config2.setRetentionSizeInMB(-1L);
            config2.setRetentionTime(1, TimeUnit.SECONDS);
            config2.setMaxEntriesPerLedger(1);
            config2.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
            ManagedLedgerImpl managedLedger = persistentTopic2.getManagedLedger();
            Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 2);
            Thread.sleep(3000L);
            CompletableFuture completableFuture = new CompletableFuture();
            managedLedger.trimConsumedLedgersInBackground(completableFuture);
            completableFuture.join();
            MessageId lastMessageId3 = this.pulsar.getAdminClient().topics().getLastMessageId("persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions");
            LOG.info("lastmessageid " + lastMessageId3);
            Assert.assertEquals(lastMessageId3, MessageId.earliest);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }
}
