package com.linkedin.venice.hadoop.input.kafka;

import com.github.luben.zstd.ZstdDictTrainer;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.CompressorFactory;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.DefaultInputDataInfoProvider;
import com.linkedin.venice.hadoop.PushJobZstdConfig;
import com.linkedin.venice.hadoop.VenicePushJob;
import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperKey;
import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperValue;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/hadoop/input/kafka/KafkaInputDictTrainer.class */
public class KafkaInputDictTrainer {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) KafkaInputDictTrainer.class);
    private final VeniceProperties props;
    private final JobConf jobConf;
    private final String sourceTopicName;
    private byte[] dict;
    private final KafkaInputFormat kafkaInputFormat;
    private final Optional<ZstdDictTrainer> trainerSupplier;
    private final CompressionStrategy sourceVersionCompressionStrategy;
    private final CompressorBuilder compressorBuilder;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/venice/hadoop/input/kafka/KafkaInputDictTrainer$CompressorBuilder.class */
    public interface CompressorBuilder {
        VeniceCompressor getCompressor(CompressorFactory compressorFactory, CompressionStrategy compressionStrategy, String str, String str2, VeniceProperties veniceProperties);
    }

    /* loaded from: input_file:com/linkedin/venice/hadoop/input/kafka/KafkaInputDictTrainer$Param.class */
    public static class Param {
        private final String kafkaInputBroker;
        private final String topicName;
        private final String keySchema;
        private final Properties sslProperties;
        private final int compressionDictSize;
        private final int dictSampleSize;
        private final CompressionStrategy sourceVersionCompressionStrategy;
        private final boolean sourceVersionChunkingEnabled;

        Param(ParamBuilder paramBuilder) {
            this.kafkaInputBroker = paramBuilder.kafkaInputBroker;
            this.topicName = paramBuilder.topicName;
            this.keySchema = paramBuilder.keySchema;
            this.sslProperties = paramBuilder.sslProperties;
            this.compressionDictSize = paramBuilder.compressionDictSize;
            this.dictSampleSize = paramBuilder.dictSampleSize;
            this.sourceVersionCompressionStrategy = paramBuilder.sourceVersionCompressionStrategy;
            this.sourceVersionChunkingEnabled = paramBuilder.sourceVersionChunkingEnabled;
        }
    }

    /* loaded from: input_file:com/linkedin/venice/hadoop/input/kafka/KafkaInputDictTrainer$ParamBuilder.class */
    public static class ParamBuilder {
        private String kafkaInputBroker;
        private String topicName;
        private String keySchema;
        private Properties sslProperties;
        private int compressionDictSize;
        private int dictSampleSize;
        private CompressionStrategy sourceVersionCompressionStrategy;
        private boolean sourceVersionChunkingEnabled;

        public ParamBuilder setKafkaInputBroker(String str) {
            this.kafkaInputBroker = str;
            return this;
        }

        public ParamBuilder setTopicName(String str) {
            this.topicName = str;
            return this;
        }

        public ParamBuilder setKeySchema(String str) {
            this.keySchema = str;
            return this;
        }

        public ParamBuilder setSslProperties(Properties properties) {
            this.sslProperties = properties;
            return this;
        }

        public ParamBuilder setCompressionDictSize(int i) {
            this.compressionDictSize = i;
            return this;
        }

        public ParamBuilder setDictSampleSize(int i) {
            this.dictSampleSize = i;
            return this;
        }

        public ParamBuilder setSourceVersionCompressionStrategy(CompressionStrategy compressionStrategy) {
            this.sourceVersionCompressionStrategy = compressionStrategy;
            return this;
        }

        public ParamBuilder setSourceVersionChunkingEnabled(boolean z) {
            this.sourceVersionChunkingEnabled = z;
            return this;
        }

        public Param build() {
            return new Param(this);
        }
    }

    public KafkaInputDictTrainer(Param param) {
        this(new KafkaInputFormat(), Optional.empty(), param, KafkaInputUtils::getCompressor);
    }

    protected KafkaInputDictTrainer(KafkaInputFormat kafkaInputFormat, Optional<ZstdDictTrainer> optional, Param param, CompressorBuilder compressorBuilder) {
        this.dict = null;
        this.kafkaInputFormat = kafkaInputFormat;
        this.trainerSupplier = optional;
        this.sourceVersionCompressionStrategy = param.sourceVersionCompressionStrategy;
        Properties properties = new Properties();
        properties.setProperty(VenicePushJob.KAFKA_INPUT_BROKER_URL, param.kafkaInputBroker);
        properties.setProperty(VenicePushJob.KAFKA_INPUT_TOPIC, param.topicName);
        properties.setProperty(VenicePushJob.KAFKA_SOURCE_KEY_SCHEMA_STRING_PROP, param.keySchema);
        this.sourceTopicName = param.topicName;
        properties.putAll(param.sslProperties);
        properties.setProperty(DefaultInputDataInfoProvider.COMPRESSION_DICTIONARY_SIZE_LIMIT, Integer.toString(param.compressionDictSize));
        properties.setProperty(DefaultInputDataInfoProvider.COMPRESSION_DICTIONARY_SAMPLE_SIZE, Integer.toString(param.dictSampleSize));
        properties.setProperty(VenicePushJob.KAFKA_INPUT_SOURCE_TOPIC_CHUNKING_ENABLED, Boolean.toString(param.sourceVersionChunkingEnabled));
        this.props = new VeniceProperties(properties);
        this.jobConf = new JobConf();
        properties.forEach((obj, obj2) -> {
            this.jobConf.set((String) obj, (String) obj2);
        });
        this.compressorBuilder = compressorBuilder;
    }

    public synchronized byte[] trainDict() {
        byte[] extractByteArray;
        if (this.dict != null) {
            return this.dict;
        }
        KafkaInputSplit[] kafkaInputSplitArr = (KafkaInputSplit[]) this.kafkaInputFormat.getSplitsByRecordsPerSplit(this.jobConf, Long.MAX_VALUE);
        Arrays.sort(kafkaInputSplitArr, Comparator.comparingInt(kafkaInputSplit -> {
            return kafkaInputSplit.getTopicPartition().partition();
        }));
        PushJobZstdConfig pushJobZstdConfig = new PushJobZstdConfig(this.props, kafkaInputSplitArr.length);
        Optional<ZstdDictTrainer> optional = this.trainerSupplier;
        Objects.requireNonNull(pushJobZstdConfig);
        ZstdDictTrainer orElseGet = optional.orElseGet(pushJobZstdConfig::getZstdDictTrainer);
        int maxBytesPerFile = pushJobZstdConfig.getMaxBytesPerFile();
        CompressorFactory compressorFactory = new CompressorFactory();
        VeniceCompressor compressor = this.compressorBuilder.getCompressor(compressorFactory, this.sourceVersionCompressionStrategy, this.jobConf.get(VenicePushJob.KAFKA_INPUT_BROKER_URL), this.jobConf.get(VenicePushJob.KAFKA_INPUT_TOPIC), this.props);
        boolean equals = compressor.getCompressionStrategy().equals(CompressionStrategy.NO_OP);
        KafkaInputMapperKey kafkaInputMapperKey = null;
        KafkaInputMapperValue kafkaInputMapperValue = null;
        int i = 0;
        long j = 0;
        try {
            try {
                for (KafkaInputSplit kafkaInputSplit2 : kafkaInputSplitArr) {
                    long j2 = 0;
                    long j3 = 0;
                    RecordReader<KafkaInputMapperKey, KafkaInputMapperValue> recordReader = this.kafkaInputFormat.getRecordReader(kafkaInputSplit2, this.jobConf, Reporter.NULL);
                    if (kafkaInputMapperKey == null) {
                        try {
                            kafkaInputMapperKey = recordReader.createKey();
                        } finally {
                        }
                    }
                    if (kafkaInputMapperValue == null) {
                        kafkaInputMapperValue = recordReader.createValue();
                    }
                    while (recordReader.next(kafkaInputMapperKey, kafkaInputMapperValue)) {
                        if (equals) {
                            extractByteArray = ByteUtils.extractByteArray(kafkaInputMapperValue.value);
                        } else if (kafkaInputMapperValue.schemaId > 0) {
                            extractByteArray = ByteUtils.extractByteArray(compressor.decompress(kafkaInputMapperValue.value));
                        }
                        j2 += extractByteArray.length;
                        if (j2 > maxBytesPerFile) {
                            break;
                        }
                        orElseGet.addSample(extractByteArray);
                        j3++;
                    }
                    j += j3;
                    LOGGER.info("Added {} samples into dict from partition: {}", Long.valueOf(j3), Integer.valueOf(i));
                    i++;
                    recordReader.close();
                }
                if (j == 0) {
                    throw new VeniceException("No record in the source topic: " + this.sourceTopicName + ", can't train the dict");
                }
                LOGGER.info("Added total {} records from {} partitions into dict", Long.valueOf(j), Integer.valueOf(kafkaInputSplitArr.length));
                this.dict = orElseGet.trainSamples();
                LOGGER.info("Successfully finished training dict");
                return this.dict;
            } catch (IOException e) {
                throw new VeniceException("Encountered exception while reading source topic: " + this.sourceTopicName, e);
            }
        } finally {
            if (compressorFactory != null) {
                compressorFactory.close();
            }
        }
    }
}
