package com.linkedin.venice.kafka;

import com.github.benmanes.caffeine.cache.Cache;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.kafka.consumer.AggKafkaConsumerService;
import com.linkedin.davinci.kafka.consumer.KafkaClusterBasedRecordThrottler;
import com.linkedin.davinci.kafka.consumer.KafkaConsumerService;
import com.linkedin.davinci.kafka.consumer.StoreIngestionTask;
import com.linkedin.davinci.kafka.consumer.StorePartitionDataReceiver;
import com.linkedin.davinci.kafka.consumer.TopicExistenceChecker;
import com.linkedin.venice.integration.utils.PubSubBrokerConfigs;
import com.linkedin.venice.integration.utils.PubSubBrokerWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.EndOfPush;
import com.linkedin.venice.kafka.protocol.GUID;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.LeaderMetadata;
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerAdapter;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerConfig;
import com.linkedin.venice.pubsub.api.PubSubMessageHeaders;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.kafka.KafkaPubSubMessageDeserializer;
import com.linkedin.venice.serialization.KafkaKeySerializer;
import com.linkedin.venice.serialization.avro.KafkaValueSerializer;
import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer;
import com.linkedin.venice.stats.TehutiUtils;
import com.linkedin.venice.throttle.EventThrottler;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestMockTime;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
import io.tehuti.metrics.MetricsRepository;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/kafka/KafkaConsumptionTest.class */
public class KafkaConsumptionTest {
    private static final int WAIT_TIME_IN_SECONDS = 10;
    private static final long MIN_COMPACTION_LAG = 86400000;
    private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
    private PubSubBrokerWrapper localKafka;
    private PubSubBrokerWrapper remoteKafka;
    private TopicManager topicManager;
    private TopicManager remoteTopicManager;
    private TestMockTime mockTime;
    private TestMockTime remoteMockTime;
    private PubSubTopic versionTopic;

    private PubSubTopic getTopic() {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString(Thread.currentThread().getStackTrace()[2].getMethodName()));
        this.topicManager.createTopic(topic, 1, 1, false);
        TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, () -> {
            Assert.assertTrue(this.topicManager.containsTopicAndAllPartitionsAreOnline(topic));
        });
        this.remoteTopicManager.createTopic(topic, 1, 1, false);
        TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, () -> {
            Assert.assertTrue(this.remoteTopicManager.containsTopicAndAllPartitionsAreOnline(topic));
        });
        return topic;
    }

    @BeforeClass
    public void setUp() {
        this.mockTime = new TestMockTime();
        this.localKafka = ServiceFactory.getPubSubBroker(new PubSubBrokerConfigs.Builder().setMockTime(this.mockTime).build());
        this.topicManager = IntegrationTestPushUtils.getTopicManagerRepo(30000L, 100L, MIN_COMPACTION_LAG, this.localKafka.getAddress(), this.pubSubTopicRepository).getTopicManager();
        Cache cache = (Cache) Mockito.mock(Cache.class);
        Mockito.when(cache.getIfPresent(Mockito.any())).thenReturn((Object) null);
        this.topicManager.setTopicConfigCache(cache);
        this.remoteMockTime = new TestMockTime();
        this.remoteKafka = ServiceFactory.getPubSubBroker(new PubSubBrokerConfigs.Builder().setMockTime(this.remoteMockTime).build());
        this.remoteTopicManager = IntegrationTestPushUtils.getTopicManagerRepo(30000L, 100L, MIN_COMPACTION_LAG, this.remoteKafka.getAddress(), this.pubSubTopicRepository).getTopicManager();
        Cache cache2 = (Cache) Mockito.mock(Cache.class);
        Mockito.when(cache2.getIfPresent(Mockito.any())).thenReturn((Object) null);
        this.remoteTopicManager.setTopicConfigCache(cache2);
    }

    @AfterClass
    public void cleanUp() {
        this.topicManager.close();
        this.localKafka.close();
        this.remoteTopicManager.close();
        this.remoteKafka.close();
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = 10000)
    public void testLocalAndRemoteConsumption(boolean z) throws ExecutionException, InterruptedException {
        EventThrottler eventThrottler = (EventThrottler) Mockito.mock(EventThrottler.class);
        EventThrottler eventThrottler2 = (EventThrottler) Mockito.mock(EventThrottler.class);
        KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler = new KafkaClusterBasedRecordThrottler(new HashMap());
        MetricsRepository metricsRepository = TehutiUtils.getMetricsRepository(getClass().getName());
        VeniceServerConfig veniceServerConfig = (VeniceServerConfig) Mockito.mock(VeniceServerConfig.class);
        ((VeniceServerConfig) Mockito.doReturn(10L).when(veniceServerConfig)).getKafkaReadCycleDelayMs();
        ((VeniceServerConfig) Mockito.doReturn(2).when(veniceServerConfig)).getConsumerPoolSizePerKafkaCluster();
        ((VeniceServerConfig) Mockito.doReturn(10L).when(veniceServerConfig)).getSharedConsumerNonExistingTopicCleanupDelayMS();
        ((VeniceServerConfig) Mockito.doReturn(true).when(veniceServerConfig)).isLiveConfigBasedKafkaThrottlingEnabled();
        if (z) {
            ((VeniceServerConfig) Mockito.doReturn(KafkaConsumerService.ConsumerAssignmentStrategy.TOPIC_WISE_SHARED_CONSUMER_ASSIGNMENT_STRATEGY).when(veniceServerConfig)).getSharedConsumerAssignmentStrategy();
        } else {
            ((VeniceServerConfig) Mockito.doReturn(KafkaConsumerService.ConsumerAssignmentStrategy.PARTITION_WISE_SHARED_CONSUMER_ASSIGNMENT_STRATEGY).when(veniceServerConfig)).getSharedConsumerAssignmentStrategy();
        }
        String address = this.localKafka.getAddress();
        String address2 = this.remoteKafka.getAddress();
        HashMap hashMap = new HashMap();
        hashMap.put(address, address);
        hashMap.put(address2, address2);
        ((VeniceServerConfig) Mockito.doReturn(hashMap).when(veniceServerConfig)).getKafkaClusterUrlToAliasMap();
        Object2IntOpenHashMap object2IntOpenHashMap = new Object2IntOpenHashMap(2);
        object2IntOpenHashMap.put(address, 0);
        object2IntOpenHashMap.put(address2, 1);
        ((VeniceServerConfig) Mockito.doReturn(object2IntOpenHashMap).when(veniceServerConfig)).getKafkaClusterUrlToIdMap();
        AggKafkaConsumerService aggKafkaConsumerService = new AggKafkaConsumerService(IntegrationTestPushUtils.getVeniceConsumerFactory(), str -> {
            return new VeniceProperties();
        }, veniceServerConfig, eventThrottler, eventThrottler2, kafkaClusterBasedRecordThrottler, metricsRepository, (TopicExistenceChecker) Mockito.mock(TopicExistenceChecker.class), new KafkaPubSubMessageDeserializer(new OptimizedKafkaValueSerializer(), new LandFillObjectPool(KafkaMessageEnvelope::new), new LandFillObjectPool(KafkaMessageEnvelope::new)));
        this.versionTopic = getTopic();
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(this.versionTopic, 0);
        StoreIngestionTask storeIngestionTask = (StoreIngestionTask) Mockito.mock(StoreIngestionTask.class);
        ((StoreIngestionTask) Mockito.doReturn(this.versionTopic).when(storeIngestionTask)).getVersionTopic();
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", address);
        properties.put("key.deserializer", ByteArrayDeserializer.class);
        properties.put("value.deserializer", ByteArrayDeserializer.class);
        properties.put("receive.buffer.bytes", 1048576);
        aggKafkaConsumerService.createKafkaConsumerService(properties);
        StorePartitionDataReceiver subscribeConsumerFor = aggKafkaConsumerService.subscribeConsumerFor(address, storeIngestionTask, pubSubTopicPartitionImpl, -1L);
        Assert.assertTrue(aggKafkaConsumerService.hasConsumerAssignedFor(address, this.versionTopic, pubSubTopicPartitionImpl));
        Properties properties2 = new Properties();
        properties2.put("kafka.bootstrap.servers", address2);
        properties2.put("key.deserializer", ByteArrayDeserializer.class);
        properties2.put("value.deserializer", ByteArrayDeserializer.class);
        properties2.put("receive.buffer.bytes", 1048576);
        aggKafkaConsumerService.createKafkaConsumerService(properties2);
        StorePartitionDataReceiver subscribeConsumerFor2 = aggKafkaConsumerService.subscribeConsumerFor(address2, storeIngestionTask, pubSubTopicPartitionImpl, -1L);
        Assert.assertTrue(aggKafkaConsumerService.hasConsumerAssignedFor(address2, this.versionTopic, pubSubTopicPartitionImpl));
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < 10; i++) {
            currentTimeMillis += 1000;
            produceToKafka(this.versionTopic.getName(), true, currentTimeMillis, address);
        }
        for (int i2 = 0; i2 < 3; i2++) {
            currentTimeMillis += 1000;
            produceToKafka(this.versionTopic.getName(), false, currentTimeMillis, address);
        }
        int i3 = 10 + 3;
        TestUtils.waitForNonDeterministicCompletion(1000L, TimeUnit.MILLISECONDS, () -> {
            return subscribeConsumerFor.receivedRecordsCount() == ((long) i3);
        });
        long currentTimeMillis2 = System.currentTimeMillis();
        for (int i4 = 0; i4 < 5; i4++) {
            currentTimeMillis2 += 1000;
            produceToKafka(this.versionTopic.getName(), true, currentTimeMillis2, address2);
        }
        for (int i5 = 0; i5 < 4; i5++) {
            currentTimeMillis2 += 1000;
            produceToKafka(this.versionTopic.getName(), false, currentTimeMillis2, address2);
        }
        int i6 = 5 + 4;
        TestUtils.waitForNonDeterministicCompletion(1000L, TimeUnit.MILLISECONDS, () -> {
            return subscribeConsumerFor2.receivedRecordsCount() == ((long) i6);
        });
    }

    private void produceToKafka(String str, boolean z, long j, String str2) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put("kafka.key.serializer", KafkaKeySerializer.class.getName());
        properties.put("kafka.value.serializer", KafkaValueSerializer.class.getName());
        properties.put("kafka.bootstrap.servers", str2);
        ApacheKafkaProducerAdapter apacheKafkaProducerAdapter = new ApacheKafkaProducerAdapter(new ApacheKafkaProducerConfig(properties));
        byte[] bArr = {0, 1};
        KafkaKey kafkaKey = new KafkaKey(z ? MessageType.PUT : MessageType.CONTROL_MESSAGE, bArr);
        KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope();
        kafkaMessageEnvelope.producerMetadata = new ProducerMetadata();
        kafkaMessageEnvelope.producerMetadata.producerGUID = new GUID();
        kafkaMessageEnvelope.producerMetadata.messageTimestamp = j;
        kafkaMessageEnvelope.leaderMetadataFooter = new LeaderMetadata();
        kafkaMessageEnvelope.leaderMetadataFooter.hostName = "localhost";
        if (z) {
            Put put = new Put();
            put.putValue = ByteBuffer.wrap(new byte[]{0, 1});
            put.replicationMetadataPayload = ByteBuffer.wrap(bArr);
            kafkaMessageEnvelope.payloadUnion = put;
        } else {
            ControlMessage controlMessage = new ControlMessage();
            controlMessage.controlMessageType = ControlMessageType.END_OF_PUSH.getValue();
            controlMessage.controlMessageUnion = new EndOfPush();
            controlMessage.debugInfo = Collections.emptyMap();
            kafkaMessageEnvelope.payloadUnion = controlMessage;
        }
        apacheKafkaProducerAdapter.sendMessage(str, (Integer) null, kafkaKey, kafkaMessageEnvelope, (PubSubMessageHeaders) null, (PubSubProducerCallback) null).get();
    }
}
