package com.linkedin.venice;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.etl.VeniceKafkaDecodedRecord;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.guid.GuidUtils;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.LeaderMetadata;
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.ByteUtils;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/KafkaTopicDumper.class */
public class KafkaTopicDumper implements AutoCloseable {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) KafkaTopicDumper.class);
    public static final String VENICE_ETL_KEY_FIELD = "key";
    public static final String VENICE_ETL_VALUE_FIELD = "value";
    public static final String VENICE_ETL_OFFSET_FIELD = "offset";
    public static final String VENICE_ETL_DELETED_TS_FIELD = "DELETED_TS";
    public static final String VENICE_ETL_METADATA_FIELD = "metadata";
    public static final String VENICE_ETL_BROKER_TIMESTAMP_FIELD = "brokerTimestamp";
    public static final String VENICE_ETL_PRODUCER_TIMESTAMP_FIELD = "producerTimestamp";
    public static final String VENICE_ETL_PARTITION_FIELD = "partition";
    private final String storeName;
    private final String topicName;
    private final int partition;
    private final String keySchemaStr;
    private final String latestValueSchemaStr;
    private final Schema[] allValueSchemas;
    private final String parentDirectory;
    private final KafkaConsumer consumer;
    private final long messageCount;
    private final long endOffset;
    private final int maxConsumeAttempts;
    private final boolean logMetadataOnly;
    private DataFileWriter<GenericRecord> dataFileWriter;
    private GenericDatumReader<Object> keyReader;
    private GenericDatumReader<Object>[] valueReaders;
    private DecoderFactory decoderFactory;
    private Schema outputSchema;
    private static final String REGULAR_REC = "REG";
    private static final String CONTROL_REC = "CTRL";

    public KafkaTopicDumper(ControllerClient controllerClient, Properties properties, String str, int i, long j, int i2, String str2, int i3, boolean z) {
        this.maxConsumeAttempts = i3;
        if (Version.isVersionTopic(str)) {
            this.storeName = Version.parseStoreFromKafkaTopicName(str);
        } else {
            this.storeName = Version.parseStoreFromRealTimeTopic(str);
        }
        this.topicName = str;
        this.partition = i;
        this.parentDirectory = str2;
        this.logMetadataOnly = z;
        if (z) {
            this.keySchemaStr = null;
            this.latestValueSchemaStr = null;
            this.allValueSchemas = null;
        } else {
            this.keySchemaStr = controllerClient.getKeySchema(this.storeName).getSchemaStr();
            MultiSchemaResponse.Schema[] schemas = controllerClient.getAllValueSchema(this.storeName).getSchemas();
            LOGGER.info("Found {} value schemas for store {}", Integer.valueOf(schemas.length), this.storeName);
            this.latestValueSchemaStr = schemas[schemas.length - 1].getSchemaStr();
            this.allValueSchemas = new Schema[schemas.length];
            int i4 = 0;
            for (MultiSchemaResponse.Schema schema : schemas) {
                this.allValueSchemas[i4] = Schema.parse(schema.getSchemaStr());
                i4++;
            }
        }
        this.consumer = new KafkaConsumer(properties);
        TopicPartition topicPartition = new TopicPartition(str, i);
        List singletonList = Collections.singletonList(topicPartition);
        this.consumer.assign(singletonList);
        long max = Math.max(this.consumer.beginningOffsets(singletonList).get(topicPartition).longValue(), j);
        LOGGER.info("Starting from offset: {}", Long.valueOf(max));
        this.consumer.seek(topicPartition, max);
        this.endOffset = this.consumer.endOffsets(singletonList).get(topicPartition).longValue();
        LOGGER.info("End offset for partition {} is {}", Integer.valueOf(topicPartition.partition()), Long.valueOf(this.endOffset));
        if (i2 < 0) {
            this.messageCount = this.endOffset;
        } else {
            this.messageCount = i2;
        }
        if (z) {
            return;
        }
        setupDumpFile();
    }

    public void fetchAndProcess() {
        int i = this.maxConsumeAttempts;
        int i2 = 0;
        int i3 = 0;
        ConsumerRecord<KafkaKey, KafkaMessageEnvelope> consumerRecord = null;
        do {
            ConsumerRecords poll = this.consumer.poll(5000L);
            Iterator it2 = poll.iterator();
            while (it2.hasNext() && i2 < this.messageCount) {
                i2++;
                ConsumerRecord<KafkaKey, KafkaMessageEnvelope> consumerRecord2 = (ConsumerRecord) it2.next();
                consumerRecord = consumerRecord2;
                processRecord(consumerRecord2);
            }
            if (i2 - i3 > 1000) {
                LOGGER.info("Consumed {} messages; last consumed message offset:{}", Integer.valueOf(i2), Long.valueOf(consumerRecord.offset()));
                i3 = i2;
            }
            i = poll.count() == 0 ? i - 1 : this.maxConsumeAttempts;
            if (consumerRecord == null || consumerRecord.offset() >= this.endOffset - 2 || i2 >= this.messageCount) {
                return;
            }
        } while (i > 0);
    }

    public final void setupDumpFile() {
        File file = new File(this.parentDirectory + this.topicName + "_" + this.partition + ".avro");
        ArrayList arrayList = new ArrayList();
        for (Schema.Field field : VeniceKafkaDecodedRecord.SCHEMA$.getFields()) {
            if (field.name().equals("key")) {
                arrayList.add(AvroCompatibilityHelper.newField(field).setSchema(Schema.parse(this.keySchemaStr)).build());
            } else if (field.name().equals("value")) {
                arrayList.add(AvroCompatibilityHelper.newField(field).setSchema(Schema.createUnion((List<Schema>) Arrays.asList(Schema.create(Schema.Type.NULL), Schema.parse(this.latestValueSchemaStr)))).build());
            } else {
                arrayList.add(AvroCompatibilityHelper.newField(field).build());
            }
        }
        this.outputSchema = Schema.createRecord("KafkaRecord", "", "none", false);
        this.outputSchema.setFields(arrayList);
        this.dataFileWriter = new DataFileWriter<>(new GenericDatumWriter(this.outputSchema));
        try {
            this.dataFileWriter.create(this.outputSchema, file);
            Schema parse = Schema.parse(this.keySchemaStr);
            this.keyReader = new GenericDatumReader<>(parse, parse);
            int length = this.allValueSchemas.length;
            this.valueReaders = new GenericDatumReader[length];
            for (int i = 0; i < length; i++) {
                this.valueReaders[i] = new GenericDatumReader<>(this.allValueSchemas[i], this.allValueSchemas[length - 1]);
            }
            this.decoderFactory = new DecoderFactory();
        } catch (IOException e) {
            throw new VeniceException("Failed on creating avro file", e);
        }
    }

    public void logRecordMetadata(ConsumerRecord<KafkaKey, KafkaMessageEnvelope> consumerRecord) {
        try {
            KafkaKey key = consumerRecord.key();
            KafkaMessageEnvelope value = consumerRecord.value();
            ProducerMetadata producerMetadata = value.producerMetadata;
            String controlMessageType = key.isControlMessage() ? ControlMessageType.valueOf((ControlMessage) value.payloadUnion).toString() : MessageType.valueOf(value).toString();
            LeaderMetadata leaderMetadata = value.leaderMetadataFooter;
            Logger logger = LOGGER;
            Object[] objArr = new Object[11];
            objArr[0] = key.isControlMessage() ? CONTROL_REC : REGULAR_REC;
            objArr[1] = controlMessageType;
            objArr[2] = Long.valueOf(consumerRecord.offset());
            objArr[3] = GuidUtils.getHexFromGuid(producerMetadata.producerGUID);
            objArr[4] = Integer.valueOf(producerMetadata.segmentNumber);
            objArr[5] = Integer.valueOf(producerMetadata.messageSequenceNumber);
            objArr[6] = Long.valueOf(producerMetadata.messageTimestamp);
            objArr[7] = Long.valueOf(producerMetadata.logicalTimestamp);
            objArr[8] = leaderMetadata == null ? "-" : leaderMetadata.hostName;
            objArr[9] = leaderMetadata == null ? "-" : Long.valueOf(leaderMetadata.upstreamOffset);
            objArr[10] = leaderMetadata == null ? "-" : Integer.valueOf(leaderMetadata.upstreamKafkaClusterId);
            logger.info("{} {} Offset:{} ProducerMd=(guid:{},seg:{},seq:{},mts:{},lts:{}) LeaderMd=(host:{},uo:{},ukcId:{})", objArr);
        } catch (Exception e) {
            LOGGER.error("Failed when building record for offset {}", Long.valueOf(consumerRecord.offset()), e);
        }
    }

    private void processRecord(ConsumerRecord<KafkaKey, KafkaMessageEnvelope> consumerRecord) {
        if (this.logMetadataOnly) {
            logRecordMetadata(consumerRecord);
        } else {
            writeToFile(consumerRecord);
        }
    }

    private void writeToFile(ConsumerRecord<KafkaKey, KafkaMessageEnvelope> consumerRecord) {
        try {
            KafkaKey key = consumerRecord.key();
            KafkaMessageEnvelope value = consumerRecord.value();
            if (key.isControlMessage()) {
                LOGGER.info("Found a control message, continue");
                return;
            }
            GenericRecord record = new GenericData.Record(this.outputSchema);
            record.put("offset", Long.valueOf(consumerRecord.offset()));
            record.put("key", this.keyReader.read(null, this.decoderFactory.binaryDecoder(key.getKey(), (BinaryDecoder) null)));
            HashMap hashMap = new HashMap();
            hashMap.put(VENICE_ETL_PARTITION_FIELD, String.valueOf(consumerRecord.partition()));
            hashMap.put(VENICE_ETL_PRODUCER_TIMESTAMP_FIELD, String.valueOf(value.producerMetadata.messageTimestamp));
            hashMap.put(VENICE_ETL_BROKER_TIMESTAMP_FIELD, String.valueOf(TimestampType.LOG_APPEND_TIME.equals(consumerRecord.timestampType()) ? consumerRecord.timestamp() : 0L));
            record.put(VENICE_ETL_METADATA_FIELD, hashMap);
            switch (MessageType.valueOf(value)) {
                case PUT:
                    Put put = (Put) value.payloadUnion;
                    record.put("value", this.valueReaders[put.schemaId - 1].read(null, this.decoderFactory.binaryDecoder(ByteUtils.extractByteArray(put.putValue), (BinaryDecoder) null)));
                    break;
                case DELETE:
                    record.put(VENICE_ETL_DELETED_TS_FIELD, Long.valueOf(consumerRecord.offset()));
                    break;
                case UPDATE:
                    LOGGER.info("Found update message! continue");
                    break;
                default:
                    throw new VeniceException("How come?");
            }
            this.dataFileWriter.append(record);
        } catch (Exception e) {
            LOGGER.error("Failed when building record for offset {}", Long.valueOf(consumerRecord.offset()), e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.dataFileWriter != null) {
            this.dataFileWriter.close();
        }
    }
}
