package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/service/MessageTTLTest.class */
public class MessageTTLTest extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MessageTTLTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        this.conf.setTtlDurationDefaultInSeconds(1);
        this.conf.setBrokerDeleteInactiveTopicsEnabled(false);
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testMessageExpiryAfterTopicUnload() throws Exception {
        this.pulsarClient.newConsumer().topic("persistent://prop/ns-abc/testttl").subscriptionName("ttl-sub-1").subscribe().close();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testttl").enableBatching(false).create();
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 50; i++) {
            newArrayList.add(create.sendAsync(("my-message-" + i).getBytes()));
        }
        FutureUtil.waitForAll(newArrayList).get();
        create.close();
        this.admin.topics().unload("persistent://prop/ns-abc/testttl");
        this.admin.topics().getStats("persistent://prop/ns-abc/testttl");
        PersistentTopicInternalStats.CursorStats cursorStats = this.admin.topics().getInternalStats("persistent://prop/ns-abc/testttl").cursors.get("ttl-sub-1");
        log.info("markDeletePosition before expire {}", cursorStats.markDeletePosition);
        Assert.assertEquals(cursorStats.markDeletePosition, PositionImpl.get(3L, -1L).toString());
        Thread.sleep(this.conf.getTtlDurationDefaultInSeconds() * 2000);
        log.info("***** run message expiry now");
        runMessageExpiryCheck();
        PersistentTopicInternalStats.CursorStats cursorStats2 = this.admin.topics().getInternalStats("persistent://prop/ns-abc/testttl").cursors.get("ttl-sub-1");
        log.info("markDeletePosition after expire {}", cursorStats2.markDeletePosition);
        Assert.assertEquals(cursorStats2.markDeletePosition, PositionImpl.get(3L, 50 - 1).toString());
    }
}
