package com.linkedin.venice.hadoop.input.kafka;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.MRJobCounterHelper;
import com.linkedin.venice.hadoop.VenicePushJob;
import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperKey;
import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperValue;
import com.linkedin.venice.hadoop.input.kafka.avro.MapperValueType;
import com.linkedin.venice.hadoop.input.kafka.chunk.ChunkKeyValueTransformer;
import com.linkedin.venice.hadoop.input.kafka.chunk.ChunkKeyValueTransformerImpl;
import com.linkedin.venice.hadoop.input.kafka.chunk.RawKeyBytesAndChunkedKeySuffix;
import com.linkedin.venice.kafka.protocol.Delete;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.kafka.KafkaPubSubMessageDeserializer;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.kafka.common.TopicPartition;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReader.class */
public class KafkaInputRecordReader implements RecordReader<KafkaInputMapperKey, KafkaInputMapperValue>, AutoCloseable {
    public static final String KIF_RECORD_READER_KAFKA_CONFIG_PREFIX = "kif.record.reader.kafka.";
    private static final long LOG_RECORD_INTERVAL = 100000;
    private static final int CONSUMER_POLL_EMPTY_RESULT_RETRY_TIMES = 12;
    private final PubSubConsumerAdapter consumer;
    private final TopicPartition topicPartition;
    private final PubSubTopicPartition pubSubTopicPartition;
    private final long maxNumberOfRecords;
    private final long startingOffset;
    private long currentOffset;
    private final long endingOffset;
    private final boolean isSourceVersionChunkingEnabled;
    private final Schema keySchema;
    private ChunkKeyValueTransformer chunkKeyValueTransformer;
    private Iterator<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> recordIterator;
    private final Reporter reporter;
    private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new byte[0]);
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) KafkaInputRecordReader.class);
    private static final Long CONSUMER_POLL_TIMEOUT = Long.valueOf(TimeUnit.SECONDS.toMillis(1));
    private static final long EMPTY_POLL_SLEEP_TIME_MS = TimeUnit.SECONDS.toMillis(5);
    private static final PubSubTopicRepository PUBSUB_TOPIC_REPOSITORY = new PubSubTopicRepository();

    public KafkaInputRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) {
        this(inputSplit, jobConf, reporter, new ApacheKafkaConsumerAdapterFactory().create2(KafkaInputUtils.getConsumerProperties(jobConf), false, (PubSubMessageDeserializer) new KafkaPubSubMessageDeserializer(new OptimizedKafkaValueSerializer(), new LandFillObjectPool(KafkaMessageEnvelope::new), new LandFillObjectPool(KafkaMessageEnvelope::new)), (String) null), PUBSUB_TOPIC_REPOSITORY);
    }

    KafkaInputRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter, PubSubConsumerAdapter pubSubConsumerAdapter, PubSubTopicRepository pubSubTopicRepository) {
        if (!(inputSplit instanceof KafkaInputSplit)) {
            throw new VeniceException("InputSplit for RecordReader is not valid split type.");
        }
        KafkaInputSplit kafkaInputSplit = (KafkaInputSplit) inputSplit;
        this.consumer = pubSubConsumerAdapter;
        this.topicPartition = kafkaInputSplit.getTopicPartition();
        this.pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(this.topicPartition.topic()), this.topicPartition.partition());
        this.startingOffset = kafkaInputSplit.getStartingOffset();
        this.currentOffset = kafkaInputSplit.getStartingOffset() - 1;
        this.endingOffset = kafkaInputSplit.getEndingOffset();
        this.isSourceVersionChunkingEnabled = jobConf.getBoolean(VenicePushJob.KAFKA_INPUT_SOURCE_TOPIC_CHUNKING_ENABLED, false);
        String str = jobConf.get(VenicePushJob.KAFKA_SOURCE_KEY_SCHEMA_STRING_PROP);
        if (str == null) {
            throw new VeniceException("Expect a value for the config property: kafka.source.key.schema");
        }
        this.keySchema = Schema.parse(str);
        this.maxNumberOfRecords = this.endingOffset - this.startingOffset;
        this.consumer.subscribe(this.pubSubTopicPartition, this.currentOffset);
        this.reporter = reporter;
        LOGGER.info("KafkaInputRecordReader started for TopicPartition: {} starting offset: {} ending offset: {}", this.topicPartition, Long.valueOf(this.startingOffset), Long.valueOf(this.endingOffset));
    }

    @Override // org.apache.hadoop.mapred.RecordReader
    public boolean next(KafkaInputMapperKey kafkaInputMapperKey, KafkaInputMapperValue kafkaInputMapperValue) throws IOException {
        while (hasPendingData()) {
            try {
                loadRecords();
                PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> next = this.recordIterator.hasNext() ? this.recordIterator.next() : null;
                if (next == null) {
                    throw new IOException("Unable to read additional data from Kafka. See logs for details. Partition " + this.topicPartition + " Current Offset: " + this.currentOffset + " End Offset: " + this.endingOffset);
                }
                this.currentOffset = next.getOffset().longValue();
                KafkaKey key = next.getKey();
                KafkaMessageEnvelope value = next.getValue();
                if (!key.isControlMessage()) {
                    MessageType valueOf = MessageType.valueOf(value);
                    kafkaInputMapperKey.offset = next.getOffset().longValue();
                    if (this.isSourceVersionChunkingEnabled) {
                        RawKeyBytesAndChunkedKeySuffix splitCompositeKey = splitCompositeKey(key.getKey(), valueOf, getSchemaIdFromValue(value));
                        kafkaInputMapperKey.key = splitCompositeKey.getRawKeyBytes();
                        kafkaInputMapperValue.chunkedKeySuffix = splitCompositeKey.getChunkedKeySuffixBytes();
                    } else {
                        kafkaInputMapperKey.key = ByteBuffer.wrap(key.getKey(), 0, key.getKeyLength());
                    }
                    kafkaInputMapperValue.offset = next.getOffset().longValue();
                    switch (valueOf) {
                        case PUT:
                            Put put = (Put) value.payloadUnion;
                            kafkaInputMapperValue.valueType = MapperValueType.PUT;
                            kafkaInputMapperValue.value = put.putValue;
                            kafkaInputMapperValue.schemaId = put.schemaId;
                            kafkaInputMapperValue.replicationMetadataPayload = put.replicationMetadataPayload;
                            kafkaInputMapperValue.replicationMetadataVersionId = put.replicationMetadataVersionId;
                            break;
                        case DELETE:
                            Delete delete = (Delete) value.payloadUnion;
                            kafkaInputMapperValue.valueType = MapperValueType.DELETE;
                            kafkaInputMapperValue.value = EMPTY_BYTE_BUFFER;
                            kafkaInputMapperValue.replicationMetadataPayload = delete.replicationMetadataPayload;
                            kafkaInputMapperValue.replicationMetadataVersionId = delete.replicationMetadataVersionId;
                            kafkaInputMapperValue.schemaId = delete.schemaId;
                            break;
                        default:
                            throw new IOException("Unexpected '" + valueOf + "' message from Kafka topic partition: " + this.topicPartition + " with offset: " + next.getOffset());
                    }
                    if (this.reporter == null || this.reporter.equals(Reporter.NULL)) {
                        return true;
                    }
                    MRJobCounterHelper.incrTotalPutOrDeleteRecordCount(this.reporter, 1L);
                    long totalPutOrDeleteRecordsCount = MRJobCounterHelper.getTotalPutOrDeleteRecordsCount(this.reporter);
                    if (totalPutOrDeleteRecordsCount % LOG_RECORD_INTERVAL != 0) {
                        return true;
                    }
                    LOGGER.info("KafkaInputRecordReader for TopicPartition: {} has processed {} records", this.topicPartition, Long.valueOf(totalPutOrDeleteRecordsCount));
                    return true;
                }
            } catch (InterruptedException e) {
                throw new IOException("Got interrupted while loading records from topic partition: " + this.topicPartition + " with current offset: " + this.currentOffset);
            }
        }
        return false;
    }

    private int getSchemaIdFromValue(KafkaMessageEnvelope kafkaMessageEnvelope) throws IOException {
        MessageType valueOf = MessageType.valueOf(kafkaMessageEnvelope);
        switch (valueOf) {
            case PUT:
                return ((Put) kafkaMessageEnvelope.payloadUnion).schemaId;
            case DELETE:
                return -1;
            default:
                throw new IOException("Unexpected '" + valueOf + "' message from Kafka topic partition: " + this.topicPartition);
        }
    }

    private RawKeyBytesAndChunkedKeySuffix splitCompositeKey(byte[] bArr, MessageType messageType, int i) {
        ChunkKeyValueTransformer.KeyType keyType;
        if (this.chunkKeyValueTransformer == null) {
            this.chunkKeyValueTransformer = new ChunkKeyValueTransformerImpl(this.keySchema);
        }
        if (i == AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion()) {
            keyType = ChunkKeyValueTransformer.KeyType.WITH_VALUE_CHUNK;
        } else if (i == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()) {
            keyType = ChunkKeyValueTransformer.KeyType.WITH_CHUNK_MANIFEST;
        } else {
            if (i <= 0 && messageType != MessageType.DELETE) {
                throw new VeniceException("Cannot categorize key type with schema ID: " + i);
            }
            keyType = ChunkKeyValueTransformer.KeyType.WITH_FULL_VALUE;
        }
        return this.chunkKeyValueTransformer.splitChunkedKey(bArr, keyType);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.hadoop.mapred.RecordReader
    public KafkaInputMapperKey createKey() {
        return new KafkaInputMapperKey();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.hadoop.mapred.RecordReader
    public KafkaInputMapperValue createValue() {
        return new KafkaInputMapperValue();
    }

    @Override // org.apache.hadoop.mapred.RecordReader
    public long getPos() {
        return this.currentOffset;
    }

    @Override // org.apache.hadoop.mapred.RecordReader
    public void close() {
        this.consumer.close();
    }

    @Override // org.apache.hadoop.mapred.RecordReader
    public float getProgress() {
        return ((float) ((this.currentOffset - this.startingOffset) + 1)) / ((float) this.maxNumberOfRecords);
    }

    private boolean hasPendingData() {
        return this.currentOffset < this.endingOffset - 1;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void loadRecords() throws InterruptedException {
        if (this.recordIterator == null || !this.recordIterator.hasNext()) {
            Map hashMap = new HashMap();
            int i = 0;
            while (true) {
                int i2 = i;
                i++;
                if (i2 >= 12) {
                    break;
                }
                hashMap = this.consumer.poll(CONSUMER_POLL_TIMEOUT.longValue());
                if (!hashMap.isEmpty()) {
                    break;
                } else {
                    Thread.sleep(EMPTY_POLL_SLEEP_TIME_MS);
                }
            }
            if (!hashMap.isEmpty()) {
                this.recordIterator = Utils.iterateOnMapOfLists(hashMap);
            } else {
                StringBuilder sb = new StringBuilder();
                sb.append("Consumer#poll still returns empty result after retrying ").append(12).append(" times, ").append("topic partition: ").append(this.topicPartition).append(" and current offset: ").append(this.currentOffset);
                throw new VeniceException(sb.toString());
            }
        }
    }
}
