package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.kafka.consumer.StoreIngestionTaskFactory;
import com.linkedin.davinci.stats.AggHostLevelIngestionStats;
import com.linkedin.davinci.stats.HostLevelIngestionStats;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.venice.exceptions.VeniceTimeoutException;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.utils.ExceptionCaptorNotifier;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import it.unimi.dsi.fastutil.objects.Object2IntMaps;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import java.util.function.BooleanSupplier;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/PushTimeoutTest.class */
public class PushTimeoutTest {
    private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();

    @Test
    public void testPushTimeoutForLeaderFollowerStores() {
        String uniqueString = Utils.getUniqueString("store");
        ExceptionCaptorNotifier exceptionCaptorNotifier = new ExceptionCaptorNotifier();
        ArrayDeque arrayDeque = new ArrayDeque();
        arrayDeque.add(exceptionCaptorNotifier);
        VeniceServerConfig veniceServerConfig = (VeniceServerConfig) Mockito.mock(VeniceServerConfig.class);
        ((VeniceServerConfig) Mockito.doReturn(Object2IntMaps.emptyMap()).when(veniceServerConfig)).getKafkaClusterUrlToIdMap();
        AggHostLevelIngestionStats aggHostLevelIngestionStats = (AggHostLevelIngestionStats) Mockito.mock(AggHostLevelIngestionStats.class);
        ((AggHostLevelIngestionStats) Mockito.doReturn((HostLevelIngestionStats) Mockito.mock(HostLevelIngestionStats.class)).when(aggHostLevelIngestionStats)).getStoreStats(Mockito.anyString());
        StoreIngestionTaskFactory.Builder pubSubTopicRepository = TestUtils.getStoreIngestionTaskBuilder(uniqueString).setLeaderFollowerNotifiersQueue(arrayDeque).setServerConfig(veniceServerConfig).setHostLevelIngestionStats(aggHostLevelIngestionStats).setPubSubTopicRepository(this.pubSubTopicRepository);
        Store storeOrThrow = pubSubTopicRepository.getMetadataRepo().getStoreOrThrow(uniqueString);
        Version version = (Version) storeOrThrow.getVersion(1).get();
        Properties properties = (Properties) Mockito.mock(Properties.class);
        ((Properties) Mockito.doReturn("localhost").when(properties)).getProperty((String) Mockito.eq("kafka.bootstrap.servers"));
        VeniceStoreVersionConfig veniceStoreVersionConfig = (VeniceStoreVersionConfig) Mockito.mock(VeniceStoreVersionConfig.class);
        String kafkaTopicName = version.kafkaTopicName();
        ((VeniceStoreVersionConfig) Mockito.doReturn(kafkaTopicName).when(veniceStoreVersionConfig)).getStoreVersionName();
        LeaderFollowerStoreIngestionTask leaderFollowerStoreIngestionTask = new LeaderFollowerStoreIngestionTask(pubSubTopicRepository, storeOrThrow, version, properties, (BooleanSupplier) Mockito.mock(BooleanSupplier.class), veniceStoreVersionConfig, 0, false, Optional.empty());
        leaderFollowerStoreIngestionTask.subscribePartition(new PubSubTopicPartitionImpl(this.pubSubTopicRepository.getTopic(kafkaTopicName), 0), Optional.empty());
        leaderFollowerStoreIngestionTask.run();
        Exception latestException = exceptionCaptorNotifier.getLatestException();
        Assert.assertNotNull(latestException, "Latest exception should not be null.");
        Assert.assertTrue(latestException instanceof VeniceTimeoutException, "Should have caught an instance of " + VeniceTimeoutException.class.getSimpleName() + "but instead got: " + latestException.getClass().getSimpleName() + ".");
    }

    @Test
    public void testReportIfCatchUpBaseTopicOffsetRouteWillNotMakePushTimeout() {
        String uniqueString = Utils.getUniqueString("store");
        ExceptionCaptorNotifier exceptionCaptorNotifier = new ExceptionCaptorNotifier();
        ArrayDeque arrayDeque = new ArrayDeque();
        arrayDeque.add(exceptionCaptorNotifier);
        StorageMetadataService storageMetadataService = (StorageMetadataService) Mockito.mock(StorageMetadataService.class);
        VeniceServerConfig veniceServerConfig = (VeniceServerConfig) Mockito.mock(VeniceServerConfig.class);
        ((VeniceServerConfig) Mockito.doReturn(Object2IntMaps.emptyMap()).when(veniceServerConfig)).getKafkaClusterUrlToIdMap();
        AggHostLevelIngestionStats aggHostLevelIngestionStats = (AggHostLevelIngestionStats) Mockito.mock(AggHostLevelIngestionStats.class);
        ((AggHostLevelIngestionStats) Mockito.doReturn((HostLevelIngestionStats) Mockito.mock(HostLevelIngestionStats.class)).when(aggHostLevelIngestionStats)).getStoreStats(Mockito.anyString());
        StoreIngestionTaskFactory.Builder pubSubTopicRepository = TestUtils.getStoreIngestionTaskBuilder(uniqueString).setLeaderFollowerNotifiersQueue(arrayDeque).setStorageMetadataService(storageMetadataService).setServerConfig(veniceServerConfig).setHostLevelIngestionStats(aggHostLevelIngestionStats).setPubSubTopicRepository(this.pubSubTopicRepository);
        Store storeOrThrow = pubSubTopicRepository.getMetadataRepo().getStoreOrThrow(uniqueString);
        Version version = (Version) storeOrThrow.getVersion(1).get();
        Properties properties = (Properties) Mockito.mock(Properties.class);
        ((Properties) Mockito.doReturn("localhost").when(properties)).getProperty((String) Mockito.eq("kafka.bootstrap.servers"));
        VeniceStoreVersionConfig veniceStoreVersionConfig = (VeniceStoreVersionConfig) Mockito.mock(VeniceStoreVersionConfig.class);
        String kafkaTopicName = version.kafkaTopicName();
        ((VeniceStoreVersionConfig) Mockito.doReturn(kafkaTopicName).when(veniceStoreVersionConfig)).getStoreVersionName();
        OffsetRecord offsetRecord = (OffsetRecord) Mockito.mock(OffsetRecord.class);
        ((OffsetRecord) Mockito.doReturn(Collections.emptyMap()).when(offsetRecord)).getProducerPartitionStateMap();
        ((OffsetRecord) Mockito.doReturn(true).when(offsetRecord)).isEndOfPushReceived();
        ((OffsetRecord) Mockito.doReturn(Version.composeRealTimeTopic(uniqueString)).when(offsetRecord)).getLeaderTopic();
        ((OffsetRecord) Mockito.doReturn(1L).when(offsetRecord)).getLocalVersionTopicOffset();
        ((StorageMetadataService) Mockito.doReturn(offsetRecord).when(storageMetadataService)).getLastOffset((String) Mockito.eq(kafkaTopicName), Mockito.eq(0));
        LeaderFollowerStoreIngestionTask leaderFollowerStoreIngestionTask = new LeaderFollowerStoreIngestionTask(pubSubTopicRepository, storeOrThrow, version, properties, (BooleanSupplier) Mockito.mock(BooleanSupplier.class), veniceStoreVersionConfig, 0, false, Optional.empty());
        leaderFollowerStoreIngestionTask.subscribePartition(new PubSubTopicPartitionImpl(this.pubSubTopicRepository.getTopic(kafkaTopicName), 0), Optional.empty());
        leaderFollowerStoreIngestionTask.run();
        Assert.assertNull(exceptionCaptorNotifier.getLatestException());
    }
}
