package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.venice.exceptions.VeniceChecksumException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.LeaderMetadata;
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.ImmutablePubSubMessage;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.Utils;
import java.nio.ByteBuffer;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/StoreBufferServiceTest.class */
public class StoreBufferServiceTest {
    private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
    private final KafkaKey key = new KafkaKey(MessageType.PUT, (byte[]) null);
    private final Put put = new Put(ByteBuffer.allocate(0), 0, 0, ByteBuffer.allocate(0));
    private final KafkaMessageEnvelope value = new KafkaMessageEnvelope(Integer.valueOf(MessageType.PUT.getValue()), new ProducerMetadata(), this.put, (LeaderMetadata) null);
    private final LeaderProducedRecordContext leaderContext = LeaderProducedRecordContext.newPutRecord(0, 0, this.key.getKey(), this.put);
    private static final int TIMEOUT_IN_MS = 1000;

    @Test(dataProviderClass = DataProviderUtils.class, dataProvider = "True-and-False")
    public void testRun(boolean z) throws Exception {
        StoreBufferService storeBufferService = new StoreBufferService(1, 10000L, 1000L, z);
        StoreIngestionTask storeIngestionTask = (StoreIngestionTask) Mockito.mock(StoreIngestionTask.class);
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(Utils.getUniqueString("test_topic") + "_v1");
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(topic, 1);
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl2 = new PubSubTopicPartitionImpl(topic, 2);
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl3 = new PubSubTopicPartitionImpl(topic, 3);
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl4 = new PubSubTopicPartitionImpl(topic, 4);
        ImmutablePubSubMessage immutablePubSubMessage = new ImmutablePubSubMessage(this.key, this.value, pubSubTopicPartitionImpl, -1L, 0L, 0);
        ImmutablePubSubMessage immutablePubSubMessage2 = new ImmutablePubSubMessage(this.key, this.value, pubSubTopicPartitionImpl2, -1L, 0L, 0);
        ImmutablePubSubMessage immutablePubSubMessage3 = new ImmutablePubSubMessage(this.key, this.value, pubSubTopicPartitionImpl3, -1L, 0L, 0);
        ImmutablePubSubMessage immutablePubSubMessage4 = new ImmutablePubSubMessage(this.key, this.value, pubSubTopicPartitionImpl4, -1L, 0L, 0);
        storeBufferService.putConsumerRecord(immutablePubSubMessage, storeIngestionTask, (LeaderProducedRecordContext) null, 1, "blah", 0L);
        storeBufferService.putConsumerRecord(immutablePubSubMessage2, storeIngestionTask, (LeaderProducedRecordContext) null, 2, "blah", 0L);
        storeBufferService.putConsumerRecord(immutablePubSubMessage3, storeIngestionTask, this.leaderContext, 3, "blah", 0L);
        storeBufferService.putConsumerRecord(immutablePubSubMessage4, storeIngestionTask, this.leaderContext, 4, "blah", 0L);
        storeBufferService.start();
        ((StoreIngestionTask) Mockito.verify(storeIngestionTask, Mockito.timeout(1000L))).processConsumerRecord(immutablePubSubMessage, (LeaderProducedRecordContext) null, 1, "blah", 0L);
        ((StoreIngestionTask) Mockito.verify(storeIngestionTask, Mockito.timeout(1000L))).processConsumerRecord(immutablePubSubMessage2, (LeaderProducedRecordContext) null, 2, "blah", 0L);
        ((StoreIngestionTask) Mockito.verify(storeIngestionTask, Mockito.timeout(1000L))).processConsumerRecord(immutablePubSubMessage3, this.leaderContext, 3, "blah", 0L);
        ((StoreIngestionTask) Mockito.verify(storeIngestionTask, Mockito.timeout(1000L))).processConsumerRecord(immutablePubSubMessage4, this.leaderContext, 4, "blah", 0L);
        storeBufferService.stop();
        Assert.assertThrows(VeniceException.class, () -> {
            storeBufferService.drainBufferedRecordsFromTopicPartition(pubSubTopicPartitionImpl);
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test(dataProviderClass = DataProviderUtils.class, dataProvider = "True-and-False")
    public void testRunWhenThrowException(boolean z) throws Exception {
        StoreBufferService storeBufferService = new StoreBufferService(1, 10000L, 1000L, z);
        StoreIngestionTask storeIngestionTask = (StoreIngestionTask) Mockito.mock(StoreIngestionTask.class);
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(Utils.getUniqueString("test_topic") + "_v1");
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(topic, 1);
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl2 = new PubSubTopicPartitionImpl(topic, 2);
        ImmutablePubSubMessage immutablePubSubMessage = new ImmutablePubSubMessage(this.key, this.value, pubSubTopicPartitionImpl, -1L, 0L, 0);
        ImmutablePubSubMessage immutablePubSubMessage2 = new ImmutablePubSubMessage(this.key, this.value, pubSubTopicPartitionImpl2, -1L, 0L, 0);
        VeniceException veniceException = new VeniceException("test_exception");
        ((StoreIngestionTask) Mockito.doThrow(new Throwable[]{veniceException}).when(storeIngestionTask)).processConsumerRecord(immutablePubSubMessage, (LeaderProducedRecordContext) null, 1, "blah", 0L);
        storeBufferService.putConsumerRecord(immutablePubSubMessage, storeIngestionTask, (LeaderProducedRecordContext) null, 1, "blah", 0L);
        storeBufferService.putConsumerRecord(immutablePubSubMessage2, storeIngestionTask, (LeaderProducedRecordContext) null, 2, "blah", 0L);
        storeBufferService.start();
        ((StoreIngestionTask) Mockito.verify(storeIngestionTask, Mockito.timeout(1000L))).processConsumerRecord(immutablePubSubMessage, (LeaderProducedRecordContext) null, 1, "blah", 0L);
        ((StoreIngestionTask) Mockito.verify(storeIngestionTask, Mockito.timeout(1000L))).processConsumerRecord(immutablePubSubMessage2, (LeaderProducedRecordContext) null, 2, "blah", 0L);
        ((StoreIngestionTask) Mockito.verify(storeIngestionTask)).setIngestionException(1, veniceException);
        storeBufferService.stop();
    }

    @Test(dataProviderClass = DataProviderUtils.class, dataProvider = "True-and-False")
    public void testDrainBufferedRecordsWhenNotExists(boolean z) throws Exception {
        StoreBufferService storeBufferService = new StoreBufferService(1, 10000L, 1000L, z);
        StoreIngestionTask storeIngestionTask = (StoreIngestionTask) Mockito.mock(StoreIngestionTask.class);
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(Utils.getUniqueString("test_topic") + "_v1");
        ImmutablePubSubMessage immutablePubSubMessage = new ImmutablePubSubMessage(this.key, this.value, new PubSubTopicPartitionImpl(topic, 1), -1L, 0L, 0);
        storeBufferService.start();
        storeBufferService.putConsumerRecord(immutablePubSubMessage, storeIngestionTask, (LeaderProducedRecordContext) null, 1, "blah", 0L);
        storeBufferService.internalDrainBufferedRecordsFromTopicPartition(new PubSubTopicPartitionImpl(topic, 2), 3, 50);
        storeBufferService.stop();
    }

    @Test(dataProviderClass = DataProviderUtils.class, dataProvider = "True-and-False")
    public void testDrainBufferedRecordsWhenExists(boolean z) throws Exception {
        StoreBufferService storeBufferService = new StoreBufferService(1, 10000L, 1000L, z);
        StoreIngestionTask storeIngestionTask = (StoreIngestionTask) Mockito.mock(StoreIngestionTask.class);
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(this.pubSubTopicRepository.getTopic(Utils.getUniqueString("test_topic") + "_v1"), 1);
        ImmutablePubSubMessage immutablePubSubMessage = new ImmutablePubSubMessage(this.key, this.value, pubSubTopicPartitionImpl, 100L, 0L, 0);
        storeBufferService.start();
        storeBufferService.putConsumerRecord(immutablePubSubMessage, storeIngestionTask, (LeaderProducedRecordContext) null, 1, "blah", 0L);
        storeBufferService.internalDrainBufferedRecordsFromTopicPartition(pubSubTopicPartitionImpl, 3, 50);
        storeBufferService.stop();
    }

    @Test(dataProviderClass = DataProviderUtils.class, dataProvider = "True-and-False")
    public void testGetDrainerIndexForConsumerRecordSeparateDrainer(boolean z) {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(Utils.getUniqueString("test_topic") + "_v1");
        int[] iArr = new int[16];
        for (int i = 0; i < 16; i++) {
            iArr[i] = 0;
        }
        VeniceServerConfig veniceServerConfig = (VeniceServerConfig) Mockito.mock(VeniceServerConfig.class);
        ((VeniceServerConfig) Mockito.doReturn(8).when(veniceServerConfig)).getDrainerPoolSizeSortedInput();
        ((VeniceServerConfig) Mockito.doReturn(8).when(veniceServerConfig)).getDrainerPoolSizeUnsortedInput();
        ((VeniceServerConfig) Mockito.doReturn(1000L).when(veniceServerConfig)).getStoreWriterBufferNotifyDelta();
        ((VeniceServerConfig) Mockito.doReturn(10000L).when(veniceServerConfig)).getStoreWriterBufferMemoryCapacity();
        ((VeniceServerConfig) Mockito.doReturn(Boolean.valueOf(z)).when(veniceServerConfig)).isStoreWriterBufferAfterLeaderLogicEnabled();
        SeparatedStoreBufferService separatedStoreBufferService = new SeparatedStoreBufferService(veniceServerConfig);
        for (int i2 = 0; i2 < 32; i2++) {
            ImmutablePubSubMessage immutablePubSubMessage = new ImmutablePubSubMessage(this.key, this.value, new PubSubTopicPartitionImpl(topic, i2), 100L, 0L, 0);
            if (i2 < 16) {
                int drainerIndexForConsumerRecord = separatedStoreBufferService.sortedServiceDelegate.getDrainerIndexForConsumerRecord(immutablePubSubMessage, i2);
                iArr[drainerIndexForConsumerRecord] = iArr[drainerIndexForConsumerRecord] + 1;
            } else {
                int drainerIndexForConsumerRecord2 = separatedStoreBufferService.unsortedServiceDelegate.getDrainerIndexForConsumerRecord(immutablePubSubMessage, i2) + 8;
                iArr[drainerIndexForConsumerRecord2] = iArr[drainerIndexForConsumerRecord2] + 1;
            }
        }
        for (int i3 = 0; i3 < 16; i3++) {
            Assert.assertNotNull(Long.valueOf(separatedStoreBufferService.getDrainerQueueMemoryUsage(i3)));
        }
        int i4 = 32 / 16;
        for (int i5 = 0; i5 < 16; i5++) {
            Assert.assertEquals(iArr[i5], i4);
        }
    }

    @Test(dataProviderClass = DataProviderUtils.class, dataProvider = "True-and-False")
    public void testGetDrainerIndexForConsumerRecord(boolean z) {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(Utils.getUniqueString("test_topic") + "_v1");
        int[] iArr = new int[8];
        for (int i = 0; i < 8; i++) {
            iArr[i] = 0;
        }
        StoreBufferService storeBufferService = new StoreBufferService(8, 10000L, 1000L, z);
        for (int i2 = 0; i2 < 64; i2++) {
            int drainerIndexForConsumerRecord = storeBufferService.getDrainerIndexForConsumerRecord(new ImmutablePubSubMessage(this.key, this.value, new PubSubTopicPartitionImpl(topic, i2), 100L, 0L, 0), i2);
            iArr[drainerIndexForConsumerRecord] = iArr[drainerIndexForConsumerRecord] + 1;
        }
        int i3 = 64 / 8;
        for (int i4 = 0; i4 < 8; i4++) {
            Assert.assertEquals(iArr[i4], i3);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test(dataProviderClass = DataProviderUtils.class, dataProvider = "True-and-False")
    public void testRunWhenThrowVeniceCheckSumFailException(boolean z) throws Exception {
        StoreBufferService storeBufferService = new StoreBufferService(1, 10000L, 1000L, z);
        StoreIngestionTask storeIngestionTask = (StoreIngestionTask) Mockito.mock(StoreIngestionTask.class);
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(Utils.getUniqueString("test_topic") + "_v1");
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(topic, 1);
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl2 = new PubSubTopicPartitionImpl(topic, 2);
        ImmutablePubSubMessage immutablePubSubMessage = new ImmutablePubSubMessage(this.key, this.value, pubSubTopicPartitionImpl, -1L, 0L, 0);
        ImmutablePubSubMessage immutablePubSubMessage2 = new ImmutablePubSubMessage(this.key, this.value, pubSubTopicPartitionImpl2, -1L, 0L, 0);
        VeniceChecksumException veniceChecksumException = new VeniceChecksumException("test_exception");
        ((StoreIngestionTask) Mockito.doThrow(new Throwable[]{veniceChecksumException}).when(storeIngestionTask)).processConsumerRecord(immutablePubSubMessage, (LeaderProducedRecordContext) null, 1, "blah", 0L);
        storeBufferService.putConsumerRecord(immutablePubSubMessage, storeIngestionTask, (LeaderProducedRecordContext) null, 1, "blah", 0L);
        storeBufferService.putConsumerRecord(immutablePubSubMessage2, storeIngestionTask, (LeaderProducedRecordContext) null, 2, "blah", 0L);
        storeBufferService.start();
        ((StoreIngestionTask) Mockito.verify(storeIngestionTask, Mockito.timeout(1000L))).processConsumerRecord(immutablePubSubMessage, (LeaderProducedRecordContext) null, 1, "blah", 0L);
        ((StoreIngestionTask) Mockito.verify(storeIngestionTask, Mockito.timeout(1000L))).processConsumerRecord(immutablePubSubMessage2, (LeaderProducedRecordContext) null, 2, "blah", 0L);
        storeBufferService.getMaxMemoryUsagePerDrainer();
        for (int i = 0; i < 1; i++) {
            Assert.assertTrue(storeBufferService.getTopicToTimeSpentMap(i).size() == 0);
        }
        ((StoreIngestionTask) Mockito.verify(storeIngestionTask)).setIngestionException(1, veniceChecksumException);
        ((StoreIngestionTask) Mockito.verify(storeIngestionTask)).recordChecksumVerificationFailure();
        storeBufferService.stop();
    }

    @Test(dataProviderClass = DataProviderUtils.class, dataProvider = "True-and-False")
    public void testPutConsumerRecord(boolean z) throws InterruptedException {
        VeniceServerConfig veniceServerConfig = (VeniceServerConfig) Mockito.mock(VeniceServerConfig.class);
        ((VeniceServerConfig) Mockito.doReturn(8).when(veniceServerConfig)).getDrainerPoolSizeSortedInput();
        ((VeniceServerConfig) Mockito.doReturn(8).when(veniceServerConfig)).getDrainerPoolSizeUnsortedInput();
        ((VeniceServerConfig) Mockito.doReturn(1000L).when(veniceServerConfig)).getStoreWriterBufferNotifyDelta();
        ((VeniceServerConfig) Mockito.doReturn(10000L).when(veniceServerConfig)).getStoreWriterBufferMemoryCapacity();
        ((VeniceServerConfig) Mockito.doReturn(Boolean.valueOf(z)).when(veniceServerConfig)).isStoreWriterBufferAfterLeaderLogicEnabled();
        StoreBufferService storeBufferService = (StoreBufferService) Mockito.mock(StoreBufferService.class);
        StoreBufferService storeBufferService2 = (StoreBufferService) Mockito.mock(StoreBufferService.class);
        SeparatedStoreBufferService separatedStoreBufferService = new SeparatedStoreBufferService(8, 8, storeBufferService, storeBufferService2);
        StoreIngestionTask storeIngestionTask = (StoreIngestionTask) Mockito.mock(StoreIngestionTask.class);
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(Utils.getUniqueString("test_topic") + "_v1");
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(topic, 1);
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl2 = new PubSubTopicPartitionImpl(topic, 2);
        ImmutablePubSubMessage immutablePubSubMessage = new ImmutablePubSubMessage(this.key, this.value, pubSubTopicPartitionImpl, 0L, 0L, 0);
        ImmutablePubSubMessage immutablePubSubMessage2 = new ImmutablePubSubMessage(this.key, this.value, pubSubTopicPartitionImpl2, 0L, 0L, 0);
        ImmutablePubSubMessage immutablePubSubMessage3 = new ImmutablePubSubMessage(this.key, this.value, pubSubTopicPartitionImpl, 1L, 0L, 0);
        ImmutablePubSubMessage immutablePubSubMessage4 = new ImmutablePubSubMessage(this.key, this.value, pubSubTopicPartitionImpl2, 1L, 0L, 0);
        separatedStoreBufferService.putConsumerRecord(immutablePubSubMessage, storeIngestionTask, (LeaderProducedRecordContext) null, 1, "blah", 0L);
        ((StoreBufferService) Mockito.verify(storeBufferService2)).putConsumerRecord(immutablePubSubMessage, storeIngestionTask, (LeaderProducedRecordContext) null, 1, "blah", 0L);
        PartitionConsumptionState partitionConsumptionState = (PartitionConsumptionState) Mockito.mock(PartitionConsumptionState.class);
        Mockito.when(Boolean.valueOf(partitionConsumptionState.isDeferredWrite())).thenReturn(true);
        Mockito.when(storeIngestionTask.getPartitionConsumptionState(1)).thenReturn(partitionConsumptionState);
        separatedStoreBufferService.putConsumerRecord(immutablePubSubMessage2, storeIngestionTask, (LeaderProducedRecordContext) null, 1, "blah", 0L);
        ((StoreBufferService) Mockito.verify(storeBufferService)).putConsumerRecord(immutablePubSubMessage2, storeIngestionTask, (LeaderProducedRecordContext) null, 1, "blah", 0L);
        separatedStoreBufferService.putConsumerRecord(immutablePubSubMessage3, storeIngestionTask, (LeaderProducedRecordContext) null, 1, "blah", 0L);
        ((StoreBufferService) Mockito.verify(storeBufferService)).putConsumerRecord(immutablePubSubMessage3, storeIngestionTask, (LeaderProducedRecordContext) null, 1, "blah", 0L);
        ((StoreBufferService) Mockito.verify(storeBufferService, Mockito.never())).drainBufferedRecordsFromTopicPartition((PubSubTopicPartition) ArgumentMatchers.any());
        ((StoreBufferService) Mockito.verify(storeBufferService2, Mockito.never())).drainBufferedRecordsFromTopicPartition((PubSubTopicPartition) ArgumentMatchers.any());
        Mockito.when(Boolean.valueOf(partitionConsumptionState.isDeferredWrite())).thenReturn(false);
        separatedStoreBufferService.putConsumerRecord(immutablePubSubMessage4, storeIngestionTask, (LeaderProducedRecordContext) null, 1, "blah", 0L);
        ((StoreBufferService) Mockito.verify(storeBufferService2)).putConsumerRecord(immutablePubSubMessage4, storeIngestionTask, (LeaderProducedRecordContext) null, 1, "blah", 0L);
        ((StoreBufferService) Mockito.verify(storeBufferService)).drainBufferedRecordsFromTopicPartition((PubSubTopicPartition) ArgumentMatchers.any());
        ((StoreBufferService) Mockito.verify(storeBufferService2)).drainBufferedRecordsFromTopicPartition((PubSubTopicPartition) ArgumentMatchers.any());
    }
}
