package com.linkedin.davinci.kafka.consumer;

import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.throttle.EventThrottler;
import com.linkedin.venice.unit.kafka.InMemoryKafkaBroker;
import com.linkedin.venice.utils.TestMockTime;
import com.linkedin.venice.utils.Utils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/KafkaClusterBasedRecordThrottlerTest.class */
public class KafkaClusterBasedRecordThrottlerTest {
    @Test
    public void testRecordsCanBeThrottledPerRegion() throws ExecutionException, InterruptedException {
        String uniqueString = Utils.getUniqueString("topic");
        InMemoryKafkaBroker inMemoryKafkaBroker = new InMemoryKafkaBroker("local");
        inMemoryKafkaBroker.createTopic(uniqueString, 2);
        InMemoryKafkaBroker inMemoryKafkaBroker2 = new InMemoryKafkaBroker("remote");
        inMemoryKafkaBroker2.createTopic(uniqueString, 2);
        new HashMap().put("server.enable.live.config.based.kafka.throttling", true);
        new Properties().put("kafka.bootstrap.servers", inMemoryKafkaBroker.getKafkaBootstrapServer());
        AtomicLong atomicLong = new AtomicLong(10L);
        TestMockTime testMockTime = new TestMockTime();
        EventThrottler eventThrottler = new EventThrottler(testMockTime, -1L, 1000L, "local_throttler", true, EventThrottler.REJECT_STRATEGY);
        Objects.requireNonNull(atomicLong);
        EventThrottler eventThrottler2 = new EventThrottler(testMockTime, atomicLong::get, 1000L, "remote_throttler", true, EventThrottler.REJECT_STRATEGY);
        HashMap hashMap = new HashMap();
        hashMap.put(inMemoryKafkaBroker.getKafkaBootstrapServer(), eventThrottler);
        hashMap.put(inMemoryKafkaBroker2.getKafkaBootstrapServer(), eventThrottler2);
        KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler = new KafkaClusterBasedRecordThrottler(hashMap);
        HashMap hashMap2 = new HashMap();
        PubSubTopicPartition pubSubTopicPartition = (PubSubTopicPartition) Mockito.mock(PubSubTopicPartition.class);
        hashMap2.put(pubSubTopicPartition, new ArrayList());
        for (int i = 0; i < 10; i++) {
            ((List) hashMap2.get(pubSubTopicPartition)).add((PubSubMessage) Mockito.mock(PubSubMessage.class));
        }
        PubSubConsumerAdapter pubSubConsumerAdapter = (PubSubConsumerAdapter) Mockito.mock(PubSubConsumerAdapter.class);
        PubSubConsumerAdapter pubSubConsumerAdapter2 = (PubSubConsumerAdapter) Mockito.mock(PubSubConsumerAdapter.class);
        ((PubSubConsumerAdapter) Mockito.doReturn(hashMap2).when(pubSubConsumerAdapter)).poll(Mockito.anyLong());
        ((PubSubConsumerAdapter) Mockito.doReturn(hashMap2).when(pubSubConsumerAdapter2)).poll(Mockito.anyLong());
        Map poll = kafkaClusterBasedRecordThrottler.poll(pubSubConsumerAdapter, inMemoryKafkaBroker.getKafkaBootstrapServer(), 1000L);
        Map poll2 = kafkaClusterBasedRecordThrottler.poll(pubSubConsumerAdapter2, inMemoryKafkaBroker2.getKafkaBootstrapServer(), 1000L);
        Assert.assertSame(poll, hashMap2);
        Assert.assertSame(poll2, hashMap2);
        atomicLong.set(0L);
        Map poll3 = kafkaClusterBasedRecordThrottler.poll(pubSubConsumerAdapter, inMemoryKafkaBroker.getKafkaBootstrapServer(), 1000L);
        Map poll4 = kafkaClusterBasedRecordThrottler.poll(pubSubConsumerAdapter2, inMemoryKafkaBroker2.getKafkaBootstrapServer(), 1000L);
        Assert.assertSame(poll3, hashMap2);
        Assert.assertNotSame(poll4, hashMap2);
        Assert.assertTrue(poll4.isEmpty());
        atomicLong.set(10L);
        testMockTime.sleep(1000L);
        Map poll5 = kafkaClusterBasedRecordThrottler.poll(pubSubConsumerAdapter, inMemoryKafkaBroker.getKafkaBootstrapServer(), 1000L);
        Map poll6 = kafkaClusterBasedRecordThrottler.poll(pubSubConsumerAdapter2, inMemoryKafkaBroker2.getKafkaBootstrapServer(), 1000L);
        Assert.assertSame(poll5, hashMap2);
        Assert.assertSame(poll6, hashMap2);
    }
}
