package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory;
import com.linkedin.davinci.config.VeniceClusterConfig;
import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.kafka.consumer.KafkaConsumerService;
import com.linkedin.davinci.listener.response.MetadataResponse;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.AbstractStorageEngineTest;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository;
import com.linkedin.venice.helix.HelixInstanceConfigRepository;
import com.linkedin.venice.helix.ResourceAssignment;
import com.linkedin.venice.meta.ClusterInfoProvider;
import com.linkedin.venice.meta.OfflinePushStrategy;
import com.linkedin.venice.meta.Partition;
import com.linkedin.venice.meta.PartitionAssignment;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.meta.ReadOnlyLiveClusterConfigRepository;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.ReadStrategy;
import com.linkedin.venice.meta.RoutingStrategy;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.meta.ZKStore;
import com.linkedin.venice.pubsub.api.PubSubClientsFactory;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.VeniceProperties;
import io.tehuti.metrics.MetricsRepository;
import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
import it.unimi.dsi.fastutil.objects.Object2IntMaps;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.avro.Schema;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test
/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.class */
public abstract class KafkaStoreIngestionServiceTest {
    private StorageEngineRepository mockStorageEngineRepository;
    private VeniceConfigLoader mockVeniceConfigLoader;
    private StorageMetadataService storageMetadataService;
    private ClusterInfoProvider mockClusterInfoProvider;
    private ReadOnlyStoreRepository mockMetadataRepo;
    private ReadOnlySchemaRepository mockSchemaRepo;
    private HelixCustomizedViewOfflinePushRepository mockCustomizedViewRepository;
    private HelixInstanceConfigRepository mockHelixInstanceConfigRepository;
    private ReadOnlyLiveClusterConfigRepository mockLiveClusterConfigRepo;
    private PubSubClientsFactory mockPubSubClientsFactory;
    private StorageEngineBackedCompressorFactory compressorFactory;
    private KafkaStoreIngestionService kafkaStoreIngestionService;

    @BeforeClass
    public void setUp() {
        this.mockStorageEngineRepository = (StorageEngineRepository) Mockito.mock(StorageEngineRepository.class);
        ((StorageEngineRepository) Mockito.doReturn(Mockito.mock(AbstractStorageEngine.class)).when(this.mockStorageEngineRepository)).getLocalStorageEngine(ArgumentMatchers.anyString());
        this.storageMetadataService = (StorageMetadataService) Mockito.mock(StorageMetadataService.class);
        this.mockClusterInfoProvider = (ClusterInfoProvider) Mockito.mock(ClusterInfoProvider.class);
        this.mockMetadataRepo = (ReadOnlyStoreRepository) Mockito.mock(ReadOnlyStoreRepository.class);
        this.mockSchemaRepo = (ReadOnlySchemaRepository) Mockito.mock(ReadOnlySchemaRepository.class);
        this.mockCustomizedViewRepository = (HelixCustomizedViewOfflinePushRepository) Mockito.mock(HelixCustomizedViewOfflinePushRepository.class);
        this.mockHelixInstanceConfigRepository = (HelixInstanceConfigRepository) Mockito.mock(HelixInstanceConfigRepository.class);
        this.mockLiveClusterConfigRepo = (ReadOnlyLiveClusterConfigRepository) Mockito.mock(ReadOnlyLiveClusterConfigRepository.class);
        this.mockPubSubClientsFactory = (PubSubClientsFactory) Mockito.mock(PubSubClientsFactory.class);
        this.compressorFactory = new StorageEngineBackedCompressorFactory(this.storageMetadataService);
        setupMockConfig();
    }

    abstract KafkaConsumerService.ConsumerAssignmentStrategy getConsumerAssignmentStrategy();

    private void setupMockConfig() {
        this.mockVeniceConfigLoader = (VeniceConfigLoader) Mockito.mock(VeniceConfigLoader.class);
        VeniceServerConfig veniceServerConfig = (VeniceServerConfig) Mockito.mock(VeniceServerConfig.class);
        ((VeniceServerConfig) Mockito.doReturn(-1L).when(veniceServerConfig)).getKafkaFetchQuotaBytesPerSecond();
        ((VeniceServerConfig) Mockito.doReturn(-1L).when(veniceServerConfig)).getKafkaFetchQuotaRecordPerSecond();
        ((VeniceServerConfig) Mockito.doReturn(-1L).when(veniceServerConfig)).getKafkaFetchQuotaUnorderedBytesPerSecond();
        ((VeniceServerConfig) Mockito.doReturn(-1L).when(veniceServerConfig)).getKafkaFetchQuotaUnorderedRecordPerSecond();
        ((VeniceServerConfig) Mockito.doReturn("").when(veniceServerConfig)).getDataBasePath();
        ((VeniceServerConfig) Mockito.doReturn(Double.valueOf(0.9d)).when(veniceServerConfig)).getDiskFullThreshold();
        ((VeniceServerConfig) Mockito.doReturn(Int2ObjectMaps.emptyMap()).when(veniceServerConfig)).getKafkaClusterIdToAliasMap();
        ((VeniceServerConfig) Mockito.doReturn(Object2IntMaps.emptyMap()).when(veniceServerConfig)).getKafkaClusterUrlToIdMap();
        ((VeniceServerConfig) Mockito.doReturn("localhost:16637").when(veniceServerConfig)).getKafkaBootstrapServers();
        ((VeniceServerConfig) Mockito.doReturn((v0) -> {
            return v0.toString();
        }).when(veniceServerConfig)).getKafkaClusterUrlResolver();
        ((VeniceServerConfig) Mockito.doReturn(new VeniceProperties()).when(veniceServerConfig)).getKafkaConsumerConfigsForLocalConsumption();
        ((VeniceServerConfig) Mockito.doReturn(getConsumerAssignmentStrategy()).when(veniceServerConfig)).getSharedConsumerAssignmentStrategy();
        ((VeniceServerConfig) Mockito.doReturn(1).when(veniceServerConfig)).getConsumerPoolSizePerKafkaCluster();
        ((VeniceServerConfig) Mockito.doReturn(SecurityProtocol.PLAINTEXT).when(veniceServerConfig)).getKafkaSecurityProtocol("localhost:16637");
        ((VeniceServerConfig) Mockito.doReturn(10).when(veniceServerConfig)).getKafkaMaxPollRecords();
        VeniceClusterConfig veniceClusterConfig = (VeniceClusterConfig) Mockito.mock(VeniceClusterConfig.class);
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", "localhost:16637");
        VeniceProperties veniceProperties = new VeniceProperties(properties);
        ((VeniceClusterConfig) Mockito.doReturn(veniceProperties).when(veniceClusterConfig)).getClusterProperties();
        ((VeniceServerConfig) Mockito.doReturn(veniceProperties).when(veniceServerConfig)).getClusterProperties();
        ((VeniceConfigLoader) Mockito.doReturn(veniceServerConfig).when(this.mockVeniceConfigLoader)).getVeniceServerConfig();
        ((VeniceConfigLoader) Mockito.doReturn(veniceClusterConfig).when(this.mockVeniceConfigLoader)).getVeniceClusterConfig();
    }

    @Test
    public void testDisableMetricsEmission() {
        this.kafkaStoreIngestionService = new KafkaStoreIngestionService(this.mockStorageEngineRepository, this.mockVeniceConfigLoader, this.storageMetadataService, this.mockClusterInfoProvider, this.mockMetadataRepo, this.mockSchemaRepo, Optional.empty(), Optional.empty(), this.mockLiveClusterConfigRepo, new MetricsRepository(), Optional.empty(), Optional.empty(), AvroProtocolDefinition.PARTITION_STATE.getSerializer(), Optional.empty(), (ICProvider) null, false, this.compressorFactory, Optional.empty(), false, (RemoteIngestionRepairService) null, this.mockPubSubClientsFactory);
        String str = "test";
        ConcurrentSkipListMap concurrentSkipListMap = new ConcurrentSkipListMap();
        for (int i = 1; i <= 3; i++) {
            concurrentSkipListMap.put("test_v" + String.valueOf(i), (StoreIngestionTask) Mockito.mock(StoreIngestionTask.class));
        }
        concurrentSkipListMap.put("testTest_v1", (StoreIngestionTask) Mockito.mock(StoreIngestionTask.class));
        Store store = (Store) Mockito.mock(Store.class);
        ((Store) Mockito.doReturn(5).when(store)).getLargestUsedVersionNumber();
        ((ReadOnlyStoreRepository) Mockito.doReturn(store).when(this.mockMetadataRepo)).getStore("test");
        ((VeniceStoreVersionConfig) Mockito.doReturn("test_v" + String.valueOf(3)).when((VeniceStoreVersionConfig) Mockito.mock(VeniceStoreVersionConfig.class))).getStoreVersionName();
        this.kafkaStoreIngestionService.updateStatsEmission(concurrentSkipListMap, "test", 5);
        String str2 = "test_v3";
        concurrentSkipListMap.forEach((str3, storeIngestionTask) -> {
            if (!Version.parseStoreFromKafkaTopicName(str3).equals(str)) {
                ((StoreIngestionTask) Mockito.verify(storeIngestionTask, Mockito.never())).enableMetricsEmission();
                ((StoreIngestionTask) Mockito.verify(storeIngestionTask, Mockito.never())).disableMetricsEmission();
            } else if (str3.equals(str2)) {
                ((StoreIngestionTask) Mockito.verify(storeIngestionTask)).enableMetricsEmission();
            } else {
                ((StoreIngestionTask) Mockito.verify(storeIngestionTask)).disableMetricsEmission();
            }
        });
        concurrentSkipListMap.remove(str2);
        this.kafkaStoreIngestionService.updateStatsEmission(concurrentSkipListMap, "test");
        String str4 = "test_v" + (3 - 1);
        concurrentSkipListMap.forEach((str5, storeIngestionTask2) -> {
            if (str5.equals(str4)) {
                ((StoreIngestionTask) Mockito.verify(storeIngestionTask2)).enableMetricsEmission();
            }
        });
    }

    @Test
    public void testGetIngestingTopicsNotWithOnlineVersion() {
        this.kafkaStoreIngestionService = new KafkaStoreIngestionService(this.mockStorageEngineRepository, this.mockVeniceConfigLoader, this.storageMetadataService, this.mockClusterInfoProvider, this.mockMetadataRepo, this.mockSchemaRepo, Optional.empty(), Optional.empty(), this.mockLiveClusterConfigRepo, new MetricsRepository(), Optional.empty(), Optional.empty(), AvroProtocolDefinition.PARTITION_STATE.getSerializer(), Optional.empty(), (ICProvider) null, false, this.compressorFactory, Optional.empty(), false, (RemoteIngestionRepairService) null, this.mockPubSubClientsFactory);
        String parseStoreFromKafkaTopicName = Version.parseStoreFromKafkaTopicName("test-store_v1");
        String parseStoreFromKafkaTopicName2 = Version.parseStoreFromKafkaTopicName("invalid-store_v1");
        ZKStore zKStore = new ZKStore(parseStoreFromKafkaTopicName, "unit-test", 0L, PersistenceType.ROCKS_DB, RoutingStrategy.CONSISTENT_HASH, ReadStrategy.ANY_OF_ONLINE, OfflinePushStrategy.WAIT_ALL_REPLICAS, 1);
        ZKStore zKStore2 = new ZKStore(parseStoreFromKafkaTopicName2, "unit-test", 0L, PersistenceType.ROCKS_DB, RoutingStrategy.CONSISTENT_HASH, ReadStrategy.ANY_OF_ONLINE, OfflinePushStrategy.WAIT_ALL_REPLICAS, 1);
        zKStore.addVersion(new VersionImpl(parseStoreFromKafkaTopicName, 1, "test-job-id"));
        zKStore2.addVersion(new VersionImpl(parseStoreFromKafkaTopicName2, 1, "test-job-id"));
        ((ReadOnlyStoreRepository) Mockito.doReturn(zKStore).when(this.mockMetadataRepo)).getStore(parseStoreFromKafkaTopicName);
        ((ReadOnlyStoreRepository) Mockito.doReturn(zKStore2).when(this.mockMetadataRepo)).getStore(parseStoreFromKafkaTopicName2);
        ((ReadOnlyStoreRepository) Mockito.doReturn(zKStore).when(this.mockMetadataRepo)).getStoreOrThrow(parseStoreFromKafkaTopicName);
        ((ReadOnlyStoreRepository) Mockito.doReturn(zKStore2).when(this.mockMetadataRepo)).getStoreOrThrow(parseStoreFromKafkaTopicName2);
        ((ReadOnlyStoreRepository) Mockito.doReturn(new Pair(zKStore, (Version) zKStore.getVersion(1).get())).when(this.mockMetadataRepo)).waitVersion((String) ArgumentMatchers.eq(parseStoreFromKafkaTopicName), ArgumentMatchers.eq(1), (Duration) ArgumentMatchers.any());
        ((ReadOnlyStoreRepository) Mockito.doReturn(new Pair(zKStore2, (Version) zKStore2.getVersion(1).get())).when(this.mockMetadataRepo)).waitVersion((String) ArgumentMatchers.eq(parseStoreFromKafkaTopicName2), ArgumentMatchers.eq(1), (Duration) ArgumentMatchers.any());
        VeniceProperties serverProperties = AbstractStorageEngineTest.getServerProperties(PersistenceType.ROCKS_DB);
        this.kafkaStoreIngestionService.startConsumption(new VeniceStoreVersionConfig("test-store_v1", serverProperties), 0);
        Assert.assertEquals(this.kafkaStoreIngestionService.getIngestingTopicsWithVersionStatusNotOnline().size(), 1, "Unexpected number of ingesting topics with version status of not ONLINE");
        zKStore.updateVersionStatus(1, VersionStatus.ONLINE);
        Assert.assertEquals(this.kafkaStoreIngestionService.getIngestingTopicsWithVersionStatusNotOnline().size(), 0, "Expecting an empty set since all ingesting topics have version status of ONLINE");
        zKStore.addVersion(new VersionImpl(parseStoreFromKafkaTopicName, 2, "test-job-id"));
        ((ReadOnlyStoreRepository) Mockito.doReturn(new Pair(zKStore, (Version) zKStore.getVersion(2).get())).when(this.mockMetadataRepo)).waitVersion((String) ArgumentMatchers.eq(parseStoreFromKafkaTopicName), ArgumentMatchers.eq(2), (Duration) ArgumentMatchers.any());
        this.kafkaStoreIngestionService.startConsumption(new VeniceStoreVersionConfig("test-store_v2", serverProperties), 0);
        this.kafkaStoreIngestionService.startConsumption(new VeniceStoreVersionConfig("invalid-store_v1", serverProperties), 0);
        ((ReadOnlyStoreRepository) Mockito.doThrow(new Throwable[]{new VeniceNoStoreException(parseStoreFromKafkaTopicName2)}).when(this.mockMetadataRepo)).getStoreOrThrow(parseStoreFromKafkaTopicName2);
        ((ReadOnlyStoreRepository) Mockito.doReturn((Object) null).when(this.mockMetadataRepo)).getStore(parseStoreFromKafkaTopicName2);
        Assert.assertEquals(this.kafkaStoreIngestionService.getIngestingTopicsWithVersionStatusNotOnline().size(), 2, "Invalid and in flight ingesting topics should be included in the returned set");
        zKStore.updateVersionStatus(2, VersionStatus.ONLINE);
        zKStore.deleteVersion(1);
        Set ingestingTopicsWithVersionStatusNotOnline = this.kafkaStoreIngestionService.getIngestingTopicsWithVersionStatusNotOnline();
        Assert.assertTrue(ingestingTopicsWithVersionStatusNotOnline.size() == 2 && ingestingTopicsWithVersionStatusNotOnline.contains("invalid-store_v1") && ingestingTopicsWithVersionStatusNotOnline.contains("test-store_v1"), "Invalid and retired ingesting topics should be included in the returned set");
    }

    @Test
    public void testCloseStoreIngestionTask() {
        this.kafkaStoreIngestionService = new KafkaStoreIngestionService(this.mockStorageEngineRepository, this.mockVeniceConfigLoader, this.storageMetadataService, this.mockClusterInfoProvider, this.mockMetadataRepo, this.mockSchemaRepo, Optional.empty(), Optional.empty(), this.mockLiveClusterConfigRepo, new MetricsRepository(), Optional.empty(), Optional.empty(), AvroProtocolDefinition.PARTITION_STATE.getSerializer(), Optional.empty(), (ICProvider) null, false, this.compressorFactory, Optional.empty(), false, (RemoteIngestionRepairService) null, this.mockPubSubClientsFactory);
        String parseStoreFromKafkaTopicName = Version.parseStoreFromKafkaTopicName("test-store_v1");
        ZKStore zKStore = new ZKStore(parseStoreFromKafkaTopicName, "unit-test", 0L, PersistenceType.ROCKS_DB, RoutingStrategy.CONSISTENT_HASH, ReadStrategy.ANY_OF_ONLINE, OfflinePushStrategy.WAIT_ALL_REPLICAS, 1);
        Mockito.when(((SchemaEntry) Mockito.mock(SchemaEntry.class)).getSchema()).thenReturn(Schema.create(Schema.Type.STRING));
        Mockito.when(this.mockSchemaRepo.getKeySchema("test-store_v1")).thenReturn((SchemaEntry) Mockito.mock(SchemaEntry.class));
        Mockito.when(this.mockStorageEngineRepository.getLocalStorageEngine("test-store_v1")).thenReturn((AbstractStorageEngine) Mockito.mock(AbstractStorageEngine.class));
        zKStore.addVersion(new VersionImpl(parseStoreFromKafkaTopicName, 1, "test-job-id"));
        ((ReadOnlyStoreRepository) Mockito.doReturn(zKStore).when(this.mockMetadataRepo)).getStore(parseStoreFromKafkaTopicName);
        ((ReadOnlyStoreRepository) Mockito.doReturn(zKStore).when(this.mockMetadataRepo)).getStoreOrThrow(parseStoreFromKafkaTopicName);
        ((ReadOnlyStoreRepository) Mockito.doReturn(new Pair(zKStore, (Version) zKStore.getVersion(1).get())).when(this.mockMetadataRepo)).waitVersion((String) ArgumentMatchers.eq(parseStoreFromKafkaTopicName), ArgumentMatchers.eq(1), (Duration) ArgumentMatchers.any());
        VeniceProperties serverProperties = AbstractStorageEngineTest.getServerProperties(PersistenceType.ROCKS_DB);
        this.kafkaStoreIngestionService.startConsumption(new VeniceStoreVersionConfig("test-store_v1", serverProperties), 0);
        StoreIngestionTask storeIngestionTask = this.kafkaStoreIngestionService.getStoreIngestionTask("test-store_v1");
        this.kafkaStoreIngestionService.shutdownStoreIngestionTask("test-store_v1");
        Assert.assertNull(this.kafkaStoreIngestionService.getStoreIngestionTask("test-store_v1"));
        AbstractStorageEngine abstractStorageEngine = (AbstractStorageEngine) Mockito.mock(AbstractStorageEngine.class);
        Mockito.when(this.mockStorageEngineRepository.getLocalStorageEngine("test-store_v1")).thenReturn(abstractStorageEngine);
        this.kafkaStoreIngestionService.startConsumption(new VeniceStoreVersionConfig("test-store_v1", serverProperties), 0);
        StoreIngestionTask storeIngestionTask2 = this.kafkaStoreIngestionService.getStoreIngestionTask("test-store_v1");
        Assert.assertNotNull(storeIngestionTask2);
        Assert.assertNotEquals(storeIngestionTask, storeIngestionTask2);
        Assert.assertEquals(storeIngestionTask2.getStorageEngine(), abstractStorageEngine);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testStoreIngestionTaskShutdownLastPartition(boolean z) {
        this.kafkaStoreIngestionService = new KafkaStoreIngestionService(this.mockStorageEngineRepository, this.mockVeniceConfigLoader, this.storageMetadataService, this.mockClusterInfoProvider, this.mockMetadataRepo, this.mockSchemaRepo, Optional.empty(), Optional.empty(), this.mockLiveClusterConfigRepo, new MetricsRepository(), Optional.empty(), Optional.empty(), AvroProtocolDefinition.PARTITION_STATE.getSerializer(), Optional.empty(), (ICProvider) null, z, this.compressorFactory, Optional.empty(), false, (RemoteIngestionRepairService) null, this.mockPubSubClientsFactory);
        String parseStoreFromKafkaTopicName = Version.parseStoreFromKafkaTopicName("test-store_v1");
        ZKStore zKStore = new ZKStore(parseStoreFromKafkaTopicName, "unit-test", 0L, PersistenceType.ROCKS_DB, RoutingStrategy.CONSISTENT_HASH, ReadStrategy.ANY_OF_ONLINE, OfflinePushStrategy.WAIT_ALL_REPLICAS, 1);
        Mockito.when(this.mockStorageEngineRepository.getLocalStorageEngine("test-store_v1")).thenReturn((AbstractStorageEngine) Mockito.mock(AbstractStorageEngine.class));
        zKStore.addVersion(new VersionImpl(parseStoreFromKafkaTopicName, 1, "test-job-id"));
        ((ReadOnlyStoreRepository) Mockito.doReturn(zKStore).when(this.mockMetadataRepo)).getStore(parseStoreFromKafkaTopicName);
        ((ReadOnlyStoreRepository) Mockito.doReturn(zKStore).when(this.mockMetadataRepo)).getStoreOrThrow(parseStoreFromKafkaTopicName);
        ((ReadOnlyStoreRepository) Mockito.doReturn(new Pair(zKStore, (Version) zKStore.getVersion(1).get())).when(this.mockMetadataRepo)).waitVersion((String) ArgumentMatchers.eq(parseStoreFromKafkaTopicName), ArgumentMatchers.eq(1), (Duration) ArgumentMatchers.any());
        VeniceStoreVersionConfig veniceStoreVersionConfig = new VeniceStoreVersionConfig("test-store_v1", AbstractStorageEngineTest.getServerProperties(PersistenceType.ROCKS_DB));
        this.kafkaStoreIngestionService.startConsumption(veniceStoreVersionConfig, 0);
        this.kafkaStoreIngestionService.stopConsumptionAndWait(veniceStoreVersionConfig, 0, 1, 1);
        StoreIngestionTask storeIngestionTask = this.kafkaStoreIngestionService.getStoreIngestionTask("test-store_v1");
        if (z) {
            Assert.assertNotNull(storeIngestionTask);
        } else {
            Assert.assertNull(storeIngestionTask);
        }
        this.kafkaStoreIngestionService.startConsumption(veniceStoreVersionConfig, 0);
        StoreIngestionTask storeIngestionTask2 = this.kafkaStoreIngestionService.getStoreIngestionTask("test-store_v1");
        storeIngestionTask2.setPartitionConsumptionState(1, (PartitionConsumptionState) Mockito.mock(PartitionConsumptionState.class));
        this.kafkaStoreIngestionService.stopConsumptionAndWait(veniceStoreVersionConfig, 0, 1, 1);
        Assert.assertNotNull(storeIngestionTask2);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testGetMetadata(boolean z) {
        this.kafkaStoreIngestionService = new KafkaStoreIngestionService(this.mockStorageEngineRepository, this.mockVeniceConfigLoader, this.storageMetadataService, this.mockClusterInfoProvider, this.mockMetadataRepo, this.mockSchemaRepo, Optional.of(CompletableFuture.completedFuture(this.mockCustomizedViewRepository)), Optional.of(CompletableFuture.completedFuture(this.mockHelixInstanceConfigRepository)), this.mockLiveClusterConfigRepo, new MetricsRepository(), Optional.empty(), Optional.empty(), AvroProtocolDefinition.PARTITION_STATE.getSerializer(), Optional.empty(), (ICProvider) null, false, this.compressorFactory, Optional.empty(), false, (RemoteIngestionRepairService) null, this.mockPubSubClientsFactory);
        String str = "test-store_v" + (z ? "0" : "1");
        String parseStoreFromKafkaTopicName = Version.parseStoreFromKafkaTopicName(str);
        ZKStore zKStore = new ZKStore(parseStoreFromKafkaTopicName, "unit-test", 0L, PersistenceType.ROCKS_DB, RoutingStrategy.CONSISTENT_HASH, ReadStrategy.ANY_OF_ONLINE, OfflinePushStrategy.WAIT_ALL_REPLICAS, 1);
        zKStore.addVersion(new VersionImpl(parseStoreFromKafkaTopicName, 0, "test-job-id"));
        ResourceAssignment resourceAssignment = new ResourceAssignment();
        resourceAssignment.setPartitionAssignment(str, (PartitionAssignment) null);
        PartitionAssignment partitionAssignment = new PartitionAssignment(str, 1);
        partitionAssignment.addPartition(new Partition(0, Collections.emptyMap()));
        ((ReadOnlyStoreRepository) Mockito.doReturn(zKStore).when(this.mockMetadataRepo)).getStoreOrThrow(parseStoreFromKafkaTopicName);
        Mockito.when(this.mockSchemaRepo.getKeySchema(parseStoreFromKafkaTopicName)).thenReturn(new SchemaEntry(0, "{\"type\" : \"string\"}"));
        Mockito.when(this.mockSchemaRepo.getValueSchemas(parseStoreFromKafkaTopicName)).thenReturn(Collections.emptyList());
        Mockito.when(this.mockCustomizedViewRepository.getResourceAssignment()).thenReturn(resourceAssignment);
        Mockito.when(this.mockCustomizedViewRepository.getPartitionAssignments(str)).thenReturn(partitionAssignment);
        Mockito.when(this.mockCustomizedViewRepository.getReplicaStates((String) ArgumentMatchers.eq(str), ArgumentMatchers.anyInt())).thenReturn(Collections.emptyList());
        Mockito.when(this.mockHelixInstanceConfigRepository.getInstanceGroupIdMapping()).thenReturn(Collections.emptyMap());
        MetadataResponse metadata = this.kafkaStoreIngestionService.getMetadata(parseStoreFromKafkaTopicName);
        Assert.assertNotNull(metadata);
        Assert.assertEquals(metadata.getResponseRecord().getKeySchema().get("0"), "\"string\"");
        if (z) {
            Assert.assertEquals((Collection) metadata.getResponseRecord().getRoutingInfo().get("0"), Collections.emptyList());
        } else {
            Assert.assertTrue(metadata.getResponseRecord().getRoutingInfo().isEmpty());
        }
    }
}
