package org.apache.pulsar.broker.service;

import java.util.ArrayList;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.common.sasl.SaslConstants;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {SaslConstants.SASL_BROKER_PROTOCOL})
/* loaded from: input_file:org/apache/pulsar/broker/service/TopicTerminationTest.class */
public class TopicTerminationTest extends BrokerTestBase {
    private final String topicName = "persistent://prop/ns-abc/topic0";

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testSimpleTermination() throws Exception {
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        create.send("test-msg-1".getBytes());
        create.send("test-msg-2".getBytes());
        Assert.assertEquals(this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get(), create.send("test-msg-3".getBytes()));
        try {
            create.send("test-msg-4".getBytes());
            Assert.fail("Should have thrown exception");
        } catch (PulsarClientException.TopicTerminatedException e) {
        }
    }

    @Test(groups = {SaslConstants.SASL_BROKER_PROTOCOL})
    public void testCreateProducerOnTerminatedTopic() throws Exception {
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        create.send("test-msg-1".getBytes());
        create.send("test-msg-2".getBytes());
        Assert.assertEquals(this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get(), create.send("test-msg-3".getBytes()));
        try {
            this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").create();
            Assert.fail("Should have thrown exception");
        } catch (PulsarClientException.TopicTerminatedException e) {
        }
    }

    @Test(timeOut = 20000)
    public void testTerminateWhilePublishing() throws Exception {
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        ArrayList arrayList = new ArrayList();
        Thread thread = new Thread(() -> {
            try {
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
            }
            for (int i = 0; i < 1000; i++) {
                arrayList.add(create.sendAsync("test".getBytes()));
            }
        });
        thread.start();
        cyclicBarrier.await();
        this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get();
        thread.join();
        boolean z = false;
        try {
            FutureUtil.waitForAll(arrayList).get();
        } catch (Exception e) {
        }
        for (int i = 0; i < 1000; i++) {
            Assert.assertTrue(((CompletableFuture) arrayList.get(i)).isDone());
            if (z) {
                Assert.assertTrue(((CompletableFuture) arrayList.get(i)).isCompletedExceptionally());
            }
            z = ((CompletableFuture) arrayList.get(i)).isCompletedExceptionally();
        }
    }

    @Test(groups = {SaslConstants.SASL_BROKER_PROTOCOL})
    public void testDoubleTerminate() throws Exception {
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        create.send("test-msg-1".getBytes());
        create.send("test-msg-2".getBytes());
        MessageId send = create.send("test-msg-3".getBytes());
        Assert.assertEquals(this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get(), send);
        Assert.assertEquals(this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get(), send);
    }

    @Test(groups = {SaslConstants.SASL_BROKER_PROTOCOL})
    public void testTerminatePartitionedTopic() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://prop/ns-abc/topic0", 4);
        try {
            this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get();
            Assert.fail("Should have failed");
        } catch (ExecutionException e) {
            Assert.assertEquals(e.getCause().getClass(), PulsarAdminException.NotAllowedException.class);
        }
    }

    @Test(timeOut = 20000)
    public void testSimpleTerminationConsumer() throws Exception {
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://prop/ns-abc/topic0").subscriptionName("my-sub").subscribe();
        MessageId send = create.send("test-msg-1".getBytes());
        MessageId send2 = create.send("test-msg-2".getBytes());
        Message<byte[]> receive = subscribe.receive();
        Assert.assertEquals(receive.getMessageId(), send);
        subscribe.acknowledge((Message<?>) receive);
        MessageId send3 = create.send("test-msg-3".getBytes());
        Assert.assertFalse(subscribe.hasReachedEndOfTopic());
        Assert.assertEquals(this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get(), send3);
        Message<byte[]> receive2 = subscribe.receive();
        Assert.assertEquals(receive2.getMessageId(), send2);
        subscribe.acknowledge((Message<?>) receive2);
        Message<byte[]> receive3 = subscribe.receive();
        Assert.assertEquals(receive3.getMessageId(), send3);
        subscribe.acknowledge((Message<?>) receive3);
        Assert.assertNull(subscribe.receive(100, TimeUnit.MILLISECONDS));
        Thread.sleep(100L);
        Assert.assertTrue(subscribe.hasReachedEndOfTopic());
    }

    @Test(timeOut = 20000)
    public void testSimpleTerminationMessageListener() throws Exception {
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://prop/ns-abc/topic0").subscriptionName("my-sub").messageListener(new MessageListener<byte[]>() { // from class: org.apache.pulsar.broker.service.TopicTerminationTest.1
            @Override // org.apache.pulsar.client.api.MessageListener
            public void received(Consumer<byte[]> consumer, Message<byte[]> message) {
            }

            @Override // org.apache.pulsar.client.api.MessageListener
            public void reachedEndOfTopic(Consumer<byte[]> consumer) {
                countDownLatch.countDown();
                Assert.assertTrue(consumer.hasReachedEndOfTopic());
            }
        }).subscribe();
        create.send("test-msg-1".getBytes());
        create.send("test-msg-2".getBytes());
        MessageId send = create.send("test-msg-3".getBytes());
        subscribe.acknowledgeCumulative(send);
        Thread.sleep(100L);
        Assert.assertFalse(subscribe.hasReachedEndOfTopic());
        Assert.assertEquals(this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get(), send);
        Assert.assertTrue(countDownLatch.await(3L, TimeUnit.SECONDS));
        Assert.assertTrue(subscribe.hasReachedEndOfTopic());
    }

    @Test(timeOut = 20000)
    public void testSimpleTerminationReader() throws Exception {
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        MessageId send = create.send("test-msg-1".getBytes());
        MessageId send2 = create.send("test-msg-2".getBytes());
        MessageId send3 = create.send("test-msg-3".getBytes());
        Assert.assertEquals(this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get(), send3);
        Reader<byte[]> create2 = this.pulsarClient.newReader().topic("persistent://prop/ns-abc/topic0").startMessageId(MessageId.earliest).create();
        Assert.assertEquals(create2.readNext().getMessageId(), send);
        Assert.assertEquals(create2.readNext().getMessageId(), send2);
        Assert.assertEquals(create2.readNext().getMessageId(), send3);
        Assert.assertNull(create2.readNext(100, TimeUnit.MILLISECONDS));
        Thread.sleep(100L);
        Assert.assertTrue(create2.hasReachedEndOfTopic());
    }

    @Test(timeOut = 20000)
    public void testSimpleTerminationReaderListener() throws Exception {
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Reader<byte[]> create2 = this.pulsarClient.newReader().topic("persistent://prop/ns-abc/topic0").startMessageId(MessageId.latest).readerListener(new ReaderListener<byte[]>() { // from class: org.apache.pulsar.broker.service.TopicTerminationTest.2
            @Override // org.apache.pulsar.client.api.ReaderListener
            public void received(Reader<byte[]> reader, Message<byte[]> message) {
            }

            @Override // org.apache.pulsar.client.api.ReaderListener
            public void reachedEndOfTopic(Reader<byte[]> reader) {
                countDownLatch.countDown();
                Assert.assertTrue(reader.hasReachedEndOfTopic());
            }
        }).create();
        create.send("test-msg-1".getBytes());
        create.send("test-msg-2".getBytes());
        MessageId send = create.send("test-msg-3".getBytes());
        Thread.sleep(100L);
        Assert.assertFalse(create2.hasReachedEndOfTopic());
        Assert.assertEquals(this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get(), send);
        Assert.assertTrue(countDownLatch.await(3L, TimeUnit.SECONDS));
        Assert.assertTrue(create2.hasReachedEndOfTopic());
    }

    @Test(timeOut = 20000)
    public void testSubscribeOnTerminatedTopic() throws Exception {
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        create.send("test-msg-1".getBytes());
        Assert.assertEquals(this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get(), create.send("test-msg-2".getBytes()));
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://prop/ns-abc/topic0").subscriptionName("my-sub").subscribe();
        Thread.sleep(200L);
        Assert.assertTrue(subscribe.hasReachedEndOfTopic());
    }

    @Test(timeOut = 20000)
    public void testSubscribeOnTerminatedTopicWithNoMessages() throws Exception {
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get();
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://prop/ns-abc/topic0").subscriptionName("my-sub").subscribe();
        Thread.sleep(200L);
        Assert.assertTrue(subscribe.hasReachedEndOfTopic());
    }
}
