package org.apache.pulsar.client.impl;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.raw.MessageParser;
import org.apache.pulsar.common.api.raw.RawMessage;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/impl/MessageParserTest.class */
public class MessageParserTest extends MockedPulsarServiceBaseTest {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("test", new ClusterData(this.pulsar.getWebServiceAddress()));
        this.admin.tenants().createTenant("my-tenant", new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
        this.admin.namespaces().createNamespace("my-tenant/my-ns", Sets.newHashSet("test"));
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testWithoutBatches() throws Exception {
        TopicName topicName = TopicName.get("persistent://my-tenant/my-ns/my-topic");
        Producer create = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic("persistent://my-tenant/my-ns/my-topic").create();
        for (int i = 0; i < 10; i++) {
            try {
                create.send("hello-" + i);
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (create != null) {
            create.close();
        }
        ManagedCursor newNonDurableCursor = ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://my-tenant/my-ns/my-topic").get()).getManagedLedger().newNonDurableCursor(PositionImpl.earliest);
        for (int i2 = 0; i2 < 10; i2++) {
            Entry entry = (Entry) newNonDurableCursor.readEntriesOrWait(1).get(0);
            ArrayList newArrayList = Lists.newArrayList();
            try {
                MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer(), rawMessage -> {
                    newArrayList.add(rawMessage);
                }, 5242880);
                entry.release();
                Assert.assertEquals(newArrayList.size(), 1);
                Assert.assertEquals(((RawMessage) newArrayList.get(0)).getData(), Unpooled.wrappedBuffer(("hello-" + i2).getBytes()));
                newArrayList.forEach((v0) -> {
                    v0.release();
                });
            } catch (Throwable th3) {
                entry.release();
                throw th3;
            }
        }
    }

    @Test
    public void testWithBatches() throws Exception {
        TopicName topicName = TopicName.get("persistent://my-tenant/my-ns/my-topic-with-batch");
        Producer create = this.pulsarClient.newProducer(Schema.STRING).enableBatching(true).batchingMaxPublishDelay(10L, TimeUnit.SECONDS).topic("persistent://my-tenant/my-ns/my-topic-with-batch").create();
        ManagedCursor newNonDurableCursor = ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://my-tenant/my-ns/my-topic-with-batch").get()).getManagedLedger().newNonDurableCursor(PositionImpl.earliest);
        for (int i = 0; i < 10 - 1; i++) {
            create.sendAsync("hello-" + i);
        }
        create.send("hello-" + (10 - 1));
        Assert.assertEquals(newNonDurableCursor.getNumberOfEntriesInBacklog(false), 1L);
        Entry entry = (Entry) newNonDurableCursor.readEntriesOrWait(1).get(0);
        ArrayList newArrayList = Lists.newArrayList();
        try {
            MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer(), rawMessage -> {
                newArrayList.add(rawMessage);
            }, 5242880);
            entry.release();
            Assert.assertEquals(newArrayList.size(), 10);
            for (int i2 = 0; i2 < 10; i2++) {
                Assert.assertEquals(((RawMessage) newArrayList.get(i2)).getData(), Unpooled.wrappedBuffer(("hello-" + i2).getBytes()));
            }
            newArrayList.forEach((v0) -> {
                v0.release();
            });
            create.close();
        } catch (Throwable th) {
            entry.release();
            throw th;
        }
    }
}
