package org.apache.pulsar.client.impl;

import com.google.common.collect.Sets;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker-impl"})
/* loaded from: input_file:org/apache/pulsar/client/impl/KeySharedSubscriptionTest.class */
public class KeySharedSubscriptionTest extends ProducerConsumerBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setMaxUnackedMessagesPerConsumer(10);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider
    public Object[][] subType() {
        return new Object[]{new Object[]{SubscriptionType.Shared}, new Object[]{SubscriptionType.Key_Shared}};
    }

    @Test(dataProvider = "subType")
    public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(SubscriptionType subscriptionType) throws PulsarClientException {
        String str = "broker-close-test-" + RandomStringUtils.randomAlphabetic(5);
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        Set newConcurrentHashSet = Sets.newConcurrentHashSet();
        Set newConcurrentHashSet2 = Sets.newConcurrentHashSet();
        AtomicLong atomicLong = new AtomicLong();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 3; i++) {
            ConsumerBuilder messageListener = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("sub-1").subscriptionType(subscriptionType).messageListener((consumer, message) -> {
                atomicLong.set(System.currentTimeMillis());
                ((List) concurrentHashMap.computeIfAbsent(consumer, consumer -> {
                    return new ArrayList();
                })).add(message.getMessageId());
                newConcurrentHashSet2.add(message.getMessageId());
                if (atomicBoolean.get()) {
                    try {
                        consumer.acknowledge(message);
                    } catch (PulsarClientException e) {
                        throw new RuntimeException((Throwable) e);
                    }
                }
            });
            if (subscriptionType == SubscriptionType.Key_Shared) {
                int makeHash = Murmur3_32Hash.getInstance().makeHash(("key-" + i).getBytes()) % 65536;
                messageListener.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(new Range[]{Range.of(makeHash, makeHash)}));
            }
            arrayList.add(messageListener.subscribe());
        }
        Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(true).batchingMaxPublishDelay(1L, TimeUnit.MILLISECONDS).batchingMaxMessages(9).batcherBuilder(BatcherBuilder.KEY_BASED).create();
        for (int i2 = 0; i2 < 1000; i2++) {
            CompletableFuture sendAsync = create.newMessage().key("key-" + (i2 % 3)).value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)).sendAsync();
            Objects.requireNonNull(newConcurrentHashSet);
            sendAsync.thenAccept((v1) -> {
                r1.add(v1);
            });
        }
        Awaitility.await().pollDelay(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(System.currentTimeMillis() - atomicLong.get() > TimeUnit.SECONDS.toMillis(5L));
        });
        atomicBoolean.set(true);
        for (Map.Entry entry : concurrentHashMap.entrySet()) {
            ((Consumer) entry.getKey()).acknowledge((List) entry.getValue());
        }
        atomicLong.set(System.currentTimeMillis());
        Awaitility.await().atMost(15L, TimeUnit.SECONDS).pollDelay(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(System.currentTimeMillis() - atomicLong.get() > TimeUnit.SECONDS.toMillis(5L));
        });
        Assert.assertEquals(newConcurrentHashSet.size(), 1000);
        Assert.assertEquals(newConcurrentHashSet.size(), newConcurrentHashSet2.size());
        Assert.assertTrue(newConcurrentHashSet2.containsAll(newConcurrentHashSet));
        create.close();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Consumer) it.next()).close();
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1290912936:
                if (implMethodName.equals("lambda$testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction$350173f6$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case SHARED_VALUE:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/impl/KeySharedSubscriptionTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicLong;Ljava/util/Map;Ljava/util/Set;Ljava/util/concurrent/atomic/AtomicBoolean;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicLong atomicLong = (AtomicLong) serializedLambda.getCapturedArg(0);
                    Map map = (Map) serializedLambda.getCapturedArg(1);
                    Set set = (Set) serializedLambda.getCapturedArg(2);
                    AtomicBoolean atomicBoolean = (AtomicBoolean) serializedLambda.getCapturedArg(3);
                    return (consumer, message) -> {
                        atomicLong.set(System.currentTimeMillis());
                        ((List) map.computeIfAbsent(consumer, consumer -> {
                            return new ArrayList();
                        })).add(message.getMessageId());
                        set.add(message.getMessageId());
                        if (atomicBoolean.get()) {
                            try {
                                consumer.acknowledge(message);
                            } catch (PulsarClientException e) {
                                throw new RuntimeException((Throwable) e);
                            }
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
