package com.linkedin.venice.kafka.consumer;

import com.linkedin.venice.exceptions.UnsubscribedTopicPartitionException;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.kafka.KafkaPubSubMessageDeserializer;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.VeniceProperties;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/kafka/consumer/ApacheKafkaPubSubConsumerAdapterTest.class */
public class ApacheKafkaPubSubConsumerAdapterTest {
    private ApacheKafkaConsumerAdapter apacheKafkaConsumerWithOffsetTrackingDisabled;
    private ApacheKafkaConsumerAdapter apacheKafkaConsumerWithOffsetTrackingEnabled;
    private KafkaConsumer<byte[], byte[]> delegateKafkaConsumer;
    private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();

    @BeforeMethod
    public void initConsumer() {
        this.delegateKafkaConsumer = (KafkaConsumer) Mockito.mock(KafkaConsumer.class);
        Properties properties = new Properties();
        properties.put("key.deserializer", ByteArrayDeserializer.class);
        properties.put("value.deserializer", ByteArrayDeserializer.class);
        properties.setProperty("kafka.bootstrap.servers", "broker address");
        KafkaPubSubMessageDeserializer kafkaPubSubMessageDeserializer = (KafkaPubSubMessageDeserializer) Mockito.mock(KafkaPubSubMessageDeserializer.class);
        this.apacheKafkaConsumerWithOffsetTrackingDisabled = new ApacheKafkaConsumerAdapter(this.delegateKafkaConsumer, new VeniceProperties(properties), false, kafkaPubSubMessageDeserializer);
        this.apacheKafkaConsumerWithOffsetTrackingEnabled = new ApacheKafkaConsumerAdapter(this.delegateKafkaConsumer, new VeniceProperties(properties), true, kafkaPubSubMessageDeserializer);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testApacheKafkaConsumer(boolean z) {
        ApacheKafkaConsumerAdapter apacheKafkaConsumerAdapter = z ? this.apacheKafkaConsumerWithOffsetTrackingEnabled : this.apacheKafkaConsumerWithOffsetTrackingDisabled;
        PubSubTopic topic = this.pubSubTopicRepository.getTopic("test_topic_v1");
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(topic, 1);
        TopicPartition topicPartition = new TopicPartition(topic.getName(), pubSubTopicPartitionImpl.getPartitionNumber());
        Assert.assertThrows(UnsubscribedTopicPartitionException.class, () -> {
            apacheKafkaConsumerAdapter.resetOffset(pubSubTopicPartitionImpl);
        });
        apacheKafkaConsumerAdapter.subscribe(pubSubTopicPartitionImpl, -1L);
        ((KafkaConsumer) Mockito.verify(this.delegateKafkaConsumer)).assign(Collections.singletonList(topicPartition));
        ((KafkaConsumer) Mockito.verify(this.delegateKafkaConsumer)).seekToBeginning(Collections.singletonList(topicPartition));
        ((KafkaConsumer) Mockito.doReturn(Collections.singleton(topicPartition)).when(this.delegateKafkaConsumer)).assignment();
        Assert.assertTrue(apacheKafkaConsumerAdapter.hasAnySubscription());
        apacheKafkaConsumerAdapter.pause(pubSubTopicPartitionImpl);
        ((KafkaConsumer) Mockito.verify(this.delegateKafkaConsumer)).pause(Collections.singletonList(topicPartition));
        apacheKafkaConsumerAdapter.resume(pubSubTopicPartitionImpl);
        ((KafkaConsumer) Mockito.verify(this.delegateKafkaConsumer)).resume(Collections.singletonList(topicPartition));
        apacheKafkaConsumerAdapter.resetOffset(pubSubTopicPartitionImpl);
        ((KafkaConsumer) Mockito.verify(this.delegateKafkaConsumer, Mockito.times(2))).seekToBeginning(Collections.singletonList(topicPartition));
        apacheKafkaConsumerAdapter.unSubscribe(pubSubTopicPartitionImpl);
        ((KafkaConsumer) Mockito.verify(this.delegateKafkaConsumer)).assign(Collections.EMPTY_LIST);
        ((KafkaConsumer) Mockito.doReturn(Collections.EMPTY_SET).when(this.delegateKafkaConsumer)).assignment();
        Assert.assertFalse(apacheKafkaConsumerAdapter.hasAnySubscription());
        apacheKafkaConsumerAdapter.subscribe(pubSubTopicPartitionImpl, 0);
        ((KafkaConsumer) Mockito.verify(this.delegateKafkaConsumer)).seek(topicPartition, 0 + 1);
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        HashSet hashSet3 = new HashSet();
        HashSet hashSet4 = new HashSet();
        PubSubTopic topic2 = this.pubSubTopicRepository.getTopic("test_topic_v2");
        for (int i = 0; i < 5; i++) {
            PubSubTopicPartitionImpl pubSubTopicPartitionImpl2 = new PubSubTopicPartitionImpl(topic, i);
            hashSet.add(pubSubTopicPartitionImpl2);
            hashSet4.add(new TopicPartition(topic.getName(), i));
            hashSet3.add(pubSubTopicPartitionImpl2);
            apacheKafkaConsumerAdapter.subscribe(pubSubTopicPartitionImpl2, -1L);
        }
        for (int i2 = 0; i2 < 3; i2++) {
            PubSubTopicPartitionImpl pubSubTopicPartitionImpl3 = new PubSubTopicPartitionImpl(topic2, i2);
            TopicPartition topicPartition2 = new TopicPartition(topic2.getName(), i2);
            hashSet2.add(topicPartition2);
            hashSet4.add(topicPartition2);
            hashSet3.add(pubSubTopicPartitionImpl3);
            apacheKafkaConsumerAdapter.subscribe(pubSubTopicPartitionImpl3, -1L);
        }
        ((KafkaConsumer) Mockito.doReturn(hashSet4).when(this.delegateKafkaConsumer)).assignment();
        Assert.assertTrue(apacheKafkaConsumerAdapter.hasSubscription(pubSubTopicPartitionImpl));
        Assert.assertEquals(apacheKafkaConsumerAdapter.getAssignment(), hashSet3);
        apacheKafkaConsumerAdapter.batchUnsubscribe(hashSet);
        ((KafkaConsumer) Mockito.verify(this.delegateKafkaConsumer)).assign(hashSet2);
        apacheKafkaConsumerAdapter.close();
        ((KafkaConsumer) Mockito.verify(this.delegateKafkaConsumer)).close((Duration) Mockito.eq(Duration.ZERO));
    }
}
