package com.linkedin.venice.hadoop;

import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponse;
import com.linkedin.venice.controllerapi.JobStatusQueryResponse;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.exceptions.UndefinedPropertyException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.InputDataInfoProvider;
import com.linkedin.venice.hadoop.VenicePushJob;
import com.linkedin.venice.meta.BufferReplayPolicy;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.HybridStoreConfigImpl;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.status.BatchJobHeartbeatConfigs;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.Collections;
import java.util.HashMap;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Consumer;
import org.apache.avro.Schema;
import org.apache.hadoop.mapred.JobConf;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/hadoop/VenicePushJobTest.class */
public class VenicePushJobTest {
    private static final String TEST_PUSH = "test_push";
    private static final String TEST_URL = "test_url";
    private static final String TEST_PATH = "test_path";
    private static final String TEST_STORE = "test_store";
    private static final String TEST_CLUSTER = "test_cluster";
    private static final String TEST_SERVICE = "test_venice";
    private static final int REPUSH_VERSION = 1;
    private static final String TEST_PARENT_ZK_ADDRESS = "localhost:2180";
    private static final String TEST_ZK_ADDRESS = "localhost:2181";
    private static final String TEST_PARENT_CONTROLLER_D2_SERVICE = "ParentController";
    private static final String TEST_CHILD_CONTROLLER_D2_SERVICE = "ChildController";
    private static final String PUSH_JOB_ID = "push_job_number_101";
    private static final String DISCOVERY_URL = "d2://d2Clusters/venice-discovery";
    private static final String PARENT_REGION_NAME = "dc-parent";

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*Repush with TTL is only supported while using Kafka Input Format.*")
    public void testRepushTTLJobWithNonKafkaInput() {
        Properties properties = new Properties();
        properties.setProperty("repush.ttl.enable", "true");
        getSpyVenicePushJob(properties, null).run();
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*Repush TTL is only supported for real-time only store.*")
    public void testRepushTTLJobWithBatchStore() {
        getSpyVenicePushJob(getRepushReadyProps(), getClient()).run();
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*Repush TTL is not supported when the store has write compute enabled.*")
    public void testRepushTTLJobWithWC() {
        getSpyVenicePushJob(getRepushReadyProps(), getClient(storeInfo -> {
            VersionImpl versionImpl = new VersionImpl(TEST_STORE, REPUSH_VERSION, TEST_PUSH);
            storeInfo.setWriteComputationEnabled(true);
            storeInfo.setVersions(Collections.singletonList(versionImpl));
            storeInfo.setHybridStoreConfig(new HybridStoreConfigImpl(0L, 0L, 0L, (DataReplicationPolicy) null, (BufferReplayPolicy) null));
        })).run();
    }

    @Test
    public void testPushJobSettingWithD2Routing() {
        ControllerClient client = getClient(storeInfo -> {
            VersionImpl versionImpl = new VersionImpl(TEST_STORE, REPUSH_VERSION, TEST_PUSH);
            storeInfo.setWriteComputationEnabled(true);
            storeInfo.setVersions(Collections.singletonList(versionImpl));
            storeInfo.setHybridStoreConfig(new HybridStoreConfigImpl(0L, 0L, 0L, (DataReplicationPolicy) null, (BufferReplayPolicy) null));
        });
        VenicePushJob.PushJobSetting pushJobSetting = getSpyVenicePushJobWithD2Routing(new Properties(), client).getPushJobSetting();
        Assert.assertTrue(pushJobSetting.d2Routing);
        Assert.assertEquals(pushJobSetting.controllerD2ServiceName, TEST_CHILD_CONTROLLER_D2_SERVICE);
        Assert.assertEquals(pushJobSetting.childControllerRegionD2ZkHosts, TEST_ZK_ADDRESS);
        VenicePushJob.PushJobSetting pushJobSetting2 = getSpyVenicePushJobWithMultiRegionD2Routing(new Properties(), client).getPushJobSetting();
        Assert.assertTrue(pushJobSetting2.d2Routing);
        Assert.assertEquals(pushJobSetting2.controllerD2ServiceName, TEST_PARENT_CONTROLLER_D2_SERVICE);
        Assert.assertEquals(pushJobSetting2.parentControllerRegionD2ZkHosts, TEST_PARENT_ZK_ADDRESS);
    }

    @Test
    public void testPushJobSettingWithLivenessHeartbeat() {
        Properties properties = new Properties();
        properties.setProperty(BatchJobHeartbeatConfigs.HEARTBEAT_ENABLED_CONFIG.getConfigName(), "true");
        Assert.assertTrue(getSpyVenicePushJob(properties, getClient(storeInfo -> {
            VersionImpl versionImpl = new VersionImpl(TEST_STORE, REPUSH_VERSION, TEST_PUSH);
            storeInfo.setWriteComputationEnabled(true);
            storeInfo.setVersions(Collections.singletonList(versionImpl));
            storeInfo.setHybridStoreConfig(new HybridStoreConfigImpl(0L, 0L, 0L, (DataReplicationPolicy) null, (BufferReplayPolicy) null));
        })).getPushJobSetting().livenessHeartbeatEnabled);
    }

    @Test
    public void testPushJobPollStatus() {
        Properties properties = new Properties();
        properties.setProperty(BatchJobHeartbeatConfigs.HEARTBEAT_ENABLED_CONFIG.getConfigName(), "true");
        ControllerClient controllerClient = (ControllerClient) Mockito.mock(ControllerClient.class);
        JobStatusQueryResponse jobStatusQueryResponse = (JobStatusQueryResponse) Mockito.mock(JobStatusQueryResponse.class);
        ((JobStatusQueryResponse) Mockito.doReturn("UNKNOWN").when(jobStatusQueryResponse)).getStatus();
        ((ControllerClient) Mockito.doReturn(jobStatusQueryResponse).when(controllerClient)).queryOverallJobStatus(ArgumentMatchers.anyString(), (Optional) ArgumentMatchers.eq(Optional.empty()), (String) ArgumentMatchers.eq((Object) null));
        VenicePushJob spyVenicePushJob = getSpyVenicePushJob(properties, controllerClient);
        VenicePushJob.PushJobSetting pushJobSetting = spyVenicePushJob.getPushJobSetting();
        pushJobSetting.jobStatusInUnknownStateTimeoutMs = 10L;
        Assert.assertTrue(pushJobSetting.livenessHeartbeatEnabled);
        VenicePushJob.TopicInfo topicInfo = new VenicePushJob.TopicInfo();
        topicInfo.version = REPUSH_VERSION;
        topicInfo.topic = "abc";
        spyVenicePushJob.storeSetting = new VenicePushJob.StoreSetting();
        spyVenicePushJob.storeSetting.storeResponse = new StoreResponse();
        spyVenicePushJob.storeSetting.storeResponse.setName("abc");
        StoreInfo storeInfo = new StoreInfo();
        storeInfo.setBootstrapToOnlineTimeoutInHours(0);
        spyVenicePushJob.storeSetting.storeResponse.setStore(storeInfo);
        Assert.assertEquals(Assert.expectThrows(VeniceException.class, () -> {
            spyVenicePushJob.pollStatusUntilComplete(Optional.empty(), controllerClient, pushJobSetting, topicInfo);
        }).getMessage(), "Failing push-job for store abc which is still running after 0 hours.");
    }

    private Properties getRepushReadyProps() {
        Properties properties = new Properties();
        properties.setProperty("repush.ttl.enable", "true");
        properties.setProperty("source.kafka", "true");
        properties.setProperty("kafka.input.topic", Version.composeKafkaTopic(TEST_STORE, REPUSH_VERSION));
        properties.setProperty("kafka.input.broker.url", "localhost");
        properties.setProperty("kafka.input.max.records.per.mapper", "5");
        return properties;
    }

    private VenicePushJob getSpyVenicePushJob(Properties properties, ControllerClient controllerClient) {
        return getSpyVenicePushJobInternal(TestWriteUtils.defaultVPJProps(TEST_URL, TEST_PATH, TEST_STORE), properties, controllerClient);
    }

    private VenicePushJob getSpyVenicePushJobWithD2Routing(Properties properties, ControllerClient controllerClient) {
        return getSpyVenicePushJobInternal(TestWriteUtils.defaultVPJPropsWithD2Routing((String) null, (String) null, Collections.singletonMap("dc-0", TEST_ZK_ADDRESS), (String) null, TEST_CHILD_CONTROLLER_D2_SERVICE, TEST_PATH, TEST_STORE), properties, controllerClient);
    }

    private VenicePushJob getSpyVenicePushJobWithMultiRegionD2Routing(Properties properties, ControllerClient controllerClient) {
        return getSpyVenicePushJobInternal(TestWriteUtils.defaultVPJPropsWithD2Routing(PARENT_REGION_NAME, TEST_PARENT_ZK_ADDRESS, Collections.singletonMap("dc-0", TEST_ZK_ADDRESS), TEST_PARENT_CONTROLLER_D2_SERVICE, TEST_CHILD_CONTROLLER_D2_SERVICE, TEST_PATH, TEST_STORE), properties, controllerClient);
    }

    private VenicePushJob getSpyVenicePushJobInternal(Properties properties, Properties properties2, ControllerClient controllerClient) {
        properties.put("controller.request.retry.attempts", Integer.valueOf(REPUSH_VERSION));
        properties.putAll(properties2);
        ControllerClient client = controllerClient == null ? getClient() : controllerClient;
        VenicePushJob venicePushJob = (VenicePushJob) Mockito.spy(new VenicePushJob(TEST_PUSH, properties));
        venicePushJob.setControllerClient(client);
        venicePushJob.setKmeSchemaSystemStoreControllerClient(client);
        return venicePushJob;
    }

    private ControllerClient getClient() {
        return getClient(storeInfo -> {
        });
    }

    private ControllerClient getClient(Consumer<StoreInfo> consumer) {
        ControllerClient controllerClient = (ControllerClient) Mockito.mock(ControllerClient.class);
        D2ServiceDiscoveryResponse d2ServiceDiscoveryResponse = new D2ServiceDiscoveryResponse();
        d2ServiceDiscoveryResponse.setCluster(TEST_CLUSTER);
        d2ServiceDiscoveryResponse.setD2Service(TEST_SERVICE);
        ((ControllerClient) Mockito.doReturn(d2ServiceDiscoveryResponse).when(controllerClient)).discoverCluster(TEST_STORE);
        ((ControllerClient) Mockito.doReturn(new SchemaResponse()).when(controllerClient)).getValueSchema(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt());
        StoreResponse storeResponse = new StoreResponse();
        storeResponse.setStore(getStoreInfo(consumer));
        ((ControllerClient) Mockito.doReturn(storeResponse).when(controllerClient)).getStore(TEST_STORE);
        SchemaResponse schemaResponse = new SchemaResponse();
        schemaResponse.setId(REPUSH_VERSION);
        schemaResponse.setSchemaStr(Schema.create(Schema.Type.STRING).toString());
        ((ControllerClient) Mockito.doReturn(schemaResponse).when(controllerClient)).getKeySchema(TEST_STORE);
        VersionCreationResponse versionCreationResponse = new VersionCreationResponse();
        versionCreationResponse.setKafkaTopic("kafka-topic");
        versionCreationResponse.setVersion(REPUSH_VERSION);
        versionCreationResponse.setKafkaBootstrapServers("kafka-bootstrap-server");
        versionCreationResponse.setPartitions(REPUSH_VERSION);
        versionCreationResponse.setEnableSSL(false);
        versionCreationResponse.setCompressionStrategy(CompressionStrategy.NO_OP);
        versionCreationResponse.setDaVinciPushStatusStoreEnabled(false);
        versionCreationResponse.setPartitionerClass("PartitionerClass");
        versionCreationResponse.setPartitionerParams(Collections.emptyMap());
        ((ControllerClient) Mockito.doReturn(versionCreationResponse).when(controllerClient)).requestTopicForWrites(ArgumentMatchers.anyString(), ArgumentMatchers.anyLong(), (Version.PushType) ArgumentMatchers.any(), ArgumentMatchers.anyString(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean(), (Optional) ArgumentMatchers.any(), (Optional) ArgumentMatchers.any(), (Optional) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean(), (String) ArgumentMatchers.any());
        ControllerResponse controllerResponse = new ControllerResponse();
        ((ControllerClient) Mockito.doReturn(controllerResponse).when(controllerClient)).writeEndOfPush(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt());
        ((ControllerClient) Mockito.doReturn(controllerResponse).when(controllerClient)).sendPushJobDetails(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), (byte[]) ArgumentMatchers.any(byte[].class));
        return controllerClient;
    }

    private StoreInfo getStoreInfo(Consumer<StoreInfo> consumer) {
        StoreInfo storeInfo = new StoreInfo();
        storeInfo.setCurrentVersion(REPUSH_VERSION);
        storeInfo.setIncrementalPushEnabled(false);
        storeInfo.setStorageQuotaInByte(1L);
        storeInfo.setSchemaAutoRegisterFromPushJobEnabled(false);
        storeInfo.setChunkingEnabled(false);
        storeInfo.setCompressionStrategy(CompressionStrategy.NO_OP);
        storeInfo.setWriteComputationEnabled(false);
        storeInfo.setIncrementalPushEnabled(false);
        storeInfo.setNativeReplicationSourceFabric("dc-0");
        storeInfo.setVersions(Collections.singletonList(new VersionImpl(TEST_STORE, REPUSH_VERSION, TEST_PUSH)));
        consumer.accept(storeInfo);
        return storeInfo;
    }

    private InputDataInfoProvider getMockInputDataInfoProvider() throws Exception {
        InputDataInfoProvider inputDataInfoProvider = (InputDataInfoProvider) Mockito.mock(InputDataInfoProvider.class);
        PushJobSchemaInfo pushJobSchemaInfo = new PushJobSchemaInfo();
        pushJobSchemaInfo.setAvro(false);
        ((InputDataInfoProvider) Mockito.doReturn(new InputDataInfoProvider.InputDataInfo(pushJobSchemaInfo, 10L, REPUSH_VERSION, false, System.currentTimeMillis())).when(inputDataInfoProvider)).validateInputAndGetInfo(ArgumentMatchers.anyString());
        return inputDataInfoProvider;
    }

    @Test(expectedExceptions = {NullPointerException.class})
    public void testVenicePushJobThrowsNpeIfVpjPropertiesIsNull() {
        new VenicePushJob(PUSH_JOB_ID, (Properties) null);
    }

    @Test
    public void testGetPushJobSettingThrowsUndefinedPropertyException() {
        Properties vpjRequiredProperties = getVpjRequiredProperties();
        for (Object obj : vpjRequiredProperties.keySet()) {
            Properties properties = (Properties) vpjRequiredProperties.clone();
            properties.remove(obj);
            try {
                new VenicePushJob(PUSH_JOB_ID, properties);
                Assert.fail("Should throw UndefinedPropertyException for missing property: " + obj);
            } catch (UndefinedPropertyException e) {
            }
        }
    }

    private Properties getVpjRequiredProperties() {
        Properties properties = new Properties();
        properties.put("venice.discover.urls", DISCOVERY_URL);
        properties.put("multi.region", true);
        properties.put("parent.controller.region.name", PARENT_REGION_NAME);
        properties.put("d2.zk.hosts.dc-parent", TEST_PARENT_ZK_ADDRESS);
        properties.put("venice.store.name", TEST_STORE);
        return properties;
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*Duplicate key field config found.*")
    public void testVenicePushJobCanHandleLegacyFieldsThrowsExceptionIfDuplicateKeysButValuesDiffer() {
        Properties vpjRequiredProperties = getVpjRequiredProperties();
        vpjRequiredProperties.put("avro.key.field", "id");
        vpjRequiredProperties.put("key.field", "message");
        new VenicePushJob(PUSH_JOB_ID, vpjRequiredProperties);
    }

    @Test
    public void testVenicePushJobCanHandleLegacyFields() {
        Properties vpjRequiredProperties = getVpjRequiredProperties();
        vpjRequiredProperties.put("avro.key.field", "id");
        vpjRequiredProperties.put("avro.value.field", "message");
        VeniceProperties veniceProperties = new VenicePushJob(PUSH_JOB_ID, vpjRequiredProperties).getVeniceProperties();
        Assert.assertNotNull(veniceProperties);
        Assert.assertEquals(veniceProperties.getString("key.field"), "id");
        Assert.assertEquals(veniceProperties.getString("value.field"), "message");
    }

    @Test
    public void testGetPushJobSetting() {
        VenicePushJob.PushJobSetting pushJobSetting = new VenicePushJob(PUSH_JOB_ID, getVpjRequiredProperties()).getPushJobSetting();
        Assert.assertNotNull(pushJobSetting);
        Assert.assertTrue(pushJobSetting.d2Routing);
    }

    @Test
    public void testGetPushJobSettingShouldNotUseD2RoutingIfControllerUrlDoesNotStartWithD2() {
        Properties vpjRequiredProperties = getVpjRequiredProperties();
        vpjRequiredProperties.put("venice.discover.urls", "http://venice.db:9898");
        VenicePushJob.PushJobSetting pushJobSetting = new VenicePushJob(PUSH_JOB_ID, vpjRequiredProperties).getPushJobSetting();
        Assert.assertNotNull(pushJobSetting);
        Assert.assertFalse(pushJobSetting.d2Routing);
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = "Incremental push is not supported while using Kafka Input Format")
    public void testGetPushJobSettingShouldThrowExceptionIfSourceIsKafkaAndJobIsIncPush() {
        Properties vpjRequiredProperties = getVpjRequiredProperties();
        vpjRequiredProperties.put("source.kafka", true);
        vpjRequiredProperties.put("incremental.push", true);
        new VenicePushJob(PUSH_JOB_ID, vpjRequiredProperties);
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = "Source ETL is not supported while using Kafka Input Format")
    public void testGetPushJobSettingShouldThrowExceptionWhenBothSourceKafkaAndEtlAreSet() {
        Properties vpjRequiredProperties = getVpjRequiredProperties();
        vpjRequiredProperties.put("source.kafka", true);
        vpjRequiredProperties.put("source.etl", true);
        new VenicePushJob(PUSH_JOB_ID, vpjRequiredProperties);
    }

    @Test
    public void testShouldBuildDictionary() {
        VenicePushJob.PushJobSetting pushJobSetting = new VenicePushJob.PushJobSetting();
        VenicePushJob.StoreSetting storeSetting = new VenicePushJob.StoreSetting();
        Assert.assertFalse(VenicePushJob.shouldBuildZstdCompressionDictionary(pushJobSetting, storeSetting, false));
        pushJobSetting.isSourceKafka = true;
        Assert.assertFalse(VenicePushJob.shouldBuildZstdCompressionDictionary(pushJobSetting, storeSetting, true));
        pushJobSetting.isSourceKafka = false;
        pushJobSetting.isIncrementalPush = true;
        pushJobSetting.compressionMetricCollectionEnabled = true;
        storeSetting.compressionStrategy = CompressionStrategy.NO_OP;
        Assert.assertFalse(VenicePushJob.shouldBuildZstdCompressionDictionary(pushJobSetting, storeSetting, true));
        storeSetting.compressionStrategy = CompressionStrategy.GZIP;
        Assert.assertFalse(VenicePushJob.shouldBuildZstdCompressionDictionary(pushJobSetting, storeSetting, true));
        storeSetting.compressionStrategy = CompressionStrategy.ZSTD_WITH_DICT;
        Assert.assertFalse(VenicePushJob.shouldBuildZstdCompressionDictionary(pushJobSetting, storeSetting, true));
        pushJobSetting.compressionMetricCollectionEnabled = false;
        storeSetting.compressionStrategy = CompressionStrategy.NO_OP;
        Assert.assertFalse(VenicePushJob.shouldBuildZstdCompressionDictionary(pushJobSetting, storeSetting, true));
        storeSetting.compressionStrategy = CompressionStrategy.GZIP;
        Assert.assertFalse(VenicePushJob.shouldBuildZstdCompressionDictionary(pushJobSetting, storeSetting, true));
        storeSetting.compressionStrategy = CompressionStrategy.ZSTD_WITH_DICT;
        Assert.assertFalse(VenicePushJob.shouldBuildZstdCompressionDictionary(pushJobSetting, storeSetting, true));
        pushJobSetting.isIncrementalPush = false;
        pushJobSetting.compressionMetricCollectionEnabled = true;
        storeSetting.compressionStrategy = CompressionStrategy.NO_OP;
        Assert.assertTrue(VenicePushJob.shouldBuildZstdCompressionDictionary(pushJobSetting, storeSetting, true));
        storeSetting.compressionStrategy = CompressionStrategy.GZIP;
        Assert.assertTrue(VenicePushJob.shouldBuildZstdCompressionDictionary(pushJobSetting, storeSetting, true));
        storeSetting.compressionStrategy = CompressionStrategy.ZSTD_WITH_DICT;
        Assert.assertTrue(VenicePushJob.shouldBuildZstdCompressionDictionary(pushJobSetting, storeSetting, true));
        pushJobSetting.compressionMetricCollectionEnabled = false;
        storeSetting.compressionStrategy = CompressionStrategy.NO_OP;
        Assert.assertFalse(VenicePushJob.shouldBuildZstdCompressionDictionary(pushJobSetting, storeSetting, true));
        storeSetting.compressionStrategy = CompressionStrategy.GZIP;
        Assert.assertFalse(VenicePushJob.shouldBuildZstdCompressionDictionary(pushJobSetting, storeSetting, true));
        storeSetting.compressionStrategy = CompressionStrategy.ZSTD_WITH_DICT;
        Assert.assertTrue(VenicePushJob.shouldBuildZstdCompressionDictionary(pushJobSetting, storeSetting, true));
    }

    @Test
    public void testEvaluateCompressionMetricCollectionEnabled() {
        VenicePushJob.PushJobSetting pushJobSetting = new VenicePushJob.PushJobSetting();
        pushJobSetting.compressionMetricCollectionEnabled = false;
        Assert.assertFalse(VenicePushJob.evaluateCompressionMetricCollectionEnabled(pushJobSetting, true));
        Assert.assertFalse(VenicePushJob.evaluateCompressionMetricCollectionEnabled(pushJobSetting, false));
        pushJobSetting.compressionMetricCollectionEnabled = true;
        Assert.assertFalse(VenicePushJob.evaluateCompressionMetricCollectionEnabled(pushJobSetting, false));
        pushJobSetting.isSourceKafka = true;
        Assert.assertFalse(VenicePushJob.evaluateCompressionMetricCollectionEnabled(pushJobSetting, true));
        Assert.assertFalse(VenicePushJob.evaluateCompressionMetricCollectionEnabled(pushJobSetting, false));
        pushJobSetting.isSourceKafka = false;
        pushJobSetting.isIncrementalPush = true;
        Assert.assertFalse(VenicePushJob.evaluateCompressionMetricCollectionEnabled(pushJobSetting, true));
        Assert.assertFalse(VenicePushJob.evaluateCompressionMetricCollectionEnabled(pushJobSetting, false));
        pushJobSetting.isIncrementalPush = false;
        Assert.assertTrue(VenicePushJob.evaluateCompressionMetricCollectionEnabled(pushJobSetting, true));
        Assert.assertFalse(VenicePushJob.evaluateCompressionMetricCollectionEnabled(pushJobSetting, false));
    }

    @Test
    public void testTargetedRegionPushConfigValidation() throws Exception {
        Properties vpjRequiredProperties = getVpjRequiredProperties();
        vpjRequiredProperties.put("targeted.region.push.enabled", false);
        vpjRequiredProperties.put("targeted.region.push.list", "dc-0");
        try {
            VenicePushJob venicePushJob = new VenicePushJob(PUSH_JOB_ID, vpjRequiredProperties);
            try {
                Assert.fail("Test should fail, but doesn't.");
                venicePushJob.close();
            } finally {
            }
        } catch (VeniceException e) {
            Assert.assertEquals(e.getMessage(), "Targeted region push list is only supported when targeted region push is enabled");
        }
        vpjRequiredProperties.put("targeted.region.push.enabled", true);
        vpjRequiredProperties.remove("targeted.region.push.list");
        vpjRequiredProperties.put("incremental.push", true);
        VenicePushJob spyVenicePushJob = getSpyVenicePushJob(vpjRequiredProperties, getClient());
        skipVPJValidation(spyVenicePushJob);
        try {
            spyVenicePushJob.run();
            Assert.fail("Test should fail, but doesn't.");
        } catch (VeniceException e2) {
            Assert.assertEquals(e2.getMessage(), "Incremental push is not supported while using targeted region push mode");
        }
    }

    @Test
    public void testTargetedRegionPushConfigOverride() throws Exception {
        Properties vpjRequiredProperties = getVpjRequiredProperties();
        vpjRequiredProperties.put("targeted.region.push.enabled", true);
        ControllerClient client = getClient(storeInfo -> {
            storeInfo.setNativeReplicationSourceFabric("");
        });
        VenicePushJob spyVenicePushJob = getSpyVenicePushJob(vpjRequiredProperties, client);
        mockJobStatusQuery(client);
        skipVPJValidation(spyVenicePushJob);
        try {
            spyVenicePushJob.run();
            Assert.fail("Test should fail, but doesn't.");
        } catch (VeniceException e) {
            Assert.assertTrue(e.getMessage().contains("The store either does not have native replication mode enabled or set up default source fabric."));
        }
        vpjRequiredProperties.put("targeted.region.push.list", "dc-0, dc-1");
        ControllerClient client2 = getClient();
        VenicePushJob spyVenicePushJob2 = getSpyVenicePushJob(vpjRequiredProperties, client2);
        mockJobStatusQuery(client2);
        skipVPJValidation(spyVenicePushJob2);
        spyVenicePushJob2.run();
        Assert.assertEquals(spyVenicePushJob2.pushJobSetting.targetedRegions, "dc-0, dc-1");
    }

    @Test
    public void testTargetedRegionPushReporting() throws Exception {
        Properties vpjRequiredProperties = getVpjRequiredProperties();
        vpjRequiredProperties.put("targeted.region.push.enabled", true);
        vpjRequiredProperties.put("targeted.region.push.list", "dc-0, dc-1");
        ControllerClient client = getClient();
        VenicePushJob spyVenicePushJob = getSpyVenicePushJob(vpjRequiredProperties, client);
        skipVPJValidation(spyVenicePushJob);
        JobStatusQueryResponse jobStatusQueryResponse = new JobStatusQueryResponse();
        jobStatusQueryResponse.setStatus(ExecutionStatus.COMPLETED.toString());
        jobStatusQueryResponse.setStatusDetails("nothing");
        jobStatusQueryResponse.setVersion(REPUSH_VERSION);
        jobStatusQueryResponse.setName(TEST_STORE);
        jobStatusQueryResponse.setCluster(TEST_CLUSTER);
        HashMap hashMap = new HashMap();
        hashMap.put("dc-0", ExecutionStatus.COMPLETED.toString());
        hashMap.put("dc-1", ExecutionStatus.COMPLETED.toString());
        jobStatusQueryResponse.setExtraInfo(hashMap);
        ((ControllerClient) Mockito.doReturn(jobStatusQueryResponse).when(client)).queryOverallJobStatus(ArgumentMatchers.anyString(), (Optional) ArgumentMatchers.any(), ArgumentMatchers.anyString());
        spyVenicePushJob.run();
        hashMap.put("dc-0", ExecutionStatus.NOT_STARTED.toString());
        try {
            spyVenicePushJob.run();
            Assert.fail("Test should fail, but doesn't.");
        } catch (VeniceException e) {
            Assert.assertTrue(e.getMessage().contains("Push job error"));
        }
    }

    private void mockJobStatusQuery(ControllerClient controllerClient) {
        JobStatusQueryResponse jobStatusQueryResponse = new JobStatusQueryResponse();
        jobStatusQueryResponse.setStatus(ExecutionStatus.COMPLETED.toString());
        jobStatusQueryResponse.setStatusDetails("nothing");
        jobStatusQueryResponse.setVersion(REPUSH_VERSION);
        jobStatusQueryResponse.setName(TEST_STORE);
        jobStatusQueryResponse.setCluster(TEST_CLUSTER);
        HashMap hashMap = new HashMap();
        hashMap.put("dc-0", ExecutionStatus.COMPLETED.toString());
        hashMap.put("dc-1", ExecutionStatus.COMPLETED.toString());
        jobStatusQueryResponse.setExtraInfo(hashMap);
        ((ControllerClient) Mockito.doReturn(jobStatusQueryResponse).when(controllerClient)).queryOverallJobStatus(ArgumentMatchers.anyString(), (Optional) ArgumentMatchers.any(), ArgumentMatchers.anyString());
    }

    private void skipVPJValidation(VenicePushJob venicePushJob) throws Exception {
        ((VenicePushJob) Mockito.doReturn(getMockInputDataInfoProvider()).when(venicePushJob)).getInputDataInfoProvider();
        ((VenicePushJob) Mockito.doNothing().when(venicePushJob)).validateKeySchema((ControllerClient) ArgumentMatchers.any(), (VenicePushJob.PushJobSetting) ArgumentMatchers.any(), (PushJobSchemaInfo) ArgumentMatchers.any(), (VenicePushJob.StoreSetting) ArgumentMatchers.any());
        ((VenicePushJob) Mockito.doNothing().when(venicePushJob)).validateValueSchema((ControllerClient) ArgumentMatchers.any(), (VenicePushJob.PushJobSetting) ArgumentMatchers.any(), (PushJobSchemaInfo) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean());
        ((VenicePushJob) Mockito.doNothing().when(venicePushJob)).setupMRConf((JobConf) ArgumentMatchers.any(), (VenicePushJob.TopicInfo) ArgumentMatchers.any(), (VenicePushJob.PushJobSetting) ArgumentMatchers.any(), (PushJobSchemaInfo) ArgumentMatchers.any(), (VenicePushJob.StoreSetting) ArgumentMatchers.any(), (VeniceProperties) ArgumentMatchers.any(), ArgumentMatchers.anyString(), ArgumentMatchers.anyString());
        ((VenicePushJob) Mockito.doNothing().when(venicePushJob)).runJobAndUpdateStatus();
    }
}
