package org.apache.pulsar.client.api;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.schema.proto.Test;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"flaky"})
/* loaded from: input_file:org/apache/pulsar/client/api/MessageDispatchThrottlingTest.class */
public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(MessageDispatchThrottlingTest.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/client/api/MessageDispatchThrottlingTest$DispatchRateType.class */
    public enum DispatchRateType {
        messageRate,
        byteRate
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        this.conf.setClusterName("test");
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
        super.resetConfig();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "subscriptions")
    public Object[][] subscriptionsProvider() {
        return new Object[]{new Object[]{SubscriptionType.Shared}, new Object[]{SubscriptionType.Exclusive}};
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "dispatchRateType")
    public Object[][] dispatchRateProvider() {
        return new Object[]{new Object[]{DispatchRateType.messageRate}, new Object[]{DispatchRateType.byteRate}};
    }

    @DataProvider(name = "subscriptionAndDispatchRateType")
    public Object[][] subDisTypeProvider() {
        LinkedList linkedList = new LinkedList();
        for (Object[] objArr : subscriptionsProvider()) {
            for (Object[] objArr2 : dispatchRateProvider()) {
                linkedList.add(merge(objArr, objArr2));
            }
        }
        return (Object[][]) linkedList.toArray(new Object[0][0]);
    }

    public static <T> T[] merge(T[] tArr, T[] tArr2) {
        T[] tArr3 = (T[]) Arrays.copyOf(tArr, tArr.length + tArr2.length);
        System.arraycopy(tArr2, 0, tArr3, tArr.length, tArr.length);
        return tArr3;
    }

    @Test
    public void testMessageRateDynamicallyChange() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        this.admin.namespaces().createNamespace("my-property/throttling_ns", Sets.newHashSet(new String[]{"test"}));
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingBlock").create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingBlock").get();
        Assert.assertFalse(persistentTopic.getDispatchRateLimiter().isPresent());
        DispatchRate build = DispatchRate.builder().dispatchThrottlingRateInMsg(100).dispatchThrottlingRateInByte(-1L).ratePeriodInSecond(360).build();
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", build);
        boolean z = false;
        int i = 0;
        while (true) {
            if (i >= 5) {
                break;
            }
            if (persistentTopic.getDispatchRateLimiter().isPresent()) {
                z = true;
                break;
            } else {
                if (i != 5 - 1) {
                    Thread.sleep(100L);
                }
                i++;
            }
        }
        Assert.assertTrue(z);
        Assert.assertEquals(this.admin.namespaces().getDispatchRate("my-property/throttling_ns"), build);
        Policies policies = this.admin.namespaces().getPolicies("my-property/throttling_ns");
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("test", build);
        Assert.assertEquals(policies.clusterDispatchRate, newHashMap);
        Assert.assertEquals(policies.topicDispatchRate, newHashMap);
        DispatchRate build2 = DispatchRate.builder().dispatchThrottlingRateInMsg(-1).dispatchThrottlingRateInByte(500).ratePeriodInSecond(360).build();
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", build2);
        boolean z2 = false;
        int i2 = 0;
        while (true) {
            if (i2 >= 5) {
                break;
            }
            if (((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getDispatchRateOnByte() == 500) {
                z2 = true;
                break;
            } else {
                if (i2 != 5 - 1) {
                    Thread.sleep(100L);
                }
                i2++;
            }
        }
        Assert.assertTrue(z2);
        Assert.assertEquals(this.admin.namespaces().getDispatchRate("my-property/throttling_ns"), build2);
        Policies policies2 = this.admin.namespaces().getPolicies("my-property/throttling_ns");
        newHashMap.put("test", build2);
        Assert.assertEquals(policies2.clusterDispatchRate, newHashMap);
        Assert.assertEquals(policies2.topicDispatchRate, newHashMap);
        create.close();
    }

    @Test(dataProvider = "subscriptionAndDispatchRateType", timeOut = 5000)
    public void testMessageRateLimitingNotReceiveAllMessages(SubscriptionType subscriptionType, DispatchRateType dispatchRateType) throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        DispatchRate build = DispatchRateType.messageRate.equals(dispatchRateType) ? DispatchRate.builder().dispatchThrottlingRateInMsg(100).dispatchThrottlingRateInByte(-1L).ratePeriodInSecond(360).build() : DispatchRate.builder().dispatchThrottlingRateInMsg(-1).dispatchThrottlingRateInByte(100L).ratePeriodInSecond(360).build();
        this.admin.namespaces().createNamespace("my-property/throttling_ns", Sets.newHashSet(new String[]{"test"}));
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", build);
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingBlock").create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingBlock").get();
        boolean z = false;
        for (int i = 0; i < 5; i++) {
            if (((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getDispatchRateOnMsg() > 0 || ((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getDispatchRateOnByte() > 0) {
                z = true;
                break;
            } else {
                if (i != 5 - 1) {
                    Thread.sleep(100L);
                }
            }
        }
        Assert.assertTrue(z);
        Assert.assertEquals(this.admin.namespaces().getDispatchRate("my-property/throttling_ns"), build);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingBlock"}).subscriptionName("my-subscriber-name").subscriptionType(subscriptionType).messageListener((consumer, message) -> {
            Assert.assertNotNull(message, "Message cannot be null");
            log.debug("Received message [{}] in the listener", new String(message.getData()));
            atomicInteger.incrementAndGet();
        }).subscribe();
        deactiveCursors((ManagedLedgerImpl) persistentTopic.getManagedLedger());
        for (int i2 = 0; i2 < 500; i2++) {
            create.send(new byte[80]);
        }
        Assert.assertTrue(atomicInteger.get() < 200);
        subscribe.close();
        create.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test
    public void testClusterMsgByteRateLimitingClusterConfig() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        int dispatchThrottlingRatePerTopicInMsg = this.pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg();
        this.admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg", Integer.toString(5));
        this.admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInByte", Long.toString(1048576L));
        for (int i = 0; i < 5; i++) {
            if (this.pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg() == dispatchThrottlingRatePerTopicInMsg) {
                Thread.sleep(50 + (i * 10));
            }
        }
        Assert.assertNotEquals(Integer.valueOf(this.pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg()), Integer.valueOf(dispatchThrottlingRatePerTopicInMsg));
        this.admin.namespaces().createNamespace("my-property/throttling_ns", Sets.newHashSet(new String[]{"test"}));
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingBlock").create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingBlock").get();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingBlock"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).messageListener((consumer, message) -> {
            Assert.assertNotNull(message, "Message cannot be null");
            log.debug("Received message [{}] in the listener", new String(message.getData()));
            atomicInteger.incrementAndGet();
        }).subscribe();
        deactiveCursors((ManagedLedgerImpl) persistentTopic.getManagedLedger());
        for (int i2 = 0; i2 < 500; i2++) {
            create.send(("my-message-" + i2).getBytes());
        }
        Thread.sleep(500L);
        Assert.assertNotEquals(Integer.valueOf(atomicInteger.get()), 500);
        subscribe.close();
        create.close();
        this.pulsar.getConfiguration().setDispatchThrottlingRatePerTopicInMsg(dispatchThrottlingRatePerTopicInMsg);
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(dataProvider = "subscriptions", timeOut = 5000)
    public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType subscriptionType) throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        DispatchRate build = DispatchRate.builder().dispatchThrottlingRateInMsg(10).dispatchThrottlingRateInByte(-1L).ratePeriodInSecond(1).build();
        this.admin.namespaces().createNamespace("my-property/throttling_ns", Sets.newHashSet(new String[]{"test"}));
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", build);
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingAll").create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingAll").get();
        boolean z = false;
        int i = 0;
        while (true) {
            if (i >= 5) {
                break;
            }
            if (((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getDispatchRateOnMsg() > 0) {
                z = true;
                break;
            } else {
                if (i != 5 - 1) {
                    Thread.sleep(100L);
                }
                i++;
            }
        }
        Assert.assertTrue(z);
        Assert.assertEquals(this.admin.namespaces().getDispatchRate("my-property/throttling_ns"), build);
        CountDownLatch countDownLatch = new CountDownLatch(20);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingAll"}).subscriptionName("my-subscriber-name").subscriptionType(subscriptionType).messageListener((consumer, message) -> {
            Assert.assertNotNull(message, "Message cannot be null");
            log.debug("Received message [{}] in the listener", new String(message.getData()));
            atomicInteger.incrementAndGet();
            countDownLatch.countDown();
        }).subscribe();
        deactiveCursors((ManagedLedgerImpl) persistentTopic.getManagedLedger());
        for (int i2 = 0; i2 < 20; i2++) {
            create.send(("my-message-" + i2).getBytes());
        }
        countDownLatch.await();
        Assert.assertEquals(atomicInteger.get(), 20);
        subscribe.close();
        create.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(dataProvider = "subscriptions", timeOut = 5000)
    public void testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType subscriptionType) throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        DispatchRate build = DispatchRate.builder().dispatchThrottlingRateInMsg(-1).dispatchThrottlingRateInByte(100L).ratePeriodInSecond(1).build();
        this.admin.namespaces().createNamespace("my-property/throttling_ns", Sets.newHashSet(new String[]{"test"}));
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", build);
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingAll").create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingAll").get();
        boolean z = false;
        int i = 0;
        while (true) {
            if (i >= 5) {
                break;
            }
            if (((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getDispatchRateOnByte() > 0) {
                z = true;
                break;
            } else {
                if (i != 5 - 1) {
                    Thread.sleep(100L);
                }
                i++;
            }
        }
        Assert.assertTrue(z);
        Assert.assertEquals(this.admin.namespaces().getDispatchRate("my-property/throttling_ns"), build);
        CountDownLatch countDownLatch = new CountDownLatch(20);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingAll"}).subscriptionName("my-subscriber-name").subscriptionType(subscriptionType).messageListener((consumer, message) -> {
            Assert.assertNotNull(message, "Message cannot be null");
            log.debug("Received message [{}] in the listener", new String(message.getData()));
            atomicInteger.incrementAndGet();
            countDownLatch.countDown();
        }).subscribe();
        deactiveCursors((ManagedLedgerImpl) persistentTopic.getManagedLedger());
        for (int i2 = 0; i2 < 20; i2++) {
            create.send(new byte[10]);
        }
        countDownLatch.await();
        Assert.assertEquals(atomicInteger.get(), 20);
        subscribe.close();
        create.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(timeOut = 5000)
    public void testRateLimitingMultipleConsumers() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        DispatchRate build = DispatchRate.builder().dispatchThrottlingRateInMsg(5).dispatchThrottlingRateInByte(-1L).ratePeriodInSecond(360).build();
        this.admin.namespaces().createNamespace("my-property/throttling_ns", Sets.newHashSet(new String[]{"test"}));
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", build);
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingMultipleConsumers").create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingMultipleConsumers").get();
        boolean z = false;
        int i = 0;
        while (true) {
            if (i >= 5) {
                break;
            }
            if (((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getDispatchRateOnMsg() > 0) {
                z = true;
                break;
            } else {
                if (i != 5 - 1) {
                    Thread.sleep(100L);
                }
                i++;
            }
        }
        Assert.assertTrue(z);
        Assert.assertEquals(this.admin.namespaces().getDispatchRate("my-property/throttling_ns"), build);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ConsumerBuilder messageListener = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingMultipleConsumers"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).messageListener((consumer, message) -> {
            Assert.assertNotNull(message, "Message cannot be null");
            log.debug("Received message [{}] in the listener", new String(message.getData()));
            atomicInteger.incrementAndGet();
        });
        Consumer subscribe = messageListener.subscribe();
        Consumer subscribe2 = messageListener.subscribe();
        Consumer subscribe3 = messageListener.subscribe();
        Consumer subscribe4 = messageListener.subscribe();
        Consumer subscribe5 = messageListener.subscribe();
        deactiveCursors((ManagedLedgerImpl) persistentTopic.getManagedLedger());
        for (int i2 = 0; i2 < 500; i2++) {
            create.send(("my-message-" + i2).getBytes());
        }
        Thread.sleep(500L);
        Assert.assertNotEquals(Integer.valueOf(atomicInteger.get()), 500);
        subscribe.close();
        subscribe2.close();
        subscribe3.close();
        subscribe4.close();
        subscribe5.close();
        create.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(dataProvider = "subscriptions", timeOut = 5000)
    public void testClusterRateLimitingConfiguration(SubscriptionType subscriptionType) throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        int dispatchThrottlingRatePerTopicInMsg = this.pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg();
        this.admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg", Integer.toString(5));
        for (int i = 0; i < 5; i++) {
            if (this.pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg() == dispatchThrottlingRatePerTopicInMsg) {
                Thread.sleep(50 + (i * 10));
            }
        }
        Assert.assertNotEquals(Integer.valueOf(this.pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg()), Integer.valueOf(dispatchThrottlingRatePerTopicInMsg));
        this.admin.namespaces().createNamespace("my-property/throttling_ns", Sets.newHashSet(new String[]{"test"}));
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingBlock").create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingBlock").get();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingBlock"}).subscriptionName("my-subscriber-name").subscriptionType(subscriptionType).messageListener((consumer, message) -> {
            Assert.assertNotNull(message, "Message cannot be null");
            log.debug("Received message [{}] in the listener", new String(message.getData()));
            atomicInteger.incrementAndGet();
        }).subscribe();
        deactiveCursors((ManagedLedgerImpl) persistentTopic.getManagedLedger());
        for (int i2 = 0; i2 < 500; i2++) {
            create.send(("my-message-" + i2).getBytes());
        }
        Thread.sleep(500L);
        Assert.assertNotEquals(Integer.valueOf(atomicInteger.get()), 500);
        subscribe.close();
        create.close();
        this.pulsar.getConfiguration().setDispatchThrottlingRatePerTopicInMsg(dispatchThrottlingRatePerTopicInMsg);
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(dataProvider = "subscriptions", timeOut = 5000)
    public void testMessageByteRateThrottlingCombined(SubscriptionType subscriptionType) throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        DispatchRate build = DispatchRate.builder().dispatchThrottlingRateInMsg(5).dispatchThrottlingRateInByte(10L).ratePeriodInSecond(360).build();
        this.admin.namespaces().createNamespace("my-property/throttling_ns", Sets.newHashSet(new String[]{"test"}));
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", build);
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingAll").create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingAll").get();
        boolean z = false;
        int i = 0;
        while (true) {
            if (i >= 5) {
                break;
            }
            if (((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getDispatchRateOnMsg() > 0 && ((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getDispatchRateOnByte() > 0) {
                z = true;
                break;
            } else {
                if (i != 5 - 1) {
                    Thread.sleep(100L);
                }
                i++;
            }
        }
        Assert.assertTrue(z);
        Assert.assertEquals(this.admin.namespaces().getDispatchRate("my-property/throttling_ns"), build);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ConsumerBuilder messageListener = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingAll"}).subscriptionName("my-subscriber-name").subscriptionType(subscriptionType).messageListener((consumer, message) -> {
            Assert.assertNotNull(message, "Message cannot be null");
            log.debug("Received message [{}] in the listener", new String(message.getData()));
            atomicInteger.incrementAndGet();
        });
        Consumer subscribe = messageListener.subscribe();
        deactiveCursors((ManagedLedgerImpl) persistentTopic.getManagedLedger());
        subscribe.close();
        byte[] bArr = new byte[50];
        for (int i2 = 0; i2 < 200; i2++) {
            create.send(bArr);
        }
        Consumer subscribe2 = messageListener.subscribe();
        Assert.assertNotEquals(Integer.valueOf(50 * atomicInteger.get()), 20L);
        subscribe2.close();
        create.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test
    public void testGlobalNamespaceThrottling() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        DispatchRate build = DispatchRate.builder().dispatchThrottlingRateInMsg(5).dispatchThrottlingRateInByte(-1L).ratePeriodInSecond(360).build();
        this.admin.clusters().createCluster("global", ClusterData.builder().serviceUrl("http://global:8080").build());
        this.admin.namespaces().createNamespace("my-property/throttling_ns");
        this.admin.namespaces().setNamespaceReplicationClusters("my-property/throttling_ns", Sets.newHashSet(new String[]{"test"}));
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", build);
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingBlock").create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingBlock").get();
        boolean z = false;
        for (int i = 0; i < 5; i++) {
            if (((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getDispatchRateOnMsg() > 0 || ((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getDispatchRateOnByte() > 0) {
                z = true;
                break;
            } else {
                if (i != 5 - 1) {
                    Thread.sleep(100L);
                }
            }
        }
        Assert.assertTrue(z);
        Assert.assertEquals(this.admin.namespaces().getDispatchRate("my-property/throttling_ns"), build);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingBlock"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).messageListener((consumer, message) -> {
            Assert.assertNotNull(message, "Message cannot be null");
            log.debug("Received message [{}] in the listener", new String(message.getData()));
            atomicInteger.incrementAndGet();
        }).subscribe();
        deactiveCursors((ManagedLedgerImpl) persistentTopic.getManagedLedger());
        for (int i2 = 0; i2 < 500; i2++) {
            create.send(new byte[80]);
        }
        Thread.sleep(500L);
        Assert.assertNotEquals(Integer.valueOf(atomicInteger.get()), 500);
        subscribe.close();
        create.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(dataProvider = "subscriptions", timeOut = 5000)
    public void testNonBacklogConsumerWithThrottlingEnabled(SubscriptionType subscriptionType) throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        DispatchRate build = DispatchRate.builder().dispatchThrottlingRateInMsg(10).dispatchThrottlingRateInByte(-1L).ratePeriodInSecond(360).build();
        this.admin.namespaces().createNamespace("my-property/throttling_ns", Sets.newHashSet(new String[]{"test"}));
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", build);
        this.admin.brokers().updateDynamicConfiguration("dispatchThrottlingOnNonBacklogConsumerEnabled", Boolean.TRUE.toString());
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingBlock").create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingBlock").get();
        boolean z = false;
        int i = 0;
        while (true) {
            if (i >= 5) {
                break;
            }
            if (((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getDispatchRateOnMsg() > 0) {
                z = true;
                break;
            } else {
                if (i != 5 - 1) {
                    Thread.sleep(100L);
                }
                i++;
            }
        }
        Assert.assertTrue(z);
        Assert.assertEquals(this.admin.namespaces().getDispatchRate("my-property/throttling_ns"), build);
        this.conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingBlock"}).subscriptionName("my-subscriber-name").subscriptionType(subscriptionType).messageListener((consumer, message) -> {
            Assert.assertNotNull(message, "Message cannot be null");
            log.debug("Received message [{}] in the listener", new String(message.getData()));
            atomicInteger.incrementAndGet();
        }).subscribe();
        for (int i2 = 0; i2 < 500; i2++) {
            create.send(new byte[80]);
        }
        Assert.assertTrue(atomicInteger.get() < 20);
        subscribe.close();
        create.close();
        this.conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(false);
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test
    public void testClusterPolicyOverrideConfiguration() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        int dispatchThrottlingRatePerTopicInMsg = this.pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg();
        this.admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg", Integer.toString(100));
        for (int i = 0; i < 5; i++) {
            if (this.pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg() == dispatchThrottlingRatePerTopicInMsg) {
                Thread.sleep(50 + (i * 10));
            }
        }
        Assert.assertNotEquals(Integer.valueOf(this.pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg()), Integer.valueOf(dispatchThrottlingRatePerTopicInMsg));
        this.admin.namespaces().createNamespace("my-property/throttling_ns", Sets.newHashSet(new String[]{"test"}));
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingOverride1").create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingOverride1").get();
        Assert.assertEquals(100L, ((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getDispatchRateOnMsg());
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", DispatchRate.builder().dispatchThrottlingRateInMsg(500).dispatchThrottlingRateInByte(0L).ratePeriodInSecond(1).build());
        for (int i2 = 0; i2 < 5; i2++) {
            if (((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getDispatchRateOnMsg() != 500) {
                Thread.sleep(50 + (i2 * 10));
            }
        }
        Assert.assertEquals(500, ((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getDispatchRateOnMsg());
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", DispatchRate.builder().dispatchThrottlingRateInMsg(0).dispatchThrottlingRateInByte(0L).ratePeriodInSecond(1).build());
        for (int i3 = 0; i3 < 5; i3++) {
            if (((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getDispatchRateOnMsg() == 500) {
                Thread.sleep(50 + (i3 * 10));
            }
        }
        Assert.assertEquals(100L, ((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getDispatchRateOnMsg());
        Producer create2 = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingOverride2").create();
        Assert.assertEquals(100L, ((DispatchRateLimiter) ((PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingOverride2").get()).getDispatchRateLimiter().get()).getDispatchRateOnMsg());
        create.close();
        create2.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(dataProvider = "subscriptions", timeOut = 10000)
    public void testClosingRateLimiter(SubscriptionType subscriptionType) throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        String str = "persistent://my-property/throttling_ns/closingRateLimiter" + subscriptionType.name();
        String str2 = "mySubscription" + subscriptionType.name();
        DispatchRate build = DispatchRate.builder().dispatchThrottlingRateInMsg(10).dispatchThrottlingRateInByte(1024L).ratePeriodInSecond(1).build();
        this.admin.namespaces().createNamespace("my-property/throttling_ns", Sets.newHashSet(new String[]{"test"}));
        this.admin.namespaces().setDispatchRate("my-property/throttling_ns", build);
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(str2).subscriptionType(subscriptionType).subscribe();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic(str).get();
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        for (int i2 = 0; i2 < 10; i2++) {
            subscribe.acknowledge(subscribe.receive());
        }
        Assert.assertTrue(persistentTopic.getDispatchRateLimiter().isPresent());
        DispatchRateLimiter dispatchRateLimiter = (DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get();
        create.close();
        subscribe.unsubscribe();
        subscribe.close();
        persistentTopic.close().get();
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), -1L);
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), -1L);
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test
    public void testDispatchRateCompatibility1() throws Exception {
        Optional of = Optional.of(new Policies());
        DispatchRateImpl build = DispatchRateImpl.builder().dispatchThrottlingRateInMsg(10).dispatchThrottlingRateInByte(512L).ratePeriodInSecond(1).build();
        DispatchRateImpl build2 = DispatchRateImpl.builder().dispatchThrottlingRateInMsg(200).dispatchThrottlingRateInByte(1024L).ratePeriodInSecond(1).build();
        Assert.assertNull(DispatchRateLimiter.getPoliciesDispatchRate("test", of, DispatchRateLimiter.Type.TOPIC));
        ((Policies) of.get()).clusterDispatchRate.put("test", build);
        Assert.assertEquals(DispatchRateLimiter.getPoliciesDispatchRate("test", of, DispatchRateLimiter.Type.TOPIC), build);
        ((Policies) of.get()).topicDispatchRate.put("test", build2);
        Assert.assertEquals(DispatchRateLimiter.getPoliciesDispatchRate("test", of, DispatchRateLimiter.Type.TOPIC), build2);
    }

    @Test
    public void testDispatchRateCompatibility2() throws Exception {
        this.admin.namespaces().createNamespace("my-property/dispatch-rate-compatibility", Sets.newHashSet(new String[]{"test"}));
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/dispatch-rate-compatibility/t1").create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/dispatch-rate-compatibility/t1").get();
        DispatchRateLimiter dispatchRateLimiter = new DispatchRateLimiter(persistentTopic, DispatchRateLimiter.Type.TOPIC);
        Policies policies = new Policies();
        DispatchRateImpl build = DispatchRateImpl.builder().dispatchThrottlingRateInMsg(100).dispatchThrottlingRateInByte(512L).ratePeriodInSecond(1).build();
        DispatchRateImpl build2 = DispatchRateImpl.builder().dispatchThrottlingRateInMsg(200).dispatchThrottlingRateInByte(1024L).ratePeriodInSecond(1).build();
        dispatchRateLimiter.onPoliciesUpdate(policies);
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), -1L);
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), -1L);
        policies.clusterDispatchRate.put("test", build);
        dispatchRateLimiter.onPoliciesUpdate(policies);
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), 100L);
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), 512L);
        policies.topicDispatchRate.put("test", build2);
        dispatchRateLimiter.onPoliciesUpdate(policies);
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), 200L);
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), 1024L);
        create.close();
        persistentTopic.close().get();
    }

    protected void deactiveCursors(ManagedLedgerImpl managedLedgerImpl) throws Exception {
        Field declaredField = BrokerService.class.getDeclaredField("statsUpdater");
        declaredField.setAccessible(true);
        ((ScheduledExecutorService) declaredField.get(this.pulsar.getBrokerService())).shutdownNow();
        managedLedgerImpl.getCursors().forEach(managedCursor -> {
            managedLedgerImpl.deactivateCursor(managedCursor);
        });
    }

    @Test(dataProvider = "subscriptions")
    public void testRelativeMessageRateLimitingThrottling(SubscriptionType subscriptionType) throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        String str = "persistent://my-property/relative_throttling_ns/relative-throttle" + subscriptionType;
        DispatchRate build = DispatchRate.builder().dispatchThrottlingRateInMsg(1).dispatchThrottlingRateInByte(-1L).ratePeriodInSecond(1).relativeToPublishRate(true).build();
        this.admin.namespaces().createNamespace("my-property/relative_throttling_ns", Sets.newHashSet(new String[]{"test"}));
        this.admin.namespaces().setDispatchRate("my-property/relative_throttling_ns", build);
        Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(false).create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic(str).get();
        boolean z = false;
        int i = 0;
        while (true) {
            if (i >= 10) {
                break;
            }
            if (((DispatchRateLimiter) persistentTopic.getDispatchRateLimiter().get()).getDispatchRateOnMsg() > 0) {
                z = true;
                break;
            } else {
                if (i != 10 - 1) {
                    Thread.sleep(100L);
                }
                i++;
            }
        }
        Assert.assertTrue(z);
        Assert.assertEquals(this.admin.namespaces().getDispatchRate("my-property/relative_throttling_ns"), build);
        Thread.sleep(2000L);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-subscriber-name").subscriptionType(subscriptionType).subscribe();
        deactiveCursors((ManagedLedgerImpl) persistentTopic.getManagedLedger());
        create.send("test".getBytes());
        Assert.assertNotNull(subscribe.receive());
        Field declaredField = PersistentTopic.class.getDeclaredField("lastUpdatedAvgPublishRateInMsg");
        declaredField.setAccessible(true);
        declaredField.set(persistentTopic, 1000);
        for (int i2 = 0; i2 < 1000; i2++) {
            create.send(("my-message-" + i2).getBytes());
        }
        int i3 = 0;
        long nanos = TimeUnit.MILLISECONDS.toNanos(1100L);
        long nanoTime = System.nanoTime();
        for (int i4 = 0; i4 < 1000; i4++) {
            Message receive = subscribe.receive((int) nanos, TimeUnit.NANOSECONDS);
            i3++;
            Assert.assertNotNull(receive);
            long nanoTime2 = System.nanoTime() - nanoTime;
            if (nanoTime2 > nanos) {
                log.info("Test has only received {} messages in {}ms, {} expected", new Object[]{Integer.valueOf(i3), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(nanoTime2)), 1000});
                Assert.fail("Messages not received in time");
            }
            log.info("Received {}-{}", receive.getMessageId(), new String(receive.getData()));
        }
        Assert.assertEquals(i3, 1000);
        Assert.assertTrue(System.nanoTime() - nanoTime < nanos);
        subscribe.close();
        create.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1945974371:
                if (implMethodName.equals("lambda$testMessageRateLimitingNotReceiveAllMessages$ad24eb19$1")) {
                    z = 8;
                    break;
                }
                break;
            case -1940413510:
                if (implMethodName.equals("lambda$testClusterMsgByteRateLimitingClusterConfig$7c7b6d72$1")) {
                    z = 7;
                    break;
                }
                break;
            case -1557052072:
                if (implMethodName.equals("lambda$testNonBacklogConsumerWithThrottlingEnabled$5077506f$1")) {
                    z = 5;
                    break;
                }
                break;
            case -830795056:
                if (implMethodName.equals("lambda$testMessageRateLimitingReceiveAllMessagesAfterThrottling$2d6838bb$1")) {
                    z = 6;
                    break;
                }
                break;
            case -784550774:
                if (implMethodName.equals("lambda$testRateLimitingMultipleConsumers$a9f0323b$1")) {
                    z = 2;
                    break;
                }
                break;
            case -690807194:
                if (implMethodName.equals("lambda$testMessageByteRateThrottlingCombined$e8bb689e$1")) {
                    z = true;
                    break;
                }
                break;
            case -139781069:
                if (implMethodName.equals("lambda$testClusterRateLimitingConfiguration$5077506f$1")) {
                    z = 4;
                    break;
                }
                break;
            case -9089164:
                if (implMethodName.equals("lambda$testBytesRateLimitingReceiveAllMessagesAfterThrottling$2d6838bb$1")) {
                    z = false;
                    break;
                }
                break;
            case 560378785:
                if (implMethodName.equals("lambda$testGlobalNamespaceThrottling$7c7b6d72$1")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case SHARED_VALUE:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/api/MessageDispatchThrottlingTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Ljava/util/concurrent/CountDownLatch;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    CountDownLatch countDownLatch = (CountDownLatch) serializedLambda.getCapturedArg(1);
                    return (consumer, message) -> {
                        Assert.assertNotNull(message, "Message cannot be null");
                        log.debug("Received message [{}] in the listener", new String(message.getData()));
                        atomicInteger.incrementAndGet();
                        countDownLatch.countDown();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/api/MessageDispatchThrottlingTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger2 = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    return (consumer2, message2) -> {
                        Assert.assertNotNull(message2, "Message cannot be null");
                        log.debug("Received message [{}] in the listener", new String(message2.getData()));
                        atomicInteger2.incrementAndGet();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/api/MessageDispatchThrottlingTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger3 = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    return (consumer3, message3) -> {
                        Assert.assertNotNull(message3, "Message cannot be null");
                        log.debug("Received message [{}] in the listener", new String(message3.getData()));
                        atomicInteger3.incrementAndGet();
                    };
                }
                break;
            case Test.TestMessage.INTFIELD_FIELD_NUMBER /* 3 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/api/MessageDispatchThrottlingTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger4 = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    return (consumer4, message4) -> {
                        Assert.assertNotNull(message4, "Message cannot be null");
                        log.debug("Received message [{}] in the listener", new String(message4.getData()));
                        atomicInteger4.incrementAndGet();
                    };
                }
                break;
            case Test.TestMessage.TESTENUM_FIELD_NUMBER /* 4 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/api/MessageDispatchThrottlingTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger5 = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    return (consumer5, message5) -> {
                        Assert.assertNotNull(message5, "Message cannot be null");
                        log.debug("Received message [{}] in the listener", new String(message5.getData()));
                        atomicInteger5.incrementAndGet();
                    };
                }
                break;
            case Test.TestMessage.NESTEDFIELD_FIELD_NUMBER /* 5 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/api/MessageDispatchThrottlingTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger6 = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    return (consumer6, message6) -> {
                        Assert.assertNotNull(message6, "Message cannot be null");
                        log.debug("Received message [{}] in the listener", new String(message6.getData()));
                        atomicInteger6.incrementAndGet();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/api/MessageDispatchThrottlingTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Ljava/util/concurrent/CountDownLatch;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger7 = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    CountDownLatch countDownLatch2 = (CountDownLatch) serializedLambda.getCapturedArg(1);
                    return (consumer7, message7) -> {
                        Assert.assertNotNull(message7, "Message cannot be null");
                        log.debug("Received message [{}] in the listener", new String(message7.getData()));
                        atomicInteger7.incrementAndGet();
                        countDownLatch2.countDown();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/api/MessageDispatchThrottlingTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger8 = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    return (consumer8, message8) -> {
                        Assert.assertNotNull(message8, "Message cannot be null");
                        log.debug("Received message [{}] in the listener", new String(message8.getData()));
                        atomicInteger8.incrementAndGet();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/api/MessageDispatchThrottlingTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger9 = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    return (consumer9, message9) -> {
                        Assert.assertNotNull(message9, "Message cannot be null");
                        log.debug("Received message [{}] in the listener", new String(message9.getData()));
                        atomicInteger9.incrementAndGet();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
