package org.apache.pulsar.client.impl;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerInterceptor;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.testng.collections.Sets;

@Test(groups = {"broker-impl"})
/* loaded from: input_file:org/apache/pulsar/client/impl/ConsumerAckTest.class */
public class ConsumerAckTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(ConsumerAckTest.class);
    private TransactionImpl transaction;
    private PulsarClient clientWithStats;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/client/impl/ConsumerAckTest$AckStatsInterceptor.class */
    public static class AckStatsInterceptor implements ConsumerInterceptor<String> {
        private final List<MessageId> individualAckedMessageIdList = new CopyOnWriteArrayList();
        private final List<MessageId> cumulativeAckedMessageIdList = new CopyOnWriteArrayList();

        private AckStatsInterceptor() {
        }

        public void close() {
        }

        public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
            return message;
        }

        public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable th) {
            if (th != null) {
                ConsumerAckTest.log.error("[{}] Failed to acknowledge {}", consumer.getConsumerName(), messageId);
            } else {
                this.individualAckedMessageIdList.add(messageId);
            }
        }

        public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable th) {
            if (th != null) {
                ConsumerAckTest.log.error("[{}] Failed to acknowledge {}", consumer.getConsumerName(), messageId);
            } else {
                this.cumulativeAckedMessageIdList.add(messageId);
            }
        }

        public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> set) {
        }

        public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> set) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/client/impl/ConsumerAckTest$AckTestData.class */
    public static class AckTestData implements Closeable {
        private final ConsumerImpl<String> consumer;
        private final AckStatsInterceptor interceptor;
        private final List<MessageId> messageIds;

        public int size() {
            return this.messageIds.size();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.interceptor.close();
            this.consumer.close();
        }

        public AckTestData(ConsumerImpl<String> consumerImpl, AckStatsInterceptor ackStatsInterceptor, List<MessageId> list) {
            this.consumer = consumerImpl;
            this.interceptor = ackStatsInterceptor;
            this.messageIds = list;
        }
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass(alwaysRun = true)
    public void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
        this.clientWithStats = newPulsarClient(this.lookupUrl.toString(), 30);
        this.transaction = (TransactionImpl) Mockito.mock(TransactionImpl.class);
        ((TransactionImpl) Mockito.doReturn(1L).when(this.transaction)).getTxnIdLeastBits();
        ((TransactionImpl) Mockito.doReturn(1L).when(this.transaction)).getTxnIdMostBits();
        ((TransactionImpl) Mockito.doReturn(Transaction.State.OPEN).when(this.transaction)).getState();
        CompletableFuture completedFuture = CompletableFuture.completedFuture(null);
        ((TransactionImpl) Mockito.doNothing().when(this.transaction)).registerAckOp((CompletableFuture) ArgumentMatchers.any());
        ((TransactionImpl) Mockito.doReturn(true).when(this.transaction)).checkIfOpen((CompletableFuture) ArgumentMatchers.any());
        ((TransactionImpl) Mockito.doReturn(completedFuture).when(this.transaction)).registerAckedTopic((String) ArgumentMatchers.any(), (String) ArgumentMatchers.any());
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    public void cleanup() throws Exception {
        this.clientWithStats.close();
        super.internalCleanup();
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testAckResponse() throws PulsarClientException, InterruptedException {
        Producer create = this.pulsarClient.newProducer(Schema.INT32).topic("testAckResponse").enableBatching(false).create();
        try {
            ConsumerImpl subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{"testAckResponse"}).subscriptionName("sub").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscribe();
            try {
                create.send(1);
                create.send(2);
                try {
                    subscribe.acknowledgeAsync(new MessageIdImpl(1L, 1L, 1), this.transaction).get();
                    Assert.fail();
                } catch (ExecutionException e) {
                    Assert.assertTrue(e.getCause() instanceof PulsarClientException.NotAllowedException);
                }
                try {
                    subscribe.acknowledgeAsync(subscribe.receive().getMessageId(), this.transaction).get();
                    Assert.fail();
                } catch (ExecutionException e2) {
                    Assert.assertTrue(e2.getCause() instanceof PulsarClientException.NotAllowedException);
                }
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testIndividualAck() throws Exception {
        AckTestData prepareDataForAck = prepareDataForAck("test-individual-ack");
        try {
            Iterator<MessageId> it = prepareDataForAck.messageIds.iterator();
            while (it.hasNext()) {
                prepareDataForAck.consumer.acknowledge(it.next());
            }
            Assert.assertEquals(prepareDataForAck.interceptor.individualAckedMessageIdList, prepareDataForAck.messageIds);
            Assert.assertEquals(prepareDataForAck.consumer.getStats().getNumAcksSent(), prepareDataForAck.size());
            Assert.assertTrue(prepareDataForAck.consumer.getUnAckedMessageTracker().isEmpty());
            if (Collections.singletonList(prepareDataForAck).get(0) != null) {
                prepareDataForAck.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(prepareDataForAck).get(0) != null) {
                prepareDataForAck.close();
            }
            throw th;
        }
    }

    @Test
    public void testIndividualAckList() throws Exception {
        AckTestData prepareDataForAck = prepareDataForAck("test-individual-ack-list");
        try {
            prepareDataForAck.consumer.acknowledge(prepareDataForAck.messageIds);
            Assert.assertEquals(prepareDataForAck.interceptor.individualAckedMessageIdList, prepareDataForAck.messageIds);
            Assert.assertEquals(prepareDataForAck.consumer.getStats().getNumAcksSent(), prepareDataForAck.size());
            Assert.assertTrue(prepareDataForAck.consumer.getUnAckedMessageTracker().isEmpty());
        } finally {
            if (Collections.singletonList(prepareDataForAck).get(0) != null) {
                prepareDataForAck.close();
            }
        }
    }

    @Test
    public void testCumulativeAck() throws Exception {
        AckTestData prepareDataForAck = prepareDataForAck("test-cumulative-ack");
        try {
            System.out.println(prepareDataForAck.size());
            prepareDataForAck.consumer.acknowledgeCumulative(prepareDataForAck.messageIds.get(prepareDataForAck.size() - 1));
            Assert.assertEquals(prepareDataForAck.interceptor.cumulativeAckedMessageIdList.get(0), prepareDataForAck.messageIds.get(prepareDataForAck.messageIds.size() - 1));
            Assert.assertEquals(prepareDataForAck.consumer.getStats().getNumAcksSent(), 2L);
            Assert.assertTrue(prepareDataForAck.consumer.getUnAckedMessageTracker().isEmpty());
        } finally {
            if (Collections.singletonList(prepareDataForAck).get(0) != null) {
                prepareDataForAck.close();
            }
        }
    }

    private AckTestData prepareDataForAck(String str) throws PulsarClientException {
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).enableBatching(true).batchingMaxMessages(9).batchingMaxPublishDelay(1L, TimeUnit.SECONDS).create();
        try {
            create = this.pulsarClient.newProducer(Schema.STRING).topic(str).enableBatching(false).create();
            try {
                AckStatsInterceptor ackStatsInterceptor = new AckStatsInterceptor();
                ConsumerImpl subscribe = this.clientWithStats.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("sub").intercept(new ConsumerInterceptor[]{ackStatsInterceptor}).ackTimeout(10L, TimeUnit.SECONDS).subscribe();
                create.send("msg-0");
                for (int i = 1; i < 10; i++) {
                    create.sendAsync("msg-" + i);
                }
                ArrayList arrayList = new ArrayList();
                for (int i2 = 0; i2 < 10; i2++) {
                    Message receive = subscribe.receive(3, TimeUnit.SECONDS);
                    Assert.assertNotNull(receive);
                    arrayList.add(receive.getMessageId());
                }
                MessageId messageId = (MessageId) arrayList.get(0);
                MessageId discardBatch = MessageIdAdvUtils.discardBatch((MessageId) arrayList.get(1));
                for (int i3 = 2; i3 < arrayList.size(); i3++) {
                    Assert.assertEquals(MessageIdAdvUtils.discardBatch((MessageId) arrayList.get(i3)), discardBatch);
                }
                Assert.assertTrue(ackStatsInterceptor.individualAckedMessageIdList.isEmpty());
                Assert.assertTrue(ackStatsInterceptor.cumulativeAckedMessageIdList.isEmpty());
                Assert.assertEquals(subscribe.getStats().getNumAcksSent(), 0L);
                Assert.assertNotNull(subscribe.getUnAckedMessageTracker().messageIdPartitionMap);
                Assert.assertEquals(subscribe.getUnAckedMessageTracker().messageIdPartitionMap.keySet(), Sets.newHashSet(new MessageId[]{messageId, discardBatch}));
                AckTestData ackTestData = new AckTestData(subscribe, ackStatsInterceptor, arrayList);
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                return ackTestData;
            } finally {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }
}
