package org.apache.pulsar.client.impl;

import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"flaky"})
/* loaded from: input_file:org/apache/pulsar/client/impl/RawReaderTest.class */
public class RawReaderTest extends MockedPulsarServiceBaseTest {
    private static final String subscription = "foobar-sub";

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("my-property", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2"}), Sets.newHashSet(new String[]{"test"})));
        this.admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet(new String[]{"test"}));
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    private Set<String> publishMessages(String str, int i) throws Exception {
        return publishMessages(str, i, false);
    }

    private Set<String> publishMessages(String str, int i, boolean z) throws Exception {
        HashSet hashSet = new HashSet();
        Producer create = this.pulsarClient.newProducer().enableBatching(z).batchingMaxMessages(10).messageRoutingMode(MessageRoutingMode.SinglePartition).maxPendingMessages(i).topic(str).create();
        CompletableFuture completableFuture = null;
        for (int i2 = 0; i2 < i; i2++) {
            try {
                String str2 = "key" + i2;
                completableFuture = create.newMessage().key(str2).value(("my-message-" + i2).getBytes()).sendAsync();
                hashSet.add(str2);
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        completableFuture.get();
        if (create != null) {
            create.close();
        }
        return hashSet;
    }

    public static String extractKey(RawMessage rawMessage) {
        return Commands.parseMessageMetadata(rawMessage.getHeadersAndPayload()).getPartitionKey();
    }

    @Test
    public void testRawReader() throws Exception {
        RawMessage rawMessage;
        Set<String> publishMessages = publishMessages("persistent://my-property/my-ns/my-raw-topic", 10);
        RawReader rawReader = (RawReader) RawReader.create(this.pulsarClient, "persistent://my-property/my-ns/my-raw-topic", subscription).get();
        MessageId messageId = (MessageId) rawReader.getLastMessageIdAsync().get();
        while (true) {
            rawMessage = (RawMessage) rawReader.readNextAsync().get();
            try {
                Assert.assertTrue(publishMessages.remove(extractKey(rawMessage)));
                if (messageId.compareTo(rawMessage.getMessageId()) == 0) {
                    break;
                } else if (rawMessage != null) {
                    rawMessage.close();
                }
            } catch (Throwable th) {
                if (rawMessage != null) {
                    try {
                        rawMessage.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (rawMessage != null) {
            rawMessage.close();
        }
        Assert.assertTrue(publishMessages.isEmpty());
    }

    @Test
    public void testSeekToStart() throws Exception {
        RawMessage rawMessage;
        publishMessages("persistent://my-property/my-ns/my-raw-topic", 10);
        HashSet hashSet = new HashSet();
        RawReader rawReader = (RawReader) RawReader.create(this.pulsarClient, "persistent://my-property/my-ns/my-raw-topic", subscription).get();
        MessageId messageId = (MessageId) rawReader.getLastMessageIdAsync().get();
        while (true) {
            rawMessage = (RawMessage) rawReader.readNextAsync().get();
            try {
                hashSet.add(extractKey(rawMessage));
                if (messageId.compareTo(rawMessage.getMessageId()) == 0) {
                    break;
                } else if (rawMessage != null) {
                    rawMessage.close();
                }
            } finally {
            }
        }
        if (rawMessage != null) {
            rawMessage.close();
        }
        Assert.assertEquals(hashSet.size(), 10);
        rawReader.seekAsync(MessageId.earliest).get();
        while (true) {
            rawMessage = (RawMessage) rawReader.readNextAsync().get();
            try {
                Assert.assertTrue(hashSet.remove(extractKey(rawMessage)));
                if (messageId.compareTo(rawMessage.getMessageId()) == 0) {
                    break;
                } else if (rawMessage != null) {
                    rawMessage.close();
                }
            } finally {
            }
        }
        if (rawMessage != null) {
            rawMessage.close();
        }
        Assert.assertTrue(hashSet.isEmpty());
    }

    @Test
    public void testSeekToMiddle() throws Exception {
        RawMessage rawMessage;
        publishMessages("persistent://my-property/my-ns/my-raw-topic", 10);
        HashSet hashSet = new HashSet();
        RawReader rawReader = (RawReader) RawReader.create(this.pulsarClient, "persistent://my-property/my-ns/my-raw-topic", subscription).get();
        int i = 0;
        MessageId messageId = null;
        MessageId messageId2 = (MessageId) rawReader.getLastMessageIdAsync().get();
        while (true) {
            rawMessage = (RawMessage) rawReader.readNextAsync().get();
            try {
                i++;
                if (i > 10 / 2) {
                    if (messageId == null) {
                        messageId = rawMessage.getMessageId();
                    }
                    hashSet.add(extractKey(rawMessage));
                }
                if (messageId2.compareTo(rawMessage.getMessageId()) == 0) {
                    break;
                } else if (rawMessage != null) {
                    rawMessage.close();
                }
            } finally {
            }
        }
        if (rawMessage != null) {
            rawMessage.close();
        }
        Assert.assertEquals(hashSet.size(), 10 / 2);
        rawReader.seekAsync(messageId).get();
        while (true) {
            rawMessage = (RawMessage) rawReader.readNextAsync().get();
            try {
                Assert.assertTrue(hashSet.remove(extractKey(rawMessage)));
                if (messageId2.compareTo(rawMessage.getMessageId()) == 0) {
                    break;
                } else if (rawMessage != null) {
                    rawMessage.close();
                }
            } finally {
            }
        }
        if (rawMessage != null) {
            rawMessage.close();
        }
        Assert.assertTrue(hashSet.isEmpty());
    }

    @Test
    public void testFlowControl() throws Exception {
        publishMessages("persistent://my-property/my-ns/my-raw-topic", 5000);
        RawReader rawReader = (RawReader) RawReader.create(this.pulsarClient, "persistent://my-property/my-ns/my-raw-topic", subscription).get();
        ArrayList arrayList = new ArrayList();
        HashSet hashSet = new HashSet();
        for (int i = 0; i < 5000 + 1; i++) {
            arrayList.add(rawReader.readNextAsync());
        }
        int i2 = 0;
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                RawMessage rawMessage = (RawMessage) ((Future) it.next()).get(1L, TimeUnit.SECONDS);
                try {
                    String extractKey = extractKey(rawMessage);
                    Assert.assertTrue(hashSet.add(extractKey), "Received duplicated key '" + extractKey + "' : already received keys = " + hashSet);
                    if (rawMessage != null) {
                        rawMessage.close();
                    }
                } catch (Throwable th) {
                    if (rawMessage != null) {
                        try {
                            rawMessage.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                    break;
                }
            } catch (TimeoutException e) {
                i2++;
            }
        }
        Assert.assertEquals(i2, 1);
        Assert.assertEquals(hashSet.size(), 5000);
    }

    @Test
    public void testFlowControlBatch() throws Exception {
        publishMessages("persistent://my-property/my-ns/my-raw-topic", 5000, true);
        RawReader rawReader = (RawReader) RawReader.create(this.pulsarClient, "persistent://my-property/my-ns/my-raw-topic", subscription).get();
        HashSet hashSet = new HashSet();
        while (true) {
            try {
                RawMessage rawMessage = (RawMessage) rawReader.readNextAsync().get(1L, TimeUnit.SECONDS);
                try {
                    Assert.assertTrue(RawBatchConverter.isReadableBatch(rawMessage));
                    Iterator it = RawBatchConverter.extractIdsAndKeysAndSize(rawMessage).iterator();
                    while (it.hasNext()) {
                        String str = (String) ((ImmutableTriple) it.next()).middle;
                        Assert.assertTrue(hashSet.add(str), "Received duplicated key '" + str + "' : already received keys = " + hashSet);
                    }
                    if (rawMessage != null) {
                        rawMessage.close();
                    }
                } finally {
                }
            } catch (TimeoutException e) {
                Assert.assertEquals(hashSet.size(), 5000);
                return;
            }
        }
    }

    @Test
    public void testBatchingExtractKeysAndIds() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-raw-topic").maxPendingMessages(3).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        try {
            create.newMessage().key("key1").value("my-content-1".getBytes()).sendAsync();
            create.newMessage().key("key2").value("my-content-2".getBytes()).sendAsync();
            create.newMessage().key("key3").value("my-content-3".getBytes()).send();
            if (create != null) {
                create.close();
            }
            RawReader rawReader = (RawReader) RawReader.create(this.pulsarClient, "persistent://my-property/my-ns/my-raw-topic", subscription).get();
            try {
                RawMessage rawMessage = (RawMessage) rawReader.readNextAsync().get();
                try {
                    List extractIdsAndKeysAndSize = RawBatchConverter.extractIdsAndKeysAndSize(rawMessage);
                    Assert.assertEquals(extractIdsAndKeysAndSize.size(), 3);
                    Assert.assertTrue(((MessageId) ((ImmutableTriple) extractIdsAndKeysAndSize.get(0)).getLeft()).compareTo((MessageId) ((ImmutableTriple) extractIdsAndKeysAndSize.get(1)).getLeft()) < 0);
                    Assert.assertTrue(((MessageId) ((ImmutableTriple) extractIdsAndKeysAndSize.get(1)).getLeft()).compareTo((MessageId) ((ImmutableTriple) extractIdsAndKeysAndSize.get(2)).getLeft()) < 0);
                    Assert.assertEquals((String) ((ImmutableTriple) extractIdsAndKeysAndSize.get(0)).getMiddle(), "key1");
                    Assert.assertEquals((String) ((ImmutableTriple) extractIdsAndKeysAndSize.get(1)).getMiddle(), "key2");
                    Assert.assertEquals((String) ((ImmutableTriple) extractIdsAndKeysAndSize.get(2)).getMiddle(), "key3");
                    if (rawMessage != null) {
                        rawMessage.close();
                    }
                } finally {
                }
            } finally {
                rawReader.closeAsync().get();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testBatchingRebatch() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-raw-topic").maxPendingMessages(3).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        try {
            create.newMessage().key("key1").value("my-content-1".getBytes()).sendAsync();
            create.newMessage().key("key2").value("my-content-2".getBytes()).sendAsync();
            create.newMessage().key("key3").value("my-content-3".getBytes()).send();
            if (create != null) {
                create.close();
            }
            RawReader rawReader = (RawReader) RawReader.create(this.pulsarClient, "persistent://my-property/my-ns/my-raw-topic", subscription).get();
            try {
                RawMessage rawMessage = (RawMessage) rawReader.readNextAsync().get();
                try {
                    RawMessage rawMessage2 = (RawMessage) RawBatchConverter.rebatchMessage(rawMessage, (str, messageId) -> {
                        return str.equals("key2");
                    }).get();
                    List extractIdsAndKeysAndSize = RawBatchConverter.extractIdsAndKeysAndSize(rawMessage2);
                    Assert.assertEquals(extractIdsAndKeysAndSize.size(), 1);
                    Assert.assertEquals((String) ((ImmutableTriple) extractIdsAndKeysAndSize.get(0)).getMiddle(), "key2");
                    rawMessage2.close();
                    Assert.assertEquals(rawMessage.getHeadersAndPayload().refCnt(), 1);
                    if (rawMessage != null) {
                        rawMessage.close();
                    }
                } finally {
                }
            } finally {
                rawReader.closeAsync().get();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testAcknowledgeWithProperties() throws Exception {
        RawMessage rawMessage;
        Set<String> publishMessages = publishMessages("persistent://my-property/my-ns/my-raw-topic", 10);
        RawReader rawReader = (RawReader) RawReader.create(this.pulsarClient, "persistent://my-property/my-ns/my-raw-topic", subscription).get();
        MessageId messageId = (MessageId) rawReader.getLastMessageIdAsync().get();
        while (true) {
            rawMessage = (RawMessage) rawReader.readNextAsync().get();
            try {
                Assert.assertTrue(publishMessages.remove(extractKey(rawMessage)));
                if (messageId.compareTo(rawMessage.getMessageId()) == 0) {
                    break;
                } else if (rawMessage != null) {
                    rawMessage.close();
                }
            } catch (Throwable th) {
                if (rawMessage != null) {
                    try {
                        rawMessage.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (rawMessage != null) {
            rawMessage.close();
        }
        Assert.assertTrue(publishMessages.isEmpty());
        HashMap hashMap = new HashMap();
        hashMap.put("foobar", 244837814099658L);
        rawReader.acknowledgeCumulativeAsync(messageId, hashMap).get();
        ManagedLedger managedLedger = ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://my-property/my-ns/my-raw-topic").get()).getManagedLedger();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(managedLedger.openCursor(subscription).getProperties().get("foobar"), 244837814099658L);
        });
    }

    @Test
    public void testReadCancellationOnClose() throws Exception {
        publishMessages("persistent://my-property/my-ns/my-raw-topic", 10 / 2);
        RawReader rawReader = (RawReader) RawReader.create(this.pulsarClient, "persistent://my-property/my-ns/my-raw-topic", subscription).get();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(rawReader.readNextAsync());
        }
        for (int i2 = 0; i2 < 10 / 2; i2++) {
            ((Future) arrayList.remove(0)).get();
        }
        rawReader.closeAsync().get();
        while (!arrayList.isEmpty()) {
            try {
                ((Future) arrayList.remove(0)).get();
                Assert.fail("Should have been cancelled");
            } catch (CancellationException e) {
            }
        }
    }
}
