package org.apache.pulsar.broker.service.persistent;

import com.google.common.collect.Lists;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.class */
public class ShadowTopicRealBkTest {
    private static final String cluster = "test";
    private final int zkPort = PortManager.nextLockedFreePort();
    private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, this.zkPort, PortManager::nextLockedFreePort);
    private PulsarService pulsar;
    private PulsarAdmin admin;

    @BeforeClass
    public void setup() throws Exception {
        this.bk.start();
        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
        serviceConfiguration.setClusterName("test");
        serviceConfiguration.setAdvertisedAddress("localhost");
        serviceConfiguration.setBrokerServicePort(Optional.of(0));
        serviceConfiguration.setWebServicePort(Optional.of(0));
        serviceConfiguration.setMetadataStoreUrl("zk:localhost:" + this.zkPort);
        this.pulsar = new PulsarService(serviceConfiguration);
        this.pulsar.start();
        this.admin = this.pulsar.getAdminClient();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).brokerServiceUrl(this.pulsar.getBrokerServiceUrl()).build());
        this.admin.tenants().createTenant("public", TenantInfo.builder().allowedClusters(Set.of("test")).build());
        this.admin.namespaces().createNamespace("public/default");
    }

    @AfterClass(alwaysRun = true)
    public void cleanup() throws Exception {
        if (this.pulsar != null) {
            this.pulsar.close();
        }
        this.bk.stop();
    }

    @Test
    public void testReadFromStorage() throws Exception {
        String topicName = TopicName.get("test-read-from-source").toString();
        String str = topicName + "-shadow";
        this.admin.topics().createNonPartitionedTopic(topicName);
        this.admin.topics().createShadowTopic(str, topicName);
        this.admin.topics().setShadowTopics(topicName, Lists.newArrayList(new String[]{str}));
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).untilAsserted(() -> {
            ShadowReplicator shadowReplicator = (ShadowReplicator) ((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(topicName).get()).orElseThrow()).getShadowReplicators().get(str);
            Assert.assertNotNull(shadowReplicator);
            Assert.assertEquals(String.valueOf(shadowReplicator.getState()), "Started");
        });
        PulsarClient client = this.pulsar.getClient();
        client.newProducer().topic(topicName).create().send("message".getBytes());
        Message receive = client.newConsumer().topic(new String[]{str}).subscriptionName("sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe().receive(5, TimeUnit.SECONDS);
        Assert.assertNotNull(receive);
        Assert.assertEquals((byte[]) receive.getValue(), "message".getBytes());
        ManagedLedger managedLedger = ((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).orElseThrow()).getManagedLedger();
        Assert.assertTrue(managedLedger instanceof ShadowManagedLedgerImpl);
        managedLedger.getEarliestMessagePublishTimeInBacklog().get(3L, TimeUnit.SECONDS);
    }
}
