package org.apache.pulsar.broker.service.nonpersistent;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelPromise;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.testng.PowerMockObjectFactory;
import org.testng.Assert;
import org.testng.IObjectFactory;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;

@PrepareForTest({DispatchRateLimiter.class})
@PowerMockIgnore({"org.apache.logging.log4j.*"})
/* loaded from: input_file:org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.class */
public class NonPersistentStickyKeyDispatcherMultipleConsumersTest {
    private PulsarService pulsarMock;
    private BrokerService brokerMock;
    private NonPersistentTopic topicMock;
    private NonPersistentSubscription subscriptionMock;
    private ServiceConfiguration configMock;
    private ChannelPromise channelMock;
    private NonPersistentStickyKeyDispatcherMultipleConsumers nonpersistentDispatcher;
    final String topicName = "non-persistent://public/default/testTopic";

    @ObjectFactory
    public IObjectFactory getObjectFactory() {
        return new PowerMockObjectFactory();
    }

    @BeforeMethod
    public void setup() throws Exception {
        this.configMock = (ServiceConfiguration) Mockito.mock(ServiceConfiguration.class);
        ((ServiceConfiguration) Mockito.doReturn(true).when(this.configMock)).isSubscriptionRedeliveryTrackerEnabled();
        ((ServiceConfiguration) Mockito.doReturn(100).when(this.configMock)).getDispatcherMaxReadBatchSize();
        ((ServiceConfiguration) Mockito.doReturn(true).when(this.configMock)).isSubscriptionKeySharedUseConsistentHashing();
        ((ServiceConfiguration) Mockito.doReturn(1).when(this.configMock)).getSubscriptionKeySharedConsistentHashingReplicaPoints();
        this.pulsarMock = (PulsarService) Mockito.mock(PulsarService.class);
        ((PulsarService) Mockito.doReturn(this.configMock).when(this.pulsarMock)).getConfiguration();
        this.brokerMock = (BrokerService) Mockito.mock(BrokerService.class);
        ((BrokerService) Mockito.doReturn(this.pulsarMock).when(this.brokerMock)).pulsar();
        this.topicMock = (NonPersistentTopic) Mockito.mock(NonPersistentTopic.class);
        ((NonPersistentTopic) Mockito.doReturn(this.brokerMock).when(this.topicMock)).getBrokerService();
        ((NonPersistentTopic) Mockito.doReturn("non-persistent://public/default/testTopic").when(this.topicMock)).getName();
        this.channelMock = (ChannelPromise) Mockito.mock(ChannelPromise.class);
        this.subscriptionMock = (NonPersistentSubscription) Mockito.mock(NonPersistentSubscription.class);
        PowerMockito.mockStatic(DispatchRateLimiter.class, new Class[0]);
        PowerMockito.when(Boolean.valueOf(DispatchRateLimiter.isDispatchRateNeeded((BrokerService) ArgumentMatchers.any(BrokerService.class), (Optional) ArgumentMatchers.any(Optional.class), ArgumentMatchers.anyString(), (DispatchRateLimiter.Type) ArgumentMatchers.any(DispatchRateLimiter.Type.class)))).thenReturn(false);
        this.nonpersistentDispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(this.topicMock, this.subscriptionMock, new HashRangeAutoSplitStickyKeyConsumerSelector());
    }

    @Test(timeOut = 10000)
    public void testSendMessage() throws BrokerServiceException {
        Consumer consumer = (Consumer) Mockito.mock(Consumer.class);
        Mockito.when(Integer.valueOf(consumer.getAvailablePermits())).thenReturn(1000);
        Mockito.when(Boolean.valueOf(consumer.isWritable())).thenReturn(true);
        this.nonpersistentDispatcher.addConsumer(consumer);
        ArrayList arrayList = new ArrayList();
        arrayList.add(EntryImpl.create(1L, 1L, createMessage("message1", 1)));
        arrayList.add(EntryImpl.create(1L, 2L, createMessage("message2", 2)));
        ((Consumer) Mockito.doAnswer(invocationOnMock -> {
            ChannelPromise channelPromise = (ChannelPromise) Mockito.mock(ChannelPromise.class);
            List list = (List) invocationOnMock.getArgument(0, List.class);
            for (int i = 1; i <= list.size(); i++) {
                Entry entry = (Entry) list.get(i - 1);
                Assert.assertEquals(entry.getLedgerId(), 1L);
                Assert.assertEquals(entry.getEntryId(), i);
                ByteBuf dataBuffer = entry.getDataBuffer();
                Commands.parseMessageMetadata(dataBuffer);
                Assert.assertEquals(dataBuffer.toString(StandardCharsets.UTF_8), "message" + i);
            }
            return channelPromise;
        }).when(consumer)).sendMessages((List) ArgumentMatchers.any(List.class), (EntryBatchSizes) ArgumentMatchers.any(EntryBatchSizes.class), (EntryBatchIndexesAcks) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (RedeliveryTracker) ArgumentMatchers.any(RedeliveryTracker.class));
        try {
            this.nonpersistentDispatcher.sendMessages(arrayList);
        } catch (Exception e) {
            Assert.fail("Failed to sendMessages.", e);
        }
        ((Consumer) Mockito.verify(consumer, Mockito.times(1))).sendMessages((List) ArgumentMatchers.any(List.class), (EntryBatchSizes) ArgumentMatchers.any(EntryBatchSizes.class), (EntryBatchIndexesAcks) ArgumentMatchers.eq((Object) null), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (RedeliveryTracker) ArgumentMatchers.any(RedeliveryTracker.class));
    }

    @Test(timeOut = 10000)
    public void testSendMessageRespectFlowControl() throws BrokerServiceException {
        Consumer consumer = (Consumer) Mockito.mock(Consumer.class);
        this.nonpersistentDispatcher.addConsumer(consumer);
        ArrayList arrayList = new ArrayList();
        arrayList.add(EntryImpl.create(1L, 1L, createMessage("message1", 1)));
        arrayList.add(EntryImpl.create(1L, 2L, createMessage("message2", 2)));
        ((Consumer) Mockito.doAnswer(invocationOnMock -> {
            ChannelPromise channelPromise = (ChannelPromise) Mockito.mock(ChannelPromise.class);
            List list = (List) invocationOnMock.getArgument(0, List.class);
            for (int i = 1; i <= list.size(); i++) {
                Entry entry = (Entry) list.get(i - 1);
                Assert.assertEquals(entry.getLedgerId(), 1L);
                Assert.assertEquals(entry.getEntryId(), i);
                ByteBuf dataBuffer = entry.getDataBuffer();
                Commands.parseMessageMetadata(dataBuffer);
                Assert.assertEquals(dataBuffer.toString(StandardCharsets.UTF_8), "message" + i);
            }
            return channelPromise;
        }).when(consumer)).sendMessages((List) ArgumentMatchers.any(List.class), (EntryBatchSizes) ArgumentMatchers.any(EntryBatchSizes.class), (EntryBatchIndexesAcks) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (RedeliveryTracker) ArgumentMatchers.any(RedeliveryTracker.class));
        try {
            this.nonpersistentDispatcher.sendMessages(arrayList);
        } catch (Exception e) {
            Assert.fail("Failed to sendMessages.", e);
        }
        ((Consumer) Mockito.verify(consumer, Mockito.times(0))).sendMessages((List) ArgumentMatchers.any(List.class), (EntryBatchSizes) ArgumentMatchers.any(EntryBatchSizes.class), (EntryBatchIndexesAcks) ArgumentMatchers.eq((Object) null), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (RedeliveryTracker) ArgumentMatchers.any(RedeliveryTracker.class));
    }

    private ByteBuf createMessage(String str, int i) {
        return createMessage(str, i, "testKey");
    }

    private ByteBuf createMessage(String str, int i, String str2) {
        return Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, new MessageMetadata().setSequenceId(i).setProducerName("testProducer").setPartitionKey(str2).setPartitionKeyB64Encoded(false).setPublishTime(System.currentTimeMillis()), Unpooled.copiedBuffer(str.getBytes(StandardCharsets.UTF_8)));
    }
}
