package com.linkedin.venice.hadoop;

import com.github.luben.zstd.Zstd;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.CompressorFactory;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.DictionaryUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/hadoop/AbstractVeniceMapper.class */
public abstract class AbstractVeniceMapper<INPUT_KEY, INPUT_VALUE> extends AbstractMapReduceTask implements Mapper<INPUT_KEY, INPUT_VALUE, BytesWritable, BytesWritable> {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) AbstractVeniceMapper.class);
    private static final int TASK_ID_WHICH_SHOULD_SPRAY_ALL_PARTITIONS = 0;
    private CompressionStrategy compressionStrategy;
    private boolean compressionMetricCollectionEnabled;
    private boolean isZstdDictCreationSuccess;
    private boolean isZstdDictCreationRequired;
    private CompressorFactory compressorFactory;
    private VeniceCompressor[] compressor;
    byte[] recordKey = null;
    byte[] recordValue = null;
    BytesWritable keyBW = new BytesWritable();
    BytesWritable valueBW = new BytesWritable();
    protected AbstractVeniceRecordReader<INPUT_KEY, INPUT_VALUE> veniceRecordReader;
    protected FilterChain<INPUT_VALUE> veniceFilterChain;

    @Override // org.apache.hadoop.mapred.Mapper
    public void map(INPUT_KEY input_key, INPUT_VALUE input_value, OutputCollector<BytesWritable, BytesWritable> outputCollector, Reporter reporter) throws IOException {
        if (this.recordKey == null) {
            maybeSprayAllPartitions(outputCollector, reporter);
        }
        if (process(input_key, input_value, this.keyBW, this.valueBW, reporter)) {
            outputCollector.collect(this.keyBW, this.valueBW);
        }
    }

    private void maybeSprayAllPartitions(OutputCollector<BytesWritable, BytesWritable> outputCollector, Reporter reporter) throws IOException {
        if (-1 == getTaskId()) {
            throw new IllegalStateException("attemptID not set!");
        }
        if (0 != getTaskId()) {
            return;
        }
        this.keyBW.setSize(0);
        this.recordValue = new byte[4];
        for (int i = 0; i < getPartitionCount(); i++) {
            ByteUtils.writeInt(this.recordValue, i, 0);
            this.valueBW.set(this.recordValue, 0, 4);
            outputCollector.collect(this.keyBW, this.valueBW);
        }
        MRJobCounterHelper.incrMapperSprayAllPartitionsTriggeredCount(reporter, 1L);
        LOGGER.info("Map Task ID {} successfully sprayed all partitions, to ensure that all Reducers come up.", (Object) 0);
    }

    protected boolean process(INPUT_KEY input_key, INPUT_VALUE input_value, BytesWritable bytesWritable, BytesWritable bytesWritable2, Reporter reporter) {
        byte[] compress;
        this.recordKey = this.veniceRecordReader.getKeyBytes(input_key, input_value);
        this.recordValue = this.veniceRecordReader.getValueBytes(input_key, input_value);
        if (this.recordKey == null) {
            throw new VeniceException("Mapper received a empty key record");
        }
        if (this.recordValue == null) {
            LOGGER.warn("Received null record, skip.");
            MRJobCounterHelper.incrEmptyRecordCount(reporter, 1L);
            return false;
        }
        MRJobCounterHelper.incrTotalKeySize(reporter, this.recordKey.length);
        MRJobCounterHelper.incrTotalUncompressedValueSize(reporter, this.recordValue.length);
        try {
            byte[] compress2 = this.compressor[this.compressionStrategy.getValue()].compress(this.recordValue);
            MRJobCounterHelper.incrTotalValueSize(reporter, compress2.length);
            bytesWritable.set(this.recordKey, 0, this.recordKey.length);
            bytesWritable2.set(compress2, 0, compress2.length);
            if (!this.compressionMetricCollectionEnabled) {
                return true;
            }
            for (CompressionStrategy compressionStrategy : CompressionStrategy.values()) {
                if (compressionStrategy != CompressionStrategy.NO_OP && this.compressor[compressionStrategy.getValue()] != null) {
                    if (compressionStrategy == this.compressionStrategy) {
                        compress = compress2;
                    } else {
                        try {
                            compress = this.compressor[compressionStrategy.getValue()].compress(this.recordValue);
                        } catch (IOException e) {
                            LOGGER.warn("Compression to collect metrics failed for compression strategy: {}", compressionStrategy.name(), e);
                        }
                    }
                    switch (compressionStrategy) {
                        case GZIP:
                            MRJobCounterHelper.incrTotalGzipCompressedValueSize(reporter, compress.length);
                            break;
                        case ZSTD_WITH_DICT:
                            MRJobCounterHelper.incrTotalZstdCompressedValueSize(reporter, compress.length);
                            break;
                        default:
                            throw new VeniceException("Support for compression Strategy: " + compressionStrategy.name() + " needs to be added");
                    }
                }
            }
            return true;
        } catch (IOException e2) {
            throw new VeniceException("Caught an IO exception while trying to to use compression strategy: " + this.compressor[this.compressionStrategy.getValue()].getCompressionStrategy().name(), e2);
        }
    }

    protected abstract AbstractVeniceRecordReader<INPUT_KEY, INPUT_VALUE> getRecordReader(VeniceProperties veniceProperties);

    protected abstract FilterChain<INPUT_VALUE> getFilterChain(VeniceProperties veniceProperties);

    @Override // com.linkedin.venice.hadoop.AbstractMapReduceTask
    protected void configureTask(VeniceProperties veniceProperties, JobConf jobConf) {
        this.compressorFactory = new CompressorFactory();
        this.veniceRecordReader = getRecordReader(veniceProperties);
        if (this.veniceRecordReader == null) {
            throw new VeniceException("Record reader not initialized");
        }
        this.compressor = new VeniceCompressor[CompressionStrategy.getCompressionStrategyTypesArrayLength()];
        setupCompression(veniceProperties);
    }

    private void setupCompression(VeniceProperties veniceProperties) {
        this.compressionStrategy = CompressionStrategy.valueOf(veniceProperties.getString(VenicePushJob.COMPRESSION_STRATEGY));
        this.compressionMetricCollectionEnabled = veniceProperties.getBoolean(VenicePushJob.COMPRESSION_METRIC_COLLECTION_ENABLED);
        this.isZstdDictCreationRequired = veniceProperties.getBoolean(VenicePushJob.ZSTD_DICTIONARY_CREATION_REQUIRED);
        this.isZstdDictCreationSuccess = veniceProperties.getBoolean(VenicePushJob.ZSTD_DICTIONARY_CREATION_SUCCESS);
        if (!this.compressionMetricCollectionEnabled) {
            if (this.compressionStrategy != CompressionStrategy.ZSTD_WITH_DICT) {
                this.compressor[this.compressionStrategy.getValue()] = this.compressorFactory.getCompressor(this.compressionStrategy);
                return;
            } else {
                if (this.isZstdDictCreationRequired && this.isZstdDictCreationSuccess) {
                    this.compressor[CompressionStrategy.ZSTD_WITH_DICT.getValue()] = getZstdCompressor(veniceProperties);
                    return;
                }
                return;
            }
        }
        for (CompressionStrategy compressionStrategy : CompressionStrategy.values()) {
            switch (compressionStrategy) {
                case GZIP:
                case NO_OP:
                    this.compressor[compressionStrategy.getValue()] = this.compressorFactory.getCompressor(compressionStrategy);
                    break;
                case ZSTD_WITH_DICT:
                    if (this.isZstdDictCreationRequired && this.isZstdDictCreationSuccess) {
                        this.compressor[CompressionStrategy.ZSTD_WITH_DICT.getValue()] = getZstdCompressor(veniceProperties);
                        break;
                    }
                    break;
                case ZSTD:
                    break;
                default:
                    throw new VeniceException("Support for compression Strategy: " + compressionStrategy.name() + " needs to be added");
            }
        }
    }

    protected ByteBuffer readDictionaryFromKafka(String str, VeniceProperties veniceProperties) {
        return DictionaryUtils.readDictionaryFromKafka(str, veniceProperties);
    }

    private VeniceCompressor getZstdCompressor(VeniceProperties veniceProperties) {
        String string = veniceProperties.getString(VenicePushJob.TOPIC_PROP);
        ByteBuffer readDictionaryFromKafka = readDictionaryFromKafka(string, veniceProperties);
        int i = veniceProperties.getInt(VenicePushJob.ZSTD_COMPRESSION_LEVEL, Zstd.maxCompressionLevel());
        if (readDictionaryFromKafka == null || readDictionaryFromKafka.limit() <= 0) {
            return null;
        }
        return this.compressorFactory.createVersionSpecificCompressorIfNotExist(CompressionStrategy.ZSTD_WITH_DICT, string, readDictionaryFromKafka.array(), i);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        Utils.closeQuietlyWithErrorLogged(this.compressorFactory);
        Utils.closeQuietlyWithErrorLogged(this.veniceFilterChain);
    }
}
