package org.apache.pulsar.broker.systopic;

import com.google.common.collect.Sets;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.socks5.auth.DefaultPasswordAuthImpl;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.class */
public class PartitionedSystemTopicTest extends BrokerTestBase {
    static final int PARTITIONS = 5;

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        resetConfig();
        this.conf.setAllowAutoTopicCreation(false);
        this.conf.setAllowAutoTopicCreationType("partitioned");
        this.conf.setDefaultNumPartitions(5);
        this.conf.setManagedLedgerMaxEntriesPerLedger(1);
        this.conf.setBrokerDeleteInactiveTopicsEnabled(false);
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testAutoCreatedPartitionedSystemTopic() throws Exception {
        this.admin.namespaces().createNamespace("prop/ns-test", 2);
        SystemTopicClient.Reader newReader = new NamespaceEventsSystemTopicFactory(this.pulsarClient).createTopicPoliciesSystemTopicClient(NamespaceName.get("prop/ns-test")).newReader();
        int i = this.admin.topics().getPartitionedTopicMetadata(String.format("persistent://%s/%s", "prop/ns-test", "__change_events")).partitions;
        Assert.assertEquals(this.admin.topics().getPartitionedTopicList("prop/ns-test").size(), 1);
        Assert.assertEquals(i, 5);
        Assert.assertEquals(this.admin.topics().getList("prop/ns-test").size(), 5);
        newReader.close();
    }

    @Test(timeOut = 60000)
    public void testConsumerCreationWhenEnablingTopicPolicy() throws Exception {
        String str = "tenant-" + RandomStringUtils.randomAlphabetic(4).toLowerCase();
        this.admin.tenants().createTenant(str, new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet(new String[]{"test"})));
        for (int i = 0; i < 30; i++) {
            String str2 = str + "/ns-" + i;
            this.admin.namespaces().createNamespace(str2, 4);
            this.admin.topics().createPartitionedTopic(str2 + "/t1", 2);
        }
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < 30; i2++) {
            arrayList.add(this.pulsarClient.newConsumer().topic(new String[]{str + "/ns-" + i2 + "/t1"}).subscriptionName("sub").subscribeAsync());
        }
        FutureUtil.waitForAll(arrayList).get();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Consumer) ((CompletableFuture) it.next()).join()).close();
        }
    }

    @Test
    public void testProduceAndConsumeUnderSystemNamespace() throws Exception {
        this.admin.tenants().createTenant(DefaultPasswordAuthImpl.DEFAULT_PASSWORD, TenantInfo.builder().adminRoles(Sets.newHashSet(new String[]{"admin"})).allowedClusters(Sets.newHashSet(new String[]{"test"})).build());
        this.admin.namespaces().createNamespace("pulsar/system", 2);
        Producer create = this.pulsarClient.newProducer().topic("pulsar/system/__topic-1").create();
        try {
            create.send("test".getBytes(StandardCharsets.UTF_8));
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"pulsar/system/__topic-1"}).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionName("sub1").subscriptionType(SubscriptionType.Shared).subscribe();
            try {
                Assert.assertNotNull(subscribe.receive(5, TimeUnit.SECONDS));
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testHealthCheckTopicNotOffload() throws Exception {
        TopicName topicName = TopicName.get("persistent", NamespaceService.getHeartbeatNamespaceV2(this.pulsar.getAdvertisedAddress(), this.pulsar.getConfig()), "healthcheck");
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(topicName.toString(), true).get()).get();
        ManagedLedgerConfig config = persistentTopic.getManagedLedger().getConfig();
        config.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
        this.admin.brokers().healthcheck(TopicVersion.V2);
        this.admin.topics().triggerOffload(topicName.toString(), MessageId.earliest);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(persistentTopic.getManagedLedger().getOffloadedSize(), 0L);
        });
        LedgerOffloader ledgerOffloader = (LedgerOffloader) Mockito.mock(LedgerOffloader.class);
        config.setLedgerOffloader(ledgerOffloader);
        Assert.assertEquals(config.getLedgerOffloader(), ledgerOffloader);
    }

    @Test
    public void testSystemNamespaceNotCreateChangeEventsTopic() throws Exception {
        this.admin.brokers().healthcheck(TopicVersion.V2);
        Assert.assertFalse(((Optional) this.pulsar.getBrokerService().getTopic(TopicName.get("persistent", NamespaceService.getHeartbeatNamespaceV2(this.pulsar.getAdvertisedAddress(), this.pulsar.getConfig()), "__change_events").getPartition(1).toString(), false).join()).isPresent());
    }

    @Test
    public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception {
        this.admin.brokers().healthcheck(TopicVersion.V2);
        TopicName topicName = TopicName.get("persistent", NamespaceService.getHeartbeatNamespaceV2(this.pulsar.getAdvertisedAddress(), this.pulsar.getConfig()), "__change_events");
        for (int i = 0; i < 5; i++) {
            this.pulsar.getBrokerService().getTopic(topicName.getPartition(i).toString(), true).join();
        }
        Assert.assertThrows(PulsarAdminException.ConflictException.class, () -> {
            this.admin.topicPolicies().setMaxConsumers(topicName.toString(), 2);
        });
    }

    @Test
    public void testSetBacklogCausedCreatingProducerFailure() throws Exception {
        this.admin.namespaces().createNamespace("prop/ns-test", 2);
        this.admin.topics().createPartitionedTopic(String.format("persistent://%s", "prop/ns-test/topic-1"), 1);
        this.admin.namespaces().setBacklogQuota("prop/ns-test", BacklogQuota.builder().limitTime(2).limitSize(-1L).retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception).build(), BacklogQuota.BacklogQuotaType.message_age);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("prop/ns-test/topic-1").create();
        try {
            Optional topicReference = this.pulsar.getBrokerService().getTopicReference(TopicName.get(String.format("persistent://%s", "prop/ns-test/topic-1")).getPartition(0).toString());
            Assert.assertTrue(topicReference.isPresent());
            PersistentTopic persistentTopic = (PersistentTopic) topicReference.get();
            ManagedLedgerConfig config = persistentTopic.getManagedLedger().getConfig();
            config.setMinimumRolloverTime(1, TimeUnit.SECONDS);
            config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
            persistentTopic.getManagedLedger().setConfig(config);
            Whitebox.invokeMethod(persistentTopic.getManagedLedger(), "updateLastLedgerCreatedTimeAndScheduleRolloverTask", new Object[0]);
            create.send("msg-1");
            Thread.sleep(3000L);
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"prop/ns-test/topic-1"}).subscriptionName("sub-1").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionType(SubscriptionType.Key_Shared).subscribe();
            subscribe.acknowledge(subscribe.receive());
            Thread.sleep(3000L);
            try {
                Producer create2 = PulsarClient.builder().maxBackoffInterval(3L, TimeUnit.SECONDS).operationTimeout(5, TimeUnit.SECONDS).serviceUrl(this.lookupUrl.toString()).connectionTimeout(2, TimeUnit.SECONDS).build().newProducer(Schema.STRING).topic("prop/ns-test/topic-1").sendTimeout(3, TimeUnit.SECONDS).create();
                Assert.assertTrue(create2.isConnected());
                create2.close();
            } catch (Exception e) {
                Assert.fail("failed to create producer");
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testSystemTopicNotCheckExceed() throws Exception {
        this.admin.namespaces().createNamespace("prop/ns-test", 2);
        this.admin.topics().createPartitionedTopic(String.format("persistent://%s", "prop/ns-test/topic-1"), 1);
        this.conf.setMaxSameAddressConsumersPerTopic(1);
        this.admin.namespaces().setMaxConsumersPerTopic("prop/ns-test", 1);
        this.admin.topicPolicies().setMaxConsumers("prop/ns-test/topic-1", 1);
        TopicPoliciesSystemTopicClient createTopicPoliciesSystemTopicClient = new NamespaceEventsSystemTopicFactory(this.pulsarClient).createTopicPoliciesSystemTopicClient(NamespaceName.get("prop/ns-test"));
        SystemTopicClient.Reader newReader = createTopicPoliciesSystemTopicClient.newReader();
        SystemTopicClient.Reader newReader2 = createTopicPoliciesSystemTopicClient.newReader();
        this.conf.setMaxSameAddressProducersPerTopic(1);
        this.admin.namespaces().setMaxProducersPerTopic("prop/ns-test", 1);
        this.admin.topicPolicies().setMaxProducers("prop/ns-test/topic-1", 1);
        CompletableFuture newWriterAsync = createTopicPoliciesSystemTopicClient.newWriterAsync();
        CompletableFuture newWriterAsync2 = createTopicPoliciesSystemTopicClient.newWriterAsync();
        CompletableFuture.allOf(newWriterAsync, newWriterAsync2, this.admin.topicPolicies().setCompactionThresholdAsync("prop/ns-test/topic-1", 1L)).join();
        Assert.assertTrue(newReader.hasMoreEvents());
        Assert.assertNotNull(newReader.readNext());
        Assert.assertTrue(newReader2.hasMoreEvents());
        Assert.assertNotNull(newReader2.readNext());
        newReader.close();
        newReader2.close();
        ((SystemTopicClient.Writer) newWriterAsync.get()).close();
        ((SystemTopicClient.Writer) newWriterAsync2.get()).close();
    }
}
