package org.apache.pulsar.broker.service;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.assertj.core.util.Sets;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.class */
public class BrokerEntryMetadataE2ETest extends BrokerTestBase {
    @DataProvider(name = "subscriptionTypes")
    public static Object[] subscriptionTypes() {
        return new Object[]{SubscriptionType.Exclusive, SubscriptionType.Failover, SubscriptionType.Shared, SubscriptionType.Key_Shared};
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        this.conf.setBrokerEntryMetadataInterceptors(Sets.newTreeSet(new String[]{"org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor", "org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor"}));
        this.conf.setExposingBrokerEntryMetadataToClientEnabled(true);
        baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        internalCleanup();
    }

    @Test(dataProvider = "subscriptionTypes")
    public void testProduceAndConsume(SubscriptionType subscriptionType) throws Exception {
        String newTopicName = newTopicName();
        Producer create = this.pulsarClient.newProducer().topic(newTopicName).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{newTopicName}).subscriptionType(subscriptionType).subscriptionName("my-sub").subscribe();
            for (int i = 0; i < 10; i++) {
                try {
                    create.send(String.valueOf(i).getBytes());
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    throw th;
                }
            }
            int i2 = 0;
            for (int i3 = 0; i3 < 10; i3++) {
                i2++;
                Assert.assertEquals(i3, Integer.valueOf(new String((byte[]) subscribe.receive().getValue())).intValue());
            }
            Assert.assertEquals(10, i2);
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test(timeOut = 20000)
    public void testPeekMessage() throws Exception {
        String newTopicName = newTopicName();
        Producer create = this.pulsarClient.newProducer().topic(newTopicName).create();
        try {
            long currentTimeMillis = System.currentTimeMillis();
            create.newMessage().eventTime(200L).deliverAt(300L).value("hello".getBytes()).send();
            this.admin.topics().createSubscription(newTopicName, "my-sub", MessageId.earliest);
            List peekMessages = this.admin.topics().peekMessages(newTopicName, "my-sub", 1);
            Assert.assertEquals(peekMessages.size(), 1);
            MessageImpl messageImpl = (MessageImpl) peekMessages.get(0);
            Assert.assertEquals(messageImpl.getData(), "hello".getBytes());
            Assert.assertEquals(messageImpl.getEventTime(), 200L);
            Assert.assertEquals(messageImpl.getDeliverAtTime(), 300L);
            Assert.assertTrue(messageImpl.getPublishTime() >= currentTimeMillis);
            BrokerEntryMetadata brokerEntryMetadata = messageImpl.getBrokerEntryMetadata();
            Assert.assertEquals(brokerEntryMetadata.getIndex(), 0L);
            Assert.assertTrue(brokerEntryMetadata.getBrokerTimestamp() >= currentTimeMillis);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test(timeOut = 20000)
    public void testGetMessageById() throws Exception {
        String newTopicName = newTopicName();
        Producer create = this.pulsarClient.newProducer().topic(newTopicName).create();
        try {
            long currentTimeMillis = System.currentTimeMillis();
            MessageIdImpl send = create.newMessage().eventTime(200L).deliverAt(300L).value("hello".getBytes()).send();
            this.admin.topics().createSubscription(newTopicName, "my-sub", MessageId.earliest);
            MessageImpl messageById = this.admin.topics().getMessageById(newTopicName, send.getLedgerId(), send.getEntryId());
            Assert.assertEquals(messageById.getData(), "hello".getBytes());
            Assert.assertEquals(messageById.getEventTime(), 200L);
            Assert.assertEquals(messageById.getDeliverAtTime(), 300L);
            Assert.assertTrue(messageById.getPublishTime() >= currentTimeMillis);
            BrokerEntryMetadata brokerEntryMetadata = messageById.getBrokerEntryMetadata();
            Assert.assertEquals(brokerEntryMetadata.getIndex(), 0L);
            Assert.assertTrue(brokerEntryMetadata.getBrokerTimestamp() >= currentTimeMillis);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test(timeOut = 20000)
    public void testExamineMessage() throws Exception {
        String newTopicName = newTopicName();
        Producer create = this.pulsarClient.newProducer().topic(newTopicName).create();
        try {
            long currentTimeMillis = System.currentTimeMillis();
            create.newMessage().eventTime(200L).deliverAt(300L).value("hello".getBytes()).send();
            this.admin.topics().createSubscription(newTopicName, "my-sub", MessageId.earliest);
            MessageImpl examineMessage = this.admin.topics().examineMessage(newTopicName, "earliest", 1L);
            Assert.assertEquals(examineMessage.getData(), "hello".getBytes());
            Assert.assertEquals(examineMessage.getEventTime(), 200L);
            Assert.assertEquals(examineMessage.getDeliverAtTime(), 300L);
            Assert.assertTrue(examineMessage.getPublishTime() >= currentTimeMillis);
            BrokerEntryMetadata brokerEntryMetadata = examineMessage.getBrokerEntryMetadata();
            Assert.assertEquals(brokerEntryMetadata.getIndex(), 0L);
            Assert.assertTrue(brokerEntryMetadata.getBrokerTimestamp() >= currentTimeMillis);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test(timeOut = 20000)
    public void testGetLastMessageId() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-test").create();
        try {
            create.newMessage().value("hello".getBytes()).send();
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic-test"}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("my-sub").subscribe();
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test(timeOut = 20000)
    public void testConsumerGetBrokerEntryMetadataForIndividualMessage() throws Exception {
        String newTopicName = newTopicName();
        Producer create = this.pulsarClient.newProducer().topic(newTopicName).enableBatching(false).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{newTopicName}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("my-sub").subscribe();
            try {
                long currentTimeMillis = System.currentTimeMillis();
                for (int i = 0; i < 10; i++) {
                    create.send(String.valueOf(i).getBytes());
                }
                for (int i2 = 0; i2 < 10; i2++) {
                    Message receive = subscribe.receive();
                    Assert.assertTrue(receive.hasBrokerPublishTime() && ((Long) receive.getBrokerPublishTime().orElse(-1L)).longValue() >= currentTimeMillis);
                    Assert.assertTrue(receive.hasIndex() && ((Long) receive.getIndex().orElse(-1L)).longValue() == ((long) i2));
                }
                create.close();
                subscribe.close();
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test(timeOut = 20000)
    public void testConsumerGetBrokerEntryMetadataForBatchMessage() throws Exception {
        String newTopicName = newTopicName();
        Producer create = this.pulsarClient.newProducer().topic(newTopicName).enableBatching(true).batchingMaxPublishDelay(1L, TimeUnit.MINUTES).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{newTopicName}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("my-sub").subscribe();
            try {
                long currentTimeMillis = System.currentTimeMillis();
                int i = 0;
                while (i < 5) {
                    create.sendAsync(String.valueOf(i).getBytes());
                    i++;
                }
                create.flush();
                while (i < 10) {
                    create.sendAsync(String.valueOf(i).getBytes());
                    i++;
                }
                create.flush();
                for (int i2 = 0; i2 < i; i2++) {
                    Message receive = subscribe.receive();
                    Assert.assertTrue(receive.hasBrokerPublishTime() && ((Long) receive.getBrokerPublishTime().orElse(-1L)).longValue() >= currentTimeMillis);
                    Assert.assertTrue(receive.hasIndex() && ((Long) receive.getIndex().orElse(-1L)).longValue() == ((long) i2));
                }
                create.close();
                subscribe.close();
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }
}
