package com.linkedin.venice.hadoop.input.kafka;

import com.linkedin.venice.hadoop.VenicePushJob;
import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperKey;
import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperValue;
import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.kafka.TopicManagerRepository;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapterFactory;
import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapterFactory;
import com.linkedin.venice.utils.VeniceProperties;
import it.unimi.dsi.fastutil.ints.Int2LongMap;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:com/linkedin/venice/hadoop/input/kafka/KafkaInputFormat.class */
public class KafkaInputFormat implements InputFormat<KafkaInputMapperKey, KafkaInputMapperValue> {
    public static final long DEFAULT_KAFKA_INPUT_MAX_RECORDS_PER_MAPPER = 5000000;
    private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();

    protected Map<TopicPartition, Long> getLatestOffsets(JobConf jobConf) {
        VeniceProperties consumerProperties = KafkaInputUtils.getConsumerProperties(jobConf);
        TopicManagerRepository build = TopicManagerRepository.builder().setPubSubProperties(str -> {
            return consumerProperties;
        }).setLocalKafkaBootstrapServers(jobConf.get(VenicePushJob.KAFKA_INPUT_BROKER_URL)).setPubSubTopicRepository(this.pubSubTopicRepository).setPubSubAdminAdapterFactory(new ApacheKafkaAdminAdapterFactory()).setPubSubConsumerAdapterFactory(new ApacheKafkaConsumerAdapterFactory()).build();
        try {
            TopicManager topicManager = build.getTopicManager();
            try {
                String str2 = jobConf.get(VenicePushJob.KAFKA_INPUT_TOPIC);
                Int2LongMap topicLatestOffsets = topicManager.getTopicLatestOffsets(this.pubSubTopicRepository.getTopic(str2));
                HashMap hashMap = new HashMap(topicLatestOffsets.size());
                topicLatestOffsets.forEach((num, l) -> {
                    hashMap.put(new TopicPartition(str2, num.intValue()), l);
                });
                if (topicManager != null) {
                    topicManager.close();
                }
                if (build != null) {
                    build.close();
                }
                return hashMap;
            } finally {
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.apache.hadoop.mapred.InputFormat
    public InputSplit[] getSplits(JobConf jobConf, int i) throws IOException {
        long j = jobConf.getLong(VenicePushJob.KAFKA_INPUT_MAX_RECORDS_PER_MAPPER, DEFAULT_KAFKA_INPUT_MAX_RECORDS_PER_MAPPER);
        if (j < 1) {
            throw new IllegalArgumentException("Invalid kafka.input.max.records.per.mapper value [" + j + DefaultExpressionEngine.DEFAULT_ATTRIBUTE_END);
        }
        return getSplitsByRecordsPerSplit(jobConf, j);
    }

    public InputSplit[] getSplitsByRecordsPerSplit(JobConf jobConf, long j) {
        Map<TopicPartition, Long> latestOffsets = getLatestOffsets(jobConf);
        LinkedList linkedList = new LinkedList();
        latestOffsets.forEach((topicPartition, l) -> {
            long j2 = 0;
            while (true) {
                long j3 = j2;
                if (j3 >= l.longValue()) {
                    return;
                }
                long min = Math.min(j3 + j, l.longValue());
                linkedList.add(new KafkaInputSplit(topicPartition.topic(), topicPartition.partition(), j3, min));
                j2 = min;
            }
        });
        return (InputSplit[]) linkedList.toArray(new KafkaInputSplit[linkedList.size()]);
    }

    @Override // org.apache.hadoop.mapred.InputFormat
    public RecordReader<KafkaInputMapperKey, KafkaInputMapperValue> getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) {
        return new KafkaInputRecordReader(inputSplit, jobConf, reporter);
    }
}
