package com.linkedin.davinci.kafka.consumer;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState;
import com.linkedin.davinci.kafka.consumer.StoreIngestionTask;
import com.linkedin.davinci.kafka.consumer.StoreIngestionTaskFactory;
import com.linkedin.davinci.replication.RmdWithValueSchemaId;
import com.linkedin.davinci.replication.merge.MergeConflictResolver;
import com.linkedin.davinci.replication.merge.MergeConflictResolverFactory;
import com.linkedin.davinci.replication.merge.MergeConflictResult;
import com.linkedin.davinci.replication.merge.RmdSerDe;
import com.linkedin.davinci.replication.merge.StringAnnotatedStoreSchemaCache;
import com.linkedin.davinci.stats.AggVersionedIngestionStats;
import com.linkedin.davinci.storage.chunking.ChunkingUtils;
import com.linkedin.davinci.storage.chunking.RawBytesChunkingAdapter;
import com.linkedin.davinci.storage.chunking.SingleGetChunkingAdapter;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.davinci.utils.ByteArrayKey;
import com.linkedin.venice.VeniceConstants;
import com.linkedin.venice.exceptions.PersistenceFailureException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceMessageException;
import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.Delete;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.TopicSwitch;
import com.linkedin.venice.kafka.protocol.Update;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.schema.rmd.RmdUtils;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.lazy.Lazy;
import com.linkedin.venice.writer.ChunkAwareCallback;
import com.linkedin.venice.writer.DeleteMetadata;
import com.linkedin.venice.writer.LeaderMetadataWrapper;
import com.linkedin.venice.writer.PutMetadata;
import com.linkedin.venice.writer.VeniceWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.class */
public class ActiveActiveStoreIngestionTask extends LeaderFollowerStoreIngestionTask {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) ActiveActiveStoreIngestionTask.class);
    private static final byte[] BINARY_DECODER_PARAM = new byte[16];
    private final int rmdProtocolVersionID;
    private final MergeConflictResolver mergeConflictResolver;
    private final RmdSerDe rmdSerDe;
    private final Lazy<KeyLevelLocksManager> keyLevelLocksManager;
    private final AggVersionedIngestionStats aggVersionedIngestionStats;
    private final RemoteIngestionRepairService remoteIngestionRepairService;
    private final ThreadLocal<ReusableObjects> threadLocalReusableObjects;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask$ReusableObjects.class */
    public static class ReusableObjects {
        final ByteBuffer reusedByteBuffer;
        final BinaryDecoder binaryDecoder;

        private ReusableObjects() {
            this.reusedByteBuffer = ByteBuffer.allocate(1048576);
            this.binaryDecoder = AvroCompatibilityHelper.newBinaryDecoder(ActiveActiveStoreIngestionTask.BINARY_DECODER_PARAM, 0, ActiveActiveStoreIngestionTask.BINARY_DECODER_PARAM.length, null);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask$StorageOperationType.class */
    public enum StorageOperationType {
        VALUE_AND_RMD,
        VALUE,
        RMD_CHUNK,
        NONE
    }

    public ActiveActiveStoreIngestionTask(StoreIngestionTaskFactory.Builder builder, Store store, Version version, Properties properties, BooleanSupplier booleanSupplier, VeniceStoreVersionConfig veniceStoreVersionConfig, int i, boolean z, Optional<ObjectCacheBackend> optional) {
        super(builder, store, version, properties, booleanSupplier, veniceStoreVersionConfig, i, z, optional);
        this.threadLocalReusableObjects = ThreadLocal.withInitial(() -> {
            return new ReusableObjects();
        });
        this.rmdProtocolVersionID = version.getRmdVersionId();
        this.aggVersionedIngestionStats = this.versionedIngestionStats;
        int size = this.serverConfig.getKafkaClusterIdToUrlMap().size();
        int i2 = size + 1;
        int min = (Math.min(this.storeVersionPartitionCount, this.serverConfig.getConsumerPoolSizePerKafkaCluster()) * size) + 1;
        this.keyLevelLocksManager = Lazy.of(() -> {
            return new KeyLevelLocksManager(getVersionTopic().getName(), i2, min);
        });
        StringAnnotatedStoreSchemaCache stringAnnotatedStoreSchemaCache = new StringAnnotatedStoreSchemaCache(this.storeName, this.schemaRepository);
        this.rmdSerDe = new RmdSerDe(stringAnnotatedStoreSchemaCache, this.rmdProtocolVersionID);
        this.mergeConflictResolver = MergeConflictResolverFactory.getInstance().createMergeConflictResolver(stringAnnotatedStoreSchemaCache, this.rmdSerDe, getStoreName(), this.isWriteComputationEnabled);
        this.remoteIngestionRepairService = builder.getRemoteIngestionRepairService();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask, com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public StoreIngestionTask.DelegateConsumerRecordResult delegateConsumerRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, int i, String str, int i2, long j, long j2) {
        if (!pubSubMessage.getTopicPartition().getPubSubTopic().isRealTime()) {
            return super.delegateConsumerRecord(pubSubMessage, i, str, i2, j, j2);
        }
        ByteArrayKey wrap = ByteArrayKey.wrap(pubSubMessage.getKey().getKey());
        ReentrantLock acquireLockByKey = this.keyLevelLocksManager.get().acquireLockByKey(wrap);
        acquireLockByKey.lock();
        try {
            StoreIngestionTask.DelegateConsumerRecordResult delegateConsumerRecord = super.delegateConsumerRecord(pubSubMessage, i, str, i2, j, j2);
            acquireLockByKey.unlock();
            this.keyLevelLocksManager.get().releaseLock(wrap);
            return delegateConsumerRecord;
        } catch (Throwable th) {
            acquireLockByKey.unlock();
            this.keyLevelLocksManager.get().releaseLock(wrap);
            throw th;
        }
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    protected void putInStorageEngine(int i, byte[] bArr, Put put) {
        try {
            switch (getStorageOperationType(i, put.putValue, put.replicationMetadataPayload)) {
                case VALUE_AND_RMD:
                    this.storageEngine.putWithReplicationMetadata(i, bArr, put.putValue, prependReplicationMetadataBytesWithValueSchemaId(put.replicationMetadataPayload, put.schemaId));
                    break;
                case RMD_CHUNK:
                    this.storageEngine.putReplicationMetadata(i, bArr, prependReplicationMetadataBytesWithValueSchemaId(put.replicationMetadataPayload, put.schemaId));
                    break;
                case VALUE:
                    this.storageEngine.put(i, bArr, put.putValue);
                    break;
            }
        } catch (PersistenceFailureException e) {
            throwOrLogStorageFailureDependingIfStillSubscribed(i, e);
        }
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    protected void removeFromStorageEngine(int i, byte[] bArr, Delete delete) {
        try {
            switch (getStorageOperationType(i, null, delete.replicationMetadataPayload)) {
                case VALUE_AND_RMD:
                    this.storageEngine.deleteWithReplicationMetadata(i, bArr, prependReplicationMetadataBytesWithValueSchemaId(delete.replicationMetadataPayload, delete.schemaId));
                    break;
                case VALUE:
                    this.storageEngine.delete(i, bArr);
                    break;
            }
        } catch (PersistenceFailureException e) {
            throwOrLogStorageFailureDependingIfStillSubscribed(i, e);
        }
    }

    private StorageOperationType getStorageOperationType(int i, ByteBuffer byteBuffer, ByteBuffer byteBuffer2) {
        PartitionConsumptionState partitionConsumptionState = this.partitionConsumptionStateMap.get(Integer.valueOf(i));
        if (partitionConsumptionState == null) {
            logStorageOperationWhileUnsubscribed(i);
            return StorageOperationType.NONE;
        }
        if (this.isDaVinciClient) {
            return StorageOperationType.VALUE;
        }
        if (byteBuffer2 == null) {
            throw new IllegalArgumentException("Replication metadata payload not found.");
        }
        return (partitionConsumptionState.isEndOfPushReceived() || byteBuffer2.remaining() > 0) ? (byteBuffer == null || byteBuffer.remaining() > 0) ? StorageOperationType.VALUE_AND_RMD : StorageOperationType.RMD_CHUNK : StorageOperationType.VALUE;
    }

    private byte[] prependReplicationMetadataBytesWithValueSchemaId(ByteBuffer byteBuffer, int i) {
        ByteBuffer prependIntHeaderToByteBuffer = ByteUtils.prependIntHeaderToByteBuffer(byteBuffer, i, false);
        prependIntHeaderToByteBuffer.position(prependIntHeaderToByteBuffer.position() - 4);
        byte[] extractByteArray = ByteUtils.extractByteArray(prependIntHeaderToByteBuffer);
        prependIntHeaderToByteBuffer.position(prependIntHeaderToByteBuffer.position() + 4);
        return extractByteArray;
    }

    RmdWithValueSchemaId getReplicationMetadataAndSchemaId(PartitionConsumptionState partitionConsumptionState, byte[] bArr, int i, long j) {
        PartitionConsumptionState.TransientRecord transientRecord = partitionConsumptionState.getTransientRecord(bArr);
        if (transientRecord != null) {
            getHostLevelIngestionStats().recordIngestionReplicationMetadataCacheHitCount(j);
            return new RmdWithValueSchemaId(transientRecord.getValueSchemaId(), getRmdProtocolVersionID(), transientRecord.getReplicationMetadataRecord());
        }
        byte[] rmdWithValueSchemaByteBufferFromStorage = getRmdWithValueSchemaByteBufferFromStorage(i, bArr, j);
        if (rmdWithValueSchemaByteBufferFromStorage == null) {
            return null;
        }
        return this.rmdSerDe.deserializeValueSchemaIdPrependedRmdBytes(rmdWithValueSchemaByteBufferFromStorage);
    }

    byte[] getRmdWithValueSchemaByteBufferFromStorage(int i, byte[] bArr, long j) {
        long nanoTime = System.nanoTime();
        ValueRecord replicationMetadata = SingleGetChunkingAdapter.getReplicationMetadata(getStorageEngine(), i, bArr, isChunked(), null);
        getHostLevelIngestionStats().recordIngestionReplicationMetadataLookUpLatency(LatencyUtils.getLatencyInMS(nanoTime), j);
        if (replicationMetadata == null) {
            return null;
        }
        return replicationMetadata.serialize();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask
    public void processMessageAndMaybeProduceToKafka(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, PartitionConsumptionState partitionConsumptionState, int i, String str, int i2, long j, long j2) {
        int i3;
        int i4;
        MergeConflictResult update;
        if (!partitionConsumptionState.isEndOfPushReceived() || (this.isDataRecovery && partitionConsumptionState.getTopicSwitch() != null)) {
            super.processMessageAndMaybeProduceToKafka(pubSubMessage, partitionConsumptionState, i, str, i2, j, j2);
            return;
        }
        KafkaKey key = pubSubMessage.getKey();
        KafkaMessageEnvelope value = pubSubMessage.getValue();
        byte[] key2 = key.getKey();
        MessageType valueOf = MessageType.valueOf(value.messageType);
        switch (valueOf) {
            case PUT:
                i3 = ((Put) value.payloadUnion).schemaId;
                i4 = -1;
                break;
            case UPDATE:
                Update update2 = (Update) value.payloadUnion;
                i3 = update2.schemaId;
                i4 = update2.updateSchemaId;
                break;
            case DELETE:
                i3 = -1;
                i4 = -1;
                break;
            default:
                throw new VeniceMessageException(this.consumerTaskId + " : Invalid/Unrecognized operation type submitted: " + value.messageType);
        }
        Lazy<ByteBuffer> of = Lazy.of(() -> {
            return getValueBytesForKey(partitionConsumptionState, key2, pubSubMessage.getTopicPartition(), j2);
        });
        RmdWithValueSchemaId replicationMetadataAndSchemaId = getReplicationMetadataAndSchemaId(partitionConsumptionState, key2, i, j2);
        long writeTimestampFromKME = getWriteTimestampFromKME(value);
        long extractOffsetVectorSumFromRmd = replicationMetadataAndSchemaId != null ? RmdUtils.extractOffsetVectorSumFromRmd(replicationMetadataAndSchemaId.getRmdRecord()) : 0L;
        List<Long> extractTimestampFromRmd = replicationMetadataAndSchemaId != null ? RmdUtils.extractTimestampFromRmd(replicationMetadataAndSchemaId.getRmdRecord()) : Collections.singletonList(0L);
        long longValue = pubSubMessage.getOffset().longValue();
        this.aggVersionedIngestionStats.recordTotalDCR(this.storeName, this.versionNumber);
        switch (valueOf) {
            case PUT:
                update = this.mergeConflictResolver.put(of, replicationMetadataAndSchemaId, ((Put) value.payloadUnion).putValue, writeTimestampFromKME, i3, longValue, i2, i2);
                break;
            case UPDATE:
                update = this.mergeConflictResolver.update(of, replicationMetadataAndSchemaId, ((Update) value.payloadUnion).updateValue, i3, i4, writeTimestampFromKME, longValue, i2, i2);
                break;
            case DELETE:
                update = this.mergeConflictResolver.delete(of, replicationMetadataAndSchemaId, writeTimestampFromKME, longValue, i2, i2);
                break;
            default:
                throw new VeniceMessageException(this.consumerTaskId + " : Invalid/Unrecognized operation type submitted: " + value.messageType);
        }
        this.aggVersionedIngestionStats.recordConsumedRecordEndToEndProcessingLatency(this.storeName, this.versionNumber, LatencyUtils.getLatencyInMS(j), j2);
        if (update.isUpdateIgnored()) {
            this.hostLevelIngestionStats.recordUpdateIgnoredDCR();
            partitionConsumptionState.updateLatestIgnoredUpstreamRTOffset(this.kafkaClusterIdToUrlMap.get(i2), longValue);
            return;
        }
        validatePostOperationResultsAndRecord(update, Long.valueOf(extractOffsetVectorSumFromRmd), extractTimestampFromRmd);
        int valueSchemaId = replicationMetadataAndSchemaId != null ? replicationMetadataAndSchemaId.getValueSchemaId() : i3;
        MergeConflictResult mergeConflictResult = update;
        int i5 = i3;
        this.viewWriters.forEach((str2, veniceViewWriter) -> {
            veniceViewWriter.processRecord(mergeConflictResult.getNewValue(), (ByteBuffer) of.get(), key2, this.versionNumber, i5, valueSchemaId, mergeConflictResult.getRmdRecord());
        });
        producePutOrDeleteToKafka(update, partitionConsumptionState, key2, pubSubMessage, i, str, i2, j);
    }

    private long getWriteTimestampFromKME(KafkaMessageEnvelope kafkaMessageEnvelope) {
        return kafkaMessageEnvelope.producerMetadata.logicalTimestamp >= 0 ? kafkaMessageEnvelope.producerMetadata.logicalTimestamp : kafkaMessageEnvelope.producerMetadata.messageTimestamp;
    }

    private void validatePostOperationResultsAndRecord(MergeConflictResult mergeConflictResult, Long l, List<Long> list) {
        if (mergeConflictResult.isUpdateIgnored()) {
            return;
        }
        GenericRecord rmdRecord = mergeConflictResult.getRmdRecord();
        if (l.longValue() > RmdUtils.extractOffsetVectorSumFromRmd(rmdRecord)) {
            this.hostLevelIngestionStats.recordOffsetRegressionDCRError();
            this.aggVersionedIngestionStats.recordOffsetRegressionDCRError(this.storeName, this.versionNumber);
            LOGGER.error("Offset vector found to have gone backwards!! New invalid replication metadata result: {}", rmdRecord);
        }
        List<Long> extractTimestampFromRmd = RmdUtils.extractTimestampFromRmd(rmdRecord);
        for (int i = 0; i < list.size(); i++) {
            if (list.get(i).longValue() > extractTimestampFromRmd.get(i).longValue()) {
                this.hostLevelIngestionStats.recordTimestampRegressionDCRError();
                this.aggVersionedIngestionStats.recordTimestampRegressionDCRError(this.storeName, this.versionNumber);
                LOGGER.error("Timestamp found to have gone backwards!! Invalid replication metadata result: {}", mergeConflictResult.getRmdRecord());
            }
        }
    }

    private ByteBuffer getValueBytesForKey(PartitionConsumptionState partitionConsumptionState, byte[] bArr, PubSubTopicPartition pubSubTopicPartition, long j) {
        ByteBuffer byteBuffer = null;
        PartitionConsumptionState.TransientRecord transientRecord = partitionConsumptionState.getTransientRecord(bArr);
        if (transientRecord == null) {
            long nanoTime = System.nanoTime();
            ReusableObjects reusableObjects = this.threadLocalReusableObjects.get();
            byteBuffer = RawBytesChunkingAdapter.INSTANCE.get(this.storageEngine, getSubPartitionId(bArr, pubSubTopicPartition), ByteBuffer.wrap(bArr), this.isChunked, reusableObjects.reusedByteBuffer, reusableObjects.binaryDecoder, null, this.compressionStrategy, this.serverConfig.isComputeFastAvroEnabled(), this.schemaRepository, this.storeName, this.compressor.get());
            this.hostLevelIngestionStats.recordIngestionValueBytesLookUpLatency(LatencyUtils.getLatencyInMS(nanoTime), j);
        } else {
            this.hostLevelIngestionStats.recordIngestionValueBytesCacheHitCount(j);
            if (transientRecord.getValue() != null) {
                byteBuffer = ByteBuffer.wrap(transientRecord.getValue(), transientRecord.getValueOffset(), transientRecord.getValueLen());
            }
        }
        return byteBuffer;
    }

    private void producePutOrDeleteToKafka(MergeConflictResult mergeConflictResult, PartitionConsumptionState partitionConsumptionState, byte[] bArr, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, int i, String str, int i2, long j) {
        ByteBuffer maybeCompressData = maybeCompressData(pubSubMessage.getTopicPartition().getPartitionNumber(), mergeConflictResult.getNewValue(), partitionConsumptionState);
        int valueSchemaId = mergeConflictResult.getValueSchemaId();
        GenericRecord rmdRecord = mergeConflictResult.getRmdRecord();
        ByteBuffer serializeRmdRecord = this.rmdSerDe.serializeRmdRecord(mergeConflictResult.getValueSchemaId(), mergeConflictResult.getRmdRecord());
        if (maybeCompressData == null) {
            this.hostLevelIngestionStats.recordTombstoneCreatedDCR();
            this.aggVersionedIngestionStats.recordTombStoneCreationDCR(this.storeName, this.versionNumber);
            partitionConsumptionState.setTransientRecord(i2, pubSubMessage.getOffset().longValue(), bArr, valueSchemaId, rmdRecord);
            Delete delete = new Delete();
            delete.schemaId = valueSchemaId;
            delete.replicationMetadataVersionId = this.rmdProtocolVersionID;
            delete.replicationMetadataPayload = serializeRmdRecord;
            produceToLocalKafka(pubSubMessage, partitionConsumptionState, LeaderProducedRecordContext.newDeleteRecord(i2, pubSubMessage.getOffset().longValue(), bArr, delete), (chunkAwareCallback, leaderMetadataWrapper) -> {
                this.veniceWriter.get().delete((VeniceWriter<byte[], byte[], byte[]>) bArr, chunkAwareCallback, leaderMetadataWrapper, new DeleteMetadata(valueSchemaId, this.rmdProtocolVersionID, serializeRmdRecord));
            }, i, str, i2, j);
            return;
        }
        partitionConsumptionState.setTransientRecord(i2, pubSubMessage.getOffset().longValue(), bArr, maybeCompressData.array(), maybeCompressData.position(), maybeCompressData.remaining(), valueSchemaId, rmdRecord);
        Put put = new Put();
        put.putValue = ByteUtils.prependIntHeaderToByteBuffer(maybeCompressData, valueSchemaId, mergeConflictResult.doesResultReuseInput());
        put.schemaId = valueSchemaId;
        put.replicationMetadataVersionId = this.rmdProtocolVersionID;
        put.replicationMetadataPayload = serializeRmdRecord;
        byte[] bArr2 = bArr;
        if (this.isChunked) {
            bArr2 = ChunkingUtils.KEY_WITH_CHUNKING_SUFFIX_SERIALIZER.serializeNonChunkedKey(bArr);
        }
        produceToLocalKafka(pubSubMessage, partitionConsumptionState, LeaderProducedRecordContext.newPutRecord(i2, pubSubMessage.getOffset().longValue(), bArr2, put), getProduceToTopicFunction(bArr, maybeCompressData, serializeRmdRecord, valueSchemaId, mergeConflictResult.doesResultReuseInput()), i, str, i2, j);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask
    public void produceToLocalKafka(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, BiConsumer<ChunkAwareCallback, LeaderMetadataWrapper> biConsumer, int i, String str, int i2, long j) {
        super.produceToLocalKafka(pubSubMessage, partitionConsumptionState, leaderProducedRecordContext, biConsumer, i, str, i2, j);
        if (partitionConsumptionState.getLeaderFollowerState() == LeaderFollowerStateType.LEADER && partitionConsumptionState.isHybrid() && pubSubMessage.getTopicPartition().getPubSubTopic().isRealTime()) {
            partitionConsumptionState.updateLatestRTOffsetTriedToProduceToVTMap(str, pubSubMessage.getOffset().longValue());
        }
    }

    @Override // com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask, com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    protected void startConsumingAsLeader(PartitionConsumptionState partitionConsumptionState) {
        int partition = partitionConsumptionState.getPartition();
        PubSubTopic leaderTopic = partitionConsumptionState.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository);
        if (shouldNewLeaderSwitchToRemoteConsumption(partitionConsumptionState)) {
            partitionConsumptionState.setConsumeRemotely(true);
            LOGGER.info("{} enabled remote consumption from topic {} partition {}", this.consumerTaskId, leaderTopic, Integer.valueOf(partition));
        }
        partitionConsumptionState.setLeaderFollowerState(LeaderFollowerStateType.LEADER);
        Set<String> consumptionSourceKafkaAddress = getConsumptionSourceKafkaAddress(partitionConsumptionState);
        HashMap hashMap = new HashMap(consumptionSourceKafkaAddress.size());
        consumptionSourceKafkaAddress.forEach(str -> {
            hashMap.put(str, Long.valueOf(partitionConsumptionState.getLeaderOffset(str, this.pubSubTopicRepository)));
        });
        LOGGER.info("{} is promoted to leader for partition {} and it is going to start consuming from topic {} with offset by Kafka URL mapping {}", this.consumerTaskId, Integer.valueOf(partition), leaderTopic, hashMap);
        hashMap.forEach((str2, l) -> {
            consumerSubscribe(partitionConsumptionState.getSourceTopicPartition(leaderTopic), l.longValue(), str2);
        });
        syncConsumedUpstreamRTOffsetMapIfNeeded(partitionConsumptionState, hashMap);
        LOGGER.info("{}, as a leader, started consuming from topic {} partition {} with offset by Kafka URL mapping {}", this.consumerTaskId, leaderTopic, Integer.valueOf(partition), hashMap);
    }

    private long calculateRewindStartTime(PartitionConsumptionState partitionConsumptionState) {
        long endOfPushTimestamp;
        long rewindTimeInSeconds = this.hybridStoreConfig.get().getRewindTimeInSeconds() * 1000;
        if (this.isDataRecovery) {
            rewindTimeInSeconds = Math.max(TopicManager.BUFFER_REPLAY_MINIMAL_SAFETY_MARGIN, rewindTimeInSeconds);
        }
        switch (this.hybridStoreConfig.get().getBufferReplayPolicy()) {
            case REWIND_FROM_SOP:
                endOfPushTimestamp = partitionConsumptionState.getStartOfPushTimestamp() - rewindTimeInSeconds;
                break;
            case REWIND_FROM_EOP:
            default:
                endOfPushTimestamp = partitionConsumptionState.getEndOfPushTimestamp() - rewindTimeInSeconds;
                break;
        }
        return endOfPushTimestamp;
    }

    @Override // com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask
    protected void leaderExecuteTopicSwitch(PartitionConsumptionState partitionConsumptionState, TopicSwitch topicSwitch, PubSubTopic pubSubTopic) {
        if (partitionConsumptionState.getLeaderFollowerState() != LeaderFollowerStateType.LEADER) {
            throw new VeniceException(String.format("Expect state %s but got %s", LeaderFollowerStateType.LEADER, partitionConsumptionState.toString()));
        }
        if (topicSwitch.sourceKafkaServers.isEmpty()) {
            throw new VeniceException("In the A/A mode, source Kafka URL list cannot be empty in Topic Switch control message.");
        }
        int partition = partitionConsumptionState.getPartition();
        PubSubTopic leaderTopic = partitionConsumptionState.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository);
        PubSubTopicPartition sourceTopicPartition = partitionConsumptionState.getSourceTopicPartition(pubSubTopic);
        HashMap hashMap = new HashMap(topicSwitch.sourceKafkaServers.size());
        ArrayList arrayList = new ArrayList();
        topicSwitch.sourceKafkaServers.forEach(charSequence -> {
            long j;
            Long latestProcessedUpstreamRTOffsetWithNoDefault = partitionConsumptionState.getLatestProcessedUpstreamRTOffsetWithNoDefault(charSequence.toString());
            if (latestProcessedUpstreamRTOffsetWithNoDefault == null || latestProcessedUpstreamRTOffsetWithNoDefault.longValue() < 0) {
                if (topicSwitch.rewindStartTimestamp == VeniceConstants.REWIND_TIME_DECIDED_BY_SERVER.longValue()) {
                    j = calculateRewindStartTime(partitionConsumptionState);
                    LOGGER.info("{} leader calculated rewindStartTimestamp {} for {}", this.consumerTaskId, Long.valueOf(j), sourceTopicPartition);
                } else {
                    j = topicSwitch.rewindStartTimestamp;
                }
                if (j > 0) {
                    try {
                        latestProcessedUpstreamRTOffsetWithNoDefault = Long.valueOf(getTopicPartitionOffsetByKafkaURL(charSequence, new PubSubTopicPartitionImpl(pubSubTopic, sourceTopicPartition.getPartitionNumber()), j));
                    } catch (Exception e) {
                        arrayList.add(charSequence);
                        latestProcessedUpstreamRTOffsetWithNoDefault = -1L;
                        LOGGER.error("Failed contacting broker {} when processing topic switch! for {}. Setting upstream start offset to {}", charSequence, sourceTopicPartition, (Object) (-1));
                        this.hostLevelIngestionStats.recordIngestionFailure();
                        if (this.remoteIngestionRepairService == null) {
                            throw new VeniceException(String.format("Failed contacting broker (%s) and no repair service available!  Aborting topic switch processing for %s. Setting upstream start offset to %d", charSequence, sourceTopicPartition.toString(), -1));
                        }
                        this.remoteIngestionRepairService.registerRepairTask(this, buildRepairTask(charSequence.toString(), sourceTopicPartition, j, partitionConsumptionState));
                    }
                } else {
                    latestProcessedUpstreamRTOffsetWithNoDefault = -1L;
                }
            }
            hashMap.put(charSequence.toString(), latestProcessedUpstreamRTOffsetWithNoDefault);
        });
        if (arrayList.size() >= (topicSwitch.sourceKafkaServers.size() + 1) / 2) {
            throw new VeniceException("Couldn't reach any broker!!  Aborting topic switch triggered consumer subscription!");
        }
        consumerUnSubscribe(leaderTopic, partitionConsumptionState);
        waitForLastLeaderPersistFuture(partitionConsumptionState, String.format("Leader failed to produce the last message to version topic before switching feed topic from %s to %s on partition %s", leaderTopic, pubSubTopic, Integer.valueOf(partition)));
        if (topicSwitch.sourceKafkaServers.size() != 1 || !Objects.equals(topicSwitch.sourceKafkaServers.get(0).toString(), this.localKafkaServer)) {
            partitionConsumptionState.setConsumeRemotely(true);
            LOGGER.info("{} enabled remote consumption and switch to topic {} partition {} with offset by Kafka URL mapping {}", this.consumerTaskId, pubSubTopic, sourceTopicPartition, hashMap);
        }
        partitionConsumptionState.getOffsetRecord().setLeaderTopic(pubSubTopic);
        hashMap.forEach((str, l) -> {
            partitionConsumptionState.getOffsetRecord().setLeaderUpstreamOffset(str, l.longValue());
        });
        if (arrayList.size() > 0) {
            LOGGER.warn("Failed to reach broker urls {}, will schedule retry to compute upstream offset and resubscribe!", arrayList.toString());
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                hashMap.remove(((CharSequence) it2.next()).toString());
            }
        }
        hashMap.forEach((str2, l2) -> {
            consumerSubscribe(partitionConsumptionState.getSourceTopicPartition(pubSubTopic), l2.longValue(), str2);
        });
        syncConsumedUpstreamRTOffsetMapIfNeeded(partitionConsumptionState, hashMap);
        LOGGER.info("{} leader successfully switch feed topic from {} to {} on partition {} with offset by Kafka URL mapping {}", this.consumerTaskId, leaderTopic, pubSubTopic, Integer.valueOf(partition), hashMap);
        this.defaultReadyToServeChecker.apply(partitionConsumptionState);
    }

    @Override // com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask, com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    protected boolean processTopicSwitch(ControlMessage controlMessage, int i, long j, PartitionConsumptionState partitionConsumptionState) {
        if (isLeader(partitionConsumptionState) && !this.amplificationFactorAdapter.isLeaderSubPartition(i)) {
            LOGGER.info("SubPartition: {} is demoted from LEADER to STANDBY.", Integer.valueOf(partitionConsumptionState.getPartition()));
            PubSubTopic leaderTopic = partitionConsumptionState.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository);
            consumerUnSubscribe(leaderTopic, partitionConsumptionState);
            waitForLastLeaderPersistFuture(partitionConsumptionState, String.format("Leader failed to produce the last message to version topic before switching feed topic from %s to %s on partition %s", leaderTopic, this.kafkaVersionTopic, Integer.valueOf(i)));
            partitionConsumptionState.setConsumeRemotely(false);
            partitionConsumptionState.setLeaderFollowerState(LeaderFollowerStateType.STANDBY);
            consumerSubscribe(partitionConsumptionState.getSourceTopicPartition(this.versionTopic), partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset(), this.localKafkaServer);
        }
        TopicSwitch topicSwitch = (TopicSwitch) controlMessage.controlMessageUnion;
        this.statusReportAdapter.reportTopicSwitchReceived(partitionConsumptionState);
        String charSequence = topicSwitch.sourceTopicName.toString();
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(charSequence);
        HashMap hashMap = new HashMap(topicSwitch.sourceKafkaServers.size());
        if (!this.isDaVinciClient) {
            int sourceTopicPartitionNumber = partitionConsumptionState.getSourceTopicPartitionNumber(topic);
            AtomicInteger atomicInteger = new AtomicInteger(0);
            topicSwitch.sourceKafkaServers.forEach(charSequence2 -> {
                long j2;
                if (topicSwitch.rewindStartTimestamp == VeniceConstants.REWIND_TIME_DECIDED_BY_SERVER.longValue()) {
                    j2 = calculateRewindStartTime(partitionConsumptionState);
                    LOGGER.info("{} leader calculated rewindStartTimestamp {} for topic {} partition {}", this.consumerTaskId, Long.valueOf(j2), charSequence, Integer.valueOf(sourceTopicPartitionNumber));
                } else {
                    j2 = topicSwitch.rewindStartTimestamp;
                }
                if (j2 <= 0) {
                    hashMap.put(charSequence2.toString(), -1L);
                    return;
                }
                long j3 = -1;
                try {
                    j3 = getTopicManager(charSequence2.toString()).getPartitionOffsetByTime(new PubSubTopicPartitionImpl(topic, sourceTopicPartitionNumber), j2);
                    atomicInteger.getAndIncrement();
                } catch (Exception e) {
                    LOGGER.error("Failed to reach broker {} when trying to get partitionOffsetByTime for topic {} partitions {}", charSequence2.toString(), charSequence, Integer.valueOf(sourceTopicPartitionNumber));
                }
                if (j3 != -1) {
                    j3--;
                }
                hashMap.put(charSequence2.toString(), Long.valueOf(j3));
            });
            if (atomicInteger.get() == 0) {
                throw new VeniceException("Failed to query any broker for rewind!  Aborting topic switch processing!");
            }
            hashMap.forEach((str, l) -> {
                partitionConsumptionState.getOffsetRecord().setLeaderUpstreamOffset(str, l.longValue());
            });
        }
        syncTopicSwitchToIngestionMetadataService(topicSwitch, partitionConsumptionState, hashMap);
        if (isLeader(partitionConsumptionState)) {
            return false;
        }
        partitionConsumptionState.getOffsetRecord().setLeaderTopic(topic);
        return true;
    }

    @Override // com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask, com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    protected void updateLatestInMemoryProcessedOffset(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, LeaderProducedRecordContext leaderProducedRecordContext, String str) {
        Objects.requireNonNull(partitionConsumptionState);
        updateOffsetsFromConsumerRecord(partitionConsumptionState, pubSubMessage, leaderProducedRecordContext, partitionConsumptionState::updateLatestProcessedLocalVersionTopicOffset, (str2, pubSubTopic, j) -> {
            if (pubSubTopic.isRealTime()) {
                partitionConsumptionState.updateLatestProcessedUpstreamRTOffset(str2, j);
            } else {
                partitionConsumptionState.updateLatestProcessedUpstreamVersionTopicOffset(j);
            }
        }, (str3, pubSubTopic2) -> {
            return pubSubTopic2.isRealTime() ? partitionConsumptionState.getLatestProcessedUpstreamRTOffset(str3) : partitionConsumptionState.getLatestProcessedUpstreamVersionTopicOffset();
        }, () -> {
            return getUpstreamKafkaUrl(partitionConsumptionState, pubSubMessage, str);
        });
    }

    private String getUpstreamKafkaUrl(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, String str) {
        String upstreamKafkaUrlFromKafkaValue;
        if (isLeader(partitionConsumptionState)) {
            upstreamKafkaUrlFromKafkaValue = str;
        } else {
            KafkaMessageEnvelope value = pubSubMessage.getValue();
            upstreamKafkaUrlFromKafkaValue = value.leaderMetadataFooter == null ? this.localKafkaServer : getUpstreamKafkaUrlFromKafkaValue(value);
        }
        return upstreamKafkaUrlFromKafkaValue;
    }

    @Override // com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask, com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    protected boolean isRealTimeBufferReplayStarted(PartitionConsumptionState partitionConsumptionState) {
        TopicSwitchWrapper topicSwitch = partitionConsumptionState.getTopicSwitch();
        if (topicSwitch == null) {
            return false;
        }
        if (topicSwitch.getTopicSwitch().sourceKafkaServers.isEmpty()) {
            throw new VeniceException("Got empty source Kafka URLs in Topic Switch.");
        }
        return topicSwitch.getNewSourceTopic().isRealTime();
    }

    @Override // com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask
    protected long getLatestPersistedUpstreamOffsetForHybridOffsetLagMeasurement(PartitionConsumptionState partitionConsumptionState, String str) {
        return partitionConsumptionState.getLatestProcessedUpstreamRTOffsetWithIgnoredMessages(str);
    }

    @Override // com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask
    protected long getLatestConsumedUpstreamOffsetForHybridOffsetLagMeasurement(PartitionConsumptionState partitionConsumptionState, String str) {
        return partitionConsumptionState.getLeaderConsumedUpstreamRTOffset(str);
    }

    @Override // com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask
    protected void updateLatestInMemoryLeaderConsumedRTOffset(PartitionConsumptionState partitionConsumptionState, String str, long j) {
        partitionConsumptionState.updateLeaderConsumedUpstreamRTOffset(str, j);
    }

    private String getUpstreamKafkaUrlFromKafkaValue(KafkaMessageEnvelope kafkaMessageEnvelope) {
        if (kafkaMessageEnvelope.leaderMetadataFooter == null) {
            throw new VeniceException("leaderMetadataFooter field in KME should have been set.");
        }
        String str = this.kafkaClusterIdToUrlMap.get(kafkaMessageEnvelope.leaderMetadataFooter.upstreamKafkaClusterId);
        if (str == null) {
            throw new VeniceException(String.format("No Kafka cluster ID found in the cluster ID to Kafka URL map. Got cluster ID %d and ID to cluster URL map %s", Integer.valueOf(kafkaMessageEnvelope.leaderMetadataFooter.upstreamKafkaClusterId), this.kafkaClusterIdToUrlMap));
        }
        return str;
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    protected boolean isTransientRecordBufferUsed() {
        return true;
    }

    @Override // com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask, com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public long getRegionHybridOffsetLag(int i) {
        String str;
        if (this.storageEngine.getStoreVersionState() == null || this.partitionConsumptionStateMap.isEmpty() || (str = this.kafkaClusterIdToUrlMap.get(i)) == null) {
            return 0L;
        }
        return minZeroLag(this.partitionConsumptionStateMap.values().stream().filter(LeaderFollowerStoreIngestionTask.LEADER_OFFSET_LAG_FILTER).filter(partitionConsumptionState -> {
            return this.amplificationFactorAdapter.isLeaderSubPartition(partitionConsumptionState.getPartition());
        }).mapToLong(partitionConsumptionState2 -> {
            PubSubTopic leaderTopic = partitionConsumptionState2.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository);
            if (leaderTopic == null || !leaderTopic.isRealTime()) {
                return 0L;
            }
            long partitionOffsetLag = getPartitionOffsetLag(str, leaderTopic, partitionConsumptionState2.getUserPartition());
            return partitionOffsetLag >= 0 ? partitionOffsetLag : (this.cachedKafkaMetadataGetter.getOffset(getTopicManager(str), leaderTopic, partitionConsumptionState2.getUserPartition()) - 1) - partitionConsumptionState2.getLeaderConsumedUpstreamRTOffset(str);
        }).sum());
    }

    @Override // com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask
    protected long measureRTOffsetLagForMultiRegions(Set<String> set, PartitionConsumptionState partitionConsumptionState, boolean z) {
        if (this.hybridStoreConfig.get().getDataReplicationPolicy().equals(DataReplicationPolicy.AGGREGATE)) {
            long j = Long.MAX_VALUE;
            Iterator<String> it2 = set.iterator();
            while (it2.hasNext()) {
                long measureRTOffsetLagForSingleRegion = measureRTOffsetLagForSingleRegion(it2.next(), partitionConsumptionState, z);
                if (j > measureRTOffsetLagForSingleRegion) {
                    j = measureRTOffsetLagForSingleRegion;
                }
            }
            return j;
        }
        if (!this.hybridStoreConfig.get().getDataReplicationPolicy().equals(DataReplicationPolicy.ACTIVE_ACTIVE)) {
            if (set.contains(this.localKafkaServer)) {
                return measureRTOffsetLagForSingleRegion(this.localKafkaServer, partitionConsumptionState, z);
            }
            throw new VeniceException(String.format("Expect source RT Kafka URLs contains local Kafka URL. Got local Kafka URL %s and RT source Kafka URLs %s", this.localKafkaServer, set));
        }
        long j2 = Long.MIN_VALUE;
        int i = 0;
        for (String str : set) {
            try {
                j2 = Math.max(measureRTOffsetLagForSingleRegion(str, partitionConsumptionState, z), j2);
            } catch (Exception e) {
                LOGGER.error("Failed to measure RT offset lag for topic {} partition id {} in {}", partitionConsumptionState.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository), Integer.valueOf(partitionConsumptionState.getPartition()), str, e);
                i++;
                if (i > 1) {
                    LOGGER.error("More than one regions are unreachable. Return {} as it is not ready-to-serve", (Object) Long.MAX_VALUE);
                    return Long.MAX_VALUE;
                }
            }
        }
        return j2;
    }

    @Override // com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask, com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public boolean isReadyToServeAnnouncedWithRTLag() {
        if (!this.hybridStoreConfig.isPresent() || this.partitionConsumptionStateMap.isEmpty()) {
            return false;
        }
        long offsetLagThresholdToGoOnline = this.hybridStoreConfig.get().getOffsetLagThresholdToGoOnline();
        for (PartitionConsumptionState partitionConsumptionState : this.partitionConsumptionStateMap.values()) {
            if (partitionConsumptionState.hasLagCaughtUp() && offsetLagThresholdToGoOnline >= 0) {
                Set<String> realTimeDataSourceKafkaAddress = getRealTimeDataSourceKafkaAddress(partitionConsumptionState);
                if (realTimeDataSourceKafkaAddress.isEmpty()) {
                    return true;
                }
                int i = 0;
                Iterator<String> it2 = realTimeDataSourceKafkaAddress.iterator();
                while (it2.hasNext()) {
                    try {
                    } catch (Exception e) {
                        i++;
                        if (i > 1) {
                            return true;
                        }
                    }
                    if (measureRTOffsetLagForSingleRegion(it2.next(), partitionConsumptionState, false) > offsetLagThresholdToGoOnline) {
                        return true;
                    }
                }
            }
        }
        return false;
    }

    Runnable buildRepairTask(String str, PubSubTopicPartition pubSubTopicPartition, long j, PartitionConsumptionState partitionConsumptionState) {
        return () -> {
            long topicPartitionOffsetByKafkaURL = getTopicPartitionOffsetByKafkaURL(str, pubSubTopicPartition, j);
            consumerSubscribe(pubSubTopicPartition, topicPartitionOffsetByKafkaURL, str);
            HashMap hashMap = new HashMap();
            hashMap.put(str, Long.valueOf(topicPartitionOffsetByKafkaURL));
            syncConsumedUpstreamRTOffsetMapIfNeeded(partitionConsumptionState, hashMap);
            LOGGER.info("Successfully repaired consumption and subscribed to {} at offset {}", pubSubTopicPartition, Long.valueOf(topicPartitionOffsetByKafkaURL));
        };
    }

    int getRmdProtocolVersionID() {
        return this.rmdProtocolVersionID;
    }

    protected BiConsumer<ChunkAwareCallback, LeaderMetadataWrapper> getProduceToTopicFunction(byte[] bArr, ByteBuffer byteBuffer, ByteBuffer byteBuffer2, int i, boolean z) {
        return (chunkAwareCallback, leaderMetadataWrapper) -> {
            if (z) {
                ((ActiveActiveProducerCallback) chunkAwareCallback).setOnCompletionFunction(() -> {
                    ByteUtils.prependIntHeaderToByteBuffer(byteBuffer, ByteUtils.getIntHeaderFromByteBuffer(byteBuffer), true);
                });
            }
            getVeniceWriter().get().put(bArr, ByteUtils.extractByteArray(byteBuffer), i, chunkAwareCallback, leaderMetadataWrapper, -2L, new PutMetadata(getRmdProtocolVersionID(), byteBuffer2));
        };
    }

    @Override // com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask
    protected LeaderProducerCallback createProducerCallback(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, int i, String str, long j) {
        return new ActiveActiveProducerCallback(this, pubSubMessage, partitionConsumptionState, leaderProducedRecordContext, i, str, j);
    }
}
