package org.apache.pulsar.client.api;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.math3.optimization.direct.CMAESOptimizer;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.class */
public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SimpleProducerConsumerStatTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetupForStatsTest();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "batch")
    public Object[][] batchMessageDelayMsProvider() {
        return new Object[]{new Object[]{0}, new Object[]{1000}};
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "batch_with_timeout")
    public Object[][] ackTimeoutSecProvider() {
        return new Object[]{new Object[]{0, 0}, new Object[]{0, 2}, new Object[]{1000, 0}, new Object[]{1000, 2}};
    }

    @Test(dataProvider = "batch_with_timeout")
    public void testSyncProducerAndConsumer(int i, int i2) throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        ConsumerBuilder<byte[]> subscriptionName = this.pulsarClient.newConsumer().topic("persistent://my-property/tp1/my-ns/my-topic1").subscriptionName("my-subscriber-name");
        boolean z = i2 > 0;
        if (i2 > 0) {
            subscriptionName.ackTimeout(i2, TimeUnit.SECONDS);
        }
        Consumer<?> subscribe = subscriptionName.subscribe();
        ProducerBuilder<byte[]> producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/tp1/my-ns/my-topic1");
        if (i != 0) {
            producerBuilder.enableBatching(true).batchingMaxPublishDelay(i, TimeUnit.MILLISECONDS).batchingMaxMessages(5);
        }
        Producer<?> create = producerBuilder.create();
        for (int i3 = 0; i3 < 11; i3++) {
            create.send(("my-message-" + i3).getBytes());
        }
        Message<?> message = null;
        HashSet newHashSet = Sets.newHashSet();
        for (int i4 = 0; i4 < 11; i4++) {
            message = subscribe.receive(5, TimeUnit.SECONDS);
            String str = new String(message.getData());
            log.info("Received message: [{}]", str);
            testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i4);
        }
        subscribe.acknowledgeCumulative(message);
        Thread.sleep(2000L);
        subscribe.close();
        create.close();
        validatingLogInfo(subscribe, create, z);
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(dataProvider = "batch_with_timeout")
    public void testAsyncProducerAndAsyncAck(int i, int i2) throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        ConsumerBuilder<byte[]> subscriptionName = this.pulsarClient.newConsumer().topic("persistent://my-property/tp1/my-ns/my-topic2").subscriptionName("my-subscriber-name");
        if (i2 > 0) {
            subscriptionName.ackTimeout(i2, TimeUnit.SECONDS);
        }
        Consumer<?> subscribe = subscriptionName.subscribe();
        ProducerBuilder<byte[]> messageRoutingMode = this.pulsarClient.newProducer().topic("persistent://my-property/tp1/my-ns/my-topic2").messageRoutingMode(MessageRoutingMode.SinglePartition);
        if (i != 0) {
            messageRoutingMode.enableBatching(true).batchingMaxPublishDelay(i, TimeUnit.MILLISECONDS).batchingMaxMessages(5);
        } else {
            messageRoutingMode.enableBatching(false);
        }
        Producer<?> create = messageRoutingMode.create();
        ArrayList newArrayList = Lists.newArrayList();
        for (int i3 = 0; i3 < 50; i3++) {
            newArrayList.add(create.sendAsync(("my-message-" + i3).getBytes()));
        }
        log.info("Waiting for async publish to complete");
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
        Message<?> message = null;
        HashSet newHashSet = Sets.newHashSet();
        for (int i4 = 0; i4 < 50; i4++) {
            message = subscribe.receive(5, TimeUnit.SECONDS);
            testMessageOrderAndDuplicates(newHashSet, new String(message.getData()), "my-message-" + i4);
        }
        CompletableFuture<Void> acknowledgeCumulativeAsync = subscribe.acknowledgeCumulativeAsync(message);
        log.info("Waiting for async ack to complete");
        acknowledgeCumulativeAsync.get();
        Thread.sleep(2000L);
        subscribe.close();
        create.close();
        validatingLogInfo(subscribe, create, i == 0 && i2 > 0);
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(dataProvider = "batch_with_timeout")
    public void testAsyncProducerAndReceiveAsyncAndAsyncAck(int i, int i2) throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        ConsumerBuilder<byte[]> subscriptionName = this.pulsarClient.newConsumer().topic("persistent://my-property/tp1/my-ns/my-topic2").subscriptionName("my-subscriber-name");
        if (i2 > 0) {
            subscriptionName.ackTimeout(i2, TimeUnit.SECONDS);
        }
        Consumer<?> subscribe = subscriptionName.subscribe();
        ProducerBuilder<byte[]> messageRoutingMode = this.pulsarClient.newProducer().topic("persistent://my-property/tp1/my-ns/my-topic2").messageRoutingMode(MessageRoutingMode.SinglePartition);
        if (i != 0) {
            messageRoutingMode.enableBatching(true).batchingMaxPublishDelay(i, TimeUnit.MILLISECONDS).batchingMaxMessages(5);
        } else {
            messageRoutingMode.enableBatching(false);
        }
        Producer<?> create = messageRoutingMode.create();
        ArrayList newArrayList = Lists.newArrayList();
        for (int i3 = 0; i3 < 101; i3++) {
            newArrayList.add(create.sendAsync(("my-message-" + i3).getBytes()));
        }
        log.info("Waiting for async publish to complete");
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
        Message<?> message = null;
        HashSet newHashSet = Sets.newHashSet();
        for (int i4 = 0; i4 < 101; i4++) {
            CompletableFuture<Message<?>> receiveAsync = subscribe.receiveAsync();
            Thread.sleep(10L);
            message = receiveAsync.get();
            String str = new String(message.getData());
            log.info("Received message: [{}]", str);
            testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i4);
        }
        CompletableFuture<Void> acknowledgeCumulativeAsync = subscribe.acknowledgeCumulativeAsync(message);
        log.info("Waiting for async ack to complete");
        acknowledgeCumulativeAsync.get();
        Thread.sleep(5000L);
        subscribe.close();
        create.close();
        validatingLogInfo(subscribe, create, i == 0 && i2 > 0);
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(dataProvider = "batch", timeOut = 100000)
    public void testMessageListener(int i) throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        CountDownLatch countDownLatch = new CountDownLatch(100);
        Consumer<?> subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/tp1/my-ns/my-topic3").subscriptionName("my-subscriber-name").ackTimeout(100L, TimeUnit.SECONDS).messageListener((consumer, message) -> {
            Assert.assertNotNull(message, "Message cannot be null");
            log.debug("Received message [{}] in the listener", new String(message.getData()));
            consumer.acknowledgeAsync((Message<?>) message);
            countDownLatch.countDown();
        }).subscribe();
        ProducerBuilder<byte[]> producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/tp1/my-ns/my-topic3");
        if (i != 0) {
            producerBuilder.enableBatching(true).batchingMaxPublishDelay(i, TimeUnit.MILLISECONDS).batchingMaxMessages(5);
        }
        Producer<?> create = producerBuilder.create();
        ArrayList newArrayList = Lists.newArrayList();
        for (int i2 = 0; i2 < 100; i2++) {
            newArrayList.add(create.sendAsync(("my-message-" + i2).getBytes()));
        }
        log.info("Waiting for async publish to complete");
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
        Thread.sleep(5000L);
        log.info("Waiting for message listener to ack all messages");
        Assert.assertTrue(countDownLatch.await(100, TimeUnit.SECONDS), "Timed out waiting for message listener acks");
        subscribe.close();
        create.close();
        validatingLogInfo(subscribe, create, true);
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(dataProvider = "batch")
    public void testSendTimeout(int i) throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/tp1/my-ns/my-topic5").subscriptionName("my-subscriber-name").subscribe();
        ProducerBuilder<byte[]> sendTimeout = this.pulsarClient.newProducer().topic("persistent://my-property/tp1/my-ns/my-topic5").sendTimeout(1, TimeUnit.SECONDS);
        if (i != 0) {
            sendTimeout.enableBatching(true).batchingMaxPublishDelay(2 * i, TimeUnit.MILLISECONDS).batchingMaxMessages(5);
        }
        Producer<byte[]> create = sendTimeout.create();
        stopBroker();
        try {
            create.sendAsync("my-message".getBytes()).get();
            Assert.fail("Send operation should have failed");
        } catch (ExecutionException e) {
        }
        startBroker();
        Assert.assertNull(subscribe.receive(3, TimeUnit.SECONDS));
        subscribe.close();
        create.close();
        Thread.sleep(1000L);
        ConsumerStats stats = subscribe.getStats();
        ProducerStats stats2 = create.getStats();
        Assert.assertEquals(stats2.getTotalMsgsSent(), 0L);
        Assert.assertEquals(stats2.getTotalSendFailed(), 1L);
        Assert.assertEquals(stats.getTotalMsgsReceived(), 0L);
        Assert.assertEquals(stats.getTotalMsgsReceived(), stats.getTotalAcksSent());
        log.info("-- Exiting {} test --", this.methodName);
    }

    public void testBatchMessagesRateOut() throws PulsarClientException, InterruptedException, PulsarAdminException {
        log.info("-- Starting {} test --", this.methodName);
        double d = 17.0d;
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/cluster/my-ns/testBatchMessagesRateOut").subscriptionName("my-subscriber-name").subscribe();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/cluster/my-ns/testBatchMessagesRateOut").batchingMaxMessages(5).enableBatching(true).batchingMaxPublishDelay(2L, TimeUnit.SECONDS).create();
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        new Thread(() -> {
            RateLimiter create2 = RateLimiter.create(d);
            while (atomicBoolean.get()) {
                create2.acquire();
                create.sendAsync("Hello World".getBytes());
                CompletableFuture receiveAsync = subscribe.receiveAsync();
                Objects.requireNonNull(subscribe);
                receiveAsync.thenAccept(subscribe::acknowledgeAsync);
            }
        }).start();
        Thread.sleep(2000L);
        atomicBoolean.set(false);
        this.pulsar.getBrokerService().updateRates();
        Assert.assertTrue(this.admin.topics().getStats("persistent://my-property/cluster/my-ns/testBatchMessagesRateOut").msgRateOut > 17.0d / ((double) 5));
        subscribe.unsubscribe();
        log.info("-- Exiting {} test --", this.methodName);
    }

    public void validatingLogInfo(Consumer<?> consumer, Producer<?> producer, boolean z) throws InterruptedException {
        Thread.sleep(1000L);
        ConsumerStats stats = consumer.getStats();
        ProducerStats stats2 = producer.getStats();
        Assert.assertEquals(stats2.getTotalMsgsSent(), stats.getTotalMsgsReceived());
        Assert.assertEquals(stats2.getTotalBytesSent(), stats.getTotalBytesReceived());
        Assert.assertEquals(stats2.getTotalMsgsSent(), stats2.getTotalAcksReceived());
        if (z) {
            Assert.assertEquals(stats.getTotalMsgsReceived(), stats.getTotalAcksSent());
        }
    }

    @Test
    public void testAddBrokerLatencyStats() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/tp1/my-ns/my-topic1").create();
        for (int i = 0; i < 120; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        this.pulsar.getBrokerService().updateRates();
        JsonArray metrics = this.admin.brokerStats().getMetrics();
        boolean z = false;
        for (int i2 = 0; i2 < metrics.size(); i2++) {
            try {
                if (metrics.get(i2).getAsJsonObject().get("metrics").toString().contains("brk_AddEntryLatencyBuckets")) {
                    JsonObject asJsonObject = metrics.get(i2).getAsJsonObject().get("metrics").getAsJsonObject();
                    for (String str : asJsonObject.keySet()) {
                        if (str.startsWith("brk_AddEntryLatencyBuckets") && asJsonObject.get(str).getAsDouble() > CMAESOptimizer.DEFAULT_STOPFITNESS) {
                            z = true;
                        }
                    }
                    System.out.println(asJsonObject.toString());
                }
            } catch (Exception e) {
            }
        }
        Assert.assertTrue(z);
        create.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -705350206:
                if (implMethodName.equals("lambda$testMessageListener$a07a3e3e$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/api/SimpleProducerConsumerStatTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/CountDownLatch;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    CountDownLatch countDownLatch = (CountDownLatch) serializedLambda.getCapturedArg(0);
                    return (consumer, message) -> {
                        Assert.assertNotNull(message, "Message cannot be null");
                        log.debug("Received message [{}] in the listener", new String(message.getData()));
                        consumer.acknowledgeAsync((Message<?>) message);
                        countDownLatch.countDown();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
