package com.linkedin.venice.hadoop;

import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.etl.ETLValueSchemaTransformation;
import com.linkedin.venice.hadoop.InputDataInfoProvider;
import com.linkedin.venice.hadoop.VenicePushJob;
import com.linkedin.venice.hadoop.output.avro.ValidateSchemaAndBuildDictMapperOutput;
import com.linkedin.venice.schema.vson.VsonSchema;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.specific.SpecificRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/hadoop/ValidateSchemaAndBuildDictMapper.class */
public class ValidateSchemaAndBuildDictMapper extends AbstractMapReduceTask implements Mapper<IntWritable, NullWritable, AvroWrapper<SpecificRecord>, NullWritable> {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) ValidateSchemaAndBuildDictMapper.class);
    protected InputDataInfoProvider.InputDataInfo inputDataInfo;
    protected boolean isZstdDictCreationRequired;
    private long inputModificationTime;
    protected String inputDirectory;
    protected DefaultInputDataInfoProvider inputDataInfoProvider = null;
    protected VenicePushJob.PushJobSetting pushJobSetting = new VenicePushJob.PushJobSetting();
    protected VenicePushJob.StoreSetting storeSetting = new VenicePushJob.StoreSetting();
    protected boolean hasReportedFailure = false;
    private FileStatus[] fileStatuses = null;
    private FileSystem fileSystem = null;
    protected Long inputFileDataSize = 0L;

    @Override // org.apache.hadoop.mapred.Mapper
    public void map(IntWritable intWritable, NullWritable nullWritable, OutputCollector<AvroWrapper<SpecificRecord>, NullWritable> outputCollector, Reporter reporter) throws IOException {
        if (this.hasReportedFailure) {
            return;
        }
        if (process(intWritable, outputCollector, reporter)) {
            MRJobCounterHelper.incrMapperNumRecordsSuccessfullyProcessedCount(reporter, 1L);
        } else {
            this.hasReportedFailure = true;
        }
    }

    private boolean process(IntWritable intWritable, OutputCollector<AvroWrapper<SpecificRecord>, NullWritable> outputCollector, Reporter reporter) throws IOException {
        int i = intWritable.get();
        return i != -1 ? processInput(i, reporter) : buildDictionaryAndPersistOutput(outputCollector, reporter);
    }

    protected boolean processInput(int i, Reporter reporter) throws IOException {
        LOGGER.info("Processing input file index {} in directory {}", Integer.valueOf(i), this.inputDirectory);
        if (i == this.fileStatuses.length) {
            MRJobCounterHelper.incrMapperInvalidInputIdxCount(reporter, 1L);
            checkLastModificationTimeAndLogError("validating schema and building dictionary", reporter);
            return false;
        }
        FileStatus fileStatus = this.fileStatuses[i];
        LOGGER.info("Processing input file {}", fileStatus.getPath().toString());
        if (fileStatus.isDirectory()) {
            MRJobCounterHelper.incrMapperInvalidInputFileCount(reporter, 1L);
            LOGGER.error("Error while trying to validate schema: Input directory: {}  should not have sub directory: {}", fileStatus.getPath().getParent().getName(), fileStatus.getPath().getName());
            return false;
        }
        if (this.inputDataInfo.getSchemaInfo().isAvro()) {
            LOGGER.info("Detected Avro input format.");
            Pair<Schema, Schema> avroFileHeader = this.inputDataInfoProvider.getAvroFileHeader(this.fileSystem, fileStatus.getPath(), this.isZstdDictCreationRequired);
            if (!avroFileHeader.equals(this.inputDataInfo.getSchemaInfo().getAvroSchema())) {
                MRJobCounterHelper.incrMapperSchemaInconsistencyFailureCount(reporter, 1L);
                LOGGER.error("Error while trying to validate schema: Inconsistent file Avro schema found. File: {}. \nExpected file schema: {}.\n Real File schema: {}.", fileStatus.getPath().getName(), this.inputDataInfo.getSchemaInfo().getAvroSchema(), avroFileHeader);
                return false;
            }
        } else {
            LOGGER.info("Detected Vson input format, will convert to Avro automatically.");
            Pair<VsonSchema, VsonSchema> vsonFileHeader = this.inputDataInfoProvider.getVsonFileHeader(this.fileSystem, fileStatus.getPath(), this.isZstdDictCreationRequired);
            if (!vsonFileHeader.equals(this.inputDataInfo.getSchemaInfo().getVsonSchema())) {
                MRJobCounterHelper.incrMapperSchemaInconsistencyFailureCount(reporter, 1L);
                LOGGER.error("Error while trying to validate schema: Inconsistent file vson schema found. File: {}. Expected file schema: {}. Real File schema: {}.", fileStatus.getPath().getName(), this.inputDataInfo.getSchemaInfo().getVsonSchema(), vsonFileHeader);
                return false;
            }
        }
        this.inputFileDataSize = Long.valueOf(this.inputFileDataSize.longValue() + fileStatus.getLen());
        return true;
    }

    private boolean buildDictionaryAndPersistOutput(OutputCollector<AvroWrapper<SpecificRecord>, NullWritable> outputCollector, Reporter reporter) throws IOException {
        ValidateSchemaAndBuildDictMapperOutput validateSchemaAndBuildDictMapperOutput = new ValidateSchemaAndBuildDictMapperOutput();
        validateSchemaAndBuildDictMapperOutput.put(VenicePushJob.KEY_INPUT_FILE_DATA_SIZE, this.inputFileDataSize);
        try {
            if (!this.isZstdDictCreationRequired) {
                LOGGER.info("No compression dictionary is generated");
            } else if (this.inputDataInfo.hasRecords()) {
                int collectedNumberOfSamples = this.inputDataInfoProvider.pushJobZstdConfig.getCollectedNumberOfSamples();
                if (collectedNumberOfSamples < 20) {
                    MRJobCounterHelper.incrMapperZstdDictTrainSkippedCount(reporter, 1L);
                    LOGGER.error("Training ZSTD compression dictionary skipped: The sample size is too small. Collected number of samples: {}, Minimum number of required samples: {}", (Object) Integer.valueOf(collectedNumberOfSamples), (Object) 20);
                    outputCollector.collect(new AvroWrapper<>(validateSchemaAndBuildDictMapperOutput), NullWritable.get());
                    return false;
                }
                LOGGER.info("Creating ZSTD compression dictionary using {} number of samples with {} bytes", Integer.valueOf(collectedNumberOfSamples), Integer.valueOf(this.inputDataInfoProvider.pushJobZstdConfig.getFilledSize()));
                try {
                    ByteBuffer wrap = ByteBuffer.wrap(this.inputDataInfoProvider.getZstdDictTrainSamples());
                    validateSchemaAndBuildDictMapperOutput.put(VenicePushJob.KEY_ZSTD_COMPRESSION_DICTIONARY, wrap);
                    MRJobCounterHelper.incrMapperZstdDictTrainSuccessCount(reporter, 1L);
                    LOGGER.info("ZSTD compression dictionary size = {} bytes", Integer.valueOf(wrap.remaining()));
                } catch (Exception e) {
                    MRJobCounterHelper.incrMapperZstdDictTrainFailureCount(reporter, 1L);
                    LOGGER.error("Training ZSTD compression dictionary failed: The content might not be suitable for creating dictionary. ", (Throwable) e);
                    outputCollector.collect(new AvroWrapper<>(validateSchemaAndBuildDictMapperOutput), NullWritable.get());
                    return false;
                }
            } else {
                LOGGER.info("No compression dictionary is generated as the input data doesn't contain any records");
            }
            outputCollector.collect(new AvroWrapper<>(validateSchemaAndBuildDictMapperOutput), NullWritable.get());
            return true;
        } catch (Throwable th) {
            outputCollector.collect(new AvroWrapper<>(validateSchemaAndBuildDictMapperOutput), NullWritable.get());
            throw th;
        }
    }

    protected void checkLastModificationTimeAndLogError(Exception exc, String str) throws IOException {
        checkLastModificationTimeAndLogError(exc, str, null);
    }

    protected void checkLastModificationTimeAndLogError(String str, Reporter reporter) throws IOException {
        checkLastModificationTimeAndLogError(null, str, reporter);
    }

    protected void checkLastModificationTimeAndLogError(Exception exc, String str, Reporter reporter) throws IOException {
        if (this.inputDataInfoProvider.getInputLastModificationTime(this.inputDirectory) <= this.inputModificationTime) {
            LOGGER.error("Error while {}: Maybe because Dataset changed during the push job.", str, exc);
            return;
        }
        LOGGER.error("Error while {}: Because Dataset changed during the push job. Rerun the job without dataset change.", str, exc);
        if (reporter != null) {
            MRJobCounterHelper.incrMapperErrorDataModifiedDuringPushJobCount(reporter, 1L);
        }
    }

    protected void initInputData(JobConf jobConf, VeniceProperties veniceProperties) throws Exception {
        this.inputDataInfoProvider = new DefaultInputDataInfoProvider(this.storeSetting, this.pushJobSetting, veniceProperties);
        try {
            this.inputDataInfo = this.inputDataInfoProvider.validateInputAndGetInfo(this.inputDirectory);
            if (this.isZstdDictCreationRequired) {
                this.inputDataInfoProvider.initZstdConfig(this.inputDataInfo.getNumInputFiles());
            }
            try {
                this.fileSystem = FileSystem.get(jobConf);
                this.fileStatuses = this.fileSystem.listStatus(new Path(this.inputDirectory), VenicePushJob.PATH_FILTER);
            } catch (IOException e) {
                this.hasReportedFailure = true;
                checkLastModificationTimeAndLogError(e, "listing input files");
            }
        } catch (Exception e2) {
            this.hasReportedFailure = true;
            checkLastModificationTimeAndLogError(e2, "validating schema");
        }
    }

    @Override // com.linkedin.venice.hadoop.AbstractMapReduceTask
    protected void configureTask(VeniceProperties veniceProperties, JobConf jobConf) {
        this.inputDirectory = veniceProperties.getString(VenicePushJob.INPUT_PATH_PROP);
        this.pushJobSetting.storeName = veniceProperties.getString(VenicePushJob.VENICE_STORE_NAME_PROP);
        this.pushJobSetting.isIncrementalPush = veniceProperties.getBoolean(VenicePushJob.INCREMENTAL_PUSH);
        this.pushJobSetting.etlValueSchemaTransformation = ETLValueSchemaTransformation.valueOf(veniceProperties.getString(VenicePushJob.ETL_VALUE_SCHEMA_TRANSFORMATION, ETLValueSchemaTransformation.NONE.name()));
        this.storeSetting.compressionStrategy = CompressionStrategy.valueOf(veniceProperties.getString(VenicePushJob.COMPRESSION_STRATEGY));
        this.inputModificationTime = veniceProperties.getLong(VenicePushJob.INPUT_PATH_LAST_MODIFIED_TIME);
        this.pushJobSetting.useMapperToBuildDict = veniceProperties.getBoolean(VenicePushJob.USE_MAPPER_TO_BUILD_DICTIONARY);
        this.pushJobSetting.compressionMetricCollectionEnabled = veniceProperties.getBoolean(VenicePushJob.COMPRESSION_METRIC_COLLECTION_ENABLED);
        this.isZstdDictCreationRequired = veniceProperties.getBoolean(VenicePushJob.ZSTD_DICTIONARY_CREATION_REQUIRED);
        try {
            initInputData(jobConf, veniceProperties);
        } catch (Exception e) {
            this.hasReportedFailure = true;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.inputDataInfoProvider != null) {
            this.inputDataInfoProvider.close();
        }
    }
}
