package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.kafka.consumer.CachedKafkaMetadataGetter;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.TopicDoesNotExistException;
import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.stats.StatsErrorCode;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/CachedKafkaMetadataGetterTest.class */
public class CachedKafkaMetadataGetterTest {
    private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();

    @Test
    public void testGetEarliestOffset() {
        CachedKafkaMetadataGetter cachedKafkaMetadataGetter = new CachedKafkaMetadataGetter(1000L);
        TopicManager topicManager = (TopicManager) Mockito.mock(TopicManager.class);
        PubSubTopic topic = this.pubSubTopicRepository.getTopic("test_v1");
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(topic, 0);
        Mockito.when(topicManager.getKafkaBootstrapServers()).thenReturn("I_Am_A_Broker_dot_com.com");
        Mockito.when(Long.valueOf(topicManager.getPartitionEarliestOffsetAndRetry((PubSubTopicPartition) ArgumentMatchers.any(), ArgumentMatchers.anyInt()))).thenReturn(1L);
        Assert.assertEquals(Long.valueOf(cachedKafkaMetadataGetter.getEarliestOffset(topicManager, pubSubTopicPartitionImpl)), 1L);
        TopicManager topicManager2 = (TopicManager) Mockito.mock(TopicManager.class);
        Mockito.when(topicManager2.getKafkaBootstrapServers()).thenReturn("I_Am_A_Broker_dot_com.com");
        Mockito.when(Long.valueOf(topicManager2.getPartitionEarliestOffsetAndRetry((PubSubTopicPartition) ArgumentMatchers.any(), ArgumentMatchers.anyInt()))).thenThrow(TopicDoesNotExistException.class);
        Assert.assertEquals(Long.valueOf(cachedKafkaMetadataGetter.getEarliestOffset(topicManager2, pubSubTopicPartitionImpl)), 1L);
        Assert.assertEquals(cachedKafkaMetadataGetter.getEarliestOffset(topicManager2, new PubSubTopicPartitionImpl(topic, 0 + 1)), StatsErrorCode.LAG_MEASUREMENT_FAILURE.code);
    }

    @Test
    public void testCacheWillResetStatusWhenExceptionIsThrown() {
        CachedKafkaMetadataGetter cachedKafkaMetadataGetter = new CachedKafkaMetadataGetter(1000L);
        CachedKafkaMetadataGetter.KafkaMetadataCacheKey kafkaMetadataCacheKey = new CachedKafkaMetadataGetter.KafkaMetadataCacheKey("server", new PubSubTopicPartitionImpl(this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString("topic")), 1));
        VeniceConcurrentHashMap veniceConcurrentHashMap = new VeniceConcurrentHashMap();
        veniceConcurrentHashMap.put(kafkaMetadataCacheKey, new CachedKafkaMetadataGetter.ValueAndExpiryTime(1L, System.nanoTime()));
        TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, () -> {
            Assert.assertEquals((Long) cachedKafkaMetadataGetter.fetchMetadata(kafkaMetadataCacheKey, veniceConcurrentHashMap, () -> {
                return 2L;
            }), 2L);
        });
        TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, () -> {
            Assert.assertThrows(VeniceException.class, () -> {
                cachedKafkaMetadataGetter.fetchMetadata(kafkaMetadataCacheKey, veniceConcurrentHashMap, () -> {
                    throw new VeniceException("dummy exception");
                });
            });
        });
        veniceConcurrentHashMap.put(kafkaMetadataCacheKey, new CachedKafkaMetadataGetter.ValueAndExpiryTime(1L, System.nanoTime()));
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, false, true, () -> {
            Assert.assertEquals((Long) cachedKafkaMetadataGetter.fetchMetadata(kafkaMetadataCacheKey, veniceConcurrentHashMap, () -> {
                if (atomicBoolean.compareAndSet(false, true)) {
                    throw new VeniceException("do not throw this exception!");
                }
                return 2L;
            }), 2L);
        });
    }
}
