package org.apache.pulsar.broker;

import com.google.common.collect.Sets;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.TopicEventsListener;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/TopicEventsListenerTest.class */
public class TopicEventsListenerTest extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(TopicEventsListenerTest.class);
    static final Queue<String> events = new ConcurrentLinkedQueue();
    volatile String topicNameToWatch;
    String namespace;

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "topicType")
    public static Object[][] topicType() {
        return new Object[]{new Object[]{"persistent", "partitioned", true}, new Object[]{"persistent", "non-partitioned", true}, new Object[]{"non-persistent", "partitioned", true}, new Object[]{"non-persistent", "non-partitioned", true}, new Object[]{"persistent", "partitioned", false}, new Object[]{"persistent", "non-partitioned", false}, new Object[]{"non-persistent", "partitioned", false}, new Object[]{"non-persistent", "non-partitioned", false}};
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "topicTypeNoDelete")
    public static Object[][] topicTypeNoDelete() {
        return new Object[]{new Object[]{"persistent", "partitioned"}, new Object[]{"persistent", "non-partitioned"}, new Object[]{"non-persistent", "partitioned"}, new Object[]{"non-persistent", "non-partitioned"}};
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        super.baseSetup();
        this.pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
        this.pulsar.getBrokerService().addTopicEventListener(new TopicEventsListener[]{(str, topicEvent, eventStage, th) -> {
            log.info("got event {}__{} for topic {}", new Object[]{topicEvent, eventStage, str});
            if (str.equals(this.topicNameToWatch)) {
                if (log.isDebugEnabled()) {
                    log.debug("got event {}__{} for topic {} with detailed stack", new Object[]{topicEvent, eventStage, str, new Exception("tracing event source")});
                }
                events.add(topicEvent.toString() + "__" + eventStage.toString());
            }
        }});
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @BeforeMethod
    protected void setupTest() throws Exception {
        this.namespace = "prop/" + UUID.randomUUID();
        this.admin.namespaces().createNamespace(this.namespace, Sets.newHashSet(new String[]{"test"}));
        Assert.assertTrue(this.admin.namespaces().getNamespaces("prop").contains(this.namespace));
        this.admin.namespaces().setRetention(this.namespace, new RetentionPolicies(3, 10));
        PulsarAdmin createPulsarAdmin = createPulsarAdmin();
        try {
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(createPulsarAdmin.namespaces().getRetention(this.namespace), new RetentionPolicies(3, 10));
            });
            if (createPulsarAdmin != null) {
                createPulsarAdmin.close();
            }
            events.clear();
        } catch (Throwable th) {
            if (createPulsarAdmin != null) {
                try {
                    createPulsarAdmin.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @AfterMethod(alwaysRun = true)
    protected void cleanupTest() throws Exception {
        deleteNamespaceWithRetry(this.namespace, true);
    }

    @Test(dataProvider = "topicType")
    public void testEvents(String str, String str2, boolean z) throws Exception {
        String str3 = str + "://" + this.namespace + "/topic-" + UUID.randomUUID();
        createTopicAndVerifyEvents(str2, str3);
        events.clear();
        if (str2.equals("partitioned")) {
            this.admin.topics().deletePartitionedTopic(str3, z);
        } else {
            this.admin.topics().delete(str3, z);
        }
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assert.assertEquals(events.toArray(), new String[]{"DELETE__BEFORE", "UNLOAD__BEFORE", "UNLOAD__SUCCESS", "DELETE__SUCCESS"});
        });
    }

    @Test(dataProvider = "topicType")
    public void testEventsWithUnload(String str, String str2, boolean z) throws Exception {
        String str3 = str + "://" + this.namespace + "/topic-" + UUID.randomUUID();
        createTopicAndVerifyEvents(str2, str3);
        events.clear();
        this.admin.topics().unload(str3);
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assert.assertEquals(events.toArray(), new String[]{"UNLOAD__BEFORE", "UNLOAD__SUCCESS"});
        });
        events.clear();
        if (str2.equals("partitioned")) {
            this.admin.topics().deletePartitionedTopic(str3, z);
        } else {
            this.admin.topics().delete(str3, z);
        }
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assert.assertEquals(events.toArray(), new String[]{"DELETE__BEFORE", "DELETE__SUCCESS"});
        });
    }

    @Test(dataProvider = "topicType")
    public void testEventsActiveSub(String str, String str2, boolean z) throws Exception {
        String str3 = str + "://" + this.namespace + "/topic-" + UUID.randomUUID();
        createTopicAndVerifyEvents(str2, str3);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str3}).subscriptionName("sub").subscribe();
        Producer create = this.pulsarClient.newProducer().topic(str3).create();
        for (int i = 0; i < 10; i++) {
            create.send("hello".getBytes());
        }
        subscribe.receive();
        events.clear();
        try {
            if (str2.equals("partitioned")) {
                this.admin.topics().deletePartitionedTopic(str3, z);
            } else {
                this.admin.topics().delete(str3, z);
            }
        } catch (PulsarAdminException e) {
            if (z) {
                throw e;
            }
            Assert.assertTrue(e.getMessage().contains("Topic has active producers/subscriptions") || e.getMessage().contains("connected producers/consumers"));
        }
        String[] strArr = z ? new String[]{"DELETE__BEFORE", "UNLOAD__BEFORE", "UNLOAD__SUCCESS", "DELETE__SUCCESS"} : new String[]{"DELETE__BEFORE", "DELETE__FAILURE"};
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assert.assertEquals(events.size() <= 4 ? (String[]) events.toArray(new String[0]) : (String[]) ArrayUtils.subarray((String[]) events.toArray(new String[0]), 0, 4), strArr);
        });
        subscribe.close();
        create.close();
    }

    @Test(dataProvider = "topicTypeNoDelete")
    public void testTopicAutoGC(String str, String str2) throws Exception {
        createTopicAndVerifyEvents(str2, str + "://" + this.namespace + "/topic-" + UUID.randomUUID());
        this.admin.namespaces().setInactiveTopicPolicies(this.namespace, new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true));
        this.admin.namespaces().setRetention(this.namespace, new RetentionPolicies());
        PulsarAdmin createPulsarAdmin = createPulsarAdmin();
        try {
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(createPulsarAdmin.namespaces().getRetention(this.namespace), new RetentionPolicies());
            });
            if (createPulsarAdmin != null) {
                createPulsarAdmin.close();
            }
            events.clear();
            runGC();
            Awaitility.waitAtMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
                Assert.assertEquals(events.toArray(), new String[]{"UNLOAD__BEFORE", "UNLOAD__SUCCESS"});
            });
        } catch (Throwable th) {
            if (createPulsarAdmin != null) {
                try {
                    createPulsarAdmin.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void createTopicAndVerifyEvents(String str, String str2) throws Exception {
        String[] strArr;
        if (str.equals("partitioned")) {
            this.topicNameToWatch = str2 + "-partition-1";
            this.admin.topics().createPartitionedTopic(str2, 2);
            triggerPartitionsCreation(str2);
            strArr = new String[]{"LOAD__BEFORE", "CREATE__BEFORE", "CREATE__SUCCESS", "LOAD__SUCCESS"};
        } else {
            this.topicNameToWatch = str2;
            this.admin.topics().createNonPartitionedTopic(str2);
            strArr = new String[]{"LOAD__BEFORE", "LOAD__FAILURE", "LOAD__BEFORE", "CREATE__BEFORE", "CREATE__SUCCESS", "LOAD__SUCCESS"};
        }
        String[] strArr2 = strArr;
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assert.assertEquals(events.toArray(), strArr2);
        });
    }

    private PulsarAdmin createPulsarAdmin() throws PulsarClientException {
        return PulsarAdmin.builder().serviceHttpUrl(this.brokerUrl != null ? this.brokerUrl.toString() : this.brokerUrlTls.toString()).build();
    }

    private void triggerPartitionsCreation(String str) throws Exception {
        this.pulsarClient.newProducer().topic(str).create().close();
    }
}
