package org.apache.pulsar.broker.service;

import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.service.plugin.FilterContext;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.class */
public class AbstractBaseDispatcherTest {
    private AbstractBaseDispatcherTestHelper helper;
    private ServiceConfiguration svcConfig;
    private PersistentSubscription subscriptionMock;

    /* loaded from: input_file:org/apache/pulsar/broker/service/AbstractBaseDispatcherTest$AbstractBaseDispatcherTestHelper.class */
    private static class AbstractBaseDispatcherTestHelper extends AbstractBaseDispatcher {
        private final Optional<DispatchRateLimiter> dispatchRateLimiter;

        protected AbstractBaseDispatcherTestHelper(Subscription subscription, ServiceConfiguration serviceConfiguration, DispatchRateLimiter dispatchRateLimiter) {
            super(subscription, serviceConfiguration);
            this.dispatchRateLimiter = Optional.ofNullable(dispatchRateLimiter);
        }

        public Optional<DispatchRateLimiter> getRateLimiter() {
            return this.dispatchRateLimiter;
        }

        protected boolean isConsumersExceededOnSubscription() {
            return false;
        }

        public boolean trackDelayedDelivery(long j, long j2, MessageMetadata messageMetadata) {
            return true;
        }

        protected void reScheduleRead() {
        }

        public void addConsumer(Consumer consumer) throws BrokerServiceException {
        }

        public void removeConsumer(Consumer consumer) throws BrokerServiceException {
        }

        public void consumerFlow(Consumer consumer, int i) {
        }

        public boolean isConsumerConnected() {
            return false;
        }

        public List<Consumer> getConsumers() {
            return null;
        }

        public boolean canUnsubscribe(Consumer consumer) {
            return false;
        }

        public CompletableFuture<Void> close() {
            return null;
        }

        public boolean isClosed() {
            return false;
        }

        public CompletableFuture<Void> disconnectActiveConsumers(boolean z) {
            return null;
        }

        public CompletableFuture<Void> disconnectAllConsumers(boolean z) {
            return null;
        }

        public void reset() {
        }

        public CommandSubscribe.SubType getType() {
            return null;
        }

        public void redeliverUnacknowledgedMessages(Consumer consumer, long j) {
        }

        public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> list) {
        }

        public void addUnAckedMessages(int i) {
        }

        public RedeliveryTracker getRedeliveryTracker() {
            return null;
        }
    }

    @BeforeMethod
    public void setup() throws Exception {
        this.svcConfig = (ServiceConfiguration) Mockito.mock(ServiceConfiguration.class);
        Mockito.when(Boolean.valueOf(this.svcConfig.isDispatchThrottlingForFilteredEntriesEnabled())).thenReturn(true);
        this.subscriptionMock = (PersistentSubscription) Mockito.mock(PersistentSubscription.class);
        this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig, null);
    }

    @Test
    public void testFilterEntriesForConsumerOfNullElement() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(null);
        SendMessageInfo threadLocal = SendMessageInfo.getThreadLocal();
        Assert.assertEquals(this.helper.filterEntriesForConsumer(arrayList, EntryBatchSizes.get(arrayList.size()), threadLocal, null, null, false, null), 0);
    }

    @Test
    public void testFilterEntriesForConsumerOfEntryFilter() throws Exception {
        Topic topic = (Topic) Mockito.mock(Topic.class);
        Mockito.when(this.subscriptionMock.getTopic()).thenReturn(topic);
        Mockito.when(topic.getBrokerService()).thenReturn((BrokerService) Mockito.mock(BrokerService.class));
        EntryFilterWithClassLoader entryFilterWithClassLoader = (EntryFilterWithClassLoader) Mockito.mock(EntryFilterWithClassLoader.class);
        Mockito.when(entryFilterWithClassLoader.filterEntry((Entry) ArgumentMatchers.any(Entry.class), (FilterContext) ArgumentMatchers.any(FilterContext.class))).thenReturn(EntryFilter.FilterResult.REJECT);
        Mockito.when(topic.getEntryFilters()).thenReturn(ImmutableMap.of("key", entryFilterWithClassLoader));
        DispatchRateLimiter dispatchRateLimiter = (DispatchRateLimiter) Mockito.mock(DispatchRateLimiter.class);
        this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig, dispatchRateLimiter);
        ArrayList arrayList = new ArrayList();
        EntryImpl create = EntryImpl.create(1L, 2L, createMessage("message1", 1));
        long length = create.getLength();
        arrayList.add(create);
        Assert.assertEquals(this.helper.filterEntriesForConsumer(arrayList, EntryBatchSizes.get(arrayList.size()), SendMessageInfo.getThreadLocal(), null, (ManagedCursor) Mockito.mock(ManagedCursor.class), false, null), 0);
        ((DispatchRateLimiter) Mockito.verify(dispatchRateLimiter)).tryDispatchPermit(1L, length);
    }

    @Test
    public void testFilterEntriesForConsumerOfTxnMsgAbort() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(EntryImpl.create(1L, 1L, createTnxAbortMessage("message1", 1)));
        SendMessageInfo threadLocal = SendMessageInfo.getThreadLocal();
        Assert.assertEquals(this.helper.filterEntriesForConsumer(arrayList, EntryBatchSizes.get(arrayList.size()), threadLocal, null, null, false, null), 0);
    }

    @Test
    public void testFilterEntriesForConsumerOfTxnBufferAbort() {
        PersistentTopic persistentTopic = (PersistentTopic) Mockito.mock(PersistentTopic.class);
        Mockito.when(this.subscriptionMock.getTopic()).thenReturn(persistentTopic);
        Mockito.when(Boolean.valueOf(persistentTopic.isTxnAborted((TxnID) ArgumentMatchers.any(TxnID.class)))).thenReturn(true);
        ArrayList arrayList = new ArrayList();
        arrayList.add(EntryImpl.create(1L, 1L, createTnxMessage("message1", 1)));
        SendMessageInfo threadLocal = SendMessageInfo.getThreadLocal();
        Assert.assertEquals(this.helper.filterEntriesForConsumer(arrayList, EntryBatchSizes.get(arrayList.size()), threadLocal, null, null, false, null), 0);
    }

    @Test
    public void testFilterEntriesForConsumerOfServerOnlyMarker() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(EntryImpl.create(1L, 1L, Markers.newReplicatedSubscriptionsSnapshotRequest("testSnapshotId", "testSourceCluster")));
        SendMessageInfo threadLocal = SendMessageInfo.getThreadLocal();
        Assert.assertEquals(this.helper.filterEntriesForConsumer(arrayList, EntryBatchSizes.get(arrayList.size()), threadLocal, null, null, false, null), 0);
    }

    @Test
    public void testFilterEntriesForConsumerOfDelayedMsg() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(EntryImpl.create(1L, 1L, createDelayedMessage("message1", 1)));
        SendMessageInfo threadLocal = SendMessageInfo.getThreadLocal();
        Assert.assertEquals(this.helper.filterEntriesForConsumer(arrayList, EntryBatchSizes.get(arrayList.size()), threadLocal, null, null, false, null), 0);
    }

    private ByteBuf createMessage(String str, int i) {
        return Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, new MessageMetadata().setSequenceId(i).setProducerName("testProducer").setPartitionKeyB64Encoded(false).setPublishTime(System.currentTimeMillis()), Unpooled.copiedBuffer(str.getBytes(StandardCharsets.UTF_8)));
    }

    private ByteBuf createTnxMessage(String str, int i) {
        return Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, new MessageMetadata().setSequenceId(i).setProducerName("testProducer").setPartitionKeyB64Encoded(false).setPublishTime(System.currentTimeMillis()).setTxnidMostBits(8L).setTxnidLeastBits(0L), Unpooled.copiedBuffer(str.getBytes(StandardCharsets.UTF_8)));
    }

    private ByteBuf createTnxAbortMessage(String str, int i) {
        return Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, new MessageMetadata().setSequenceId(i).setProducerName("testProducer").setPartitionKeyB64Encoded(false).setPublishTime(System.currentTimeMillis()).setTxnidMostBits(8L).setTxnidLeastBits(0L).setMarkerType(22), Unpooled.copiedBuffer(str.getBytes(StandardCharsets.UTF_8)));
    }

    private ByteBuf createDelayedMessage(String str, int i) {
        return Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, new MessageMetadata().setSequenceId(i).setProducerName("testProducer").setPartitionKeyB64Encoded(false).setPublishTime(System.currentTimeMillis()).setDeliverAtTime(System.currentTimeMillis() + 5000), Unpooled.copiedBuffer(str.getBytes(StandardCharsets.UTF_8)));
    }
}
