package com.linkedin.venice;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.etl.VeniceKafkaDecodedRecord;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.guid.GuidUtils;
import com.linkedin.venice.kafka.partitionoffset.PartitionOffsetFetcherImpl;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.LeaderMetadata;
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.Utils;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/KafkaTopicDumper.class */
public class KafkaTopicDumper implements AutoCloseable {
    private static final Logger LOGGER = LogManager.getLogger(KafkaTopicDumper.class);
    public static final String VENICE_ETL_KEY_FIELD = "key";
    public static final String VENICE_ETL_VALUE_FIELD = "value";
    public static final String VENICE_ETL_OFFSET_FIELD = "offset";
    public static final String VENICE_ETL_DELETED_TS_FIELD = "DELETED_TS";
    public static final String VENICE_ETL_METADATA_FIELD = "metadata";
    public static final String VENICE_ETL_BROKER_TIMESTAMP_FIELD = "brokerTimestamp";
    public static final String VENICE_ETL_PRODUCER_TIMESTAMP_FIELD = "producerTimestamp";
    public static final String VENICE_ETL_PARTITION_FIELD = "partition";
    private final String storeName;
    private final String topicName;
    private final int partition;
    private final String keySchemaStr;
    private final String latestValueSchemaStr;
    private final Schema[] allValueSchemas;
    private final String parentDirectory;
    private final PubSubConsumerAdapter consumer;
    private final long messageCount;
    private final long endOffset;
    private final int maxConsumeAttempts;
    private final boolean logMetadataOnly;
    private DataFileWriter<GenericRecord> dataFileWriter;
    private GenericDatumReader<Object> keyReader;
    private GenericDatumReader<Object>[] valueReaders;
    private DecoderFactory decoderFactory;
    private Schema outputSchema;
    private static final String REGULAR_REC = "REG";
    private static final String CONTROL_REC = "CTRL";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.linkedin.venice.KafkaTopicDumper$1, reason: invalid class name */
    /* loaded from: input_file:com/linkedin/venice/KafkaTopicDumper$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$venice$kafka$protocol$enums$MessageType = new int[MessageType.values().length];

        static {
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$MessageType[MessageType.PUT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$MessageType[MessageType.DELETE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$MessageType[MessageType.UPDATE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public KafkaTopicDumper(ControllerClient controllerClient, PubSubConsumerAdapter pubSubConsumerAdapter, String str, int i, long j, int i2, String str2, int i3, boolean z) {
        this.consumer = pubSubConsumerAdapter;
        this.maxConsumeAttempts = i3;
        if (Version.isVersionTopic(str)) {
            this.storeName = Version.parseStoreFromKafkaTopicName(str);
        } else {
            this.storeName = Version.parseStoreFromRealTimeTopic(str);
        }
        this.topicName = str;
        this.partition = i;
        this.parentDirectory = str2;
        this.logMetadataOnly = z;
        if (z) {
            this.keySchemaStr = null;
            this.latestValueSchemaStr = null;
            this.allValueSchemas = null;
        } else {
            this.keySchemaStr = controllerClient.getKeySchema(this.storeName).getSchemaStr();
            MultiSchemaResponse.Schema[] schemas = controllerClient.getAllValueSchema(this.storeName).getSchemas();
            LOGGER.info("Found {} value schemas for store {}", Integer.valueOf(schemas.length), this.storeName);
            this.latestValueSchemaStr = schemas[schemas.length - 1].getSchemaStr();
            this.allValueSchemas = new Schema[schemas.length];
            int i4 = 0;
            for (MultiSchemaResponse.Schema schema : schemas) {
                this.allValueSchemas[i4] = Schema.parse(schema.getSchemaStr());
                i4++;
            }
        }
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(new PubSubTopicRepository().getTopic(this.topicName), i);
        long max = Math.max(pubSubConsumerAdapter.beginningOffset(pubSubTopicPartitionImpl, PartitionOffsetFetcherImpl.DEFAULT_KAFKA_OFFSET_API_TIMEOUT).longValue(), j);
        LOGGER.info("Starting from offset: {}", Long.valueOf(max));
        pubSubConsumerAdapter.subscribe(pubSubTopicPartitionImpl, max - 1);
        this.endOffset = pubSubConsumerAdapter.endOffset(pubSubTopicPartitionImpl).longValue();
        LOGGER.info("End offset for partition {} is {}", pubSubTopicPartitionImpl, Long.valueOf(this.endOffset));
        if (i2 < 0) {
            this.messageCount = this.endOffset;
        } else {
            this.messageCount = i2;
        }
        if (z) {
            return;
        }
        setupDumpFile();
    }

    public int fetchAndProcess() {
        int i = this.maxConsumeAttempts;
        int i2 = 0;
        int i3 = 0;
        PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage = null;
        do {
            Map poll = this.consumer.poll(5000L);
            Iterator iterateOnMapOfLists = Utils.iterateOnMapOfLists(poll);
            while (iterateOnMapOfLists.hasNext() && i2 < this.messageCount) {
                i2++;
                PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage2 = (PubSubMessage) iterateOnMapOfLists.next();
                pubSubMessage = pubSubMessage2;
                processRecord(pubSubMessage2);
            }
            if (i2 - i3 > 1000) {
                LOGGER.info("Consumed {} messages; last consumed message offset:{}", Integer.valueOf(i2), pubSubMessage.getOffset());
                i3 = i2;
            }
            i = poll.isEmpty() ? i - 1 : this.maxConsumeAttempts;
            if (pubSubMessage == null || ((Long) pubSubMessage.getOffset()).longValue() >= this.endOffset - 2 || i2 >= this.messageCount) {
                break;
            }
        } while (i > 0);
        return i2;
    }

    public final void setupDumpFile() {
        File file = new File(this.parentDirectory + this.topicName + "_" + this.partition + ".avro");
        ArrayList arrayList = new ArrayList();
        for (Schema.Field field : VeniceKafkaDecodedRecord.SCHEMA$.getFields()) {
            if (field.name().equals(VENICE_ETL_KEY_FIELD)) {
                arrayList.add(AvroCompatibilityHelper.newField(field).setSchema(Schema.parse(this.keySchemaStr)).build());
            } else if (field.name().equals(VENICE_ETL_VALUE_FIELD)) {
                arrayList.add(AvroCompatibilityHelper.newField(field).setSchema(Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.parse(this.latestValueSchemaStr)))).build());
            } else {
                arrayList.add(AvroCompatibilityHelper.newField(field).build());
            }
        }
        this.outputSchema = Schema.createRecord("KafkaRecord", "", "none", false);
        this.outputSchema.setFields(arrayList);
        this.dataFileWriter = new DataFileWriter<>(new GenericDatumWriter(this.outputSchema));
        try {
            this.dataFileWriter.create(this.outputSchema, file);
            Schema parse = Schema.parse(this.keySchemaStr);
            this.keyReader = new GenericDatumReader<>(parse, parse);
            int length = this.allValueSchemas.length;
            this.valueReaders = new GenericDatumReader[length];
            for (int i = 0; i < length; i++) {
                this.valueReaders[i] = new GenericDatumReader<>(this.allValueSchemas[i], this.allValueSchemas[length - 1]);
            }
            this.decoderFactory = new DecoderFactory();
        } catch (IOException e) {
            throw new VeniceException("Failed on creating avro file", e);
        }
    }

    public void logRecordMetadata(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage) {
        try {
            KafkaKey kafkaKey = (KafkaKey) pubSubMessage.getKey();
            KafkaMessageEnvelope kafkaMessageEnvelope = (KafkaMessageEnvelope) pubSubMessage.getValue();
            ProducerMetadata producerMetadata = kafkaMessageEnvelope.producerMetadata;
            String controlMessageType = kafkaKey.isControlMessage() ? ControlMessageType.valueOf((ControlMessage) kafkaMessageEnvelope.payloadUnion).toString() : MessageType.valueOf(kafkaMessageEnvelope).toString();
            LeaderMetadata leaderMetadata = kafkaMessageEnvelope.leaderMetadataFooter;
            Logger logger = LOGGER;
            Object[] objArr = new Object[11];
            objArr[0] = kafkaKey.isControlMessage() ? CONTROL_REC : REGULAR_REC;
            objArr[1] = controlMessageType;
            objArr[2] = pubSubMessage.getOffset();
            objArr[3] = GuidUtils.getHexFromGuid(producerMetadata.producerGUID);
            objArr[4] = Integer.valueOf(producerMetadata.segmentNumber);
            objArr[5] = Integer.valueOf(producerMetadata.messageSequenceNumber);
            objArr[6] = Long.valueOf(producerMetadata.messageTimestamp);
            objArr[7] = Long.valueOf(producerMetadata.logicalTimestamp);
            objArr[8] = leaderMetadata == null ? "-" : leaderMetadata.hostName;
            objArr[9] = leaderMetadata == null ? "-" : Long.valueOf(leaderMetadata.upstreamOffset);
            objArr[10] = leaderMetadata == null ? "-" : Integer.valueOf(leaderMetadata.upstreamKafkaClusterId);
            logger.info("{} {} Offset:{} ProducerMd=(guid:{},seg:{},seq:{},mts:{},lts:{}) LeaderMd=(host:{},uo:{},ukcId:{})", objArr);
        } catch (Exception e) {
            LOGGER.error("Failed when building record for offset {}", pubSubMessage.getOffset(), e);
        }
    }

    private void processRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage) {
        if (this.logMetadataOnly) {
            logRecordMetadata(pubSubMessage);
        } else {
            writeToFile(pubSubMessage);
        }
    }

    private void writeToFile(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage) {
        try {
            KafkaKey kafkaKey = (KafkaKey) pubSubMessage.getKey();
            KafkaMessageEnvelope kafkaMessageEnvelope = (KafkaMessageEnvelope) pubSubMessage.getValue();
            if (kafkaKey.isControlMessage()) {
                LOGGER.info("Found a control message, continue");
                return;
            }
            GenericData.Record record = new GenericData.Record(this.outputSchema);
            record.put(VENICE_ETL_OFFSET_FIELD, pubSubMessage.getOffset());
            record.put(VENICE_ETL_KEY_FIELD, this.keyReader.read((Object) null, this.decoderFactory.binaryDecoder(kafkaKey.getKey(), (BinaryDecoder) null)));
            HashMap hashMap = new HashMap();
            hashMap.put(VENICE_ETL_PARTITION_FIELD, String.valueOf(pubSubMessage.getPartition()));
            hashMap.put(VENICE_ETL_PRODUCER_TIMESTAMP_FIELD, String.valueOf(kafkaMessageEnvelope.producerMetadata.messageTimestamp));
            hashMap.put(VENICE_ETL_BROKER_TIMESTAMP_FIELD, String.valueOf(pubSubMessage.getPubSubMessageTime()));
            record.put(VENICE_ETL_METADATA_FIELD, hashMap);
            switch (AnonymousClass1.$SwitchMap$com$linkedin$venice$kafka$protocol$enums$MessageType[MessageType.valueOf(kafkaMessageEnvelope).ordinal()]) {
                case 1:
                    Put put = (Put) kafkaMessageEnvelope.payloadUnion;
                    ByteBuffer byteBuffer = put.putValue;
                    int i = put.schemaId;
                    record.put(VENICE_ETL_VALUE_FIELD, this.valueReaders[i - 1].read((Object) null, this.decoderFactory.binaryDecoder(ByteUtils.extractByteArray(byteBuffer), (BinaryDecoder) null)));
                    break;
                case 2:
                    record.put(VENICE_ETL_DELETED_TS_FIELD, pubSubMessage.getOffset());
                    break;
                case 3:
                    LOGGER.info("Found update message! continue");
                    break;
                default:
                    throw new VeniceException("How come?");
            }
            this.dataFileWriter.append(record);
        } catch (Exception e) {
            LOGGER.error("Failed when building record for offset {}", pubSubMessage.getOffset(), e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.dataFileWriter != null) {
            this.dataFileWriter.close();
        }
    }
}
