package com.linkedin.venice.hadoop;

import com.github.luben.zstd.Zstd;
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.controllerapi.D2ControllerClientFactory;
import com.linkedin.venice.controllerapi.JobStatusQueryResponse;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.controllerapi.RepushInfoResponse;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.etl.ETLValueSchemaTransformation;
import com.linkedin.venice.exceptions.ErrorType;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.heartbeat.DefaultPushJobHeartbeatSenderFactory;
import com.linkedin.venice.hadoop.heartbeat.NoOpPushJobHeartbeatSender;
import com.linkedin.venice.hadoop.heartbeat.NoOpPushJobHeartbeatSenderFactory;
import com.linkedin.venice.hadoop.heartbeat.PushJobHeartbeatSender;
import com.linkedin.venice.hadoop.heartbeat.PushJobHeartbeatSenderFactory;
import com.linkedin.venice.hadoop.input.kafka.KafkaInputDictTrainer;
import com.linkedin.venice.hadoop.input.kafka.KafkaInputFormat;
import com.linkedin.venice.hadoop.input.kafka.KafkaInputFormatCombiner;
import com.linkedin.venice.hadoop.input.kafka.KafkaInputKeyComparator;
import com.linkedin.venice.hadoop.input.kafka.KafkaInputMRPartitioner;
import com.linkedin.venice.hadoop.input.kafka.KafkaInputRecordReader;
import com.linkedin.venice.hadoop.input.kafka.KafkaInputValueGroupingComparator;
import com.linkedin.venice.hadoop.input.kafka.VeniceKafkaInputMapper;
import com.linkedin.venice.hadoop.input.kafka.VeniceKafkaInputReducer;
import com.linkedin.venice.hadoop.input.kafka.ttl.TTLResolutionPolicy;
import com.linkedin.venice.hadoop.output.avro.ValidateSchemaAndBuildDictMapperOutput;
import com.linkedin.venice.hadoop.ssl.TempFileSSLConfigurator;
import com.linkedin.venice.hadoop.utils.VPJSSLUtils;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.BufferReplayPolicy;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.schema.AvroSchemaParseUtils;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.status.BatchJobHeartbeatConfigs;
import com.linkedin.venice.status.PushJobDetailsStatus;
import com.linkedin.venice.status.protocol.PushJobDetails;
import com.linkedin.venice.status.protocol.PushJobDetailsStatusTuple;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.DictionaryUtils;
import com.linkedin.venice.utils.EncodingUtils;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.lazy.Lazy;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.Closeable;
import java.io.FileReader;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.mapred.AvroInputFormat;
import org.apache.avro.mapred.AvroJob;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/hadoop/VenicePushJob.class */
public class VenicePushJob implements AutoCloseable {
    public static final String LEGACY_AVRO_KEY_FIELD_PROP = "avro.key.field";
    public static final String LEGACY_AVRO_VALUE_FIELD_PROP = "avro.value.field";
    public static final String KEY_FIELD_PROP = "key.field";
    public static final String VALUE_FIELD_PROP = "value.field";
    public static final String DEFAULT_KEY_FIELD_PROP = "key";
    public static final String DEFAULT_VALUE_FIELD_PROP = "value";
    public static final boolean DEFAULT_SSL_ENABLED = false;
    public static final String SCHEMA_STRING_PROP = "schema";
    public static final String KAFKA_SOURCE_KEY_SCHEMA_STRING_PROP = "kafka.source.key.schema";
    public static final String EXTENDED_SCHEMA_VALIDITY_CHECK_ENABLED = "extended.schema.validity.check.enabled";
    public static final boolean DEFAULT_EXTENDED_SCHEMA_VALIDITY_CHECK_ENABLED = true;
    public static final String FILE_KEY_SCHEMA = "key.schema";
    public static final String FILE_VALUE_SCHEMA = "value.schema";
    public static final String INCREMENTAL_PUSH = "incremental.push";
    public static final String ALLOW_DUPLICATE_KEY = "allow.duplicate.key";
    public static final String POLL_STATUS_RETRY_ATTEMPTS = "poll.status.retry.attempts";
    public static final String CONTROLLER_REQUEST_RETRY_ATTEMPTS = "controller.request.retry.attempts";
    public static final String POLL_JOB_STATUS_INTERVAL_MS = "poll.job.status.interval.ms";
    public static final String JOB_STATUS_IN_UNKNOWN_STATE_TIMEOUT_MS = "job.status.in.unknown.state.timeout.ms";
    public static final String SEND_CONTROL_MESSAGES_DIRECTLY = "send.control.messages.directly";
    public static final String SOURCE_ETL = "source.etl";
    public static final String ETL_VALUE_SCHEMA_TRANSFORMATION = "etl.value.schema.transformation";
    public static final String COMPRESSION_METRIC_COLLECTION_ENABLED = "compression.metric.collection.enabled";
    public static final boolean DEFAULT_COMPRESSION_METRIC_COLLECTION_ENABLED = false;
    public static final String USE_MAPPER_TO_BUILD_DICTIONARY = "use.mapper.to.build.dictionary";
    public static final boolean DEFAULT_USE_MAPPER_TO_BUILD_DICTIONARY = false;
    public static final String ZSTD_DICTIONARY_CREATION_REQUIRED = "zstd.dictionary.creation.required";
    public static final String ZSTD_DICTIONARY_CREATION_SUCCESS = "zstd.dictionary.creation.success";
    public static final String VALIDATE_SCHEMA_AND_BUILD_DICT_MAPPER_OUTPUT_DIRECTORY = "validate.schema.and.build.dict.mapper.output.directory";
    public static final String MAPPER_OUTPUT_DIRECTORY = "mapper.output.directory";
    protected static final String VALIDATE_SCHEMA_AND_BUILD_DICTIONARY_MAPPER_OUTPUT_PARENT_DIR_DEFAULT = "/tmp/veniceMapperOutput";
    private static final String VALIDATE_SCHEMA_AND_BUILD_DICTIONARY_MAPPER_OUTPUT_FILE_PREFIX = "mapper-output-";
    private static final String VALIDATE_SCHEMA_AND_BUILD_DICTIONARY_MAPPER_OUTPUT_FILE_EXTENSION = ".avro";
    public static final String KEY_ZSTD_COMPRESSION_DICTIONARY = "zstdDictionary";
    public static final String KEY_INPUT_FILE_DATA_SIZE = "inputFileDataSize";
    public static final String SOURCE_KAFKA = "source.kafka";
    public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
    public static final String KAFKA_INPUT_FABRIC = "kafka.input.fabric";
    public static final String KAFKA_INPUT_BROKER_URL = "kafka.input.broker.url";
    public static final String KAFKA_INPUT_MAX_RECORDS_PER_MAPPER = "kafka.input.max.records.per.mapper";
    public static final String KAFKA_INPUT_COMBINER_ENABLED = "kafka.input.combiner.enabled";
    public static final String KAFKA_INPUT_COMPRESSION_BUILD_NEW_DICT_ENABLED = "kafka.input.compression.build.new.dict.enabled";
    public static final String KAFKA_INPUT_SOURCE_TOPIC_CHUNKING_ENABLED = "kafka.input.source.topic.chunking.enabled";
    public static final String REWIND_TIME_IN_SECONDS_OVERRIDE = "rewind.time.in.seconds.override";
    public static final String REWIND_EPOCH_TIME_IN_SECONDS_OVERRIDE = "rewind.epoch.time.in.seconds.override";
    public static final String SUPPRESS_END_OF_PUSH_MESSAGE = "suppress.end.of.push.message";
    public static final String DEFER_VERSION_SWAP = "defer.version.swap";
    public static final String D2_ZK_HOSTS_PREFIX = "d2.zk.hosts.";
    public static final String PARENT_CONTROLLER_REGION_NAME = "parent.controller.region.name";
    public static final String REWIND_EPOCH_TIME_BUFFER_IN_SECONDS_OVERRIDE = "rewind.epoch.time.buffer.in.seconds.override";
    public static final String MULTI_REGION = "multi.region";
    public static final String VENICE_DISCOVER_URL_PROP = "venice.discover.urls";
    public static final String SOURCE_GRID_FABRIC = "source.grid.fabric";
    public static final String ENABLE_WRITE_COMPUTE = "venice.write.compute.enable";
    public static final String ENABLE_PUSH = "venice.push.enable";
    public static final String ENABLE_SSL = "venice.ssl.enable";
    public static final String VENICE_STORE_NAME_PROP = "venice.store.name";
    public static final String INPUT_PATH_PROP = "input.path";
    public static final String INPUT_PATH_LAST_MODIFIED_TIME = "input.path.last.modified.time";
    public static final String BATCH_NUM_BYTES_PROP = "batch.num.bytes";
    public static final String VALUE_SCHEMA_ID_PROP = "value.schema.id";
    public static final String DERIVED_SCHEMA_ID_PROP = "derived.schema.id";
    public static final String TOPIC_PROP = "venice.kafka.topic";
    protected static final String HADOOP_PREFIX = "hadoop-conf.";
    protected static final String HADOOP_VALIDATE_SCHEMA_AND_BUILD_DICT_PREFIX = "hadoop-dict-build-conf.";
    public static final String SSL_PREFIX = "ssl";
    public static final String STORAGE_QUOTA_PROP = "storage.quota";
    public static final String STORAGE_ENGINE_OVERHEAD_RATIO = "storage_engine_overhead_ratio";

    @Deprecated
    public static final String VSON_PUSH = "vson.push";
    public static final String KAFKA_SECURITY_PROTOCOL = "SSL";
    public static final String COMPRESSION_STRATEGY = "compression.strategy";
    public static final String KAFKA_INPUT_SOURCE_COMPRESSION_STRATEGY = "kafka.input.source.compression.strategy";
    public static final String SSL_CONFIGURATOR_CLASS_CONFIG = "ssl.configurator.class";
    public static final String SSL_KEY_STORE_PROPERTY_NAME = "ssl.key.store.property.name";
    public static final String SSL_TRUST_STORE_PROPERTY_NAME = "ssl.trust.store.property.name";
    public static final String SSL_KEY_STORE_PASSWORD_PROPERTY_NAME = "ssl.key.store.password.property.name";
    public static final String SSL_KEY_PASSWORD_PROPERTY_NAME = "ssl.key.password.property.name";
    public static final String JOB_EXEC_URL = "job.execution.url";
    public static final String JOB_EXEC_ID = "job.execution.id";
    public static final String JOB_SERVER_NAME = "job.server.name";
    public static final String PUSH_JOB_STATUS_UPLOAD_ENABLE = "push.job.status.upload.enable";
    public static final String REDUCER_SPECULATIVE_EXECUTION_ENABLE = "reducer.speculative.execution.enable";
    public static final String TELEMETRY_MESSAGE_INTERVAL = "telemetry.message.interval";
    public static final String ZSTD_COMPRESSION_LEVEL = "zstd.compression.level";
    public static final int DEFAULT_BATCH_BYTES_SIZE = 1000000;
    public static final boolean SORTED = true;
    public static final long DEFAULT_RE_PUSH_REWIND_IN_SECONDS_OVERRIDE = 86400;
    public static final String REPUSH_TTL_ENABLE = "repush.ttl.enable";
    public static final String REPUSH_TTL_IN_SECONDS = "repush.ttl.seconds";
    public static final String REPUSH_TTL_POLICY = "repush.ttl.policy";
    public static final String RMD_SCHEMA_DIR = "rmd.schema.dir";
    private static final String TEMP_DIR_PREFIX = "/tmp/veniceRmdSchemas/";
    public static final int NOT_SET = -1;
    public static final long INPUT_DATA_SIZE_FACTOR = 2;
    protected static final boolean DEFAULT_IS_DUPLICATED_KEY_ALLOWED = false;
    private static final int UNCREATED_VERSION_NUMBER = -1;
    private static final long DEFAULT_POLL_STATUS_INTERVAL_MS = 300000;
    private static final long DEFAULT_JOB_STATUS_IN_UNKNOWN_STATE_TIMEOUT_MS = 1800000;
    private static final String NON_CRITICAL_EXCEPTION = "This exception does not fail the push job. ";
    protected final VeniceProperties props;
    private final String jobId;
    private final Lazy<Properties> sslProperties;
    private VeniceWriter<KafkaKey, byte[], byte[]> veniceWriter;
    private ControllerClient controllerClient;
    private ControllerClient kmeSchemaSystemStoreControllerClient;
    private ControllerClient livenessHeartbeatStoreControllerClient;
    private RunningJob runningJob;
    protected InputDataInfoProvider inputDataInfoProvider;
    private ValidateSchemaAndBuildDictMapperOutputReader validateSchemaAndBuildDictMapperOutputReader;
    private long inputFileDataSize;
    private String inputDirectory;
    private boolean inputFileHasRecords;
    private long inputModificationTime;
    private long inputNumFiles;
    private long jobStartTimeMs;
    private Properties veniceWriterProperties;
    private JobClientWrapper jobClientWrapper;
    private SentPushJobDetailsTracker sentPushJobDetailsTracker;
    private PushJobSchemaInfo pushJobSchemaInfo;
    private ValidateSchemaAndBuildDictMapperOutput validateSchemaAndBuildDictMapperOutput;
    private String validateSchemaAndBuildDictMapperOutputDirectory;
    protected PushJobSetting pushJobSetting;
    private TopicInfo kafkaTopicInfo;
    private final PushJobDetails pushJobDetails;
    protected StoreSetting storeSetting;
    private InputStorageQuotaTracker inputStorageQuotaTracker;
    private PushJobHeartbeatSenderFactory pushJobHeartbeatSenderFactory;
    private static final Logger LOGGER = LogManager.getLogger(VenicePushJob.class);
    public static final PathFilter PATH_FILTER = path -> {
        return (path.getName().startsWith("_") || path.getName().startsWith(".")) ? false : true;
    };
    protected JobConf validateSchemaAndBuildDictJobConf = new JobConf();
    protected JobConf jobConf = new JobConf();
    private Class<? extends Partitioner> mapRedPartitionerClass = VeniceMRPartitioner.class;
    private boolean isZstdDictCreationRequired = false;
    private boolean isZstdDictCreationSuccess = false;
    private final InternalAvroSpecificSerializer<PushJobDetails> pushJobDetailsSerializer = AvroProtocolDefinition.PUSH_JOB_DETAILS.getSerializer();
    private boolean pushJobStatusUploadDisabledHasBeenLogged = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/hadoop/VenicePushJob$ErrorMessage.class */
    public static class ErrorMessage {
        private final String errorMessage;

        ErrorMessage(String str) {
            this.errorMessage = str;
        }

        String getErrorMessage() {
            return this.errorMessage;
        }
    }

    /* loaded from: input_file:com/linkedin/venice/hadoop/VenicePushJob$PushJobCheckpoints.class */
    public enum PushJobCheckpoints {
        INITIALIZE_PUSH_JOB(0),
        NEW_VERSION_CREATED(1),
        START_MAP_REDUCE_JOB(2),
        MAP_REDUCE_JOB_COMPLETED(3),
        START_JOB_STATUS_POLLING(4),
        JOB_STATUS_POLLING_COMPLETED(5),
        START_VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB(6),
        VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB_COMPLETED(7),
        QUOTA_EXCEEDED(-1),
        WRITE_ACL_FAILED(-2),
        DUP_KEY_WITH_DIFF_VALUE(-3),
        FILE_SCHEMA_VALIDATION_FAILED(-4),
        EXTENDED_FILE_SCHEMA_VALIDATION_FAILED(-5),
        RECORD_TOO_LARGE_FAILED(-6),
        CONCURRENT_BATCH_PUSH(-7),
        DATASET_CHANGED(-8),
        INVALID_INPUT_FILE(-9),
        ZSTD_DICTIONARY_CREATION_FAILED(-10);

        private final int value;

        PushJobCheckpoints(int i) {
            this.value = i;
        }

        public int getValue() {
            return this.value;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/linkedin/venice/hadoop/VenicePushJob$PushJobSetting.class */
    public static class PushJobSetting {
        boolean enablePush;
        String veniceControllerUrl;
        String storeName;
        String clusterName;
        String sourceGridFabric;
        int batchNumBytes;
        boolean isIncrementalPush;
        Optional<String> incrementalPushVersion = Optional.empty();
        boolean isDuplicateKeyAllowed;
        boolean enablePushJobStatusUpload;
        boolean enableReducerSpeculativeExecution;
        int controllerRetries;
        int controllerStatusPollRetries;
        long pollJobStatusIntervalMs;
        long jobStatusInUnknownStateTimeoutMs;
        boolean sendControlMessagesDirectly;
        boolean isSourceETL;
        boolean enableWriteCompute;
        ETLValueSchemaTransformation etlValueSchemaTransformation;
        boolean isSourceKafka;
        String kafkaInputBrokerUrl;
        String kafkaInputTopic;
        RepushInfoResponse repushInfoResponse;
        long rewindTimeInSecondsOverride;
        boolean kafkaInputCombinerEnabled;
        boolean kafkaInputBuildNewDictEnabled;
        BufferReplayPolicy validateRemoteReplayPolicy;
        boolean suppressEndOfPushMessage;
        boolean deferVersionSwap;
        boolean extendedSchemaValidityCheckEnabled;
        boolean compressionMetricCollectionEnabled;
        boolean useMapperToBuildDict;
        String useMapperToBuildDictOutputPath;
        boolean repushTTLEnabled;
        long repushTTLInSeconds;
        String rmdSchemaDir;
        String controllerD2ServiceName;
        String parentControllerRegionD2ZkHosts;
        String childControllerRegionD2ZkHosts;
        boolean livenessHeartbeatEnabled;
        String livenessHeartbeatStoreName;
        boolean multiRegion;
        boolean d2Routing;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/linkedin/venice/hadoop/VenicePushJob$StoreSetting.class */
    public static class StoreSetting {
        boolean isChunkingEnabled;
        boolean isRmdChunkingEnabled;
        long storeStorageQuota;
        boolean isSchemaAutoRegisterFromPushJobEnabled;
        CompressionStrategy compressionStrategy;
        boolean isWriteComputeEnabled;
        boolean isIncrementalPushEnabled;
        Version sourceKafkaInputVersionInfo;
        long storeRewindTimeInSeconds;
        Schema keySchema;
        HybridStoreConfig hybridStoreConfig;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/linkedin/venice/hadoop/VenicePushJob$TopicInfo.class */
    public static class TopicInfo {
        String topic;
        int version;
        int partitionCount;
        String kafkaUrl;
        boolean sslToKafka;
        CompressionStrategy compressionStrategy;
        String partitionerClass;
        Map<String, String> partitionerParams;
        int amplificationFactor;
        boolean chunkingEnabled;
        boolean rmdChunkingEnabled;

        protected TopicInfo() {
        }
    }

    public VenicePushJob(String str, Properties properties) {
        this.jobId = str;
        this.props = getVenicePropsFromVanillaProps((Properties) Objects.requireNonNull(properties, "VPJ props cannot be null"));
        if (isSslEnabled()) {
            VPJSSLUtils.validateSslProperties(properties);
        }
        this.sslProperties = Lazy.of(() -> {
            try {
                return VPJSSLUtils.getSslProperties(this.props);
            } catch (IOException e) {
                throw new VeniceException("Could not get user credential");
            }
        });
        LOGGER.info("Constructing {}: {}", VenicePushJob.class.getSimpleName(), this.props.toString(true));
        this.pushJobSetting = getPushJobSetting(this.props);
        LOGGER.info("Going to use controller URL: {}  to discover cluster.", this.pushJobSetting.veniceControllerUrl);
        this.pushJobDetails = new PushJobDetails();
        if (this.pushJobSetting.livenessHeartbeatEnabled) {
            LOGGER.info("Push job heartbeat is enabled.");
            this.pushJobHeartbeatSenderFactory = new DefaultPushJobHeartbeatSenderFactory();
        } else {
            LOGGER.info("Push job heartbeat is NOT enabled.");
            this.pushJobHeartbeatSenderFactory = new NoOpPushJobHeartbeatSenderFactory();
        }
    }

    PushJobSetting getPushJobSetting() {
        return this.pushJobSetting;
    }

    VeniceProperties getVeniceProperties() {
        return this.props;
    }

    private VeniceProperties getVenicePropsFromVanillaProps(Properties properties) {
        handleLegacyConfig(properties, LEGACY_AVRO_KEY_FIELD_PROP, "key.field", "key field");
        handleLegacyConfig(properties, LEGACY_AVRO_VALUE_FIELD_PROP, "value.field", "value field");
        return new VeniceProperties(properties);
    }

    private void handleLegacyConfig(Properties properties, String str, String str2, String str3) {
        String property = properties.getProperty(str);
        if (property != null) {
            String property2 = properties.getProperty(str2);
            if (property2 == null) {
                properties.setProperty(str2, property);
            } else if (!property2.equals(property)) {
                throw new VeniceException("Duplicate " + str3 + " config found! Both " + str + " and " + str2 + " are set, but with different values! Use only: " + str2);
            }
        }
    }

    private boolean isSslEnabled() {
        return this.props.getBoolean(ENABLE_SSL, false);
    }

    private PushJobSetting getPushJobSetting(VeniceProperties veniceProperties) {
        PushJobSetting pushJobSetting = new PushJobSetting();
        pushJobSetting.veniceControllerUrl = veniceProperties.getString(VENICE_DISCOVER_URL_PROP);
        pushJobSetting.enablePush = veniceProperties.getBoolean(ENABLE_PUSH, true);
        if (veniceProperties.containsKey(SOURCE_GRID_FABRIC)) {
            pushJobSetting.sourceGridFabric = veniceProperties.getString(SOURCE_GRID_FABRIC);
        }
        pushJobSetting.batchNumBytes = veniceProperties.getInt(BATCH_NUM_BYTES_PROP, DEFAULT_BATCH_BYTES_SIZE);
        pushJobSetting.isIncrementalPush = veniceProperties.getBoolean(INCREMENTAL_PUSH, false);
        pushJobSetting.isDuplicateKeyAllowed = veniceProperties.getBoolean(ALLOW_DUPLICATE_KEY, false);
        pushJobSetting.enablePushJobStatusUpload = veniceProperties.getBoolean(PUSH_JOB_STATUS_UPLOAD_ENABLE, false);
        pushJobSetting.enableReducerSpeculativeExecution = veniceProperties.getBoolean(REDUCER_SPECULATIVE_EXECUTION_ENABLE, false);
        pushJobSetting.controllerRetries = veniceProperties.getInt(CONTROLLER_REQUEST_RETRY_ATTEMPTS, 1);
        pushJobSetting.controllerStatusPollRetries = veniceProperties.getInt(POLL_STATUS_RETRY_ATTEMPTS, 15);
        pushJobSetting.pollJobStatusIntervalMs = veniceProperties.getLong(POLL_JOB_STATUS_INTERVAL_MS, DEFAULT_POLL_STATUS_INTERVAL_MS);
        pushJobSetting.jobStatusInUnknownStateTimeoutMs = veniceProperties.getLong(JOB_STATUS_IN_UNKNOWN_STATE_TIMEOUT_MS, DEFAULT_JOB_STATUS_IN_UNKNOWN_STATE_TIMEOUT_MS);
        pushJobSetting.sendControlMessagesDirectly = veniceProperties.getBoolean(SEND_CONTROL_MESSAGES_DIRECTLY, false);
        pushJobSetting.enableWriteCompute = veniceProperties.getBoolean(ENABLE_WRITE_COMPUTE, false);
        pushJobSetting.isSourceETL = veniceProperties.getBoolean(SOURCE_ETL, false);
        pushJobSetting.isSourceKafka = veniceProperties.getBoolean(SOURCE_KAFKA, false);
        pushJobSetting.kafkaInputCombinerEnabled = veniceProperties.getBoolean(KAFKA_INPUT_COMBINER_ENABLED, false);
        pushJobSetting.kafkaInputBuildNewDictEnabled = veniceProperties.getBoolean(KAFKA_INPUT_COMPRESSION_BUILD_NEW_DICT_ENABLED, true);
        pushJobSetting.suppressEndOfPushMessage = veniceProperties.getBoolean(SUPPRESS_END_OF_PUSH_MESSAGE, false);
        pushJobSetting.deferVersionSwap = veniceProperties.getBoolean(DEFER_VERSION_SWAP, false);
        pushJobSetting.repushTTLEnabled = veniceProperties.getBoolean(REPUSH_TTL_ENABLE, false);
        pushJobSetting.repushTTLInSeconds = -1L;
        if (pushJobSetting.repushTTLEnabled && !pushJobSetting.isSourceKafka) {
            throw new VeniceException("Repush with TTL is only supported while using Kafka Input Format");
        }
        if (pushJobSetting.veniceControllerUrl.startsWith("d2://")) {
            pushJobSetting.d2Routing = true;
            pushJobSetting.controllerD2ServiceName = pushJobSetting.veniceControllerUrl.substring("d2://".length());
            pushJobSetting.multiRegion = veniceProperties.getBoolean(MULTI_REGION);
            if (pushJobSetting.multiRegion) {
                pushJobSetting.parentControllerRegionD2ZkHosts = veniceProperties.getString(D2_ZK_HOSTS_PREFIX + veniceProperties.getString(PARENT_CONTROLLER_REGION_NAME));
            } else {
                pushJobSetting.childControllerRegionD2ZkHosts = veniceProperties.getString(D2_ZK_HOSTS_PREFIX + pushJobSetting.sourceGridFabric);
            }
        } else {
            pushJobSetting.d2Routing = false;
            pushJobSetting.controllerD2ServiceName = null;
            pushJobSetting.multiRegion = veniceProperties.getBoolean(MULTI_REGION, false);
            pushJobSetting.parentControllerRegionD2ZkHosts = null;
            pushJobSetting.childControllerRegionD2ZkHosts = null;
        }
        pushJobSetting.livenessHeartbeatEnabled = veniceProperties.getBoolean(BatchJobHeartbeatConfigs.HEARTBEAT_ENABLED_CONFIG.getConfigName(), false);
        pushJobSetting.livenessHeartbeatStoreName = AvroProtocolDefinition.BATCH_JOB_HEARTBEAT.getSystemStoreName();
        if (pushJobSetting.isSourceKafka) {
            pushJobSetting.isDuplicateKeyAllowed = true;
            if (pushJobSetting.isIncrementalPush) {
                throw new VeniceException("Incremental push is not supported while using Kafka Input Format");
            }
            if (pushJobSetting.isSourceETL) {
                throw new VeniceException("Source ETL is not supported while using Kafka Input Format");
            }
        }
        pushJobSetting.storeName = veniceProperties.getString(VENICE_STORE_NAME_PROP);
        pushJobSetting.rewindTimeInSecondsOverride = veniceProperties.getLong(REWIND_TIME_IN_SECONDS_OVERRIDE, -1L);
        if (pushJobSetting.rewindTimeInSecondsOverride == -1) {
            long j = veniceProperties.getLong(REWIND_EPOCH_TIME_IN_SECONDS_OVERRIDE, -1L);
            if (j != -1) {
                long currentTimeMillis = System.currentTimeMillis() / 1000;
                if (j > currentTimeMillis) {
                    throw new VeniceException(String.format("Provided {} for {}. {} cannot be a timestamp in the future!! ", Long.valueOf(j), REWIND_EPOCH_TIME_IN_SECONDS_OVERRIDE, REWIND_EPOCH_TIME_IN_SECONDS_OVERRIDE));
                }
                pushJobSetting.rewindTimeInSecondsOverride = (currentTimeMillis - j) + veniceProperties.getLong(REWIND_EPOCH_TIME_BUFFER_IN_SECONDS_OVERRIDE, 60L);
                pushJobSetting.validateRemoteReplayPolicy = BufferReplayPolicy.REWIND_FROM_SOP;
            }
        }
        pushJobSetting.extendedSchemaValidityCheckEnabled = veniceProperties.getBoolean(EXTENDED_SCHEMA_VALIDITY_CHECK_ENABLED, true);
        pushJobSetting.compressionMetricCollectionEnabled = veniceProperties.getBoolean(COMPRESSION_METRIC_COLLECTION_ENABLED, false);
        pushJobSetting.useMapperToBuildDict = veniceProperties.getBoolean(USE_MAPPER_TO_BUILD_DICTIONARY, false);
        if (pushJobSetting.compressionMetricCollectionEnabled && !pushJobSetting.useMapperToBuildDict) {
            LOGGER.warn("Force enabling \"{}\" to support \"{}\"", USE_MAPPER_TO_BUILD_DICTIONARY, COMPRESSION_METRIC_COLLECTION_ENABLED);
            pushJobSetting.useMapperToBuildDict = true;
        }
        if (pushJobSetting.useMapperToBuildDict) {
            pushJobSetting.useMapperToBuildDictOutputPath = veniceProperties.getString(MAPPER_OUTPUT_DIRECTORY, VALIDATE_SCHEMA_AND_BUILD_DICTIONARY_MAPPER_OUTPUT_PARENT_DIR_DEFAULT);
        }
        return pushJobSetting;
    }

    private String getSourceTopicNameForKafkaInput(String str, VeniceProperties veniceProperties) {
        Optional ofNullable = Optional.ofNullable(veniceProperties.getString(KAFKA_INPUT_TOPIC, () -> {
            return null;
        }));
        if (ofNullable.isPresent()) {
            return getUserProvidedTopicName(str, (String) ofNullable.get(), this.pushJobSetting.controllerRetries);
        }
        Optional ofNullable2 = Optional.ofNullable(veniceProperties.getString(KAFKA_INPUT_FABRIC, () -> {
            return null;
        }));
        this.pushJobSetting.repushInfoResponse = ControllerClient.retryableRequest(this.controllerClient, this.pushJobSetting.controllerRetries, controllerClient -> {
            return controllerClient.getRepushInfo(str, ofNullable2);
        });
        if (this.pushJobSetting.repushInfoResponse.isError()) {
            throw new VeniceException("Could not get repush info for store " + str + " with error: " + this.pushJobSetting.repushInfoResponse.getError());
        }
        return Version.composeKafkaTopic(str, this.pushJobSetting.repushInfoResponse.getRepushInfo().getVersion().getNumber());
    }

    private String getUserProvidedTopicName(String str, String str2, int i) {
        String parseStoreFromKafkaTopicName = Version.parseStoreFromKafkaTopicName(str2);
        if (!Objects.equals(parseStoreFromKafkaTopicName, str)) {
            throw new IllegalArgumentException(String.format("Store user-provided name mismatch with the derived store name. Got user-provided store name %s and derived store name %s", str, parseStoreFromKafkaTopicName));
        }
        LOGGER.info("userProvidedStoreName: {}", str);
        StoreResponse storeResponse = (StoreResponse) ControllerClient.retryableRequest(this.controllerClient, i, controllerClient -> {
            return controllerClient.getStore(str);
        });
        if (storeResponse.isError()) {
            throw new VeniceException(String.format("Fail to get store information for store %s with error %s", str, storeResponse.getError()));
        }
        Map<String, Integer> currentStoreVersions = getCurrentStoreVersions(storeResponse);
        if (new HashSet(currentStoreVersions.values()).size() > 1) {
            LOGGER.info("Got current topic version mismatch across multiple colos {}. Use user-provided topic name: {}", currentStoreVersions, str2);
            return str2;
        }
        Integer num = null;
        Iterator<Integer> it = currentStoreVersions.values().iterator();
        while (it.hasNext()) {
            num = it.next();
        }
        String composeKafkaTopic = Version.composeKafkaTopic(str, num.intValue());
        if (Objects.equals(composeKafkaTopic, str2)) {
            return composeKafkaTopic;
        }
        throw new IllegalStateException(String.format("Mismatch between user-provided topic name and auto discovered topic name. They are %s and %s respectively", str2, composeKafkaTopic));
    }

    protected void setControllerClient(ControllerClient controllerClient) {
        this.controllerClient = controllerClient;
    }

    public void setJobClientWrapper(JobClientWrapper jobClientWrapper) {
        this.jobClientWrapper = jobClientWrapper;
    }

    protected void setInputDataInfoProvider(InputDataInfoProvider inputDataInfoProvider) {
        this.inputDataInfoProvider = inputDataInfoProvider;
    }

    protected void setVeniceWriter(VeniceWriter<KafkaKey, byte[], byte[]> veniceWriter) {
        this.veniceWriter = veniceWriter;
    }

    protected void setSentPushJobDetailsTracker(SentPushJobDetailsTracker sentPushJobDetailsTracker) {
        this.sentPushJobDetailsTracker = sentPushJobDetailsTracker;
    }

    protected void setMapRedPartitionerClass(Class<? extends Partitioner> cls) {
        this.mapRedPartitionerClass = cls;
    }

    protected void setValidateSchemaAndBuildDictMapperOutputReader(ValidateSchemaAndBuildDictMapperOutputReader validateSchemaAndBuildDictMapperOutputReader) throws Exception {
        this.validateSchemaAndBuildDictMapperOutputReader = validateSchemaAndBuildDictMapperOutputReader;
    }

    /* JADX WARN: Finally extract failed */
    /* JADX WARN: Removed duplicated region for block: B:104:0x0701  */
    /* JADX WARN: Removed duplicated region for block: B:107:0x0716  */
    /* JADX WARN: Removed duplicated region for block: B:109:? A[RETURN, SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void run() {
        /*
            Method dump skipped, instructions count: 1884
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.linkedin.venice.hadoop.VenicePushJob.run():void");
    }

    private PushJobHeartbeatSender createPushJobHeartbeatSender(boolean z) {
        try {
            return this.pushJobHeartbeatSenderFactory.createHeartbeatSender(this.kafkaTopicInfo.kafkaUrl, this.props, this.livenessHeartbeatStoreControllerClient, z ? Optional.of((Properties) this.sslProperties.get()) : Optional.empty());
        } catch (Exception e) {
            LOGGER.warn("Failed to create a push job heartbeat sender. Use the no-op push job heartbeat sender.", e);
            this.pushJobDetails.sendLivenessHeartbeatFailureDetails = e.getMessage();
            return new NoOpPushJobHeartbeatSender();
        }
    }

    private void updatePushJobDetailsWithLivenessHeartbeatException(PushJobHeartbeatSender pushJobHeartbeatSender) {
        if (pushJobHeartbeatSender == null || this.pushJobDetails == null || this.pushJobDetails.sendLivenessHeartbeatFailureDetails != null) {
            return;
        }
        pushJobHeartbeatSender.getFirstSendHeartbeatException().ifPresent(exc -> {
            this.pushJobDetails.sendLivenessHeartbeatFailureDetails = exc.getMessage();
        });
    }

    private void validateFileSchema(String str) {
        try {
            AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(str);
        } catch (Exception e) {
            if (this.pushJobSetting.extendedSchemaValidityCheckEnabled) {
                LOGGER.error("The schema of the input data failed strict Avro schema validation. Verify if the schema is a valid Avro schema.");
                updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.EXTENDED_FILE_SCHEMA_VALIDATION_FAILED);
                throw new VeniceException(e);
            }
            LOGGER.info("The schema of the input data failed strict Avro schema validation. Trying loose schema validation.");
            try {
                AvroSchemaParseUtils.parseSchemaFromJSONLooseValidation(str);
            } catch (Exception e2) {
                LOGGER.error("The schema of the input data failed loose Avro schema validation. Verify if the schema is a valid Avro schema.");
                updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.FILE_SCHEMA_VALIDATION_FAILED);
                throw new VeniceException(e2);
            }
        }
    }

    private void runJobAndUpdateStatus() throws IOException {
        updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.START_MAP_REDUCE_JOB);
        this.runningJob = runJobWithConfig(this.jobConf);
        validateCountersAfterPush();
        Optional<ErrorMessage> updatePushJobDetailsWithMRDetails = updatePushJobDetailsWithMRDetails();
        if (updatePushJobDetailsWithMRDetails.isPresent()) {
            throw new VeniceException(updatePushJobDetailsWithMRDetails.get().getErrorMessage());
        }
        updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.MAP_REDUCE_JOB_COMPLETED);
    }

    private void validateCountersAfterPush() throws IOException {
        long reducerClosedCount = MRJobCounterHelper.getReducerClosedCount(this.runningJob.getCounters());
        if (!this.inputFileHasRecords) {
            verifyCountersWithZeroValues();
            return;
        }
        long j = 0;
        if (this.pushJobSetting.isSourceKafka) {
            j = MRJobCounterHelper.getTotalPutOrDeleteRecordsCount(this.runningJob.getCounters());
            LOGGER.info("Source kafka input topic : {} has {} records", this.pushJobSetting.kafkaInputTopic, Long.valueOf(j));
            if (this.pushJobSetting.repushTTLEnabled) {
                LOGGER.info("Repush with ttl filtered out {} records", Long.valueOf(MRJobCounterHelper.getRepushTtlFilterCount(this.runningJob.getCounters())));
            }
        }
        if (reducerClosedCount < this.kafkaTopicInfo.partitionCount) {
            if (this.pushJobSetting.isSourceKafka && j == 0) {
                return;
            }
            if (MRJobCounterHelper.getMapperSprayAllPartitionsTriggeredCount(this.runningJob.getCounters()) != 0) {
                throw new VeniceException(String.format("MR job counter is not reliable since the reducer job closed count (%d) < the partition count (%d), while the input file data size is %d byte(s)", Long.valueOf(reducerClosedCount), Integer.valueOf(this.kafkaTopicInfo.partitionCount), Long.valueOf(this.inputFileDataSize)));
            }
            LOGGER.warn("'AbstractVeniceMapper#maybeSprayAllPartitions' is not invoked, so we couldn't decide whether the push job finished successfully or not purely based on the reducer job closed count ({}) < the partition count ({})", Long.valueOf(reducerClosedCount), Integer.valueOf(this.kafkaTopicInfo.partitionCount));
        }
    }

    private void runValidateSchemaAndBuildDictJobAndUpdateStatus(JobConf jobConf) throws Exception {
        updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.START_VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB);
        this.runningJob = runJobWithConfig(jobConf);
        validateCountersAfterValidateSchemaAndBuildDict();
        getValidateSchemaAndBuildDictMapperOutput(this.runningJob.getID().toString());
        updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB_COMPLETED);
    }

    protected static String getValidateSchemaAndBuildDictionaryOutputDir(String str, String str2, String str3) {
        return str + "/" + str2 + "-" + str3 + "-" + Utils.getUniqueString();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static String getValidateSchemaAndBuildDictionaryOutputFileNameNoExtension(String str) {
        return VALIDATE_SCHEMA_AND_BUILD_DICTIONARY_MAPPER_OUTPUT_FILE_PREFIX + str;
    }

    protected static String getValidateSchemaAndBuildDictionaryOutputFileName(String str) {
        return getValidateSchemaAndBuildDictionaryOutputFileNameNoExtension(str) + VALIDATE_SCHEMA_AND_BUILD_DICTIONARY_MAPPER_OUTPUT_FILE_EXTENSION;
    }

    private void getValidateSchemaAndBuildDictMapperOutput(String str) throws Exception {
        ValidateSchemaAndBuildDictMapperOutputReader validateSchemaAndBuildDictMapperOutputReader = getValidateSchemaAndBuildDictMapperOutputReader(this.validateSchemaAndBuildDictMapperOutputDirectory, getValidateSchemaAndBuildDictionaryOutputFileName(str));
        try {
            this.validateSchemaAndBuildDictMapperOutput = validateSchemaAndBuildDictMapperOutputReader.getOutput();
            if (validateSchemaAndBuildDictMapperOutputReader != null) {
                validateSchemaAndBuildDictMapperOutputReader.close();
            }
            this.inputFileDataSize = this.validateSchemaAndBuildDictMapperOutput.getInputFileDataSize() * 2;
        } catch (Throwable th) {
            if (validateSchemaAndBuildDictMapperOutputReader != null) {
                try {
                    validateSchemaAndBuildDictMapperOutputReader.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void checkLastModificationTimeAndLog() throws IOException {
        checkLastModificationTimeAndLog(false);
    }

    private void checkLastModificationTimeAndLog(boolean z) throws IOException {
        if (getInputDataInfoProvider().getInputLastModificationTime(this.inputDirectory) > this.inputModificationTime) {
            updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.DATASET_CHANGED);
            LOGGER.error("Dataset changed during the push job. Please check above logs to see if the change caused the MapReduce failure and rerun the job without dataset change.");
            if (z) {
                throw new VeniceException("Dataset changed during the push job. Please check above logs to see if the change caused the MapReduce failure and rerun the job without dataset change.");
            }
        }
    }

    protected static boolean shouldBuildZstdCompressionDictionary(PushJobSetting pushJobSetting, StoreSetting storeSetting, boolean z) {
        if (pushJobSetting.isSourceKafka) {
            return false;
        }
        if (!z) {
            LOGGER.info("No compression dictionary will be generated as there are no records");
            return false;
        }
        if (!pushJobSetting.compressionMetricCollectionEnabled && storeSetting.compressionStrategy != CompressionStrategy.ZSTD_WITH_DICT) {
            LOGGER.info("No Compression dictionary will be generated with the compression strategy {} and compressionMetricCollectionEnabled is disabled", storeSetting.compressionStrategy);
            return false;
        }
        if (pushJobSetting.isIncrementalPush) {
            LOGGER.info("No compression dictionary will be generated as the push type is incremental push");
            return false;
        }
        LOGGER.info("Compression dictionary will be generated with the compression strategy {} and compressionMetricCollectionEnabled is {}", storeSetting.compressionStrategy, pushJobSetting.compressionMetricCollectionEnabled ? "Enabled" : "Disabled");
        return true;
    }

    protected static boolean evaluateCompressionMetricCollectionEnabled(PushJobSetting pushJobSetting, boolean z) {
        if (!pushJobSetting.compressionMetricCollectionEnabled) {
            return false;
        }
        if (!z) {
            LOGGER.info("No compression related metrics will be generated as there are no records");
            return false;
        }
        if (pushJobSetting.isSourceKafka) {
            LOGGER.info("No compression related metrics will be generated as the push type is repush");
            return false;
        }
        if (!pushJobSetting.isIncrementalPush) {
            return true;
        }
        LOGGER.info("No compression related metrics will be generated as the push type is incremental push");
        return false;
    }

    private void validateCountersAfterValidateSchemaAndBuildDict() throws IOException {
        if (this.inputFileHasRecords) {
            Counters counters = this.runningJob.getCounters();
            if (MRJobCounterHelper.getMapperErrorDataModifiedDuringPushJobCount(counters) != 0) {
                updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.DATASET_CHANGED);
                LOGGER.error("Error while validating schema and building dictionary: Because Dataset changed during the push job. Rerun the job without dataset change");
                throw new VeniceException("Error while validating schema and building dictionary: Because Dataset changed during the push job. Rerun the job without dataset change");
            }
            if (MRJobCounterHelper.getMapperInvalidInputIdxCount(counters) != 0) {
                checkLastModificationTimeAndLog(true);
                updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.INVALID_INPUT_FILE);
                LOGGER.error("Error while validating schema and building dictionary: Input file Idx is invalid, MR job counter is not reliable to point out the reason");
                throw new VeniceException("Error while validating schema and building dictionary: Input file Idx is invalid, MR job counter is not reliable to point out the reason");
            }
            if (MRJobCounterHelper.getMapperInvalidInputFileCount(counters) != 0) {
                updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.INVALID_INPUT_FILE);
                LOGGER.error("Error while validating schema: Input directory should not have sub directory");
                throw new VeniceException("Error while validating schema: Input directory should not have sub directory");
            }
            if (MRJobCounterHelper.getMapperSchemaInconsistencyFailureCount(counters) != 0) {
                updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.FILE_SCHEMA_VALIDATION_FAILED);
                LOGGER.error("Error while validating schema: Inconsistent file schema found");
                throw new VeniceException("Error while validating schema: Inconsistent file schema found");
            }
            long mapperZstdDictTrainFailureCount = MRJobCounterHelper.getMapperZstdDictTrainFailureCount(counters);
            long mapperZstdDictTrainSuccessCount = MRJobCounterHelper.getMapperZstdDictTrainSuccessCount(counters);
            long mapperZstdDictTrainSkippedCount = MRJobCounterHelper.getMapperZstdDictTrainSkippedCount(counters);
            this.isZstdDictCreationSuccess = mapperZstdDictTrainSuccessCount == 1;
            boolean z = mapperZstdDictTrainFailureCount == 1;
            boolean z2 = mapperZstdDictTrainSkippedCount == 1;
            long mapperNumRecordsSuccessfullyProcessedCount = MRJobCounterHelper.getMapperNumRecordsSuccessfullyProcessedCount(counters);
            if (mapperNumRecordsSuccessfullyProcessedCount == this.inputNumFiles + 1) {
                if (!this.isZstdDictCreationRequired || this.isZstdDictCreationSuccess) {
                    return;
                }
                checkLastModificationTimeAndLog(true);
                updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.INVALID_INPUT_FILE);
                LOGGER.error("Error while validating schema: MR job counter is not reliable to point out the exact reason");
                throw new VeniceException("Error while validating schema: MR job counter is not reliable to point out the exact reason");
            }
            if (mapperNumRecordsSuccessfullyProcessedCount != this.inputNumFiles) {
                checkLastModificationTimeAndLog(true);
                updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.INVALID_INPUT_FILE);
                LOGGER.error("Error while validating schema: MR job counter is not reliable to point out the exact reason");
                throw new VeniceException("Error while validating schema: MR job counter is not reliable to point out the exact reason");
            }
            if (!z && !z2) {
                checkLastModificationTimeAndLog(true);
                LOGGER.error("Error while validating schema: MR job counter is not reliable to point out the reason");
                throw new VeniceException("Error while validating schema: MR job counter is not reliable to point out the reason");
            }
            String str = z ? "Training ZSTD compression dictionary failed: The content might not be suitable for creating dictionary." : "Training ZSTD compression dictionary skipped: The sample size is too small.";
            if (this.storeSetting.compressionStrategy != CompressionStrategy.ZSTD_WITH_DICT) {
                LOGGER.warn(str + " But as this job's configured compression strategy don't need dictionary, the job is not stopped");
            } else {
                updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.ZSTD_DICTIONARY_CREATION_FAILED);
                LOGGER.error(str);
                throw new VeniceException(str);
            }
        }
    }

    void setKmeSchemaSystemStoreControllerClient(ControllerClient controllerClient) {
        this.kmeSchemaSystemStoreControllerClient = controllerClient;
    }

    private void verifyCountersWithZeroValues() throws IOException {
        Counters counters = this.runningJob.getCounters();
        long reducerClosedCount = MRJobCounterHelper.getReducerClosedCount(counters);
        if (reducerClosedCount != 0) {
            throw new VeniceException("Expect 0 reducer closed. Got count: " + reducerClosedCount);
        }
        long outputRecordsCount = MRJobCounterHelper.getOutputRecordsCount(counters);
        if (outputRecordsCount != 0) {
            throw new VeniceException("Expect 0 output record. Got count: " + outputRecordsCount);
        }
        long writeAclAuthorizationFailureCount = MRJobCounterHelper.getWriteAclAuthorizationFailureCount(counters);
        if (writeAclAuthorizationFailureCount != 0) {
            throw new VeniceException("Expect 0 ACL authorization failure. Got count: " + writeAclAuthorizationFailureCount);
        }
        long duplicateKeyWithDistinctCount = MRJobCounterHelper.getDuplicateKeyWithDistinctCount(counters);
        if (duplicateKeyWithDistinctCount != 0) {
            throw new VeniceException("Expect 0 duplicated key with distinct value. Got count: " + duplicateKeyWithDistinctCount);
        }
        long totalKeySize = MRJobCounterHelper.getTotalKeySize(counters);
        if (totalKeySize != 0) {
            throw new VeniceException("Expect 0 byte for total key size. Got count: " + totalKeySize);
        }
        long totalValueSize = MRJobCounterHelper.getTotalValueSize(counters);
        if (totalValueSize != 0) {
            throw new VeniceException("Expect 0 byte for total value size. Got count: " + totalValueSize);
        }
    }

    private RunningJob runJobWithConfig(JobConf jobConf) throws IOException {
        if (this.jobClientWrapper == null) {
            this.jobClientWrapper = new DefaultJobClientWrapper();
        }
        try {
            return this.jobClientWrapper.runJobWithConfig(jobConf);
        } catch (Exception e) {
            if (!this.pushJobSetting.isSourceKafka) {
                checkLastModificationTimeAndLog();
            }
            throw e;
        }
    }

    protected InputDataInfoProvider getInputDataInfoProvider() {
        if (this.inputDataInfoProvider == null) {
            this.inputDataInfoProvider = new DefaultInputDataInfoProvider(this.storeSetting, this.pushJobSetting, this.props);
        }
        return this.inputDataInfoProvider;
    }

    protected ValidateSchemaAndBuildDictMapperOutputReader getValidateSchemaAndBuildDictMapperOutputReader(String str, String str2) throws Exception {
        if (this.validateSchemaAndBuildDictMapperOutputReader == null) {
            this.validateSchemaAndBuildDictMapperOutputReader = new ValidateSchemaAndBuildDictMapperOutputReader(str, str2);
        }
        return this.validateSchemaAndBuildDictMapperOutputReader;
    }

    private void initControllerClient(String str, Optional<SSLFactory> optional) {
        String str2 = this.pushJobSetting.multiRegion ? this.pushJobSetting.parentControllerRegionD2ZkHosts : this.pushJobSetting.childControllerRegionD2ZkHosts;
        if (this.controllerClient == null) {
            this.controllerClient = getControllerClient(str, this.pushJobSetting.d2Routing, this.pushJobSetting.controllerD2ServiceName, str2, optional, this.pushJobSetting.controllerRetries);
        } else {
            LOGGER.info("Controller client has already been initialized");
        }
        if (this.kmeSchemaSystemStoreControllerClient == null) {
            this.kmeSchemaSystemStoreControllerClient = getControllerClient(AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName(), this.pushJobSetting.d2Routing, this.pushJobSetting.controllerD2ServiceName, str2, optional, this.pushJobSetting.controllerRetries);
        } else {
            LOGGER.info("System store controller client has already been initialized");
        }
        if (this.pushJobSetting.livenessHeartbeatEnabled) {
            this.livenessHeartbeatStoreControllerClient = getControllerClient(this.pushJobSetting.livenessHeartbeatStoreName, this.pushJobSetting.d2Routing, this.pushJobSetting.controllerD2ServiceName, str2, optional, this.pushJobSetting.controllerRetries);
        } else {
            this.livenessHeartbeatStoreControllerClient = null;
        }
    }

    protected void initKIFRepushDetails() {
        this.pushJobSetting.kafkaInputTopic = getSourceTopicNameForKafkaInput(this.pushJobSetting.storeName, this.props);
        this.pushJobSetting.kafkaInputBrokerUrl = this.pushJobSetting.repushInfoResponse == null ? this.props.getString(KAFKA_INPUT_BROKER_URL) : this.pushJobSetting.repushInfoResponse.getRepushInfo().getKafkaBrokerUrl();
    }

    private ControllerClient getControllerClient(String str, boolean z, String str2, String str3, Optional<SSLFactory> optional, int i) {
        return z ? D2ControllerClientFactory.discoverAndConstructControllerClient(str, str2, str3, optional, i) : ControllerClient.discoverAndConstructControllerClient(str, this.pushJobSetting.veniceControllerUrl, optional, i);
    }

    private Optional<ByteBuffer> getCompressionDictionary() throws VeniceException {
        ByteBuffer byteBuffer = null;
        KafkaInputDictTrainer.ParamBuilder dictSampleSize = new KafkaInputDictTrainer.ParamBuilder().setKeySchema(AvroCompatibilityHelper.toParsingForm(this.storeSetting.keySchema)).setSslProperties(isSslEnabled() ? (Properties) this.sslProperties.get() : new Properties()).setCompressionDictSize(this.props.getInt(DefaultInputDataInfoProvider.COMPRESSION_DICTIONARY_SIZE_LIMIT, 972800)).setDictSampleSize(this.props.getInt(DefaultInputDataInfoProvider.COMPRESSION_DICTIONARY_SAMPLE_SIZE, DefaultInputDataInfoProvider.DEFAULT_COMPRESSION_DICTIONARY_SAMPLE_SIZE));
        if (this.pushJobSetting.isSourceKafka) {
            boolean z = this.pushJobSetting.kafkaInputBuildNewDictEnabled;
            dictSampleSize.setSourceVersionChunkingEnabled(this.storeSetting.sourceKafkaInputVersionInfo.isChunkingEnabled());
            if (this.storeSetting.compressionStrategy == CompressionStrategy.ZSTD_WITH_DICT) {
                if (z) {
                    LOGGER.info("Rebuild a new Zstd dictionary from the input topic: {}", this.pushJobSetting.kafkaInputTopic);
                    dictSampleSize.setKafkaInputBroker(this.pushJobSetting.kafkaInputBrokerUrl).setTopicName(this.pushJobSetting.kafkaInputTopic).setSourceVersionCompressionStrategy(this.storeSetting.sourceKafkaInputVersionInfo.getCompressionStrategy());
                    byteBuffer = ByteBuffer.wrap(new KafkaInputDictTrainer(dictSampleSize.build()).trainDict());
                } else {
                    LOGGER.info("Reading Zstd dictionary from input topic: {}", this.pushJobSetting.kafkaInputTopic);
                    Properties properties = new Properties();
                    if (isSslEnabled()) {
                        properties.putAll((Map) this.sslProperties.get());
                    }
                    properties.setProperty("kafka.bootstrap.servers", this.pushJobSetting.kafkaInputBrokerUrl);
                    byteBuffer = DictionaryUtils.readDictionaryFromKafka(this.pushJobSetting.kafkaInputTopic, new VeniceProperties(properties));
                }
            }
            return Optional.ofNullable(byteBuffer);
        }
        if (this.storeSetting.compressionStrategy == CompressionStrategy.ZSTD_WITH_DICT && !this.inputFileHasRecords && this.storeSetting.hybridStoreConfig != null) {
            String str = getPushJobSetting().storeName;
            try {
                RepushInfoResponse retryableRequest = ControllerClient.retryableRequest(this.controllerClient, this.pushJobSetting.controllerRetries, controllerClient -> {
                    return controllerClient.getRepushInfo(str, Optional.empty());
                });
                if (retryableRequest.isError()) {
                    throw new VeniceException("Could not get repush info for store " + str + " with error: " + retryableRequest.getError());
                }
                String composeKafkaTopic = Version.composeKafkaTopic(str, retryableRequest.getRepushInfo().getVersion().getNumber());
                LOGGER.info("Rebuild a new Zstd dictionary from the source topic: {} in Kafka: {}", composeKafkaTopic, retryableRequest.getRepushInfo().getKafkaBrokerUrl());
                dictSampleSize.setKafkaInputBroker(retryableRequest.getRepushInfo().getKafkaBrokerUrl()).setTopicName(composeKafkaTopic).setSourceVersionCompressionStrategy(retryableRequest.getRepushInfo().getVersion().getCompressionStrategy());
                return Optional.of(ByteBuffer.wrap(new KafkaInputDictTrainer(dictSampleSize.build()).trainDict()));
            } catch (Exception e) {
                LOGGER.warn("Encountered an exception when trying to build a dict from an existing version for an empty push to a hybrid store: " + str + ", so the push job will use a default dict built in the Controller", e);
                return Optional.empty();
            }
        }
        if (this.isZstdDictCreationRequired) {
            if (!this.pushJobSetting.useMapperToBuildDict) {
                LOGGER.info("Training Zstd dictionary");
                byteBuffer = ByteBuffer.wrap(getInputDataInfoProvider().getZstdDictTrainSamples());
                this.isZstdDictCreationSuccess = true;
            } else if (this.isZstdDictCreationSuccess) {
                LOGGER.info("Retrieving the Zstd dictionary trained by {}", ValidateSchemaAndBuildDictMapper.class.getSimpleName());
                byteBuffer = this.validateSchemaAndBuildDictMapperOutput.getZstdDictionary();
            } else if (this.storeSetting.compressionStrategy == CompressionStrategy.ZSTD_WITH_DICT) {
                LOGGER.error("Dictionary creation failed for the configured ZSTD compression type");
                throw new VeniceException("Dictionary creation failed for the configured ZSTD compression type");
            }
        }
        if (byteBuffer != null) {
            LOGGER.info("Zstd dictionary size = {} bytes", Integer.valueOf(byteBuffer.limit()));
        } else {
            LOGGER.info("No Compression dictionary is generated with the compression strategy {} and compressionMetricCollectionEnabled is {}", this.storeSetting.compressionStrategy, this.pushJobSetting.compressionMetricCollectionEnabled ? "Enabled" : "Disabled");
        }
        return Optional.ofNullable(byteBuffer);
    }

    private void throwVeniceException(Throwable th) throws VeniceException {
        if (!(th instanceof VeniceException)) {
            th = new VeniceException("Exception or error caught during VenicePushJob: " + th.getMessage(), th);
        }
        throw ((VeniceException) th);
    }

    protected String getInputURI(VeniceProperties veniceProperties) throws Exception {
        if (this.pushJobSetting.isSourceKafka) {
            return "";
        }
        return getLatestPathOfInputDirectory(veniceProperties.getString(INPUT_PATH_PROP), FileSystem.get(new Configuration())).toString();
    }

    private void initPushJobDetails() {
        this.pushJobDetails.clusterName = this.pushJobSetting.clusterName;
        this.pushJobDetails.overallStatus = new ArrayList();
        this.pushJobDetails.overallStatus.add(getPushJobDetailsStatusTuple(PushJobDetailsStatus.STARTED.getValue()));
        this.pushJobDetails.pushId = "";
        this.pushJobDetails.partitionCount = -1;
        this.pushJobDetails.valueCompressionStrategy = CompressionStrategy.NO_OP.getValue();
        this.pushJobDetails.chunkingEnabled = false;
        this.pushJobDetails.jobDurationInMs = -1L;
        this.pushJobDetails.totalNumberOfRecords = -1L;
        this.pushJobDetails.totalKeyBytes = -1L;
        this.pushJobDetails.totalRawValueBytes = -1L;
        this.pushJobDetails.totalCompressedValueBytes = -1L;
        this.pushJobDetails.failureDetails = "";
        this.pushJobDetails.pushJobLatestCheckpoint = Integer.valueOf(PushJobCheckpoints.INITIALIZE_PUSH_JOB.getValue());
        this.pushJobDetails.pushJobConfigs = Collections.singletonMap(BatchJobHeartbeatConfigs.HEARTBEAT_ENABLED_CONFIG.getConfigName(), String.valueOf(this.pushJobSetting.livenessHeartbeatEnabled));
    }

    private void updatePushJobDetailsWithCheckpoint(PushJobCheckpoints pushJobCheckpoints) {
        this.pushJobDetails.pushJobLatestCheckpoint = Integer.valueOf(pushJobCheckpoints.getValue());
    }

    private void updatePushJobDetailsWithMRCounters() {
        if (this.runningJob == null) {
            LOGGER.info("No running job to update push job details with MR counters");
            return;
        }
        try {
            this.pushJobDetails.totalNumberOfRecords = MRJobCounterHelper.getOutputRecordsCount(this.runningJob.getCounters());
            this.pushJobDetails.totalKeyBytes = MRJobCounterHelper.getTotalKeySize(this.runningJob.getCounters());
            this.pushJobDetails.totalRawValueBytes = MRJobCounterHelper.getTotalUncompressedValueSize(this.runningJob.getCounters());
            this.pushJobDetails.totalCompressedValueBytes = MRJobCounterHelper.getTotalValueSize(this.runningJob.getCounters());
            this.pushJobDetails.totalGzipCompressedValueBytes = MRJobCounterHelper.getTotalGzipCompressedValueSize(this.runningJob.getCounters());
            this.pushJobDetails.totalZstdWithDictCompressedValueBytes = MRJobCounterHelper.getTotalZstdWithDictCompressedValueSize(this.runningJob.getCounters());
            LOGGER.info("pushJobDetails MR Counters: \n\tTotal number of records: {} \n\tSize of keys: {} Bytes \n\tsize of uncompressed value: {} Bytes \n\tConfigured value Compression Strategy: {} \n\tFinal data size stored in Venice based on this compression strategy: {} Bytes \n\tCompression Metrics collection is: {} ", Long.valueOf(this.pushJobDetails.totalNumberOfRecords), Long.valueOf(this.pushJobDetails.totalKeyBytes), Long.valueOf(this.pushJobDetails.totalRawValueBytes), CompressionStrategy.valueOf(this.pushJobDetails.valueCompressionStrategy).name(), Long.valueOf(this.pushJobDetails.totalCompressedValueBytes), this.pushJobSetting.compressionMetricCollectionEnabled ? "Enabled" : "Disabled");
            if (this.pushJobSetting.compressionMetricCollectionEnabled) {
                LOGGER.info("\tData size if compressed using Gzip: {} Bytes ", Long.valueOf(this.pushJobDetails.totalGzipCompressedValueBytes));
                if (this.isZstdDictCreationSuccess) {
                    LOGGER.info("\tData size if compressed using Zstd with Dictionary: {} Bytes", Long.valueOf(this.pushJobDetails.totalZstdWithDictCompressedValueBytes));
                } else {
                    LOGGER.info("\tZstd Dictionary creation Failed");
                }
            }
        } catch (Exception e) {
            LOGGER.warn("Exception caught while updating push job details with map reduce counters. {}", NON_CRITICAL_EXCEPTION, e);
        }
    }

    private void updatePushJobDetailsWithConfigs() {
        try {
            if (PushJobDetailsStatus.isTerminal(((PushJobDetailsStatusTuple) this.pushJobDetails.overallStatus.get(this.pushJobDetails.overallStatus.size() - 1)).status)) {
                HashMap hashMap = new HashMap();
                for (String str : this.props.keySet()) {
                    hashMap.put(str, this.props.getString(str));
                }
                if (!hashMap.containsKey(BatchJobHeartbeatConfigs.HEARTBEAT_ENABLED_CONFIG.getConfigName())) {
                    hashMap.put(BatchJobHeartbeatConfigs.HEARTBEAT_ENABLED_CONFIG.getConfigName(), String.valueOf(this.pushJobSetting.livenessHeartbeatEnabled));
                }
                this.pushJobDetails.pushJobConfigs = hashMap;
                this.pushJobDetails.producerConfigs = new HashMap();
            }
        } catch (Exception e) {
            LOGGER.warn("Exception caught while updating push job details with configs. {}", NON_CRITICAL_EXCEPTION, e);
        }
    }

    private Optional<ErrorMessage> updatePushJobDetailsWithMRDetails() throws IOException {
        long totalKeySize = MRJobCounterHelper.getTotalKeySize(this.runningJob.getCounters()) + MRJobCounterHelper.getTotalValueSize(this.runningJob.getCounters());
        if (this.inputStorageQuotaTracker.exceedQuota(totalKeySize)) {
            updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.QUOTA_EXCEEDED);
            Long storeStorageQuota = this.inputStorageQuotaTracker.getStoreStorageQuota();
            return Optional.of(new ErrorMessage(String.format("Storage quota exceeded. Store quota %s, Input data size %s. Please request at least %s additional quota.", ByteUtils.generateHumanReadableByteCountString(storeStorageQuota.longValue()), ByteUtils.generateHumanReadableByteCountString(totalKeySize), ByteUtils.generateHumanReadableByteCountString(totalKeySize - storeStorageQuota.longValue()))));
        }
        if (MRJobCounterHelper.getWriteAclAuthorizationFailureCount(this.runningJob.getCounters()) > 0) {
            updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.WRITE_ACL_FAILED);
            return Optional.of(new ErrorMessage("Insufficient ACLs to write to the store"));
        }
        if (!this.pushJobSetting.isDuplicateKeyAllowed) {
            long duplicateKeyWithDistinctCount = MRJobCounterHelper.getDuplicateKeyWithDistinctCount(this.runningJob.getCounters());
            if (duplicateKeyWithDistinctCount > 0) {
                updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.DUP_KEY_WITH_DIFF_VALUE);
                return Optional.of(new ErrorMessage(String.format("Input data has at least %d keys that appear more than once but have different values", Long.valueOf(duplicateKeyWithDistinctCount))));
            }
        }
        long recordTooLargeFailureCount = MRJobCounterHelper.getRecordTooLargeFailureCount(this.runningJob.getCounters());
        if (recordTooLargeFailureCount <= 0) {
            return Optional.empty();
        }
        updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.RECORD_TOO_LARGE_FAILED);
        return Optional.of(new ErrorMessage(String.format("Input data has at least %d records that exceed the maximum record limit of %s", Long.valueOf(recordTooLargeFailureCount), ByteUtils.generateHumanReadableByteCountString(getVeniceWriter(this.kafkaTopicInfo).getMaxSizeForUserPayloadPerMessageInBytes()))));
    }

    private void updatePushJobDetailsWithColoStatus(Map<String, String> map, Set<String> set) {
        try {
            if (this.pushJobDetails.coloStatus == null) {
                this.pushJobDetails.coloStatus = new HashMap();
            }
            map.entrySet().stream().filter(entry -> {
                return !set.contains(entry.getKey());
            }).forEach(entry2 -> {
                int value = PushJobDetailsStatus.valueOf((String) entry2.getValue()).getValue();
                if (!this.pushJobDetails.coloStatus.containsKey(entry2.getKey())) {
                    ArrayList arrayList = new ArrayList();
                    arrayList.add(getPushJobDetailsStatusTuple(value));
                    this.pushJobDetails.coloStatus.put((CharSequence) entry2.getKey(), arrayList);
                } else {
                    List list = (List) this.pushJobDetails.coloStatus.get(entry2.getKey());
                    if (((PushJobDetailsStatusTuple) list.get(list.size() - 1)).status != value) {
                        list.add(getPushJobDetailsStatusTuple(value));
                    }
                }
            });
        } catch (Exception e) {
            LOGGER.warn("Exception caught while updating push job details with colo status. {}", NON_CRITICAL_EXCEPTION, e);
        }
    }

    private PushJobDetailsStatusTuple getPushJobDetailsStatusTuple(int i) {
        PushJobDetailsStatusTuple pushJobDetailsStatusTuple = new PushJobDetailsStatusTuple();
        pushJobDetailsStatusTuple.status = i;
        pushJobDetailsStatusTuple.timestamp = System.currentTimeMillis();
        return pushJobDetailsStatusTuple;
    }

    private void sendPushJobDetailsToController() {
        if (!this.pushJobSetting.enablePushJobStatusUpload) {
            if (this.pushJobStatusUploadDisabledHasBeenLogged) {
                return;
            }
            this.pushJobStatusUploadDisabledHasBeenLogged = true;
            LOGGER.warn("Unable to send push job details for monitoring purpose. Feature is disabled");
            return;
        }
        if (this.pushJobDetails == null) {
            LOGGER.warn("Unable to send push job details for monitoring purpose. The payload was not populated properly");
            return;
        }
        try {
            this.pushJobDetails.reportTimestamp = System.currentTimeMillis();
            int i = this.kafkaTopicInfo == null ? -1 : this.kafkaTopicInfo.version;
            ControllerResponse sendPushJobDetails = this.controllerClient.sendPushJobDetails(this.pushJobSetting.storeName, i, this.pushJobDetailsSerializer.serialize((String) null, this.pushJobDetails));
            getSentPushJobDetailsTracker().record(this.pushJobSetting.storeName, i, this.pushJobDetails);
            if (sendPushJobDetails.isError()) {
                LOGGER.warn("Failed to send push job details. {} Details: {}", NON_CRITICAL_EXCEPTION, sendPushJobDetails.getError());
            }
        } catch (Exception e) {
            LOGGER.error("Exception caught while sending push job details. {}", NON_CRITICAL_EXCEPTION, e);
        }
    }

    private SentPushJobDetailsTracker getSentPushJobDetailsTracker() {
        if (this.sentPushJobDetailsTracker == null) {
            this.sentPushJobDetailsTracker = new NoOpSentPushJobDetailsTracker();
        }
        return this.sentPushJobDetailsTracker;
    }

    private void logGreeting() {
        LOGGER.info("Running VenicePushJob: " + this.jobId + Utils.NEW_LINE_CHAR + "  _    _           _                   " + Utils.NEW_LINE_CHAR + " | |  | |         | |                  " + Utils.NEW_LINE_CHAR + " | |__| | __ _  __| | ___   ___  _ __  " + Utils.NEW_LINE_CHAR + " |  __  |/ _` |/ _` |/ _ \\ / _ \\| '_ \\ " + Utils.NEW_LINE_CHAR + " | |  | | (_| | (_| | (_) | (_) | |_) |   " + Utils.NEW_LINE_CHAR + " |_|  |_|\\__,_|\\__,_|\\___/ \\___/| .__/" + Utils.NEW_LINE_CHAR + "                _______         | |     " + Utils.NEW_LINE_CHAR + "               |__   __|        |_|     " + Utils.NEW_LINE_CHAR + "                  | | ___               " + Utils.NEW_LINE_CHAR + "                  | |/ _ \\             " + Utils.NEW_LINE_CHAR + "     __      __   | | (_) |             " + Utils.NEW_LINE_CHAR + "     \\ \\    / /   |_|\\___/           " + Utils.NEW_LINE_CHAR + "      \\ \\  / /__ _ __  _  ___ ___     " + Utils.NEW_LINE_CHAR + "       \\ \\/ / _ | '_ \\| |/ __/ _ \\  " + Utils.NEW_LINE_CHAR + "        \\  |  __| | | | | (_|  __/     " + Utils.NEW_LINE_CHAR + "         \\/ \\___|_| |_|_|\\___\\___|  " + Utils.NEW_LINE_CHAR + "      ___        _     _                " + Utils.NEW_LINE_CHAR + "     |  _ \\     (_)   | |              " + Utils.NEW_LINE_CHAR + "     | |_) |_ __ _  __| | __ _  ___     " + Utils.NEW_LINE_CHAR + "     |  _ <| '__| |/ _` |/ _` |/ _ \\   " + Utils.NEW_LINE_CHAR + "     | |_) | |  | | (_| | (_| |  __/    " + Utils.NEW_LINE_CHAR + "     |____/|_|  |_|\\__,_|\\__, |\\___| " + Utils.NEW_LINE_CHAR + "                          __/ |         " + Utils.NEW_LINE_CHAR + "                         |___/          " + Utils.NEW_LINE_CHAR);
    }

    private void validateKeySchema(ControllerClient controllerClient, PushJobSetting pushJobSetting, PushJobSchemaInfo pushJobSchemaInfo, StoreSetting storeSetting) {
        Schema schema = storeSetting.keySchema;
        if (AvroCompatibilityHelper.toParsingForm(schema).equals(AvroCompatibilityHelper.toParsingForm(AvroCompatibilityHelper.parse(new String[]{pushJobSchemaInfo.getKeySchemaString()})))) {
            return;
        }
        String str = "Key schema mis-match for store " + pushJobSetting.storeName;
        LOGGER.error("{}\n\t\tController URLs: {}\n\t\tschema defined in HDFS: \t{}\n\t\tschema defined in Venice: \t{}", str, controllerClient.getControllerDiscoveryUrls(), pushJobSchemaInfo.getKeySchemaString(), schema.toString());
        throw new VeniceException(str);
    }

    protected void validateRemoteHybridSettings() {
        validateRemoteHybridSettings(this.pushJobSetting);
    }

    protected void validateRemoteHybridSettings(PushJobSetting pushJobSetting) {
        if (pushJobSetting.validateRemoteReplayPolicy != null) {
            StoreResponse retryableRequest = ControllerClient.retryableRequest(this.controllerClient, pushJobSetting.controllerRetries, controllerClient -> {
                return controllerClient.getStore(pushJobSetting.storeName);
            });
            if (retryableRequest.isError()) {
                throw new VeniceException("Failed to get store information to validate push settings! Error: " + retryableRequest.getError());
            }
            HybridStoreConfig hybridStoreConfig = retryableRequest.getStore().getHybridStoreConfig();
            if (!pushJobSetting.validateRemoteReplayPolicy.equals(hybridStoreConfig.getBufferReplayPolicy())) {
                throw new VeniceException(String.format("Remote rewind policy is {} but push settings require a policy of {}. Please adjust hybrid settings or push job configuration!", hybridStoreConfig.getBufferReplayPolicy(), pushJobSetting.validateRemoteReplayPolicy));
            }
        }
    }

    private void validateKafkaMessageEnvelopeSchema(PushJobSetting pushJobSetting) {
        SchemaResponse retryableRequest = ControllerClient.retryableRequest(this.kmeSchemaSystemStoreControllerClient, pushJobSetting.controllerRetries, controllerClient -> {
            return controllerClient.getValueSchema(AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName(), AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersion());
        });
        if (retryableRequest.isError()) {
            throw new VeniceException("KME protocol is upgraded in the push job but not in the Venice backend; Please contact Venice team. Error : " + retryableRequest.getError());
        }
    }

    private Schema getKeySchemaFromController(ControllerClient controllerClient, int i, String str) {
        SchemaResponse retryableRequest = ControllerClient.retryableRequest(controllerClient, i, controllerClient2 -> {
            return controllerClient2.getKeySchema(str);
        });
        if (retryableRequest.isError()) {
            throw new VeniceException("Got an error in keySchemaResponse: " + retryableRequest);
        }
        if (retryableRequest.getSchemaStr() == null) {
            throw new VeniceException("Got a null schema in keySchemaResponse: " + retryableRequest);
        }
        return AvroSchemaParseUtils.parseSchemaFromJSONLooseValidation(retryableRequest.getSchemaStr());
    }

    private void validateValueSchema(ControllerClient controllerClient, PushJobSetting pushJobSetting, PushJobSchemaInfo pushJobSchemaInfo, boolean z) {
        LOGGER.info("Validating value schema: {} for store: {}", pushJobSchemaInfo.getValueSchemaString(), pushJobSetting.storeName);
        SchemaResponse schemaResponse = pushJobSetting.enableWriteCompute ? (SchemaResponse) ControllerClient.retryableRequest(controllerClient, pushJobSetting.controllerRetries, controllerClient2 -> {
            return controllerClient2.getValueOrDerivedSchemaId(pushJobSetting.storeName, pushJobSchemaInfo.getValueSchemaString());
        }) : (SchemaResponse) ControllerClient.retryableRequest(controllerClient, pushJobSetting.controllerRetries, controllerClient3 -> {
            return controllerClient3.getValueSchemaID(pushJobSetting.storeName, pushJobSchemaInfo.getValueSchemaString());
        });
        if (!schemaResponse.isError() || z) {
            if (schemaResponse.isError() && z) {
                LOGGER.info("Auto registering value schema: {} for store: {}", pushJobSchemaInfo.getValueSchemaString(), pushJobSetting.storeName);
                SchemaResponse schemaResponse2 = (SchemaResponse) ControllerClient.retryableRequest(controllerClient, pushJobSetting.controllerRetries, controllerClient4 -> {
                    return controllerClient4.addValueSchema(pushJobSetting.storeName, pushJobSchemaInfo.getValueSchemaString());
                });
                if (schemaResponse2.isError()) {
                    throw new VeniceException("Failed to auto-register value schema for store: " + pushJobSetting.storeName + "\nError from the server: " + schemaResponse2.getError() + "\nSchema for the data file: " + pushJobSchemaInfo.getValueSchemaString());
                }
                setSchemaIdPropInSchemaInfo(pushJobSchemaInfo, schemaResponse2, pushJobSetting.enableWriteCompute);
            } else {
                setSchemaIdPropInSchemaInfo(pushJobSchemaInfo, schemaResponse, pushJobSetting.enableWriteCompute);
            }
            LOGGER.info("Got schema id: {} for value schema: {} of store: {}", Integer.valueOf(pushJobSchemaInfo.getValueSchemaId()), pushJobSchemaInfo.getValueSchemaString(), pushJobSetting.storeName);
            return;
        }
        MultiSchemaResponse allValueSchema = controllerClient.getAllValueSchema(pushJobSetting.storeName);
        if (allValueSchema.isError()) {
            LOGGER.error("Failed to fetch all value schemas, so they will not be printed.");
        } else {
            LOGGER.info("All currently registered value schemas:");
            for (MultiSchemaResponse.Schema schema : allValueSchema.getSchemas()) {
                LOGGER.info("Schema {}: {}", Integer.valueOf(schema.getId()), schema.getSchemaStr());
            }
        }
        throw new VeniceException("Failed to validate value schema for store: " + pushJobSetting.storeName + "\nError from the server: " + schemaResponse.getError() + "\nSchema for the data file: " + pushJobSchemaInfo.getValueSchemaString());
    }

    private void setSchemaIdPropInSchemaInfo(PushJobSchemaInfo pushJobSchemaInfo, SchemaResponse schemaResponse, boolean z) {
        pushJobSchemaInfo.setValueSchemaId(schemaResponse.getId());
        if (z) {
            pushJobSchemaInfo.setDerivedSchemaId(schemaResponse.getDerivedSchemaId());
        }
    }

    private StoreSetting getSettingsFromController(ControllerClient controllerClient, PushJobSetting pushJobSetting) {
        StoreSetting storeSetting = new StoreSetting();
        StoreResponse retryableRequest = ControllerClient.retryableRequest(controllerClient, pushJobSetting.controllerRetries, controllerClient2 -> {
            return controllerClient2.getStore(pushJobSetting.storeName);
        });
        if (retryableRequest.isError()) {
            throw new VeniceException("Can't get store info. " + retryableRequest.getError());
        }
        storeSetting.storeStorageQuota = retryableRequest.getStore().getStorageQuotaInByte();
        storeSetting.isSchemaAutoRegisterFromPushJobEnabled = retryableRequest.getStore().isSchemaAutoRegisterFromPushJobEnabled();
        storeSetting.isChunkingEnabled = retryableRequest.getStore().isChunkingEnabled();
        storeSetting.isRmdChunkingEnabled = retryableRequest.getStore().isRmdChunkingEnabled();
        storeSetting.compressionStrategy = retryableRequest.getStore().getCompressionStrategy();
        storeSetting.isWriteComputeEnabled = retryableRequest.getStore().isWriteComputationEnabled();
        storeSetting.isIncrementalPushEnabled = retryableRequest.getStore().isIncrementalPushEnabled();
        storeSetting.storeRewindTimeInSeconds = DEFAULT_RE_PUSH_REWIND_IN_SECONDS_OVERRIDE;
        HybridStoreConfig hybridStoreConfig = retryableRequest.getStore().getHybridStoreConfig();
        storeSetting.hybridStoreConfig = hybridStoreConfig;
        if (pushJobSetting.repushTTLEnabled) {
            if (hybridStoreConfig == null) {
                throw new VeniceException("Repush TTL is only supported for real-time only store.");
            }
            storeSetting.storeRewindTimeInSeconds = hybridStoreConfig.getRewindTimeInSeconds();
        }
        if (pushJobSetting.enableWriteCompute && !storeSetting.isWriteComputeEnabled) {
            throw new VeniceException("Store does not have write compute enabled.");
        }
        if (pushJobSetting.enableWriteCompute && (!storeSetting.isIncrementalPushEnabled || !pushJobSetting.isIncrementalPush)) {
            throw new VeniceException("Write compute is only available for incremental push jobs.");
        }
        if (pushJobSetting.enableWriteCompute && storeSetting.isWriteComputeEnabled) {
            pushJobSetting.sendControlMessagesDirectly = false;
        }
        storeSetting.keySchema = getKeySchemaFromController(controllerClient, pushJobSetting.controllerRetries, pushJobSetting.storeName);
        if (pushJobSetting.isSourceKafka) {
            int parseVersionFromKafkaTopicName = Version.parseVersionFromKafkaTopicName(this.pushJobSetting.kafkaInputTopic);
            Optional version = retryableRequest.getStore().getVersion(parseVersionFromKafkaTopicName);
            if (!version.isPresent()) {
                if (this.pushJobSetting.repushInfoResponse == null || this.pushJobSetting.repushInfoResponse.getRepushInfo().getVersion().getNumber() != parseVersionFromKafkaTopicName) {
                    throw new VeniceException("Could not find version " + parseVersionFromKafkaTopicName + ", please provide input fabric to repush.");
                }
                LOGGER.warn("Could not find version {} in parent colo, fetching from child colo.", Integer.valueOf(parseVersionFromKafkaTopicName));
                version = Optional.of(this.pushJobSetting.repushInfoResponse.getRepushInfo().getVersion());
            }
            storeSetting.sourceKafkaInputVersionInfo = (Version) version.get();
            storeSetting.storeStorageQuota = -1L;
            if (((Version) version.get()).isChunkingEnabled() && !retryableRequest.getStore().isChunkingEnabled()) {
                throw new VeniceException("Source version has chunking enabled while chunking is disabled in store config.");
            }
        }
        return storeSetting;
    }

    private Map<String, Integer> getCurrentStoreVersions(StoreResponse storeResponse) {
        Map coloToCurrentVersions = storeResponse.getStore().getColoToCurrentVersions();
        return (coloToCurrentVersions == null || coloToCurrentVersions.isEmpty()) ? Collections.singletonMap("unknown_single_colo", Integer.valueOf(storeResponse.getStore().getCurrentVersion())) : Collections.unmodifiableMap(coloToCurrentVersions);
    }

    private Version.PushType getPushType(PushJobSetting pushJobSetting) {
        return pushJobSetting.isIncrementalPush ? Version.PushType.INCREMENTAL : Version.PushType.BATCH;
    }

    private void createNewStoreVersion(PushJobSetting pushJobSetting, long j, ControllerClient controllerClient, String str, VeniceProperties veniceProperties, Optional<ByteBuffer> optional) {
        Version.PushType pushType = getPushType(pushJobSetting);
        boolean z = !this.pushJobSetting.sendControlMessagesDirectly;
        String string = veniceProperties.getString("venice.partitioners", DefaultVenicePartitioner.class.getName());
        Optional map = z ? optional.map((v0) -> {
            return v0.array();
        }).map(EncodingUtils::base64EncodeToString) : Optional.empty();
        boolean z2 = false;
        if (this.storeSetting.isWriteComputeEnabled && pushJobSetting.enableWriteCompute) {
            z2 = true;
        }
        boolean z3 = z2;
        this.kafkaTopicInfo = new TopicInfo();
        Optional optional2 = map;
        VersionCreationResponse retryableRequest = ControllerClient.retryableRequest(controllerClient, pushJobSetting.controllerRetries, controllerClient2 -> {
            return controllerClient2.requestTopicForWrites(pushJobSetting.storeName, j, pushType, str, z, true, z3, Optional.of(string), optional2, Optional.ofNullable(pushJobSetting.sourceGridFabric), pushJobSetting.livenessHeartbeatEnabled, pushJobSetting.rewindTimeInSecondsOverride, pushJobSetting.deferVersionSwap);
        });
        if (retryableRequest.isError()) {
            if (ErrorType.CONCURRENT_BATCH_PUSH.equals(retryableRequest.getErrorType())) {
                LOGGER.error("Unable to run this job since another batch push is running. See the error message for details.");
                updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.CONCURRENT_BATCH_PUSH);
            }
            throw new VeniceException("Failed to create new store version with urls: " + pushJobSetting.veniceControllerUrl + ", error: " + retryableRequest.getError());
        }
        if (retryableRequest.getVersion() == 0) {
            throw new VeniceException("Got version 0 from: " + retryableRequest);
        }
        LOGGER.info(retryableRequest.toString());
        this.kafkaTopicInfo.topic = retryableRequest.getKafkaTopic();
        this.kafkaTopicInfo.version = retryableRequest.getVersion();
        this.kafkaTopicInfo.kafkaUrl = retryableRequest.getKafkaBootstrapServers();
        this.kafkaTopicInfo.partitionCount = retryableRequest.getPartitions();
        this.kafkaTopicInfo.sslToKafka = retryableRequest.isEnableSSL();
        this.kafkaTopicInfo.compressionStrategy = retryableRequest.getCompressionStrategy();
        this.kafkaTopicInfo.partitionerClass = retryableRequest.getPartitionerClass();
        this.kafkaTopicInfo.partitionerParams = retryableRequest.getPartitionerParams();
        this.kafkaTopicInfo.amplificationFactor = retryableRequest.getAmplificationFactor();
        this.kafkaTopicInfo.chunkingEnabled = this.storeSetting.isChunkingEnabled && !Version.isRealTimeTopic(this.kafkaTopicInfo.topic);
        this.kafkaTopicInfo.rmdChunkingEnabled = this.kafkaTopicInfo.chunkingEnabled && this.storeSetting.isRmdChunkingEnabled;
        if (this.pushJobSetting.isSourceKafka) {
            StoreResponse retryableRequest2 = ControllerClient.retryableRequest(controllerClient, pushJobSetting.controllerRetries, controllerClient3 -> {
                return controllerClient3.getStore(pushJobSetting.storeName);
            });
            if (retryableRequest2.isError()) {
                throw new VeniceException("Failed to retrieve store response with urls: " + pushJobSetting.veniceControllerUrl + ", error: " + retryableRequest2.getError());
            }
            int i = this.kafkaTopicInfo.version;
            Optional version = retryableRequest2.getStore().getVersion(i);
            if (!version.isPresent()) {
                throw new VeniceException("Couldn't fetch the newly created version: " + i + " for store: " + pushJobSetting.storeName + " with urls: " + pushJobSetting.veniceControllerUrl);
            }
            Version version2 = (Version) version.get();
            Version version3 = this.storeSetting.sourceKafkaInputVersionInfo;
            if (version3.isChunkingEnabled() && !version2.isChunkingEnabled()) {
                throw new VeniceException("Chunking config mismatch between the source and the new version of store " + retryableRequest2.getStore().getName() + ". Source version: " + version3.getNumber() + " is using: " + version3.isChunkingEnabled() + ", new version: " + version2.getNumber() + " is using: " + version2.isChunkingEnabled());
            }
            if (version3.isRmdChunkingEnabled() && !version2.isRmdChunkingEnabled()) {
                throw new VeniceException("RMD Chunking config mismatch between the source and the new version of store " + retryableRequest2.getStore().getName() + ". Source version: " + version3.getNumber() + " is using: " + version3.isRmdChunkingEnabled() + ", new version: " + version2.getNumber() + " is using: " + version2.isRmdChunkingEnabled());
            }
            if (version3.isActiveActiveReplicationEnabled() && version2.isActiveActiveReplicationEnabled() && version3.getRmdVersionId() != version2.getRmdVersionId()) {
                throw new VeniceException("Replication Metadata Version Id config mismatch between the source version and the new version is not supported by Kafka Input Format, source version: " + version3.getNumber() + " is using RMD ID: " + version3.getRmdVersionId() + ", new version: " + version2.getNumber() + " is using RMD ID: " + version2.getRmdVersionId());
            }
        }
    }

    private synchronized VeniceWriter<KafkaKey, byte[], byte[]> getVeniceWriter(TopicInfo topicInfo) {
        if (this.veniceWriter == null) {
            VeniceWriterFactory veniceWriterFactory = new VeniceWriterFactory(getVeniceWriterProperties(topicInfo));
            Properties properties = new Properties();
            properties.putAll(topicInfo.partitionerParams);
            VeniceWriter<KafkaKey, byte[], byte[]> createVeniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(topicInfo.topic).setUseKafkaKeySerializer(true).setPartitioner(PartitionUtils.getVenicePartitioner(topicInfo.partitionerClass, topicInfo.amplificationFactor, new VeniceProperties(properties))).setPartitionCount(Integer.valueOf(Version.isVersionTopic(topicInfo.topic) ? topicInfo.partitionCount * topicInfo.amplificationFactor : topicInfo.partitionCount)).build());
            LOGGER.info("Created VeniceWriter: {}", createVeniceWriter);
            this.veniceWriter = createVeniceWriter;
        }
        return this.veniceWriter;
    }

    private synchronized Properties getVeniceWriterProperties(TopicInfo topicInfo) {
        if (this.veniceWriterProperties == null) {
            this.veniceWriterProperties = createVeniceWriterProperties(topicInfo.kafkaUrl, topicInfo.sslToKafka);
        }
        return this.veniceWriterProperties;
    }

    private Properties createVeniceWriterProperties(String str, boolean z) {
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", str);
        properties.put("venice.writer.max.elapsed.time.for.segment.in.ms", -1);
        if (this.props.containsKey("venice.writer.close.timeout.ms")) {
            properties.put("venice.writer.close.timeout.ms", Integer.valueOf(this.props.getInt("venice.writer.close.timeout.ms")));
        }
        if (z) {
            properties.putAll((Map) this.sslProperties.get());
        }
        if (this.props.containsKey("kafka.request.timeout.ms")) {
            properties.setProperty("kafka.request.timeout.ms", this.props.getString("kafka.request.timeout.ms"));
        } else {
            properties.setProperty("kafka.request.timeout.ms", Integer.toString(Integer.MAX_VALUE));
        }
        if (this.props.containsKey("kafka.retries")) {
            properties.setProperty("kafka.retries", this.props.getString("kafka.retries"));
        } else {
            properties.setProperty("kafka.retries", Integer.toString(Integer.MAX_VALUE));
        }
        if (this.props.containsKey("kafka.delivery.timeout.ms")) {
            properties.setProperty("kafka.delivery.timeout.ms", this.props.getString("kafka.delivery.timeout.ms"));
        } else {
            properties.setProperty("kafka.delivery.timeout.ms", Integer.toString(Integer.MAX_VALUE));
        }
        return properties;
    }

    private synchronized void closeVeniceWriter() {
        if (this.veniceWriter != null) {
            this.veniceWriter.close();
            this.veniceWriter = null;
        }
    }

    private void pollStatusUntilComplete(Optional<String> optional, ControllerClient controllerClient, PushJobSetting pushJobSetting, TopicInfo topicInfo) {
        HashSet hashSet = new HashSet();
        HashMap hashMap = new HashMap();
        String str = null;
        long j = 0;
        long j2 = 0;
        String topicToMonitor = getTopicToMonitor(topicInfo, pushJobSetting);
        List asList = Arrays.asList(ExecutionStatus.COMPLETED, ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED);
        while (true) {
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis < j && !Utils.sleep(j - currentTimeMillis)) {
                throw new VeniceException("Job status polling was interrupted!");
            }
            j = currentTimeMillis + pushJobSetting.pollJobStatusIntervalMs;
            JobStatusQueryResponse jobStatusQueryResponse = (JobStatusQueryResponse) ControllerClient.retryableRequest(controllerClient, pushJobSetting.controllerStatusPollRetries, controllerClient2 -> {
                return controllerClient2.queryOverallJobStatus(topicToMonitor, optional);
            });
            if (jobStatusQueryResponse.isError()) {
                throw new VeniceException("Failed to connect to: " + pushJobSetting.veniceControllerUrl + " to query job status, after " + pushJobSetting.controllerStatusPollRetries + " attempts. Error: " + jobStatusQueryResponse.getError());
            }
            str = printJobStatus(jobStatusQueryResponse, str, hashMap);
            ExecutionStatus valueOf = ExecutionStatus.valueOf(jobStatusQueryResponse.getStatus());
            Map<String, String> extraInfo = jobStatusQueryResponse.getExtraInfo();
            updatePushJobDetailsWithColoStatus(extraInfo, hashSet);
            extraInfo.forEach((str2, str3) -> {
                ExecutionStatus valueOf2 = ExecutionStatus.valueOf(str3);
                if (!valueOf2.isTerminal() || valueOf2.equals(ExecutionStatus.ERROR)) {
                    return;
                }
                hashSet.add(str2);
            });
            if (valueOf.isTerminal()) {
                if (hashSet.size() != extraInfo.size() || !asList.contains(valueOf)) {
                    throw new VeniceException("Push job error reported by controller: " + pushJobSetting.veniceControllerUrl + "\ncontroller response: " + jobStatusQueryResponse.toString());
                }
                LOGGER.info("Successfully pushed {}", topicInfo.topic);
                return;
            }
            if (!valueOf.equals(ExecutionStatus.UNKNOWN)) {
                j2 = 0;
            } else if (j2 == 0) {
                j2 = System.currentTimeMillis();
            } else {
                if (System.currentTimeMillis() >= j2 + pushJobSetting.jobStatusInUnknownStateTimeoutMs) {
                    throw new VeniceException("After waiting for " + (pushJobSetting.jobStatusInUnknownStateTimeoutMs / 60000) + " minutes; push job is still in unknown state.");
                }
                LOGGER.warn("Some data centers are still in unknown state after waiting for {} minutes", Double.valueOf((System.currentTimeMillis() - j2) / 60000.0d));
            }
            sendPushJobDetailsToController();
        }
    }

    private String printJobStatus(JobStatusQueryResponse jobStatusQueryResponse, String str, Map<String, String> map) {
        String str2 = str;
        Map extraInfo = jobStatusQueryResponse.getExtraInfo();
        if (extraInfo != null && !extraInfo.isEmpty()) {
            LOGGER.info("Specific status: {}", extraInfo);
        }
        Optional optionalStatusDetails = jobStatusQueryResponse.getOptionalStatusDetails();
        if (optionalStatusDetails.isPresent() && detailsAreDifferent(str, (String) optionalStatusDetails.get())) {
            LOGGER.info("\t\tNew overall details: {}", optionalStatusDetails.get());
            str2 = (String) optionalStatusDetails.get();
        }
        Optional optionalExtraDetails = jobStatusQueryResponse.getOptionalExtraDetails();
        if (optionalExtraDetails.isPresent()) {
            ((Map) optionalExtraDetails.get()).forEach((str3, str4) -> {
                if (detailsAreDifferent((String) map.get(str3), str4)) {
                    LOGGER.info("\t\tNew specific details for {}: {}", str3, str4);
                    map.put(str3, str4);
                }
            });
        }
        return str2;
    }

    private boolean detailsAreDifferent(String str, String str2) {
        return (str == null && str2 != null) || (str != null && !str.equals(str2));
    }

    protected void setupMRConf(JobConf jobConf, TopicInfo topicInfo, PushJobSetting pushJobSetting, PushJobSchemaInfo pushJobSchemaInfo, StoreSetting storeSetting, VeniceProperties veniceProperties, String str, String str2) {
        setupDefaultJobConf(jobConf, topicInfo, pushJobSetting, pushJobSchemaInfo, storeSetting, veniceProperties, str);
        setupInputFormatConf(jobConf, pushJobSchemaInfo, str2);
        setupReducerConf(jobConf, pushJobSetting, topicInfo);
    }

    private void setupCommonJobConf(JobConf jobConf, String str) {
        if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
            jobConf.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
        }
        jobConf.setJobName(str);
        jobConf.setJarByClass(getClass());
        if (isSslEnabled()) {
            jobConf.set(SSL_CONFIGURATOR_CLASS_CONFIG, this.props.getString(SSL_CONFIGURATOR_CLASS_CONFIG, TempFileSSLConfigurator.class.getName()));
            jobConf.set(SSL_KEY_STORE_PROPERTY_NAME, this.props.getString(SSL_KEY_STORE_PROPERTY_NAME));
            jobConf.set(SSL_TRUST_STORE_PROPERTY_NAME, this.props.getString(SSL_TRUST_STORE_PROPERTY_NAME));
            jobConf.set(SSL_KEY_PASSWORD_PROPERTY_NAME, this.props.getString(SSL_KEY_PASSWORD_PROPERTY_NAME));
        }
        jobConf.setBoolean("mapreduce.job.classloader", true);
        LOGGER.info("{}: {}", "mapreduce.job.classloader", jobConf.get("mapreduce.job.classloader"));
        jobConf.setOutputKeyClass(NullWritable.class);
        jobConf.setOutputValueClass(NullWritable.class);
        jobConf.setOutputFormat(NullOutputFormat.class);
        jobConf.setBoolean(COMPRESSION_METRIC_COLLECTION_ENABLED, this.pushJobSetting.compressionMetricCollectionEnabled);
        jobConf.setBoolean(ZSTD_DICTIONARY_CREATION_REQUIRED, this.isZstdDictCreationRequired);
    }

    protected void setupDefaultJobConf(JobConf jobConf, TopicInfo topicInfo, PushJobSetting pushJobSetting, PushJobSchemaInfo pushJobSchemaInfo, StoreSetting storeSetting, VeniceProperties veniceProperties, String str) {
        setupCommonJobConf(jobConf, str + ":venice_push_job-" + topicInfo.topic);
        jobConf.set(BATCH_NUM_BYTES_PROP, Integer.toString(pushJobSetting.batchNumBytes));
        jobConf.set(TOPIC_PROP, topicInfo.topic);
        jobConf.set("kafka.bootstrap.servers", topicInfo.kafkaUrl);
        jobConf.set("kafka.bootstrap.servers", topicInfo.kafkaUrl);
        jobConf.set("partitioner.class", topicInfo.partitionerClass);
        Map<String, String> map = topicInfo.partitionerParams;
        Objects.requireNonNull(jobConf);
        map.forEach(jobConf::set);
        jobConf.setInt("amplification.factor", topicInfo.amplificationFactor);
        if (topicInfo.sslToKafka) {
            jobConf.set("security.protocol", KAFKA_SECURITY_PROTOCOL);
            veniceProperties.keySet().stream().filter(str2 -> {
                return str2.toLowerCase().startsWith(SSL_PREFIX);
            }).forEach(str3 -> {
                jobConf.set(str3, veniceProperties.getString(str3));
            });
        }
        jobConf.setBoolean(ALLOW_DUPLICATE_KEY, pushJobSetting.isDuplicateKeyAllowed);
        jobConf.setBoolean("venice.writer.chunking.enabled", this.kafkaTopicInfo.chunkingEnabled);
        jobConf.setBoolean("venice.writer.replication.metadata.chunking.enabled", this.kafkaTopicInfo.rmdChunkingEnabled);
        jobConf.set(STORAGE_QUOTA_PROP, Long.toString(storeSetting.storeStorageQuota));
        if (pushJobSetting.isSourceKafka) {
            jobConf.setInt(VALUE_SCHEMA_ID_PROP, -1);
            jobConf.set(KAFKA_INPUT_TOPIC, pushJobSetting.kafkaInputTopic);
            jobConf.set(KAFKA_INPUT_BROKER_URL, pushJobSetting.kafkaInputBrokerUrl);
            jobConf.setLong(REPUSH_TTL_IN_SECONDS, pushJobSetting.repushTTLInSeconds);
            if (pushJobSetting.repushTTLEnabled) {
                jobConf.setInt(REPUSH_TTL_POLICY, TTLResolutionPolicy.RT_WRITE_ONLY.getValue());
                jobConf.set(RMD_SCHEMA_DIR, pushJobSetting.rmdSchemaDir);
            }
            jobConf.set(KAFKA_INPUT_SOURCE_COMPRESSION_STRATEGY, storeSetting.sourceKafkaInputVersionInfo.getCompressionStrategy().name());
            jobConf.set(KAFKA_INPUT_SOURCE_TOPIC_CHUNKING_ENABLED, Boolean.toString(storeSetting.sourceKafkaInputVersionInfo.isChunkingEnabled()));
        } else {
            jobConf.setInt(VALUE_SCHEMA_ID_PROP, pushJobSchemaInfo.getValueSchemaId());
            jobConf.setInt(DERIVED_SCHEMA_ID_PROP, pushJobSchemaInfo.getDerivedSchemaId());
        }
        jobConf.setBoolean(ENABLE_WRITE_COMPUTE, pushJobSetting.enableWriteCompute);
        if (!veniceProperties.containsKey("kafka.request.timeout.ms")) {
            jobConf.set("kafka.request.timeout.ms", Integer.toString(Integer.MAX_VALUE));
        }
        if (!veniceProperties.containsKey("kafka.retries")) {
            jobConf.set("kafka.retries", Integer.toString(Integer.MAX_VALUE));
        }
        if (!veniceProperties.containsKey("kafka.delivery.timeout.ms")) {
            jobConf.set("kafka.delivery.timeout.ms", Integer.toString(Integer.MAX_VALUE));
        }
        jobConf.set(TELEMETRY_MESSAGE_INTERVAL, veniceProperties.getString(TELEMETRY_MESSAGE_INTERVAL, "10000"));
        jobConf.set(ETL_VALUE_SCHEMA_TRANSFORMATION, pushJobSetting.etlValueSchemaTransformation.name());
        jobConf.setBoolean(EXTENDED_SCHEMA_VALIDITY_CHECK_ENABLED, pushJobSetting.extendedSchemaValidityCheckEnabled);
        jobConf.set(COMPRESSION_STRATEGY, topicInfo.compressionStrategy.toString());
        jobConf.set(ZSTD_COMPRESSION_LEVEL, veniceProperties.getString(ZSTD_COMPRESSION_LEVEL, String.valueOf(Zstd.maxCompressionLevel())));
        jobConf.setBoolean(ZSTD_DICTIONARY_CREATION_SUCCESS, this.isZstdDictCreationSuccess);
        List asList = Arrays.asList("venice.writer.", "kafka.", KafkaInputRecordReader.KIF_RECORD_READER_KAFKA_CONFIG_PREFIX);
        int size = asList.size();
        if (size > 1) {
            for (int i = 0; i < size; i++) {
                for (int i2 = 0; i2 < size; i2++) {
                    if (i != i2) {
                        String str4 = (String) asList.get(i);
                        String str5 = (String) asList.get(i2);
                        if (str4.startsWith(str5)) {
                            throw new VeniceException("Prefix: " + str5 + " shouldn't be a prefix of another prefix: " + str4);
                        }
                    }
                }
            }
        }
        for (String str6 : veniceProperties.keySet()) {
            String lowerCase = str6.toLowerCase();
            if (lowerCase.startsWith(HADOOP_PREFIX)) {
                jobConf.set(str6.substring(HADOOP_PREFIX.length()), veniceProperties.getString(str6));
            }
            Iterator it = asList.iterator();
            while (true) {
                if (it.hasNext()) {
                    if (lowerCase.startsWith((String) it.next())) {
                        jobConf.set(str6, veniceProperties.getString(str6));
                        break;
                    }
                } else {
                    break;
                }
            }
        }
    }

    protected void setupInputFormatConf(JobConf jobConf, PushJobSchemaInfo pushJobSchemaInfo, String str) {
        if (this.pushJobSetting.isSourceKafka) {
            jobConf.set(KAFKA_SOURCE_KEY_SCHEMA_STRING_PROP, AvroCompatibilityHelper.toParsingForm(this.storeSetting.keySchema));
            jobConf.setInputFormat(KafkaInputFormat.class);
            jobConf.setMapperClass(VeniceKafkaInputMapper.class);
            if (this.pushJobSetting.kafkaInputCombinerEnabled) {
                jobConf.setCombinerClass(KafkaInputFormatCombiner.class);
                return;
            }
            return;
        }
        FileInputFormat.setInputPaths(jobConf, new Path[]{new Path(str)});
        jobConf.set("avro.mapred.ignore.inputs.without.extension", "false");
        jobConf.set("key.field", pushJobSchemaInfo.getKeyField());
        jobConf.set("value.field", pushJobSchemaInfo.getValueField());
        if (!pushJobSchemaInfo.isAvro()) {
            jobConf.setInputFormat(VsonSequenceFileInputFormat.class);
            jobConf.setMapperClass(VeniceVsonMapper.class);
            jobConf.setBoolean(VSON_PUSH, true);
            jobConf.set("key.schema", pushJobSchemaInfo.getVsonFileKeySchema());
            jobConf.set("value.schema", pushJobSchemaInfo.getVsonFileValueSchema());
            return;
        }
        jobConf.set(SCHEMA_STRING_PROP, pushJobSchemaInfo.getFileSchemaString());
        jobConf.set("avro.input.schema", pushJobSchemaInfo.getFileSchemaString());
        jobConf.setClass("avro.serialization.data.model", GenericData.class, GenericData.class);
        jobConf.setInputFormat(AvroInputFormat.class);
        jobConf.setMapperClass(VeniceAvroMapper.class);
        jobConf.setBoolean(VSON_PUSH, false);
    }

    private void setupReducerConf(JobConf jobConf, PushJobSetting pushJobSetting, TopicInfo topicInfo) {
        if (pushJobSetting.isSourceKafka) {
            jobConf.setOutputKeyComparatorClass(KafkaInputKeyComparator.class);
            jobConf.setOutputValueGroupingComparator(KafkaInputValueGroupingComparator.class);
            jobConf.setCombinerKeyGroupingComparator(KafkaInputValueGroupingComparator.class);
            jobConf.setPartitionerClass(KafkaInputMRPartitioner.class);
        } else {
            jobConf.setPartitionerClass(this.mapRedPartitionerClass);
        }
        jobConf.setReduceSpeculativeExecution(pushJobSetting.enableReducerSpeculativeExecution);
        jobConf.setNumReduceTasks(topicInfo.partitionCount * topicInfo.amplificationFactor);
        jobConf.setMapOutputKeyClass(BytesWritable.class);
        jobConf.setMapOutputValueClass(BytesWritable.class);
        if (pushJobSetting.isSourceKafka) {
            jobConf.setReducerClass(VeniceKafkaInputReducer.class);
        } else {
            jobConf.setReducerClass(VeniceReducer.class);
        }
    }

    private void validateSchemaAndBuildDict(JobConf jobConf, PushJobSetting pushJobSetting, PushJobSchemaInfo pushJobSchemaInfo, StoreSetting storeSetting, VeniceProperties veniceProperties, String str, String str2) throws Exception {
        setupMRConfToValidateSchemaAndBuildDict(jobConf, pushJobSetting, pushJobSchemaInfo, storeSetting, veniceProperties, str, str2);
        runValidateSchemaAndBuildDictJobAndUpdateStatus(jobConf);
    }

    private void setupMRConfToValidateSchemaAndBuildDict(JobConf jobConf, PushJobSetting pushJobSetting, PushJobSchemaInfo pushJobSchemaInfo, StoreSetting storeSetting, VeniceProperties veniceProperties, String str, String str2) {
        setupDefaultJobConfToValidateSchemaAndBuildDict(jobConf, pushJobSetting, storeSetting, veniceProperties, str);
        setupInputFormatConfToValidateSchemaAndBuildDict(jobConf, pushJobSchemaInfo, str2);
    }

    private void setupDefaultJobConfToValidateSchemaAndBuildDict(JobConf jobConf, PushJobSetting pushJobSetting, StoreSetting storeSetting, VeniceProperties veniceProperties, String str) {
        setupCommonJobConf(jobConf, str + ":venice_push_job_validate_schema_and_build_dict-" + pushJobSetting.storeName);
        jobConf.set(VENICE_STORE_NAME_PROP, pushJobSetting.storeName);
        jobConf.set(ETL_VALUE_SCHEMA_TRANSFORMATION, pushJobSetting.etlValueSchemaTransformation.name());
        jobConf.setBoolean(INCREMENTAL_PUSH, pushJobSetting.isIncrementalPush);
        jobConf.setLong(INPUT_PATH_LAST_MODIFIED_TIME, this.inputModificationTime);
        jobConf.setInt(DefaultInputDataInfoProvider.COMPRESSION_DICTIONARY_SIZE_LIMIT, veniceProperties.getInt(DefaultInputDataInfoProvider.COMPRESSION_DICTIONARY_SIZE_LIMIT, 972800));
        jobConf.setInt(DefaultInputDataInfoProvider.COMPRESSION_DICTIONARY_SAMPLE_SIZE, veniceProperties.getInt(DefaultInputDataInfoProvider.COMPRESSION_DICTIONARY_SAMPLE_SIZE, DefaultInputDataInfoProvider.DEFAULT_COMPRESSION_DICTIONARY_SAMPLE_SIZE));
        jobConf.setBoolean(USE_MAPPER_TO_BUILD_DICTIONARY, pushJobSetting.useMapperToBuildDict);
        jobConf.set(MAPPER_OUTPUT_DIRECTORY, pushJobSetting.useMapperToBuildDictOutputPath);
        jobConf.set(COMPRESSION_STRATEGY, storeSetting.compressionStrategy.toString());
        this.validateSchemaAndBuildDictMapperOutputDirectory = getValidateSchemaAndBuildDictionaryOutputDir(pushJobSetting.useMapperToBuildDictOutputPath, pushJobSetting.storeName, veniceProperties.getString(JOB_EXEC_ID, ""));
        jobConf.set(VALIDATE_SCHEMA_AND_BUILD_DICT_MAPPER_OUTPUT_DIRECTORY, this.validateSchemaAndBuildDictMapperOutputDirectory);
        jobConf.setBoolean("venice.writer.chunking.enabled", false);
        for (String str2 : veniceProperties.keySet()) {
            if (str2.toLowerCase().startsWith(HADOOP_VALIDATE_SCHEMA_AND_BUILD_DICT_PREFIX)) {
                jobConf.set(str2.substring(HADOOP_VALIDATE_SCHEMA_AND_BUILD_DICT_PREFIX.length()), veniceProperties.getString(str2));
            }
        }
        jobConf.setNumReduceTasks(0);
    }

    protected void setupInputFormatConfToValidateSchemaAndBuildDict(JobConf jobConf, PushJobSchemaInfo pushJobSchemaInfo, String str) {
        jobConf.set(INPUT_PATH_PROP, str);
        jobConf.setInputFormat(VeniceFileInputFormat.class);
        jobConf.setMapperClass(ValidateSchemaAndBuildDictMapper.class);
        AvroJob.setOutputSchema(jobConf, ValidateSchemaAndBuildDictMapperOutput.getClassSchema());
        jobConf.setOutputFormat(ValidateSchemaAndBuildDictOutputFormat.class);
        jobConf.set("key.field", pushJobSchemaInfo.getKeyField());
        jobConf.set("value.field", pushJobSchemaInfo.getValueField());
    }

    private void logPushJobProperties(TopicInfo topicInfo, PushJobSetting pushJobSetting, PushJobSchemaInfo pushJobSchemaInfo, String str, long j) {
        LOGGER.info(pushJobPropertiesToString(topicInfo, pushJobSetting, pushJobSchemaInfo, str, j));
    }

    private String pushJobPropertiesToString(TopicInfo topicInfo, PushJobSetting pushJobSetting, PushJobSchemaInfo pushJobSchemaInfo, String str, long j) {
        ArrayList arrayList = new ArrayList();
        arrayList.add("Job ID: " + this.jobId);
        arrayList.add("Kafka URL: " + topicInfo.kafkaUrl);
        arrayList.add("Kafka Topic: " + topicInfo.topic);
        arrayList.add("Kafka topic partition count: " + topicInfo.partitionCount);
        arrayList.add("Kafka Queue Bytes: " + pushJobSetting.batchNumBytes);
        arrayList.add("Input Directory: " + str);
        arrayList.add("Venice Store Name: " + pushJobSetting.storeName);
        arrayList.add("Venice Cluster Name: " + pushJobSetting.clusterName);
        arrayList.add("Venice URL: " + pushJobSetting.veniceControllerUrl);
        if (pushJobSchemaInfo != null) {
            arrayList.add("File Schema: " + pushJobSchemaInfo.getFileSchemaString());
            arrayList.add("Avro key schema: " + pushJobSchemaInfo.getKeySchemaString());
            arrayList.add("Avro value schema: " + pushJobSchemaInfo.getValueSchemaString());
        }
        arrayList.add("Total input data file size: " + ((j / 1024.0d) / 1024.0d) + " MB, estimated with a factor of 2");
        arrayList.add("Is incremental push: " + pushJobSetting.isIncrementalPush);
        arrayList.add("Is duplicated key allowed: " + pushJobSetting.isDuplicateKeyAllowed);
        arrayList.add("Is source ETL data: " + pushJobSetting.isSourceETL);
        arrayList.add("ETL value schema transformation : " + pushJobSetting.etlValueSchemaTransformation);
        arrayList.add("Is Kafka Input Format: " + pushJobSetting.isSourceKafka);
        if (pushJobSetting.isSourceKafka) {
            arrayList.add("Kafka Input broker urls: " + pushJobSetting.kafkaInputBrokerUrl);
            arrayList.add("Kafka Input topic name: " + pushJobSetting.kafkaInputTopic);
        }
        return String.join(Utils.NEW_LINE_CHAR, arrayList);
    }

    public void cancel() {
        killJobAndCleanup(this.pushJobSetting, this.controllerClient, this.kafkaTopicInfo);
        if (this.kafkaTopicInfo == null || !StringUtils.isEmpty(this.kafkaTopicInfo.topic)) {
            this.pushJobDetails.overallStatus.add(getPushJobDetailsStatusTuple(PushJobDetailsStatus.KILLED.getValue()));
        } else {
            this.pushJobDetails.overallStatus.add(getPushJobDetailsStatusTuple(PushJobDetailsStatus.ERROR.getValue()));
        }
        this.pushJobDetails.jobDurationInMs = System.currentTimeMillis() - this.jobStartTimeMs;
        updatePushJobDetailsWithConfigs();
        sendPushJobDetailsToController();
    }

    private void killJobAndCleanup(PushJobSetting pushJobSetting, ControllerClient controllerClient, TopicInfo topicInfo) {
        killJob();
        if (!pushJobSetting.isIncrementalPush && topicInfo != null) {
            for (int i = 0; i < 10 && StringUtils.isEmpty(topicInfo.topic); i++) {
                Utils.sleep(Duration.ofMillis(10L).toMillis());
            }
            if (StringUtils.isEmpty(topicInfo.topic)) {
                LOGGER.error("Could not find a store version to delete for store: {}", pushJobSetting.storeName);
            } else {
                ControllerClient.retryableRequest(controllerClient, pushJobSetting.controllerRetries, controllerClient2 -> {
                    return controllerClient2.killOfflinePushJob(topicInfo.topic);
                });
                LOGGER.info("Offline push job has been killed, topic: {}", topicInfo.topic);
            }
        }
        close();
    }

    private void killJob() {
        if (this.runningJob == null) {
            LOGGER.warn("No op to kill a null running job");
            return;
        }
        try {
            if (this.runningJob.isComplete()) {
                LOGGER.warn("No op to kill a completed job with name {} and ID {}", this.runningJob.getJobName(), Integer.valueOf(this.runningJob.getID().getId()));
            } else {
                this.runningJob.killJob();
            }
        } catch (Exception e) {
            LOGGER.info("Received exception while killing map-reduce job with name {} and ID {}", this.runningJob.getJobName(), Integer.valueOf(this.runningJob.getID().getId()), e);
        }
    }

    protected static Path getLatestPathOfInputDirectory(String str, FileSystem fileSystem) throws IOException {
        String[] split = str.split("#LATEST");
        String str2 = split[0];
        for (int i = 1; i < split.length; i++) {
            str2 = getLatestPath(new Path(str2), fileSystem).toString() + split[i];
        }
        return str.endsWith("#LATEST") ? getLatestPath(new Path(str2), fileSystem) : new Path(str2);
    }

    public String getKafkaTopic() {
        return this.kafkaTopicInfo.topic;
    }

    public String getKafkaUrl() {
        return this.kafkaTopicInfo.kafkaUrl;
    }

    public String getInputDirectory() {
        return this.inputDirectory;
    }

    public Optional<String> getIncrementalPushVersion() {
        return this.pushJobSetting.incrementalPushVersion;
    }

    public PushJobSchemaInfo getVeniceSchemaInfo() {
        return this.pushJobSchemaInfo;
    }

    public String getTopicToMonitor() {
        if (this.kafkaTopicInfo == null || this.pushJobSetting == null) {
            throw new VeniceException("The push job is not initialized yet");
        }
        return getTopicToMonitor(this.kafkaTopicInfo, this.pushJobSetting);
    }

    private String getTopicToMonitor(TopicInfo topicInfo, PushJobSetting pushJobSetting) {
        return Version.isRealTimeTopic(topicInfo.topic) ? Version.composeKafkaTopic(pushJobSetting.storeName, topicInfo.version) : topicInfo.topic;
    }

    private static Path getLatestPath(Path path, FileSystem fileSystem) throws IOException {
        FileStatus[] listStatus = fileSystem.listStatus(path, PATH_FILTER);
        if (listStatus.length != 0) {
            Arrays.sort(listStatus);
            for (int length = listStatus.length - 1; length >= 0; length--) {
                if (listStatus[length].isDirectory()) {
                    return listStatus[length].getPath();
                }
            }
        }
        return path;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        closeVeniceWriter();
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.controllerClient});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.kmeSchemaSystemStoreControllerClient});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.livenessHeartbeatStoreControllerClient});
    }

    public static void main(String[] strArr) {
        if (strArr.length != 1) {
            Utils.exit("USAGE: java -jar venice-push-job-all.jar <VPJ_config_file_path>");
        }
        Properties properties = new Properties();
        try {
            properties.load(new FileReader(strArr[0]));
        } catch (IOException e) {
            e.printStackTrace();
            Utils.exit("Unable to read config file");
        }
        runPushJob("Venice Push Job", properties);
        Utils.exit("Venice Push Job Completed");
    }

    public static void runPushJob(String str, Properties properties) {
        VenicePushJob venicePushJob = new VenicePushJob(str, properties);
        try {
            venicePushJob.run();
            venicePushJob.close();
        } catch (Throwable th) {
            try {
                venicePushJob.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
