package com.linkedin.davinci.kafka.consumer;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.ChunkedValueManifestSerializer;
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.RedundantExceptionFilter;
import com.linkedin.venice.writer.ChunkAwareCallback;
import java.nio.ByteBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/LeaderProducerCallback.class */
public class LeaderProducerCallback implements ChunkAwareCallback {
    private static final Logger LOGGER = LogManager.getLogger(LeaderFollowerStoreIngestionTask.class);
    private static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER = RedundantExceptionFilter.getRedundantExceptionFilter();
    protected static final ChunkedValueManifestSerializer CHUNKED_VALUE_MANIFEST_SERIALIZER = new ChunkedValueManifestSerializer(false);
    protected static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
    protected final LeaderFollowerStoreIngestionTask ingestionTask;
    private final PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> sourceConsumerRecord;
    private final PartitionConsumptionState partitionConsumptionState;
    private final int subPartition;
    private final String kafkaUrl;
    protected final LeaderProducedRecordContext leaderProducedRecordContext;
    private final long produceTimeNs;
    private final long beforeProcessingRecordTimestampNs;
    private byte[] key = null;
    private ChunkedValueManifest chunkedValueManifest = null;
    private ByteBuffer[] valueChunks = null;
    protected ChunkedValueManifest chunkedRmdManifest = null;
    private ByteBuffer[] rmdChunks = null;

    public LeaderProducerCallback(LeaderFollowerStoreIngestionTask leaderFollowerStoreIngestionTask, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, int i, String str, long j) {
        this.ingestionTask = leaderFollowerStoreIngestionTask;
        this.sourceConsumerRecord = pubSubMessage;
        this.partitionConsumptionState = partitionConsumptionState;
        this.subPartition = i;
        this.kafkaUrl = str;
        this.leaderProducedRecordContext = leaderProducedRecordContext;
        this.produceTimeNs = leaderFollowerStoreIngestionTask.isUserSystemStore() ? 0L : System.nanoTime();
        this.beforeProcessingRecordTimestampNs = j;
    }

    public void onCompletion(PubSubProduceResult pubSubProduceResult, Exception exc) {
        int i;
        int length;
        if (exc != null) {
            this.ingestionTask.getVersionedDIVStats().recordLeaderProducerFailure(this.ingestionTask.getStoreName(), this.ingestionTask.versionNumber);
            if (REDUNDANT_LOGGING_FILTER.isRedundantException(exc + " - TP: " + this.sourceConsumerRecord.getTopicName() + "/" + this.sourceConsumerRecord.getPartition())) {
                return;
            }
            LOGGER.error("Leader failed to send out message to version topic when consuming {}", this.sourceConsumerRecord.getTopicPartition(), exc);
            return;
        }
        if (this.partitionConsumptionState.getLeaderFollowerState() == LeaderFollowerStateType.LEADER && pubSubProduceResult.getPartition() != this.partitionConsumptionState.getPartition()) {
            this.leaderProducedRecordContext.completePersistedToDBFuture(null);
            return;
        }
        if (this.chunkedValueManifest != null) {
            if (this.valueChunks == null) {
                throw new IllegalStateException("Value chunking info not initialized.");
            }
            if (this.chunkedValueManifest.keysWithChunkIdSuffix.size() != this.valueChunks.length) {
                throw new IllegalStateException("keysWithChunkIdSuffix in chunkedValueManifest is not in sync with value chunks.");
            }
        }
        if (this.chunkedRmdManifest != null) {
            if (this.rmdChunks == null) {
                throw new IllegalStateException("RMD chunking info not initialized.");
            }
            if (this.chunkedRmdManifest.keysWithChunkIdSuffix.size() != this.rmdChunks.length) {
                throw new IllegalStateException("keysWithChunkIdSuffix in chunkedRmdManifest is not in sync with RMD chunks.");
            }
        }
        long currentTimeMillis = System.currentTimeMillis();
        this.leaderProducedRecordContext.setProducedTimestampMs(currentTimeMillis);
        if (!this.ingestionTask.isUserSystemStore()) {
            this.ingestionTask.getVersionedDIVStats().recordLeaderProducerCompletionTime(this.ingestionTask.getStoreName(), this.ingestionTask.versionNumber, LatencyUtils.getLatencyInMS(this.produceTimeNs), currentTimeMillis);
            if (this.ingestionTask.isHybridMode() && this.sourceConsumerRecord.getTopicPartition().getPubSubTopic().isRealTime() && this.partitionConsumptionState.hasLagCaughtUp()) {
                this.ingestionTask.getVersionIngestionStats().recordNearlineProducerToLocalBrokerLatency(this.ingestionTask.getStoreName(), this.ingestionTask.versionNumber, currentTimeMillis - ((KafkaMessageEnvelope) this.sourceConsumerRecord.getValue()).producerMetadata.messageTimestamp, currentTimeMillis);
            }
        }
        try {
            if (this.chunkedValueManifest == null) {
                if (this.key != null) {
                    this.leaderProducedRecordContext.setKeyBytes(this.key);
                }
                this.leaderProducedRecordContext.setProducedOffset(pubSubProduceResult.getOffset());
                this.ingestionTask.produceToStoreBufferService(this.sourceConsumerRecord, this.leaderProducedRecordContext, this.subPartition, this.kafkaUrl, this.beforeProcessingRecordTimestampNs, currentTimeMillis);
                i = 0 + 1;
                length = Math.max(0, pubSubProduceResult.getSerializedSize());
            } else {
                int produceChunksToStoreBufferService = (int) (0 + produceChunksToStoreBufferService(this.chunkedValueManifest, this.valueChunks, false, currentTimeMillis));
                int size = 0 + this.chunkedValueManifest.keysWithChunkIdSuffix.size();
                if (this.chunkedRmdManifest != null) {
                    produceChunksToStoreBufferService = (int) (produceChunksToStoreBufferService + produceChunksToStoreBufferService(this.chunkedRmdManifest, this.rmdChunks, true, currentTimeMillis));
                    size += this.chunkedRmdManifest.keysWithChunkIdSuffix.size();
                }
                ByteBuffer serialize = CHUNKED_VALUE_MANIFEST_SERIALIZER.serialize(this.chunkedValueManifest);
                serialize.position(4);
                Put instantiateManifestPut = instantiateManifestPut();
                instantiateManifestPut.putValue = serialize;
                instantiateManifestPut.schemaId = AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion();
                LeaderProducedRecordContext newPutRecordWithFuture = LeaderProducedRecordContext.newPutRecordWithFuture(this.leaderProducedRecordContext.getConsumedKafkaClusterId(), this.leaderProducedRecordContext.getConsumedOffset(), this.key, instantiateManifestPut, this.leaderProducedRecordContext.getPersistedToDBFuture());
                newPutRecordWithFuture.setProducedOffset(pubSubProduceResult.getOffset());
                this.ingestionTask.produceToStoreBufferService(this.sourceConsumerRecord, newPutRecordWithFuture, this.subPartition, this.kafkaUrl, this.beforeProcessingRecordTimestampNs, currentTimeMillis);
                i = size + 1;
                length = produceChunksToStoreBufferService + this.key.length + serialize.remaining();
            }
            recordProducerStats(length, i);
        } catch (Exception e) {
            boolean isEndOfPushReceived = this.partitionConsumptionState.isEndOfPushReceived();
            LOGGER.error("{} received exception in kafka callback thread; EOP received: {}, {}, Offset: {}", this.ingestionTask.consumerTaskId, Boolean.valueOf(isEndOfPushReceived), this.sourceConsumerRecord.getTopicPartition(), this.sourceConsumerRecord.getOffset(), e);
            if (!isEndOfPushReceived) {
                try {
                    this.ingestionTask.setIngestionException(this.sourceConsumerRecord.getTopicPartition().getPartitionNumber(), e);
                } catch (VeniceException e2) {
                    this.ingestionTask.setLastStoreIngestionException(e2);
                }
            }
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
    }

    public void setChunkingInfo(byte[] bArr, ByteBuffer[] byteBufferArr, ChunkedValueManifest chunkedValueManifest, ByteBuffer[] byteBufferArr2, ChunkedValueManifest chunkedValueManifest2) {
        this.key = bArr;
        this.chunkedValueManifest = chunkedValueManifest;
        this.valueChunks = byteBufferArr;
        this.chunkedRmdManifest = chunkedValueManifest2;
        this.rmdChunks = byteBufferArr2;
    }

    private void recordProducerStats(int i, int i2) {
        this.ingestionTask.getVersionIngestionStats().recordLeaderProduced(this.ingestionTask.getStoreName(), this.ingestionTask.versionNumber, i, i2);
        this.ingestionTask.getHostLevelIngestionStats().recordTotalLeaderBytesProduced(i);
        this.ingestionTask.getHostLevelIngestionStats().recordTotalLeaderRecordsProduced(i2);
    }

    protected Put instantiateValueChunkPut() {
        return new Put();
    }

    protected Put instantiateRmdChunkPut() {
        return new Put();
    }

    protected Put instantiateManifestPut() {
        return new Put();
    }

    private long produceChunksToStoreBufferService(ChunkedValueManifest chunkedValueManifest, ByteBuffer[] byteBufferArr, boolean z, long j) throws InterruptedException {
        Put instantiateValueChunkPut;
        long j2 = 0;
        for (int i = 0; i < chunkedValueManifest.keysWithChunkIdSuffix.size(); i++) {
            ByteBuffer byteBuffer = (ByteBuffer) chunkedValueManifest.keysWithChunkIdSuffix.get(i);
            ByteBuffer byteBuffer2 = byteBufferArr[i];
            if (z) {
                instantiateValueChunkPut = instantiateRmdChunkPut();
                instantiateValueChunkPut.replicationMetadataPayload = byteBuffer2;
            } else {
                instantiateValueChunkPut = instantiateValueChunkPut();
                instantiateValueChunkPut.putValue = byteBuffer2;
            }
            instantiateValueChunkPut.schemaId = AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion();
            LeaderProducedRecordContext newChunkPutRecord = LeaderProducedRecordContext.newChunkPutRecord(ByteUtils.extractByteArray(byteBuffer), instantiateValueChunkPut);
            newChunkPutRecord.setProducedOffset(-1L);
            this.ingestionTask.produceToStoreBufferService(this.sourceConsumerRecord, newChunkPutRecord, this.subPartition, this.kafkaUrl, this.beforeProcessingRecordTimestampNs, j);
            j2 += byteBuffer.remaining() + byteBuffer2.remaining();
        }
        return j2;
    }
}
