package com.linkedin.venice.hadoop;

import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponse;
import com.linkedin.venice.controllerapi.JobStatusQueryResponse;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.controllerapi.StorageEngineOverheadRatioResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.InputDataInfoProvider;
import com.linkedin.venice.hadoop.MRJobCounterHelper;
import com.linkedin.venice.hadoop.VenicePushJob;
import com.linkedin.venice.hadoop.output.avro.ValidateSchemaAndBuildDictMapperOutput;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.status.protocol.PushJobDetails;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.writer.VeniceWriter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/hadoop/TestVenicePushJobCheckpoints.class */
public class TestVenicePushJobCheckpoints {
    private static final int PARTITION_COUNT = 10;
    private static final int NUMBER_OF_FILES_TO_READ_AND_BUILD_DICT_COUNT = 1;
    private static final String TEST_CLUSTER_NAME = "some-cluster";
    private static final String SCHEMA_STR = "{  \"namespace\" : \"example.avro\",    \"type\": \"record\",     \"name\": \"User\",       \"fields\": [                  { \"name\": \"id\", \"type\": \"string\" },         { \"name\": \"name\", \"type\": \"string\" },         { \"name\": \"age\", \"type\": \"int\" },         { \"name\": \"company\", \"type\": \"string\" }    ]  } ";
    private static final String SIMPLE_FILE_SCHEMA_STR = "{\n    \"type\": \"record\",\n    \"name\": \"Type1\",\n    \"fields\": [\n        {\n            \"name\": \"something\",\n            \"type\": \"string\"\n        }\n    ]\n}";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/hadoop/TestVenicePushJobCheckpoints$MockCounterInfo.class */
    public static class MockCounterInfo {
        private final MRJobCounterHelper.GroupAndCounterNames groupAndCounterNames;
        private final long counterValue;

        MockCounterInfo(MRJobCounterHelper.GroupAndCounterNames groupAndCounterNames, long j) {
            this.groupAndCounterNames = groupAndCounterNames;
            this.counterValue = j;
        }

        String getGroupName() {
            return this.groupAndCounterNames.getGroupName();
        }

        String getCounterName() {
            return this.groupAndCounterNames.getCounterName();
        }

        long getCounterValue() {
            return this.counterValue;
        }
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = "Storage quota exceeded.*")
    public void testHandleQuotaExceeded() throws Exception {
        testHandleErrorsInCounter(Arrays.asList(new MockCounterInfo(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME, 1001L), new MockCounterInfo(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME, 10L)), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.NEW_VERSION_CREATED, VenicePushJob.PushJobCheckpoints.QUOTA_EXCEEDED), properties -> {
            properties.setProperty("use.mapper.to.build.dictionary", "false");
            properties.setProperty("compression.metric.collection.enabled", "false");
        });
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = "Storage quota exceeded.*")
    public void testHandleQuotaExceededWithMapperToBuildDict() throws Exception {
        testHandleErrorsInCounter(Arrays.asList(new MockCounterInfo(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME, 1001L), new MockCounterInfo(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME, 10L), new MockCounterInfo(MRJobCounterHelper.MAPPER_NUM_RECORDS_SUCCESSFULLY_PROCESSED_GROUP_COUNTER_NAME, 2L)), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB_COMPLETED, VenicePushJob.PushJobCheckpoints.NEW_VERSION_CREATED, VenicePushJob.PushJobCheckpoints.QUOTA_EXCEEDED), properties -> {
            properties.setProperty("use.mapper.to.build.dictionary", "true");
            properties.setProperty("compression.metric.collection.enabled", "false");
        });
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = "Storage quota exceeded.*")
    public void testHandleQuotaExceededWithCompressionCollectionEnabled() throws Exception {
        testHandleErrorsInCounter(Arrays.asList(new MockCounterInfo(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME, 1001L), new MockCounterInfo(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME, 10L), new MockCounterInfo(MRJobCounterHelper.MAPPER_NUM_RECORDS_SUCCESSFULLY_PROCESSED_GROUP_COUNTER_NAME, 2L), new MockCounterInfo(MRJobCounterHelper.MAPPER_ZSTD_DICT_TRAIN_SUCCESS_GROUP_COUNTER_NAME, 1L)), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB_COMPLETED, VenicePushJob.PushJobCheckpoints.NEW_VERSION_CREATED, VenicePushJob.PushJobCheckpoints.QUOTA_EXCEEDED), properties -> {
            properties.setProperty("use.mapper.to.build.dictionary", "false");
            properties.setProperty("compression.metric.collection.enabled", "true");
        });
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = "Storage quota exceeded.*")
    public void testHandleQuotaExceededWithCompressionCollectionEnabledV1() throws Exception {
        testHandleErrorsInCounter(Arrays.asList(new MockCounterInfo(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME, 1001L), new MockCounterInfo(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME, 10L), new MockCounterInfo(MRJobCounterHelper.MAPPER_NUM_RECORDS_SUCCESSFULLY_PROCESSED_GROUP_COUNTER_NAME, 2L), new MockCounterInfo(MRJobCounterHelper.MAPPER_ZSTD_DICT_TRAIN_SUCCESS_GROUP_COUNTER_NAME, 1L)), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB_COMPLETED, VenicePushJob.PushJobCheckpoints.NEW_VERSION_CREATED, VenicePushJob.PushJobCheckpoints.QUOTA_EXCEEDED), properties -> {
            properties.setProperty("use.mapper.to.build.dictionary", "true");
            properties.setProperty("compression.metric.collection.enabled", "true");
        });
    }

    @Test
    public void testWithNoMapperToBuildDictionary() throws Exception {
        testHandleErrorsInCounter(Arrays.asList(new MockCounterInfo(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME, 10L)), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.NEW_VERSION_CREATED, VenicePushJob.PushJobCheckpoints.MAP_REDUCE_JOB_COMPLETED, VenicePushJob.PushJobCheckpoints.JOB_STATUS_POLLING_COMPLETED), properties -> {
            properties.setProperty("use.mapper.to.build.dictionary", "false");
            properties.setProperty("compression.metric.collection.enabled", "false");
        });
    }

    @Test
    public void testWithMapperToBuildDictionary() throws Exception {
        testHandleErrorsInCounter(Arrays.asList(new MockCounterInfo(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME, 10L), new MockCounterInfo(MRJobCounterHelper.MAPPER_NUM_RECORDS_SUCCESSFULLY_PROCESSED_GROUP_COUNTER_NAME, 2L)), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB_COMPLETED, VenicePushJob.PushJobCheckpoints.NEW_VERSION_CREATED, VenicePushJob.PushJobCheckpoints.MAP_REDUCE_JOB_COMPLETED, VenicePushJob.PushJobCheckpoints.JOB_STATUS_POLLING_COMPLETED), properties -> {
            properties.setProperty("use.mapper.to.build.dictionary", "true");
            properties.setProperty("compression.metric.collection.enabled", "false");
        });
    }

    @Test
    public void testWithCompressionCollectionDisabled() throws Exception {
        testHandleErrorsInCounter(Arrays.asList(new MockCounterInfo(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME, 10L)), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.NEW_VERSION_CREATED, VenicePushJob.PushJobCheckpoints.MAP_REDUCE_JOB_COMPLETED, VenicePushJob.PushJobCheckpoints.JOB_STATUS_POLLING_COMPLETED), properties -> {
            properties.setProperty("use.mapper.to.build.dictionary", "false");
            properties.setProperty("compression.metric.collection.enabled", "false");
        });
    }

    @Test
    public void testWithCompressionCollectionEnabled() throws Exception {
        testHandleErrorsInCounter(Arrays.asList(new MockCounterInfo(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME, 10L), new MockCounterInfo(MRJobCounterHelper.MAPPER_NUM_RECORDS_SUCCESSFULLY_PROCESSED_GROUP_COUNTER_NAME, 2L), new MockCounterInfo(MRJobCounterHelper.MAPPER_ZSTD_DICT_TRAIN_SUCCESS_GROUP_COUNTER_NAME, 1L)), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB_COMPLETED, VenicePushJob.PushJobCheckpoints.NEW_VERSION_CREATED, VenicePushJob.PushJobCheckpoints.MAP_REDUCE_JOB_COMPLETED, VenicePushJob.PushJobCheckpoints.JOB_STATUS_POLLING_COMPLETED), properties -> {
            properties.setProperty("use.mapper.to.build.dictionary", "false");
            properties.setProperty("compression.metric.collection.enabled", "true");
        });
    }

    @Test
    public void testWithCompressionCollectionEnabledV1() throws Exception {
        testHandleErrorsInCounter(Arrays.asList(new MockCounterInfo(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME, 10L), new MockCounterInfo(MRJobCounterHelper.MAPPER_NUM_RECORDS_SUCCESSFULLY_PROCESSED_GROUP_COUNTER_NAME, 2L), new MockCounterInfo(MRJobCounterHelper.MAPPER_ZSTD_DICT_TRAIN_SUCCESS_GROUP_COUNTER_NAME, 1L)), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB_COMPLETED, VenicePushJob.PushJobCheckpoints.NEW_VERSION_CREATED, VenicePushJob.PushJobCheckpoints.MAP_REDUCE_JOB_COMPLETED, VenicePushJob.PushJobCheckpoints.JOB_STATUS_POLLING_COMPLETED), properties -> {
            properties.setProperty("use.mapper.to.build.dictionary", "true");
            properties.setProperty("compression.metric.collection.enabled", "true");
        });
    }

    @Test
    public void testHandlingFailureWithCompressionCollectionEnabled() throws Exception {
        testHandleErrorsInCounter(Arrays.asList(new MockCounterInfo(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME, 10L), new MockCounterInfo(MRJobCounterHelper.MAPPER_NUM_RECORDS_SUCCESSFULLY_PROCESSED_GROUP_COUNTER_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.MAPPER_ZSTD_DICT_TRAIN_FAILURE_GROUP_COUNTER_NAME, 1L)), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB_COMPLETED, VenicePushJob.PushJobCheckpoints.NEW_VERSION_CREATED, VenicePushJob.PushJobCheckpoints.MAP_REDUCE_JOB_COMPLETED, VenicePushJob.PushJobCheckpoints.JOB_STATUS_POLLING_COMPLETED), properties -> {
            properties.setProperty("use.mapper.to.build.dictionary", "true");
            properties.setProperty("compression.metric.collection.enabled", "true");
        });
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = "Training ZSTD compression dictionary failed.*")
    public void testHandlingFailureWithCompressionCollectionEnabledAndZstdCompression() throws Exception {
        testHandleErrorsInCounter(Arrays.asList(new MockCounterInfo(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME, 10L), new MockCounterInfo(MRJobCounterHelper.MAPPER_NUM_RECORDS_SUCCESSFULLY_PROCESSED_GROUP_COUNTER_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.MAPPER_ZSTD_DICT_TRAIN_FAILURE_GROUP_COUNTER_NAME, 1L)), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.ZSTD_DICTIONARY_CREATION_FAILED), properties -> {
            properties.setProperty("use.mapper.to.build.dictionary", "true");
            properties.setProperty("compression.metric.collection.enabled", "true");
            properties.setProperty("compression.strategy", CompressionStrategy.ZSTD_WITH_DICT.toString());
        });
    }

    @Test
    public void testHandlingSkippedWithCompressionCollectionEnabled() throws Exception {
        testHandleErrorsInCounter(Arrays.asList(new MockCounterInfo(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME, 10L), new MockCounterInfo(MRJobCounterHelper.MAPPER_NUM_RECORDS_SUCCESSFULLY_PROCESSED_GROUP_COUNTER_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.MAPPER_ZSTD_DICT_TRAIN_SKIPPED_GROUP_COUNTER_NAME, 1L)), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB_COMPLETED, VenicePushJob.PushJobCheckpoints.NEW_VERSION_CREATED, VenicePushJob.PushJobCheckpoints.MAP_REDUCE_JOB_COMPLETED, VenicePushJob.PushJobCheckpoints.JOB_STATUS_POLLING_COMPLETED), properties -> {
            properties.setProperty("use.mapper.to.build.dictionary", "true");
            properties.setProperty("compression.metric.collection.enabled", "true");
        });
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = "Training ZSTD compression dictionary skipped.*")
    public void testHandlingSkippedWithCompressionCollectionEnabledAndZstdCompression() throws Exception {
        testHandleErrorsInCounter(Arrays.asList(new MockCounterInfo(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME, 10L), new MockCounterInfo(MRJobCounterHelper.MAPPER_NUM_RECORDS_SUCCESSFULLY_PROCESSED_GROUP_COUNTER_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.MAPPER_ZSTD_DICT_TRAIN_SKIPPED_GROUP_COUNTER_NAME, 1L)), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.ZSTD_DICTIONARY_CREATION_FAILED), properties -> {
            properties.setProperty("use.mapper.to.build.dictionary", "true");
            properties.setProperty("compression.metric.collection.enabled", "true");
            properties.setProperty("compression.strategy", CompressionStrategy.ZSTD_WITH_DICT.toString());
        });
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = "Insufficient ACLs to write to the store")
    public void testHandleWriteAclFailed() throws Exception {
        testHandleErrorsInCounter(Arrays.asList(new MockCounterInfo(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME, 10L)), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.NEW_VERSION_CREATED, VenicePushJob.PushJobCheckpoints.WRITE_ACL_FAILED), properties -> {
            properties.setProperty("compression.metric.collection.enabled", "false");
            properties.setProperty("use.mapper.to.build.dictionary", "false");
        });
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = "Input data has at least 1 keys that appear more than once.*")
    public void testHandleDuplicatedKeyWithDistinctValue() throws Exception {
        testHandleErrorsInCounter(Arrays.asList(new MockCounterInfo(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME, 10L)), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.NEW_VERSION_CREATED, VenicePushJob.PushJobCheckpoints.DUP_KEY_WITH_DIFF_VALUE), properties -> {
            properties.setProperty("compression.metric.collection.enabled", "false");
            properties.setProperty("use.mapper.to.build.dictionary", "false");
        });
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*reducer job closed count \\(0\\).*")
    public void testHandleZeroClosedReducersFailure() throws Exception {
        testHandleErrorsInCounter(Arrays.asList(new MockCounterInfo(MRJobCounterHelper.MAPPER_SPRAY_ALL_PARTITIONS_TRIGGERED_COUNT_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME, 0L)), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.NEW_VERSION_CREATED, VenicePushJob.PushJobCheckpoints.START_MAP_REDUCE_JOB), 10L, properties -> {
            properties.setProperty("compression.metric.collection.enabled", "false");
            properties.setProperty("use.mapper.to.build.dictionary", "false");
        });
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = "MR job counter is not reliable.*")
    public void testUnreliableMapReduceCounter() throws Exception {
        testHandleErrorsInCounter(Arrays.asList(new MockCounterInfo(MRJobCounterHelper.MAPPER_SPRAY_ALL_PARTITIONS_TRIGGERED_COUNT_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME, 0L)), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.NEW_VERSION_CREATED, VenicePushJob.PushJobCheckpoints.START_MAP_REDUCE_JOB), 10L, NUMBER_OF_FILES_TO_READ_AND_BUILD_DICT_COUNT, true, properties -> {
            properties.setProperty("compression.metric.collection.enabled", "false");
            properties.setProperty("use.mapper.to.build.dictionary", "false");
        });
    }

    @Test
    public void testHandleZeroClosedReducersWithNoRecordInputDataFile() throws Exception {
        testHandleErrorsInCounter(Arrays.asList(new MockCounterInfo(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME, 0L)), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.NEW_VERSION_CREATED, VenicePushJob.PushJobCheckpoints.MAP_REDUCE_JOB_COMPLETED, VenicePushJob.PushJobCheckpoints.JOB_STATUS_POLLING_COMPLETED), 10L, NUMBER_OF_FILES_TO_READ_AND_BUILD_DICT_COUNT, false, properties -> {
            properties.setProperty("compression.metric.collection.enabled", "false");
            properties.setProperty("use.mapper.to.build.dictionary", "false");
        });
    }

    @Test(expectedExceptions = {IllegalArgumentException.class}, expectedExceptionsMessageRegExp = "The input data file size is expected to be positive. Got: 0")
    public void testInitInputDataInfoWithIllegalSize() {
        new InputDataInfoProvider.InputDataInfo(new PushJobSchemaInfo(), 0L, NUMBER_OF_FILES_TO_READ_AND_BUILD_DICT_COUNT, false, System.currentTimeMillis());
    }

    @Test(expectedExceptions = {IllegalArgumentException.class}, expectedExceptionsMessageRegExp = "The Number of Input files is expected to be positive. Got: 0")
    public void testInitInputDataInfoWithIllegalNumInputFiles() {
        new InputDataInfoProvider.InputDataInfo(new PushJobSchemaInfo(), 10L, 0, false, System.currentTimeMillis());
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*reducer job closed count \\(9\\).*")
    public void testHandleInsufficientClosedReducersFailure() throws Exception {
        testHandleErrorsInCounter(Arrays.asList(new MockCounterInfo(MRJobCounterHelper.MAPPER_SPRAY_ALL_PARTITIONS_TRIGGERED_COUNT_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME, 9L)), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.NEW_VERSION_CREATED, VenicePushJob.PushJobCheckpoints.START_MAP_REDUCE_JOB), properties -> {
            properties.setProperty("compression.metric.collection.enabled", "false");
            properties.setProperty("use.mapper.to.build.dictionary", "false");
        });
    }

    @Test
    public void testCounterValidationWhenSprayAllPartitionsNotTriggeredButWithMismatchedReducerCount() throws Exception {
        testHandleErrorsInCounter(Arrays.asList(new MockCounterInfo(MRJobCounterHelper.MAPPER_SPRAY_ALL_PARTITIONS_TRIGGERED_COUNT_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME, 9L)), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.NEW_VERSION_CREATED, VenicePushJob.PushJobCheckpoints.MAP_REDUCE_JOB_COMPLETED, VenicePushJob.PushJobCheckpoints.JOB_STATUS_POLLING_COMPLETED), properties -> {
            properties.setProperty("compression.metric.collection.enabled", "false");
            properties.setProperty("use.mapper.to.build.dictionary", "false");
        });
    }

    @Test
    public void testHandleNoErrorInCounters() throws Exception {
        testHandleErrorsInCounter(Arrays.asList(new MockCounterInfo(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME, 1L), new MockCounterInfo(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME, 0L), new MockCounterInfo(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME, 10L)), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.NEW_VERSION_CREATED, VenicePushJob.PushJobCheckpoints.MAP_REDUCE_JOB_COMPLETED, VenicePushJob.PushJobCheckpoints.JOB_STATUS_POLLING_COMPLETED), properties -> {
            properties.setProperty("compression.metric.collection.enabled", "false");
            properties.setProperty("use.mapper.to.build.dictionary", "false");
        });
    }

    @Test(expectedExceptions = {VeniceException.class}, dataProvider = "Two-True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testHandleMRFailureAndDatasetChange(boolean z, boolean z2) throws Exception {
        JobClientWrapper jobClientWrapper = (JobClientWrapper) Mockito.mock(JobClientWrapper.class);
        Mockito.when(jobClientWrapper.runJobWithConfig((JobConf) Mockito.any())).thenThrow(new Throwable[]{new IOException("Job failed!")});
        InputDataInfoProvider inputDataInfoProviderMock = getInputDataInfoProviderMock(10L, NUMBER_OF_FILES_TO_READ_AND_BUILD_DICT_COUNT, true);
        Mockito.when(Long.valueOf(inputDataInfoProviderMock.getInputLastModificationTime(Mockito.anyString()))).thenReturn(Long.valueOf(System.currentTimeMillis() + 10));
        HashMap hashMap = new HashMap();
        hashMap.put(new Pair(true, true), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.DATASET_CHANGED));
        hashMap.put(new Pair(false, true), (List) hashMap.get(new Pair(true, true)));
        hashMap.put(new Pair(true, false), (List) hashMap.get(new Pair(true, true)));
        hashMap.put(new Pair(false, false), Arrays.asList(VenicePushJob.PushJobCheckpoints.INITIALIZE_PUSH_JOB, VenicePushJob.PushJobCheckpoints.NEW_VERSION_CREATED, VenicePushJob.PushJobCheckpoints.DATASET_CHANGED));
        runJobAndAssertCheckpoints(jobClientWrapper, inputDataInfoProviderMock, properties -> {
            properties.setProperty("compression.metric.collection.enabled", String.valueOf(z));
            properties.setProperty("use.mapper.to.build.dictionary", String.valueOf(z2));
        }, (List) hashMap.get(new Pair(Boolean.valueOf(z2), Boolean.valueOf(z))));
    }

    private void testHandleErrorsInCounter(List<MockCounterInfo> list, List<VenicePushJob.PushJobCheckpoints> list2, Consumer<Properties> consumer) throws Exception {
        testHandleErrorsInCounter(list, list2, 10L, consumer);
    }

    private void testHandleErrorsInCounter(List<MockCounterInfo> list, List<VenicePushJob.PushJobCheckpoints> list2, long j, Consumer<Properties> consumer) throws Exception {
        testHandleErrorsInCounter(list, list2, j, NUMBER_OF_FILES_TO_READ_AND_BUILD_DICT_COUNT, j > 0, consumer);
    }

    private void testHandleErrorsInCounter(List<MockCounterInfo> list, List<VenicePushJob.PushJobCheckpoints> list2, long j, int i, boolean z, Consumer<Properties> consumer) throws Exception {
        runJobAndAssertCheckpoints(createJobClientWrapperMock(list), getInputDataInfoProviderMock(j, i, z), consumer, list2);
    }

    private void runJobAndAssertCheckpoints(JobClientWrapper jobClientWrapper, InputDataInfoProvider inputDataInfoProvider, Consumer<Properties> consumer, List<VenicePushJob.PushJobCheckpoints> list) {
        Properties vPJProps = getVPJProps();
        if (consumer != null) {
            consumer.accept(vPJProps);
        }
        ControllerClient controllerClient = (ControllerClient) Mockito.mock(ControllerClient.class);
        configureControllerClientMock(controllerClient, vPJProps);
        configureClusterDiscoverControllerClient(controllerClient);
        VenicePushJob venicePushJob = new VenicePushJob("job-id", vPJProps);
        try {
            venicePushJob.setControllerClient(controllerClient);
            venicePushJob.setKmeSchemaSystemStoreControllerClient(controllerClient);
            venicePushJob.setJobClientWrapper(jobClientWrapper);
            venicePushJob.setInputDataInfoProvider(inputDataInfoProvider);
            venicePushJob.setVeniceWriter(createVeniceWriterMock());
            SentPushJobDetailsTrackerImpl sentPushJobDetailsTrackerImpl = new SentPushJobDetailsTrackerImpl();
            venicePushJob.setSentPushJobDetailsTracker(sentPushJobDetailsTrackerImpl);
            try {
                venicePushJob.setValidateSchemaAndBuildDictMapperOutputReader(getValidateSchemaAndBuildDictMapperOutputReaderMock());
            } catch (Exception e) {
                e.printStackTrace();
            }
            try {
                venicePushJob.run();
                ArrayList arrayList = new ArrayList(sentPushJobDetailsTrackerImpl.getRecordedPushJobDetails().size());
                Iterator<PushJobDetails> it = sentPushJobDetailsTrackerImpl.getRecordedPushJobDetails().iterator();
                while (it.hasNext()) {
                    arrayList.add(it.next().pushJobLatestCheckpoint);
                }
                Assert.assertEquals(arrayList, (List) list.stream().map((v0) -> {
                    return v0.getValue();
                }).collect(Collectors.toList()));
                venicePushJob.close();
            } catch (Throwable th) {
                ArrayList arrayList2 = new ArrayList(sentPushJobDetailsTrackerImpl.getRecordedPushJobDetails().size());
                Iterator<PushJobDetails> it2 = sentPushJobDetailsTrackerImpl.getRecordedPushJobDetails().iterator();
                while (it2.hasNext()) {
                    arrayList2.add(it2.next().pushJobLatestCheckpoint);
                }
                Assert.assertEquals(arrayList2, (List) list.stream().map((v0) -> {
                    return v0.getValue();
                }).collect(Collectors.toList()));
                throw th;
            }
        } catch (Throwable th2) {
            try {
                venicePushJob.close();
            } catch (Throwable th3) {
                th2.addSuppressed(th3);
            }
            throw th2;
        }
    }

    private Properties getVPJProps() {
        Properties properties = new Properties();
        properties.setProperty("multi.region", "false");
        properties.setProperty("source.grid.fabric", "child_region");
        properties.setProperty("d2.zk.hosts.child_region", "child.zk.com:1234");
        properties.put("venice.discover.urls", "venice-urls");
        properties.put("venice.store.name", "store-name");
        properties.put("input.path", "input-path");
        properties.put("key.field", "id");
        properties.put("value.field", "name");
        properties.put("venice.writer.close.timeout.ms", 500);
        properties.put("poll.job.status.interval.ms", 1000);
        properties.setProperty("ssl.key.store.property.name", "test");
        properties.setProperty("ssl.trust.store.property.name", "test");
        properties.setProperty("ssl.key.store.password.property.name", "test");
        properties.setProperty("ssl.key.password.property.name", "test");
        properties.setProperty("push.job.status.upload.enable", "true");
        return properties;
    }

    private ValidateSchemaAndBuildDictMapperOutputReader getValidateSchemaAndBuildDictMapperOutputReaderMock() {
        ValidateSchemaAndBuildDictMapperOutputReader validateSchemaAndBuildDictMapperOutputReader = (ValidateSchemaAndBuildDictMapperOutputReader) Mockito.mock(ValidateSchemaAndBuildDictMapperOutputReader.class);
        Mockito.when(validateSchemaAndBuildDictMapperOutputReader.getOutput()).thenReturn(new ValidateSchemaAndBuildDictMapperOutput(10L, ByteBuffer.wrap("Test".getBytes())));
        return validateSchemaAndBuildDictMapperOutputReader;
    }

    private InputDataInfoProvider getInputDataInfoProviderMock(long j, int i, boolean z) throws Exception {
        InputDataInfoProvider inputDataInfoProvider = (InputDataInfoProvider) Mockito.mock(InputDataInfoProvider.class);
        PushJobSchemaInfo pushJobSchemaInfo = new PushJobSchemaInfo();
        pushJobSchemaInfo.setKeySchemaString(SCHEMA_STR);
        pushJobSchemaInfo.setValueSchemaString(SCHEMA_STR);
        pushJobSchemaInfo.setKeyField("key-field");
        pushJobSchemaInfo.setValueField("value-field");
        pushJobSchemaInfo.setFileSchemaString(SIMPLE_FILE_SCHEMA_STR);
        Mockito.when(inputDataInfoProvider.validateInputAndGetInfo(Mockito.anyString())).thenReturn(new InputDataInfoProvider.InputDataInfo(pushJobSchemaInfo, j, i, z, System.currentTimeMillis()));
        return inputDataInfoProvider;
    }

    private JobStatusQueryResponse createJobStatusQueryResponseMock() {
        JobStatusQueryResponse jobStatusQueryResponse = (JobStatusQueryResponse) Mockito.mock(JobStatusQueryResponse.class);
        Mockito.when(Boolean.valueOf(jobStatusQueryResponse.isError())).thenReturn(false);
        Mockito.when(jobStatusQueryResponse.getExtraInfo()).thenReturn(Collections.emptyMap());
        Mockito.when(jobStatusQueryResponse.getOptionalStatusDetails()).thenReturn(Optional.empty());
        Mockito.when(jobStatusQueryResponse.getOptionalExtraDetails()).thenReturn(Optional.empty());
        Mockito.when(jobStatusQueryResponse.getStatus()).thenReturn(ExecutionStatus.COMPLETED.toString());
        return jobStatusQueryResponse;
    }

    private void configureControllerClientMock(ControllerClient controllerClient, Properties properties) {
        StoreResponse storeResponse = (StoreResponse) Mockito.mock(StoreResponse.class);
        Mockito.when(controllerClient.getClusterName()).thenReturn(TEST_CLUSTER_NAME);
        StoreInfo storeInfo = (StoreInfo) Mockito.mock(StoreInfo.class);
        Mockito.when(controllerClient.getValueSchema(Mockito.anyString(), Mockito.anyInt())).thenReturn((SchemaResponse) Mockito.mock(SchemaResponse.class));
        StorageEngineOverheadRatioResponse storageEngineOverheadRatioResponse = (StorageEngineOverheadRatioResponse) Mockito.mock(StorageEngineOverheadRatioResponse.class);
        Mockito.when(Boolean.valueOf(storageEngineOverheadRatioResponse.isError())).thenReturn(false);
        Mockito.when(Double.valueOf(storageEngineOverheadRatioResponse.getStorageEngineOverheadRatio())).thenReturn(Double.valueOf(1.0d));
        Mockito.when(Long.valueOf(storeInfo.getStorageQuotaInByte())).thenReturn(1000L);
        Mockito.when(Boolean.valueOf(storeInfo.isSchemaAutoRegisterFromPushJobEnabled())).thenReturn(false);
        Mockito.when(storeResponse.getStore()).thenReturn(storeInfo);
        Mockito.when(storeInfo.getCompressionStrategy()).thenReturn(CompressionStrategy.valueOf(properties.getProperty("compression.strategy", CompressionStrategy.NO_OP.toString())));
        SchemaResponse schemaResponse = (SchemaResponse) Mockito.mock(SchemaResponse.class);
        Mockito.when(Boolean.valueOf(schemaResponse.isError())).thenReturn(false);
        Mockito.when(schemaResponse.getSchemaStr()).thenReturn(SCHEMA_STR);
        SchemaResponse schemaResponse2 = (SchemaResponse) Mockito.mock(SchemaResponse.class);
        Mockito.when(Boolean.valueOf(schemaResponse2.isError())).thenReturn(false);
        Mockito.when(Integer.valueOf(schemaResponse2.getId())).thenReturn(12345);
        VersionCreationResponse createVersionCreationResponse = createVersionCreationResponse();
        Mockito.when(controllerClient.getStore(Mockito.anyString())).thenReturn(storeResponse);
        Mockito.when(controllerClient.getStorageEngineOverheadRatio(Mockito.anyString())).thenReturn(storageEngineOverheadRatioResponse);
        Mockito.when(controllerClient.getKeySchema(Mockito.anyString())).thenReturn(schemaResponse);
        Mockito.when(controllerClient.getValueSchemaID(Mockito.anyString(), Mockito.anyString())).thenReturn(schemaResponse2);
        Mockito.when(controllerClient.requestTopicForWrites(Mockito.anyString(), Mockito.anyLong(), (Version.PushType) Mockito.any(), Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean(), (Optional) Mockito.any(), (Optional) Mockito.any(), (Optional) Mockito.any(), Mockito.anyBoolean(), Mockito.anyLong(), Mockito.anyBoolean(), (String) Mockito.any())).thenReturn(createVersionCreationResponse);
        Mockito.when(controllerClient.queryOverallJobStatus(Mockito.anyString(), (Optional) Mockito.any(), (String) Mockito.any())).thenReturn(createJobStatusQueryResponseMock());
        ControllerResponse controllerResponse = (ControllerResponse) Mockito.mock(ControllerResponse.class);
        Mockito.when(Boolean.valueOf(controllerResponse.isError())).thenReturn(false);
        Mockito.when(controllerClient.sendPushJobDetails(Mockito.anyString(), Mockito.anyInt(), (byte[]) Mockito.any(byte[].class))).thenReturn(controllerResponse);
    }

    private void configureClusterDiscoverControllerClient(ControllerClient controllerClient) {
        D2ServiceDiscoveryResponse d2ServiceDiscoveryResponse = (D2ServiceDiscoveryResponse) Mockito.mock(D2ServiceDiscoveryResponse.class);
        Mockito.when(Boolean.valueOf(d2ServiceDiscoveryResponse.isError())).thenReturn(false);
        Mockito.when(d2ServiceDiscoveryResponse.getCluster()).thenReturn(TEST_CLUSTER_NAME);
        Mockito.when(controllerClient.discoverCluster(Mockito.anyString())).thenReturn(d2ServiceDiscoveryResponse);
    }

    private VersionCreationResponse createVersionCreationResponse() {
        VersionCreationResponse versionCreationResponse = (VersionCreationResponse) Mockito.mock(VersionCreationResponse.class);
        Mockito.when(Boolean.valueOf(versionCreationResponse.isError())).thenReturn(false);
        Mockito.when(versionCreationResponse.getKafkaTopic()).thenReturn("kafka-topic");
        Mockito.when(Integer.valueOf(versionCreationResponse.getVersion())).thenReturn(Integer.valueOf(NUMBER_OF_FILES_TO_READ_AND_BUILD_DICT_COUNT));
        Mockito.when(versionCreationResponse.getKafkaBootstrapServers()).thenReturn("kafka-bootstrap-server");
        Mockito.when(Integer.valueOf(versionCreationResponse.getPartitions())).thenReturn(Integer.valueOf(PARTITION_COUNT));
        Mockito.when(Boolean.valueOf(versionCreationResponse.isEnableSSL())).thenReturn(false);
        Mockito.when(versionCreationResponse.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP);
        Mockito.when(Boolean.valueOf(versionCreationResponse.isDaVinciPushStatusStoreEnabled())).thenReturn(false);
        Mockito.when(versionCreationResponse.getPartitionerClass()).thenReturn("PartitionerClass");
        Mockito.when(versionCreationResponse.getPartitionerParams()).thenReturn(Collections.emptyMap());
        return versionCreationResponse;
    }

    private JobClientWrapper createJobClientWrapperMock(List<MockCounterInfo> list) throws IOException {
        RunningJob runningJob = (RunningJob) Mockito.mock(RunningJob.class);
        Counters counters = (Counters) Mockito.mock(Counters.class);
        HashMap hashMap = new HashMap();
        for (MockCounterInfo mockCounterInfo : list) {
            Counters.Group group = (Counters.Group) hashMap.computeIfAbsent(mockCounterInfo.getGroupName(), str -> {
                return (Counters.Group) Mockito.mock(Counters.Group.class);
            });
            Mockito.when(Long.valueOf(group.getCounter(mockCounterInfo.getCounterName()))).thenReturn(Long.valueOf(mockCounterInfo.getCounterValue()));
            Mockito.when(counters.getGroup(mockCounterInfo.getGroupName())).thenReturn(group);
        }
        Mockito.when(runningJob.getCounters()).thenReturn(counters);
        JobID jobID = (JobID) Mockito.mock(JobID.class);
        Mockito.when(jobID.toString()).thenReturn("temp");
        Mockito.when(runningJob.getID()).thenReturn(jobID);
        JobClientWrapper jobClientWrapper = (JobClientWrapper) Mockito.mock(JobClientWrapper.class);
        Mockito.when(jobClientWrapper.runJobWithConfig((JobConf) Mockito.any())).thenReturn(runningJob);
        return jobClientWrapper;
    }

    private VeniceWriter<KafkaKey, byte[], byte[]> createVeniceWriterMock() {
        return (VeniceWriter) Mockito.mock(VeniceWriter.class);
    }
}
