package org.apache.pulsar.client.impl;

import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker-impl"})
/* loaded from: input_file:org/apache/pulsar/client/impl/ProducerConsumerInternalTest.class */
public class ProducerConsumerInternalTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(ProducerConsumerInternalTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass(alwaysRun = true)
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testSameProducerRegisterTwice() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        ProducerImpl producerImpl = (ProducerImpl) this.pulsarClient.newProducer().topic(newUniqueName).create();
        removeServiceProducerMaintainedByServerCnx(getServiceProducer(producerImpl, newUniqueName));
        CommandCloseProducer commandCloseProducer = new CommandCloseProducer();
        commandCloseProducer.setProducerId(producerImpl.producerId);
        producerImpl.getClientCnx().handleCloseProducer(commandCloseProducer);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(producerImpl.getState().toString(), "Ready");
        });
    }

    @Test
    public void testSameProducerRegisterTwiceWithSpecifiedProducerName() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        ProducerImpl producerImpl = (ProducerImpl) this.pulsarClient.newProducer().producerName("p1").topic(newUniqueName).create();
        removeServiceProducerMaintainedByServerCnx(getServiceProducer(producerImpl, newUniqueName));
        CommandCloseProducer commandCloseProducer = new CommandCloseProducer();
        commandCloseProducer.setProducerId(producerImpl.producerId);
        producerImpl.getClientCnx().handleCloseProducer(commandCloseProducer);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(producerImpl.getState().toString(), "Ready", "The producer registration failed");
        });
    }

    private void removeServiceProducerMaintainedByServerCnx(MockedPulsarServiceBaseTest.ServiceProducer serviceProducer) {
        ServerCnx cnx = serviceProducer.getServiceProducer().getCnx();
        cnx.removedProducer(serviceProducer.getServiceProducer());
        Awaitility.await().untilAsserted(() -> {
            Assert.assertFalse(cnx.getProducers().containsKey(serviceProducer.getServiceProducer().getProducerId()));
        });
    }

    @Test(groups = {"flaky"})
    public void testExclusiveConsumerWillAlwaysRetryEvenIfReceivedConsumerBusyError() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{newUniqueName.toString()}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("subscription1").subscribe();
        ClientCnx clientCnx = subscribe.getClientCnx();
        ServerCnx cnx = ((Consumer) ((Topic) ((Optional) this.pulsar.getBrokerService().getTopic(newUniqueName, false).join()).get()).getSubscription("subscription1").getDispatcher().getConsumers().get(0)).cnx();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        cnx.execute(() -> {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        clientCnx.close();
        Thread.sleep(1000L);
        countDownLatch.countDown();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(subscribe.getState(), HandlerState.State.Ready);
        });
        subscribe.close();
        this.admin.topics().delete(newUniqueName, false);
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "containerBuilder")
    public Object[][] containerBuilderProvider() {
        return new Object[]{new Object[]{BatcherBuilder.DEFAULT}, new Object[]{BatcherBuilder.KEY_BASED}};
    }

    @Test(timeOut = 30000, dataProvider = "containerBuilder")
    public void testSendTimerCheckForBatchContainer(BatcherBuilder batcherBuilder) throws Exception {
        ProducerImpl create = this.pulsarClient.newProducer().topic(BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_")).batcherBuilder(batcherBuilder).sendTimeout(1, TimeUnit.SECONDS).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).batchingMaxMessages(1000).create();
        try {
            log.info("Before sendAsync msg-0: {}", Long.valueOf(System.nanoTime()));
            CompletableFuture sendAsync = create.sendAsync("msg-0".getBytes());
            sendAsync.thenAccept(messageId -> {
                log.info("msg-0 done: {} (msgId: {})", Long.valueOf(System.nanoTime()), messageId);
            });
            sendAsync.get();
            create.triggerSendTimer();
            Thread.sleep(1950L);
            log.info("Before sendAsync msg-1: {}", Long.valueOf(System.nanoTime()));
            CompletableFuture sendAsync2 = create.sendAsync("msg-1".getBytes());
            sendAsync2.thenAccept(messageId2 -> {
                log.info("msg-1 done: {} (msgId: {})", Long.valueOf(System.nanoTime()), messageId2);
            });
            sendAsync2.get();
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testRetentionPolicyByProducingMessages() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
        this.pulsar.getConfiguration().setManagedLedgerMaxEntriesPerLedger(10);
        this.pulsar.getConfiguration().setManagedLedgerMinLedgerRolloverTimeMinutes(0);
        this.pulsar.getConfiguration().setDefaultRetentionTimeInMinutes(0);
        this.pulsar.getConfiguration().setDefaultRetentionSizeInMB(0);
        Producer create = this.pulsarClient.newProducer().topic(newUniqueName).sendTimeout(1, TimeUnit.SECONDS).enableBatching(false).create();
        try {
            org.apache.pulsar.client.api.Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{newUniqueName}).subscriptionName("my-sub").subscribe();
            for (int i = 0; i < 10; i++) {
                try {
                    create.newMessage().sendAsync();
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    throw th;
                }
            }
            for (int i2 = 0; i2 < 10; i2++) {
                Message receive = subscribe.receive();
                Assert.assertNotNull(receive);
                subscribe.acknowledge(receive);
            }
            Awaitility.await().untilAsserted(() -> {
                this.admin.topics().trimTopic(newUniqueName);
                PersistentTopicInternalStats persistentTopicInternalStats = (PersistentTopicInternalStats) this.admin.topics().getInternalStatsAsync(newUniqueName).get();
                Assert.assertEquals(persistentTopicInternalStats.currentLedgerEntries, 0L);
                Assert.assertEquals(persistentTopicInternalStats.ledgers.size(), 1);
            });
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }
}
