package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.stats.KafkaConsumerServiceStats;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.kafka.KafkaPubSubMessageDeserializer;
import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer;
import com.linkedin.venice.throttle.EventThrottler;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
import io.tehuti.metrics.MetricsRepository;
import io.tehuti.metrics.Sensor;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceTest.class */
public class KafkaConsumerServiceTest {
    private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
    private final KafkaPubSubMessageDeserializer pubSubDeserializer = new KafkaPubSubMessageDeserializer(new OptimizedKafkaValueSerializer(), new LandFillObjectPool(KafkaMessageEnvelope::new), new LandFillObjectPool(KafkaMessageEnvelope::new));

    @Test
    public void testTopicWiseGetConsumer() throws Exception {
        ApacheKafkaConsumerAdapter apacheKafkaConsumerAdapter = (ApacheKafkaConsumerAdapter) Mockito.mock(ApacheKafkaConsumerAdapter.class);
        Mockito.when(Boolean.valueOf(apacheKafkaConsumerAdapter.hasAnySubscription())).thenReturn(true);
        PubSubConsumerAdapter pubSubConsumerAdapter = (ApacheKafkaConsumerAdapter) Mockito.mock(ApacheKafkaConsumerAdapter.class);
        Mockito.when(Boolean.valueOf(pubSubConsumerAdapter.hasAnySubscription())).thenReturn(true);
        String uniqueString = Utils.getUniqueString("test_consumer_service1");
        StoreIngestionTask storeIngestionTask = (StoreIngestionTask) Mockito.mock(StoreIngestionTask.class);
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(Version.composeKafkaTopic(uniqueString, 1));
        Mockito.when(storeIngestionTask.getVersionTopic()).thenReturn(topic);
        Mockito.when(Boolean.valueOf(storeIngestionTask.isHybridMode())).thenReturn(true);
        PubSubTopic topic2 = this.pubSubTopicRepository.getTopic(Version.composeKafkaTopic(Utils.getUniqueString("test_consumer_service2"), 1));
        StoreIngestionTask storeIngestionTask2 = (StoreIngestionTask) Mockito.mock(StoreIngestionTask.class);
        Mockito.when(storeIngestionTask2.getVersionTopic()).thenReturn(topic2);
        Mockito.when(Boolean.valueOf(storeIngestionTask2.isHybridMode())).thenReturn(true);
        PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory = (PubSubConsumerAdapterFactory) Mockito.mock(PubSubConsumerAdapterFactory.class);
        Mockito.when(pubSubConsumerAdapterFactory.create((VeniceProperties) Mockito.any(), ArgumentMatchers.anyBoolean(), (PubSubMessageDeserializer) Mockito.any(), (String) Mockito.any())).thenReturn(apacheKafkaConsumerAdapter, new PubSubConsumerAdapter[]{pubSubConsumerAdapter});
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", "test_kafka_url");
        MetricsRepository metricsRepository = (MetricsRepository) Mockito.mock(MetricsRepository.class);
        ((MetricsRepository) Mockito.doReturn((Sensor) Mockito.mock(Sensor.class)).when(metricsRepository)).sensor(Mockito.anyString(), (Sensor[]) Mockito.any());
        TopicWiseKafkaConsumerService topicWiseKafkaConsumerService = new TopicWiseKafkaConsumerService(pubSubConsumerAdapterFactory, properties, 1000L, 2, (EventThrottler) Mockito.mock(EventThrottler.class), (EventThrottler) Mockito.mock(EventThrottler.class), (KafkaClusterBasedRecordThrottler) Mockito.mock(KafkaClusterBasedRecordThrottler.class), metricsRepository, "test_kafka_cluster_alias", TimeUnit.MINUTES.toMillis(1L), (TopicExistenceChecker) Mockito.mock(TopicExistenceChecker.class), false, this.pubSubDeserializer, SystemTime.INSTANCE, (KafkaConsumerServiceStats) null, false);
        topicWiseKafkaConsumerService.start();
        PubSubTopic versionTopic = storeIngestionTask.getVersionTopic();
        SharedKafkaConsumer assignConsumerFor = topicWiseKafkaConsumerService.assignConsumerFor(versionTopic, new PubSubTopicPartitionImpl(versionTopic, 0));
        PubSubTopic versionTopic2 = storeIngestionTask2.getVersionTopic();
        SharedKafkaConsumer assignConsumerFor2 = topicWiseKafkaConsumerService.assignConsumerFor(versionTopic2, new PubSubTopicPartitionImpl(versionTopic2, 0));
        Assert.assertNotEquals(assignConsumerFor, assignConsumerFor2, "We should avoid to share consumer when there is consumer not assigned topic.");
        Set<PubSubTopicPartition> pubSubTopicPartitionsSet = getPubSubTopicPartitionsSet(topic, 5);
        Mockito.when(apacheKafkaConsumerAdapter.getAssignment()).thenReturn(pubSubTopicPartitionsSet);
        Set<PubSubTopicPartition> pubSubTopicPartitionsSet2 = getPubSubTopicPartitionsSet(topic2, 3);
        Mockito.when(pubSubConsumerAdapter.getAssignment()).thenReturn(pubSubTopicPartitionsSet2);
        assignConsumerFor.setCurrentAssignment(pubSubTopicPartitionsSet);
        assignConsumerFor2.setCurrentAssignment(pubSubTopicPartitionsSet2);
        PubSubTopic topic3 = this.pubSubTopicRepository.getTopic(Version.composeKafkaTopic(Utils.getUniqueString("test_consumer_service3"), 1));
        StoreIngestionTask storeIngestionTask3 = (StoreIngestionTask) Mockito.mock(StoreIngestionTask.class);
        Mockito.when(storeIngestionTask3.getVersionTopic()).thenReturn(topic3);
        Mockito.when(Boolean.valueOf(storeIngestionTask3.isHybridMode())).thenReturn(true);
        PubSubTopic versionTopic3 = storeIngestionTask3.getVersionTopic();
        Assert.assertEquals(topicWiseKafkaConsumerService.assignConsumerFor(versionTopic3, new PubSubTopicPartitionImpl(versionTopic3, 0)), assignConsumerFor2, "The assigned consumer should come with least partitions, when no zero loaded consumer available.");
        topicWiseKafkaConsumerService.stop();
    }

    private Set<PubSubTopicPartition> getPubSubTopicPartitionsSet(PubSubTopic pubSubTopic, int i) {
        HashSet hashSet = new HashSet();
        for (int i2 = 0; i2 < i; i2++) {
            hashSet.add(new PubSubTopicPartitionImpl(pubSubTopic, i2));
        }
        return hashSet;
    }

    @Test
    public void testTopicWiseGetConsumerForHybridMode() throws Exception {
        ApacheKafkaConsumerAdapter apacheKafkaConsumerAdapter = (ApacheKafkaConsumerAdapter) Mockito.mock(ApacheKafkaConsumerAdapter.class);
        Mockito.when(Boolean.valueOf(apacheKafkaConsumerAdapter.hasAnySubscription())).thenReturn(false);
        PubSubConsumerAdapter pubSubConsumerAdapter = (ApacheKafkaConsumerAdapter) Mockito.mock(ApacheKafkaConsumerAdapter.class);
        Mockito.when(Boolean.valueOf(apacheKafkaConsumerAdapter.hasAnySubscription())).thenReturn(false);
        PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory = (PubSubConsumerAdapterFactory) Mockito.mock(PubSubConsumerAdapterFactory.class);
        Mockito.when(pubSubConsumerAdapterFactory.create((VeniceProperties) Mockito.any(), ArgumentMatchers.anyBoolean(), (PubSubMessageDeserializer) Mockito.any(), (String) Mockito.any())).thenReturn(apacheKafkaConsumerAdapter, new PubSubConsumerAdapter[]{pubSubConsumerAdapter});
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", "test_kafka_url");
        MetricsRepository metricsRepository = (MetricsRepository) Mockito.mock(MetricsRepository.class);
        ((MetricsRepository) Mockito.doReturn((Sensor) Mockito.mock(Sensor.class)).when(metricsRepository)).sensor(Mockito.anyString(), (Sensor[]) Mockito.any());
        TopicWiseKafkaConsumerService topicWiseKafkaConsumerService = new TopicWiseKafkaConsumerService(pubSubConsumerAdapterFactory, properties, 1000L, 2, (EventThrottler) Mockito.mock(EventThrottler.class), (EventThrottler) Mockito.mock(EventThrottler.class), (KafkaClusterBasedRecordThrottler) Mockito.mock(KafkaClusterBasedRecordThrottler.class), metricsRepository, "test_kafka_cluster_alias", TimeUnit.MINUTES.toMillis(1L), (TopicExistenceChecker) Mockito.mock(TopicExistenceChecker.class), false, this.pubSubDeserializer, SystemTime.INSTANCE, (KafkaConsumerServiceStats) null, false);
        topicWiseKafkaConsumerService.start();
        String uniqueString = Utils.getUniqueString("test_consumer_service");
        StoreIngestionTask storeIngestionTask = (StoreIngestionTask) Mockito.mock(StoreIngestionTask.class);
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(Version.composeKafkaTopic(uniqueString, 1));
        Mockito.when(storeIngestionTask.getVersionTopic()).thenReturn(topic);
        Mockito.when(Boolean.valueOf(storeIngestionTask.isHybridMode())).thenReturn(true);
        SharedKafkaConsumer assignConsumerFor = topicWiseKafkaConsumerService.assignConsumerFor(topic, new PubSubTopicPartitionImpl(storeIngestionTask.getVersionTopic(), 0));
        Assert.assertEquals(topicWiseKafkaConsumerService.assignConsumerFor(topic, new PubSubTopicPartitionImpl(storeIngestionTask.getVersionTopic(), 0)), assignConsumerFor, "The 'getConsumer' function should be idempotent");
        String composeKafkaTopic = Version.composeKafkaTopic(uniqueString, 2);
        StoreIngestionTask storeIngestionTask2 = (StoreIngestionTask) Mockito.mock(StoreIngestionTask.class);
        PubSubTopic topic2 = this.pubSubTopicRepository.getTopic(composeKafkaTopic);
        Mockito.when(storeIngestionTask2.getVersionTopic()).thenReturn(topic2);
        Mockito.when(Boolean.valueOf(storeIngestionTask2.isHybridMode())).thenReturn(true);
        Assert.assertNotEquals(topicWiseKafkaConsumerService.assignConsumerFor(topic2, new PubSubTopicPartitionImpl(topic2, 0)), assignConsumerFor, "The 'getConsumer' function should return a different consumer from v1");
        PubSubTopic topic3 = this.pubSubTopicRepository.getTopic(Version.composeKafkaTopic(uniqueString, 3));
        StoreIngestionTask storeIngestionTask3 = (StoreIngestionTask) Mockito.mock(StoreIngestionTask.class);
        Mockito.when(storeIngestionTask3.getVersionTopic()).thenReturn(topic3);
        Mockito.when(Boolean.valueOf(storeIngestionTask3.isHybridMode())).thenReturn(true);
        try {
            topicWiseKafkaConsumerService.assignConsumerFor(topic3, new PubSubTopicPartitionImpl(topic3, 0));
            Assert.fail("An exception should be thrown since all 2 consumers should be occupied by other versions");
        } catch (VeniceException e) {
        }
        topicWiseKafkaConsumerService.stop();
    }

    @Test
    public void testPartitionWiseGetConsumer() {
        ApacheKafkaConsumerAdapter apacheKafkaConsumerAdapter = (ApacheKafkaConsumerAdapter) Mockito.mock(ApacheKafkaConsumerAdapter.class);
        Mockito.when(Boolean.valueOf(apacheKafkaConsumerAdapter.hasAnySubscription())).thenReturn(true);
        PubSubConsumerAdapter pubSubConsumerAdapter = (ApacheKafkaConsumerAdapter) Mockito.mock(ApacheKafkaConsumerAdapter.class);
        Mockito.when(Boolean.valueOf(pubSubConsumerAdapter.hasAnySubscription())).thenReturn(true);
        String uniqueString = Utils.getUniqueString("test_consumer_service1");
        StoreIngestionTask storeIngestionTask = (StoreIngestionTask) Mockito.mock(StoreIngestionTask.class);
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(Version.composeKafkaTopic(uniqueString, 1));
        Mockito.when(storeIngestionTask.getVersionTopic()).thenReturn(topic);
        Mockito.when(Boolean.valueOf(storeIngestionTask.isHybridMode())).thenReturn(true);
        PubSubTopic topic2 = this.pubSubTopicRepository.getTopic(Version.composeKafkaTopic(Utils.getUniqueString("test_consumer_service2"), 1));
        StoreIngestionTask storeIngestionTask2 = (StoreIngestionTask) Mockito.mock(StoreIngestionTask.class);
        Mockito.when(storeIngestionTask2.getVersionTopic()).thenReturn(topic2);
        Mockito.when(Boolean.valueOf(storeIngestionTask2.isHybridMode())).thenReturn(true);
        PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory = (PubSubConsumerAdapterFactory) Mockito.mock(PubSubConsumerAdapterFactory.class);
        Mockito.when(pubSubConsumerAdapterFactory.create((VeniceProperties) Mockito.any(), ArgumentMatchers.anyBoolean(), (PubSubMessageDeserializer) Mockito.any(), (String) Mockito.any())).thenReturn(apacheKafkaConsumerAdapter, new PubSubConsumerAdapter[]{pubSubConsumerAdapter});
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", "test_kafka_url");
        MetricsRepository metricsRepository = (MetricsRepository) Mockito.mock(MetricsRepository.class);
        ((MetricsRepository) Mockito.doReturn((Sensor) Mockito.mock(Sensor.class)).when(metricsRepository)).sensor(Mockito.anyString(), (Sensor[]) Mockito.any());
        PartitionWiseKafkaConsumerService partitionWiseKafkaConsumerService = new PartitionWiseKafkaConsumerService(pubSubConsumerAdapterFactory, properties, 1000L, 2, (EventThrottler) Mockito.mock(EventThrottler.class), (EventThrottler) Mockito.mock(EventThrottler.class), (KafkaClusterBasedRecordThrottler) Mockito.mock(KafkaClusterBasedRecordThrottler.class), metricsRepository, "test_kafka_cluster_alias", TimeUnit.MINUTES.toMillis(1L), (TopicExistenceChecker) Mockito.mock(TopicExistenceChecker.class), false, this.pubSubDeserializer, SystemTime.INSTANCE, (KafkaConsumerServiceStats) null, false);
        partitionWiseKafkaConsumerService.start();
        SharedKafkaConsumer assignConsumerFor = partitionWiseKafkaConsumerService.assignConsumerFor(topic, new PubSubTopicPartitionImpl(topic, 0));
        SharedKafkaConsumer assignConsumerFor2 = partitionWiseKafkaConsumerService.assignConsumerFor(topic, new PubSubTopicPartitionImpl(topic, 1));
        SharedKafkaConsumer assignConsumerFor3 = partitionWiseKafkaConsumerService.assignConsumerFor(topic, new PubSubTopicPartitionImpl(topic, 2));
        SharedKafkaConsumer assignConsumerFor4 = partitionWiseKafkaConsumerService.assignConsumerFor(topic, new PubSubTopicPartitionImpl(topic, 3));
        SharedKafkaConsumer assignConsumerFor5 = partitionWiseKafkaConsumerService.assignConsumerFor(topic2, new PubSubTopicPartitionImpl(topic2, 4));
        SharedKafkaConsumer assignConsumerFor6 = partitionWiseKafkaConsumerService.assignConsumerFor(topic2, new PubSubTopicPartitionImpl(topic2, 5));
        Assert.assertNotEquals(assignConsumerFor, assignConsumerFor2);
        Assert.assertNotEquals(assignConsumerFor5, assignConsumerFor6);
        Assert.assertEquals(assignConsumerFor, assignConsumerFor3);
        Assert.assertEquals(assignConsumerFor4, assignConsumerFor6);
    }
}
