package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModel;
import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState;
import com.linkedin.davinci.kafka.consumer.StoreIngestionTaskFactory;
import com.linkedin.davinci.listener.response.ReadResponse;
import com.linkedin.davinci.storage.chunking.ChunkingUtils;
import com.linkedin.davinci.storage.chunking.GenericRecordChunkingAdapter;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.view.VeniceViewWriter;
import com.linkedin.davinci.validation.KafkaDataIntegrityValidator;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceMessageException;
import com.linkedin.venice.exceptions.VeniceTimeoutException;
import com.linkedin.venice.guid.GuidUtils;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.TopicSwitch;
import com.linkedin.venice.kafka.protocol.Update;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.merge.CollectionTimestampMergeRecordHelper;
import com.linkedin.venice.stats.StatsErrorCode;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.lazy.Lazy;
import com.linkedin.venice.writer.ChunkAwareCallback;
import com.linkedin.venice.writer.LeaderMetadataWrapper;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.IntList;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.class */
public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask {
    private final long newLeaderInactiveTime;
    private final StoreWriteComputeProcessor storeWriteComputeHandler;
    private final boolean isNativeReplicationEnabled;
    private final String nativeReplicationSourceVersionTopicKafkaURL;
    private final Set<String> nativeReplicationSourceVersionTopicKafkaURLSingletonSet;
    private final VeniceWriterFactory veniceWriterFactory;
    private final KafkaDataIntegrityValidator kafkaDataIntegrityValidatorForLeaders;
    protected final Lazy<VeniceWriter<byte[], byte[], byte[]>> veniceWriter;
    protected final Int2ObjectMap<String> kafkaClusterIdToUrlMap;
    private long dataRecoveryCompletionTimeLagThresholdInMs;
    protected final Map<String, VeniceViewWriter> viewWriters;
    private final Predicate<? super PartitionConsumptionState> FOLLOWER_OFFSET_LAG_FILTER;
    private final Predicate<? super PartitionConsumptionState> BATCH_FOLLOWER_OFFSET_LAG_FILTER;
    private final Predicate<? super PartitionConsumptionState> HYBRID_FOLLOWER_OFFSET_LAG_FILTER;
    private static final Logger LOGGER = LogManager.getLogger(LeaderFollowerStoreIngestionTask.class);
    private static final Predicate<? super PartitionConsumptionState> BATCH_REPLICATION_LAG_FILTER = partitionConsumptionState -> {
        return !partitionConsumptionState.isEndOfPushReceived() && partitionConsumptionState.consumeRemotely() && partitionConsumptionState.getLeaderFollowerState().equals(LeaderFollowerStateType.LEADER);
    };
    public static final Predicate<? super PartitionConsumptionState> LEADER_OFFSET_LAG_FILTER = partitionConsumptionState -> {
        return partitionConsumptionState.getLeaderFollowerState().equals(LeaderFollowerStateType.LEADER);
    };
    private static final Predicate<? super PartitionConsumptionState> BATCH_LEADER_OFFSET_LAG_FILTER = partitionConsumptionState -> {
        return !partitionConsumptionState.isEndOfPushReceived() && partitionConsumptionState.getLeaderFollowerState().equals(LeaderFollowerStateType.LEADER);
    };
    private static final Predicate<? super PartitionConsumptionState> HYBRID_LEADER_OFFSET_LAG_FILTER = partitionConsumptionState -> {
        return partitionConsumptionState.isEndOfPushReceived() && partitionConsumptionState.isHybrid() && partitionConsumptionState.getLeaderFollowerState().equals(LeaderFollowerStateType.LEADER);
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask$1, reason: invalid class name */
    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$davinci$kafka$consumer$LeaderFollowerStateType;
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$venice$kafka$protocol$enums$MessageType;
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$venice$kafka$protocol$enums$ControlMessageType = new int[ControlMessageType.values().length];

        static {
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$ControlMessageType[ControlMessageType.START_OF_PUSH.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$ControlMessageType[ControlMessageType.END_OF_PUSH.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$ControlMessageType[ControlMessageType.START_OF_SEGMENT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$ControlMessageType[ControlMessageType.END_OF_SEGMENT.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$ControlMessageType[ControlMessageType.START_OF_INCREMENTAL_PUSH.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$ControlMessageType[ControlMessageType.END_OF_INCREMENTAL_PUSH.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$ControlMessageType[ControlMessageType.TOPIC_SWITCH.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$ControlMessageType[ControlMessageType.VERSION_SWAP.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            $SwitchMap$com$linkedin$venice$kafka$protocol$enums$MessageType = new int[MessageType.values().length];
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$MessageType[MessageType.PUT.ordinal()] = 1;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$MessageType[MessageType.DELETE.ordinal()] = 2;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$MessageType[MessageType.UPDATE.ordinal()] = 3;
            } catch (NoSuchFieldError e11) {
            }
            $SwitchMap$com$linkedin$davinci$kafka$consumer$LeaderFollowerStateType = new int[LeaderFollowerStateType.values().length];
            try {
                $SwitchMap$com$linkedin$davinci$kafka$consumer$LeaderFollowerStateType[LeaderFollowerStateType.PAUSE_TRANSITION_FROM_STANDBY_TO_LEADER.ordinal()] = 1;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$com$linkedin$davinci$kafka$consumer$LeaderFollowerStateType[LeaderFollowerStateType.IN_TRANSITION_FROM_STANDBY_TO_LEADER.ordinal()] = 2;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$com$linkedin$davinci$kafka$consumer$LeaderFollowerStateType[LeaderFollowerStateType.LEADER.ordinal()] = 3;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$com$linkedin$davinci$kafka$consumer$LeaderFollowerStateType[LeaderFollowerStateType.STANDBY.ordinal()] = 4;
            } catch (NoSuchFieldError e15) {
            }
            $SwitchMap$com$linkedin$davinci$kafka$consumer$ConsumerActionType = new int[ConsumerActionType.values().length];
            try {
                $SwitchMap$com$linkedin$davinci$kafka$consumer$ConsumerActionType[ConsumerActionType.STANDBY_TO_LEADER.ordinal()] = 1;
            } catch (NoSuchFieldError e16) {
            }
            try {
                $SwitchMap$com$linkedin$davinci$kafka$consumer$ConsumerActionType[ConsumerActionType.LEADER_TO_STANDBY.ordinal()] = 2;
            } catch (NoSuchFieldError e17) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask$GetLastKnownUpstreamTopicOffset.class */
    public interface GetLastKnownUpstreamTopicOffset {
        long apply(String str, PubSubTopic pubSubTopic);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask$UpdateUpstreamTopicOffset.class */
    public interface UpdateUpstreamTopicOffset {
        void apply(String str, PubSubTopic pubSubTopic, long j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask$UpdateVersionTopicOffset.class */
    public interface UpdateVersionTopicOffset {
        void apply(long j);
    }

    public LeaderFollowerStoreIngestionTask(StoreIngestionTaskFactory.Builder builder, Store store, Version version, Properties properties, BooleanSupplier booleanSupplier, VeniceStoreVersionConfig veniceStoreVersionConfig, int i, boolean z, Optional<ObjectCacheBackend> optional) {
        super(builder, store, version, properties, booleanSupplier, veniceStoreVersionConfig, i, z, optional, builder.getLeaderFollowerNotifiers());
        this.dataRecoveryCompletionTimeLagThresholdInMs = 0L;
        this.FOLLOWER_OFFSET_LAG_FILTER = partitionConsumptionState -> {
            return (partitionConsumptionState.getLatestProcessedUpstreamRTOffset("") == -1 || partitionConsumptionState.getLeaderFollowerState().equals(LeaderFollowerStateType.LEADER)) ? false : true;
        };
        this.BATCH_FOLLOWER_OFFSET_LAG_FILTER = partitionConsumptionState2 -> {
            return (partitionConsumptionState2.isEndOfPushReceived() || partitionConsumptionState2.getLatestProcessedUpstreamRTOffset("") == -1 || partitionConsumptionState2.getLeaderFollowerState().equals(LeaderFollowerStateType.LEADER)) ? false : true;
        };
        this.HYBRID_FOLLOWER_OFFSET_LAG_FILTER = partitionConsumptionState3 -> {
            return partitionConsumptionState3.isEndOfPushReceived() && partitionConsumptionState3.isHybrid() && partitionConsumptionState3.getLatestProcessedUpstreamRTOffset("") != -1 && !partitionConsumptionState3.getLeaderFollowerState().equals(LeaderFollowerStateType.LEADER);
        };
        if (isUserSystemStore()) {
            this.newLeaderInactiveTime = this.serverConfig.getServerSystemStorePromotionToLeaderReplicaDelayMs();
        } else {
            this.newLeaderInactiveTime = this.serverConfig.getServerPromotionToLeaderReplicaDelayMs();
        }
        this.storeWriteComputeHandler = new StoreWriteComputeProcessor(this.storeName, this.schemaRepository, new CollectionTimestampMergeRecordHelper());
        this.isNativeReplicationEnabled = version.isNativeReplicationEnabled();
        if (version.getDataRecoveryVersionConfig() == null) {
            this.nativeReplicationSourceVersionTopicKafkaURL = version.getPushStreamSourceAddress();
            this.isDataRecovery = false;
        } else {
            this.nativeReplicationSourceVersionTopicKafkaURL = (String) this.serverConfig.getKafkaClusterIdToUrlMap().get(this.serverConfig.getKafkaClusterAliasToIdMap().getInt(version.getDataRecoveryVersionConfig().getDataRecoverySourceFabric()));
            if (this.nativeReplicationSourceVersionTopicKafkaURL == null) {
                throw new VeniceException("Unable to get data recovery source kafka url from the provided source fabric:" + version.getDataRecoveryVersionConfig().getDataRecoverySourceFabric());
            }
            this.isDataRecovery = true;
            if (isHybridMode()) {
                this.dataRecoveryCompletionTimeLagThresholdInMs = 86400000L;
                LOGGER.info("Data recovery info for topic: {}, source kafka url: {}, time lag threshold for completion: {}", getVersionTopic(), this.nativeReplicationSourceVersionTopicKafkaURL, Long.valueOf(this.dataRecoveryCompletionTimeLagThresholdInMs));
            }
        }
        this.nativeReplicationSourceVersionTopicKafkaURLSingletonSet = Collections.singleton(this.nativeReplicationSourceVersionTopicKafkaURL);
        LOGGER.info("Native replication source version topic kafka url set to: {} for topic: {}", this.nativeReplicationSourceVersionTopicKafkaURL, getVersionTopic());
        this.veniceWriterFactory = builder.getVeniceWriterFactory();
        VeniceWriterOptions build = new VeniceWriterOptions.Builder(getVersionTopic().getName()).setPartitioner(this.venicePartitioner).setChunkingEnabled(this.isChunked).setRmdChunkingEnabled(version.isRmdChunkingEnabled()).setPartitionCount(Integer.valueOf(this.storeVersionPartitionCount * this.amplificationFactor)).build();
        this.veniceWriter = Lazy.of(() -> {
            return this.veniceWriterFactory.createVeniceWriter(build);
        });
        this.kafkaClusterIdToUrlMap = this.serverConfig.getKafkaClusterIdToUrlMap();
        this.kafkaDataIntegrityValidatorForLeaders = new KafkaDataIntegrityValidator(this.kafkaVersionTopic);
        if (builder.getVeniceViewWriterFactory() == null || store.getViewConfigs().isEmpty()) {
            this.viewWriters = Collections.emptyMap();
        } else {
            this.viewWriters = builder.getVeniceViewWriterFactory().buildStoreViewWriters(store, version.getNumber(), this.schemaRepository.getKeySchema(store.getName()).getSchema());
        }
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    protected void closeVeniceWriters(boolean z) {
        if (this.veniceWriter.isPresent()) {
            ((VeniceWriter) this.veniceWriter.get()).close(z);
        }
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    protected void closeVeniceViewWriters() {
        if (this.viewWriters.isEmpty()) {
            return;
        }
        this.viewWriters.forEach((str, veniceViewWriter) -> {
            veniceViewWriter.close();
        });
    }

    private void endSegment(int i) {
        this.veniceWriter.ifPresent(veniceWriter -> {
            veniceWriter.endSegment(i, true);
        });
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public synchronized void promoteToLeader(PubSubTopicPartition pubSubTopicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker leaderSessionIdChecker) {
        throwIfNotRunning();
        this.amplificationFactorAdapter.execute(pubSubTopicPartition.getPartitionNumber(), num -> {
            this.consumerActionsQueue.add(new ConsumerAction(ConsumerActionType.STANDBY_TO_LEADER, (PubSubTopicPartition) new PubSubTopicPartitionImpl(pubSubTopicPartition.getPubSubTopic(), num.intValue()), nextSeqNum(), leaderSessionIdChecker));
        });
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public synchronized void demoteToStandby(PubSubTopicPartition pubSubTopicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker leaderSessionIdChecker) {
        throwIfNotRunning();
        this.amplificationFactorAdapter.execute(pubSubTopicPartition.getPartitionNumber(), num -> {
            this.consumerActionsQueue.add(new ConsumerAction(ConsumerActionType.LEADER_TO_STANDBY, (PubSubTopicPartition) new PubSubTopicPartitionImpl(pubSubTopicPartition.getPubSubTopic(), num.intValue()), nextSeqNum(), leaderSessionIdChecker));
        });
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    protected void processConsumerAction(ConsumerAction consumerAction, Store store) throws InterruptedException {
        ConsumerActionType type = consumerAction.getType();
        String topic = consumerAction.getTopic();
        int partition = consumerAction.getPartition();
        switch (type) {
            case STANDBY_TO_LEADER:
                if (!consumerAction.getLeaderSessionIdChecker().isSessionIdValid()) {
                    LOGGER.info("State transition from STANDBY to LEADER is skipped for topic {} partition {}, because Helix has assigned another role to this replica.", topic, Integer.valueOf(partition));
                    return;
                }
                PartitionConsumptionState partitionConsumptionState = this.partitionConsumptionStateMap.get(Integer.valueOf(partition));
                if (partitionConsumptionState == null) {
                    LOGGER.info("State transition from STANDBY to LEADER is skipped for topic {} partition {}, because partition consumption state is null and the partition may have been unsubscribed.", topic, Integer.valueOf(partition));
                    return;
                }
                if (partitionConsumptionState.getLeaderFollowerState().equals(LeaderFollowerStateType.LEADER)) {
                    LOGGER.info("State transition from STANDBY to LEADER is skipped for topic {} partition {}, because this replica is the leader already.", topic, Integer.valueOf(partition));
                    return;
                } else if (store.isMigrationDuplicateStore()) {
                    partitionConsumptionState.setLeaderFollowerState(LeaderFollowerStateType.PAUSE_TRANSITION_FROM_STANDBY_TO_LEADER);
                    LOGGER.info("{} for partition {} is paused transition from STANDBY to LEADER", this.consumerTaskId, Integer.valueOf(partition));
                    return;
                } else {
                    partitionConsumptionState.setLeaderFollowerState(LeaderFollowerStateType.IN_TRANSITION_FROM_STANDBY_TO_LEADER);
                    LOGGER.info("{} for partition {} is in transition from STANDBY to LEADER", this.consumerTaskId, Integer.valueOf(partition));
                    return;
                }
            case LEADER_TO_STANDBY:
                if (!consumerAction.getLeaderSessionIdChecker().isSessionIdValid()) {
                    LOGGER.info("State transition from LEADER to STANDBY is skipped for topic {} partition {}, because Helix has assigned another role to this replica.", topic, Integer.valueOf(partition));
                    return;
                }
                PartitionConsumptionState partitionConsumptionState2 = this.partitionConsumptionStateMap.get(Integer.valueOf(partition));
                if (partitionConsumptionState2 == null) {
                    LOGGER.info("State transition from LEADER to STANDBY is skipped for topic {} partition {}, because partition consumption state is null and the partition may have been unsubscribed.", topic, Integer.valueOf(partition));
                    return;
                }
                if (partitionConsumptionState2.getLeaderFollowerState().equals(LeaderFollowerStateType.STANDBY)) {
                    LOGGER.info("State transition from LEADER to STANDBY is skipped for topic {} partition {}, because this replica is a follower already.", topic, Integer.valueOf(partition));
                    return;
                }
                OffsetRecord offsetRecord = partitionConsumptionState2.getOffsetRecord();
                PubSubTopic pubSubTopic = consumerAction.getTopicPartition().getPubSubTopic();
                PubSubTopic leaderTopic = offsetRecord.getLeaderTopic(this.pubSubTopicRepository);
                if (leaderTopic == null || (pubSubTopic.equals(leaderTopic) && !partitionConsumptionState2.consumeRemotely())) {
                    partitionConsumptionState2.setLeaderFollowerState(LeaderFollowerStateType.STANDBY);
                    updateLeaderTopicOnFollower(partitionConsumptionState2);
                } else {
                    consumerUnSubscribe(leaderTopic, partitionConsumptionState2);
                    waitForAllMessageToBeProcessedFromTopicPartition(new PubSubTopicPartitionImpl(leaderTopic, partition), partitionConsumptionState2);
                    partitionConsumptionState2.setConsumeRemotely(false);
                    LOGGER.info("{} disabled remote consumption from topic {} partition {}", this.consumerTaskId, leaderTopic, Integer.valueOf(partition));
                    partitionConsumptionState2.setSkipKafkaMessage(false);
                    partitionConsumptionState2.setLeaderFollowerState(LeaderFollowerStateType.STANDBY);
                    updateLeaderTopicOnFollower(partitionConsumptionState2);
                    consumerSubscribe(partitionConsumptionState2.getSourceTopicPartition(pubSubTopic), partitionConsumptionState2.getLatestProcessedLocalVersionTopicOffset(), this.localKafkaServer);
                    LOGGER.info("{} demoted to standby for partition {}", this.consumerTaskId, Integer.valueOf(partition));
                }
                endSegment(partition);
                return;
            default:
                processCommonConsumerAction(type, consumerAction.getTopicPartition(), consumerAction.getLeaderState());
                return;
        }
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    protected void checkLongRunningTaskState() throws InterruptedException {
        boolean z = false;
        HashSet hashSet = null;
        long nanoTime = System.nanoTime();
        for (PartitionConsumptionState partitionConsumptionState : this.partitionConsumptionStateMap.values()) {
            int partition = partitionConsumptionState.getPartition();
            if (!partitionConsumptionState.isComplete() && LatencyUtils.getElapsedTimeInMs(partitionConsumptionState.getConsumptionStartTimeInMs()) > this.bootstrapTimeoutInMs) {
                if (!z) {
                    z = true;
                    hashSet = new HashSet();
                }
                hashSet.add(Integer.valueOf(partition));
            }
            switch (AnonymousClass1.$SwitchMap$com$linkedin$davinci$kafka$consumer$LeaderFollowerStateType[partitionConsumptionState.getLeaderFollowerState().ordinal()]) {
                case 1:
                    if (this.storeRepository.getStoreOrThrow(this.storeName).isMigrationDuplicateStore()) {
                        break;
                    } else {
                        partitionConsumptionState.setLeaderFollowerState(LeaderFollowerStateType.IN_TRANSITION_FROM_STANDBY_TO_LEADER);
                        LOGGER.info("{} became in transition to leader for partition {}", this.consumerTaskId, Integer.valueOf(partitionConsumptionState.getPartition()));
                        break;
                    }
                case 2:
                    if (canSwitchToLeaderTopic(partitionConsumptionState)) {
                        LOGGER.info("{} start promoting to leader for partition {} unsubscribing from current topic: {}", this.consumerTaskId, Integer.valueOf(partition), this.kafkaVersionTopic);
                        consumerUnSubscribe(this.versionTopic, partitionConsumptionState);
                        LOGGER.info("{} start promoting to leader for partition {}, unsubscribed from current topic: {}", this.consumerTaskId, Integer.valueOf(partition), this.kafkaVersionTopic);
                        OffsetRecord offsetRecord = partitionConsumptionState.getOffsetRecord();
                        if (offsetRecord.getLeaderTopic(this.pubSubTopicRepository) == null) {
                            offsetRecord.setLeaderTopic(this.versionTopic);
                        }
                        restoreProducerStatesForLeaderConsumption(partition);
                        if (this.amplificationFactorAdapter.isLeaderSubPartition(partition) || !partitionConsumptionState.isEndOfPushReceived()) {
                            startConsumingAsLeaderInTransitionFromStandby(partitionConsumptionState);
                        } else {
                            LOGGER.info("Stop promoting non-leaderSubPartition: {} of store: {}} to leader.", Integer.valueOf(partition), this.storeName);
                            partitionConsumptionState.setLeaderFollowerState(LeaderFollowerStateType.STANDBY);
                            consumerSubscribe(partitionConsumptionState.getSourceTopicPartition(this.versionTopic), partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset(), this.localKafkaServer);
                        }
                        this.defaultReadyToServeChecker.apply(partitionConsumptionState);
                        break;
                    } else {
                        break;
                    }
                case VeniceServerConfig.MINIMUM_CONSUMER_NUM_IN_CONSUMER_POOL_PER_KAFKA_CLUSTER /* 3 */:
                    PubSubTopic leaderTopic = partitionConsumptionState.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository);
                    if (leaderTopic == null) {
                        String str = this.consumerTaskId + " Missing leader topic for actual leader. OffsetRecord: " + partitionConsumptionState.getOffsetRecord().toSimplifiedString();
                        LOGGER.error(str);
                        throw new VeniceException(str);
                    }
                    if (shouldLeaderSwitchToLocalConsumption(partitionConsumptionState)) {
                        consumerUnSubscribe(leaderTopic, partitionConsumptionState);
                        waitForAllMessageToBeProcessedFromTopicPartition(new PubSubTopicPartitionImpl(leaderTopic, partitionConsumptionState.getPartition()), partitionConsumptionState);
                        partitionConsumptionState.setConsumeRemotely(false);
                        LOGGER.info("{} disabled remote consumption from topic {} partition {}", this.consumerTaskId, leaderTopic, Integer.valueOf(partition));
                        partitionConsumptionState.setSkipKafkaMessage(false);
                        consumerSubscribe(partitionConsumptionState.getSourceTopicPartition(leaderTopic), partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset(), this.localKafkaServer);
                    }
                    if (this.amplificationFactorAdapter.isLeaderSubPartition(partition) || !partitionConsumptionState.isEndOfPushReceived()) {
                        TopicSwitchWrapper topicSwitch = partitionConsumptionState.getTopicSwitch();
                        if (topicSwitch == null) {
                            break;
                        } else {
                            PubSubTopic newSourceTopic = topicSwitch.getNewSourceTopic();
                            if (!leaderTopic.equals(newSourceTopic) && (LatencyUtils.getElapsedTimeInMs(getLastConsumedMessageTimestamp(partition)) > this.newLeaderInactiveTime || switchAwayFromStreamReprocessingTopic(leaderTopic, newSourceTopic))) {
                                leaderExecuteTopicSwitch(partitionConsumptionState, topicSwitch.getTopicSwitch(), newSourceTopic);
                                break;
                            }
                        }
                    } else {
                        consumerUnSubscribe(leaderTopic, partitionConsumptionState);
                        partitionConsumptionState.setConsumeRemotely(false);
                        partitionConsumptionState.setLeaderFollowerState(LeaderFollowerStateType.STANDBY);
                        consumerSubscribe(partitionConsumptionState.getSourceTopicPartition(this.versionTopic), partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset(), this.localKafkaServer);
                        break;
                    }
                    break;
            }
        }
        if (this.emitMetrics.get()) {
            this.hostLevelIngestionStats.recordCheckLongRunningTasksLatency(LatencyUtils.getLatencyInMS(nanoTime));
        }
        if (z) {
            String str2 = "After waiting " + TimeUnit.MILLISECONDS.toHours(this.bootstrapTimeoutInMs) + " hours, resource:" + this.storeName + " partitions:" + hashSet + " still can not complete ingestion.";
            LOGGER.error(str2);
            throw new VeniceTimeoutException(str2);
        }
    }

    private boolean canSwitchToLeaderTopic(PartitionConsumptionState partitionConsumptionState) {
        if (LatencyUtils.getElapsedTimeInMs(getLastConsumedMessageTimestamp(partitionConsumptionState.getPartition())) <= this.newLeaderInactiveTime) {
            return false;
        }
        return !isUserSystemStore() || isLocalVersionTopicPartitionFullyConsumed(partitionConsumptionState);
    }

    private boolean isLocalVersionTopicPartitionFullyConsumed(PartitionConsumptionState partitionConsumptionState) {
        long latestProcessedLocalVersionTopicOffset = partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset();
        long kafkaTopicPartitionEndOffSet = getKafkaTopicPartitionEndOffSet(this.localKafkaServer, this.versionTopic, partitionConsumptionState.getPartition());
        if (kafkaTopicPartitionEndOffSet == StatsErrorCode.LAG_MEASUREMENT_FAILURE.code) {
            return false;
        }
        return (kafkaTopicPartitionEndOffSet == 0 && latestProcessedLocalVersionTopicOffset == -1) || latestProcessedLocalVersionTopicOffset + 1 >= kafkaTopicPartitionEndOffSet;
    }

    protected void startConsumingAsLeaderInTransitionFromStandby(PartitionConsumptionState partitionConsumptionState) {
        if (partitionConsumptionState.getLeaderFollowerState() != LeaderFollowerStateType.IN_TRANSITION_FROM_STANDBY_TO_LEADER) {
            throw new VeniceException(String.format("Expect state %s but got %s", LeaderFollowerStateType.IN_TRANSITION_FROM_STANDBY_TO_LEADER, partitionConsumptionState));
        }
        startConsumingAsLeader(partitionConsumptionState);
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    protected void startConsumingAsLeader(PartitionConsumptionState partitionConsumptionState) {
        int partition = partitionConsumptionState.getPartition();
        OffsetRecord offsetRecord = partitionConsumptionState.getOffsetRecord();
        if (this.isNativeReplicationEnabled) {
            if (this.nativeReplicationSourceVersionTopicKafkaURL == null || this.nativeReplicationSourceVersionTopicKafkaURL.isEmpty()) {
                throw new VeniceException("Native replication is enabled but remote source address is not found");
            }
            if (shouldNewLeaderSwitchToRemoteConsumption(partitionConsumptionState)) {
                partitionConsumptionState.setConsumeRemotely(true);
                LOGGER.info("{} enabled remote consumption from topic {} partition {}", this.consumerTaskId, offsetRecord.getLeaderTopic(this.pubSubTopicRepository), Integer.valueOf(partition));
            }
        }
        Set<String> consumptionSourceKafkaAddress = getConsumptionSourceKafkaAddress(partitionConsumptionState);
        if (consumptionSourceKafkaAddress.size() != 1) {
            throw new VeniceException("In L/F mode, expect only one leader source Kafka URL. Got: " + consumptionSourceKafkaAddress);
        }
        partitionConsumptionState.setLeaderFollowerState(LeaderFollowerStateType.LEADER);
        PubSubTopicPartition sourceTopicPartition = partitionConsumptionState.getSourceTopicPartition(offsetRecord.getLeaderTopic(this.pubSubTopicRepository));
        long leaderOffset = partitionConsumptionState.getLeaderOffset("", this.pubSubTopicRepository);
        String next = consumptionSourceKafkaAddress.iterator().next();
        LOGGER.info("{} is promoted to leader for partition {} and it is going to start consuming from {} at offset {}; source Kafka url: {}; remote consumption flag: {}", this.consumerTaskId, Integer.valueOf(partition), sourceTopicPartition, Long.valueOf(leaderOffset), next, Boolean.valueOf(partitionConsumptionState.consumeRemotely()));
        consumerSubscribe(sourceTopicPartition, leaderOffset, next);
        syncConsumedUpstreamRTOffsetMapIfNeeded(partitionConsumptionState, Collections.singletonMap(next, Long.valueOf(leaderOffset)));
        LOGGER.info("{}, as a leader, started consuming from {} at offset {}", this.consumerTaskId, sourceTopicPartition, Long.valueOf(leaderOffset));
    }

    private boolean switchAwayFromStreamReprocessingTopic(PubSubTopic pubSubTopic, PubSubTopic pubSubTopic2) {
        return pubSubTopic.isStreamReprocessingTopic() && !pubSubTopic2.isStreamReprocessingTopic();
    }

    protected void leaderExecuteTopicSwitch(PartitionConsumptionState partitionConsumptionState, TopicSwitch topicSwitch, PubSubTopic pubSubTopic) {
        if (partitionConsumptionState.getLeaderFollowerState() != LeaderFollowerStateType.LEADER) {
            throw new VeniceException(String.format("Expect state %s but got %s", LeaderFollowerStateType.LEADER, partitionConsumptionState));
        }
        if (topicSwitch.sourceKafkaServers.size() != 1) {
            throw new VeniceException("In the L/F mode, expect only one source Kafka URL in Topic Switch control message. But got: " + topicSwitch.sourceKafkaServers);
        }
        int partition = partitionConsumptionState.getPartition();
        PubSubTopic leaderTopic = partitionConsumptionState.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository);
        String charSequence = ((CharSequence) topicSwitch.sourceKafkaServers.get(0)).toString();
        PubSubTopicPartition sourceTopicPartition = partitionConsumptionState.getSourceTopicPartition(pubSubTopic);
        long latestProcessedUpstreamRTOffset = partitionConsumptionState.getLatestProcessedUpstreamRTOffset("");
        if (latestProcessedUpstreamRTOffset < 0) {
            latestProcessedUpstreamRTOffset = topicSwitch.rewindStartTimestamp > 0 ? getTopicPartitionOffsetByKafkaURL(charSequence, sourceTopicPartition, topicSwitch.rewindStartTimestamp) : -1L;
        }
        consumerUnSubscribe(leaderTopic, partitionConsumptionState);
        waitForLastLeaderPersistFuture(partitionConsumptionState, String.format("Leader failed to produce the last message to version topic before switching feed topic from %s to %s on partition %s", leaderTopic, pubSubTopic, Integer.valueOf(partition)));
        if (this.isNativeReplicationEnabled && !charSequence.equals(this.localKafkaServer)) {
            partitionConsumptionState.setConsumeRemotely(true);
            LOGGER.info("{} enabled remote consumption from {}", this.consumerTaskId, sourceTopicPartition);
        }
        partitionConsumptionState.getOffsetRecord().setLeaderTopic(pubSubTopic);
        partitionConsumptionState.getOffsetRecord().setLeaderUpstreamOffset("", latestProcessedUpstreamRTOffset);
        Set<String> consumptionSourceKafkaAddress = getConsumptionSourceKafkaAddress(partitionConsumptionState);
        if (consumptionSourceKafkaAddress.size() != 1) {
            throw new VeniceException("In L/F mode, expect only one leader source Kafka URL. Got: " + consumptionSourceKafkaAddress);
        }
        String next = consumptionSourceKafkaAddress.iterator().next();
        consumerSubscribe(sourceTopicPartition, latestProcessedUpstreamRTOffset, next);
        syncConsumedUpstreamRTOffsetMapIfNeeded(partitionConsumptionState, Collections.singletonMap(next, Long.valueOf(latestProcessedUpstreamRTOffset)));
        LOGGER.info("{} leader successfully switch feed topic from {} to {} offset {} partition {}", this.consumerTaskId, leaderTopic, pubSubTopic, Long.valueOf(latestProcessedUpstreamRTOffset), Integer.valueOf(partition));
        this.defaultReadyToServeChecker.apply(partitionConsumptionState);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void syncConsumedUpstreamRTOffsetMapIfNeeded(PartitionConsumptionState partitionConsumptionState, Map<String, Long> map) {
        PubSubTopic leaderTopic = partitionConsumptionState.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository);
        if (leaderTopic == null || !leaderTopic.isRealTime()) {
            return;
        }
        map.forEach((str, l) -> {
            if (l.longValue() > getLatestConsumedUpstreamOffsetForHybridOffsetLagMeasurement(partitionConsumptionState, str)) {
                updateLatestInMemoryLeaderConsumedRTOffset(partitionConsumptionState, str, l.longValue());
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitForLastLeaderPersistFuture(PartitionConsumptionState partitionConsumptionState, String str) {
        try {
            Future<Void> lastLeaderPersistFuture = partitionConsumptionState.getLastLeaderPersistFuture();
            if (lastLeaderPersistFuture != null) {
                lastLeaderPersistFuture.get();
            }
        } catch (Exception e) {
            LOGGER.error(str, e);
            this.versionedDIVStats.recordLeaderProducerFailure(this.storeName, this.versionNumber);
            this.statusReportAdapter.reportError(Collections.singletonList(partitionConsumptionState), str, e);
            throw new VeniceException(str, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getTopicPartitionOffsetByKafkaURL(CharSequence charSequence, PubSubTopicPartition pubSubTopicPartition, long j) {
        return getTopicManager(charSequence.toString()).getPartitionOffsetByTime(pubSubTopicPartition, j) - 1;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public Set<String> getConsumptionSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState) {
        if (!partitionConsumptionState.consumeRemotely()) {
            return this.localKafkaServerSingletonSet;
        }
        if (!partitionConsumptionState.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository).isRealTime()) {
            return this.nativeReplicationSourceVersionTopicKafkaURLSingletonSet;
        }
        Set<String> realTimeDataSourceKafkaAddress = getRealTimeDataSourceKafkaAddress(partitionConsumptionState);
        if (realTimeDataSourceKafkaAddress.isEmpty()) {
            throw new VeniceException("Expect RT Kafka URL when leader topic is a real-time topic. Got: " + partitionConsumptionState);
        }
        return realTimeDataSourceKafkaAddress;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public Set<String> getRealTimeDataSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState) {
        if (!this.isNativeReplicationEnabled) {
            return this.localKafkaServerSingletonSet;
        }
        TopicSwitchWrapper topicSwitch = partitionConsumptionState.getTopicSwitch();
        return topicSwitch == null ? Collections.emptySet() : topicSwitch.getSourceServers();
    }

    private long getLastConsumedMessageTimestamp(int i) {
        return this.partitionConsumptionStateMap.get(Integer.valueOf(i)).getLatestMessageConsumptionTimestampInMs();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean shouldNewLeaderSwitchToRemoteConsumption(PartitionConsumptionState partitionConsumptionState) {
        return isConsumingFromRemoteVersionTopic(partitionConsumptionState) || isLeaderConsumingRemoteRealTimeTopic(partitionConsumptionState);
    }

    private boolean isConsumingFromRemoteVersionTopic(PartitionConsumptionState partitionConsumptionState) {
        return (partitionConsumptionState.isEndOfPushReceived() || this.isCurrentVersion.getAsBoolean() || Objects.equals(this.nativeReplicationSourceVersionTopicKafkaURL, this.localKafkaServer)) ? false : true;
    }

    private boolean isLeaderConsumingRemoteRealTimeTopic(PartitionConsumptionState partitionConsumptionState) {
        if (!partitionConsumptionState.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository).isRealTime()) {
            return false;
        }
        Set<String> realTimeDataSourceKafkaAddress = getRealTimeDataSourceKafkaAddress(partitionConsumptionState);
        if (realTimeDataSourceKafkaAddress.isEmpty()) {
            throw new VeniceException("Expect at least one RT Kafka URL for " + partitionConsumptionState);
        }
        return (realTimeDataSourceKafkaAddress.size() == 1 && Objects.equals(realTimeDataSourceKafkaAddress.iterator().next(), this.localKafkaServer)) ? false : true;
    }

    private boolean shouldLeaderSwitchToLocalConsumption(PartitionConsumptionState partitionConsumptionState) {
        if (this.isDataRecovery && !partitionConsumptionState.isBatchOnly() && partitionConsumptionState.isEndOfPushReceived()) {
            checkAndUpdateDataRecoveryStatusOfHybridStore(partitionConsumptionState);
        }
        return partitionConsumptionState.consumeRemotely() && partitionConsumptionState.isEndOfPushReceived() && partitionConsumptionState.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository).isVersionTopicOrStreamReprocessingTopic() && (!this.isDataRecovery || partitionConsumptionState.isDataRecoveryCompleted());
    }

    private void checkAndUpdateDataRecoveryStatusOfHybridStore(PartitionConsumptionState partitionConsumptionState) {
        boolean isDataRecoveryCompleted = partitionConsumptionState.isDataRecoveryCompleted();
        if (isDataRecoveryCompleted) {
            return;
        }
        if (partitionConsumptionState.getTopicSwitch() != null) {
            isDataRecoveryCompleted = true;
        }
        if (!isDataRecoveryCompleted) {
            long latestProducerProcessingTimeInMs = partitionConsumptionState.getOffsetRecord().getLatestProducerProcessingTimeInMs();
            if (this.amplificationFactorAdapter != null) {
                latestProducerProcessingTimeInMs = getLatestConsumedProducerTimestampWithSubPartition(latestProducerProcessingTimeInMs, partitionConsumptionState);
            }
            if (LatencyUtils.getElapsedTimeInMs(latestProducerProcessingTimeInMs) < this.dataRecoveryCompletionTimeLagThresholdInMs) {
                LOGGER.info("Data recovery completed for topic: {} partition: {} upon consuming records with producer timestamp of {} which is within the data recovery completion lag threshold of {} ms", this.kafkaVersionTopic, Integer.valueOf(partitionConsumptionState.getPartition()), Long.valueOf(latestProducerProcessingTimeInMs), Long.valueOf(this.dataRecoveryCompletionTimeLagThresholdInMs));
                isDataRecoveryCompleted = true;
            }
        }
        if (!isDataRecoveryCompleted) {
            boolean z = this.cachedKafkaMetadataGetter.getOffset(this.topicManagerRepository.getTopicManager(this.nativeReplicationSourceVersionTopicKafkaURL), this.versionTopic, partitionConsumptionState.getPartition()) - 2 <= partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset();
            long lastConsumedMessageTimestamp = getLastConsumedMessageTimestamp(partitionConsumptionState.getPartition());
            if (z && LatencyUtils.getElapsedTimeInMs(lastConsumedMessageTimestamp) > this.newLeaderInactiveTime) {
                LOGGER.info("Data recovery completed for topic: {} partition: {} upon exceeding leader inactive time of {} ms", this.kafkaVersionTopic, Integer.valueOf(partitionConsumptionState.getPartition()), Long.valueOf(this.newLeaderInactiveTime));
                isDataRecoveryCompleted = true;
            }
        }
        if (isDataRecoveryCompleted) {
            partitionConsumptionState.setDataRecoveryCompleted(true);
            this.statusReportAdapter.reportDataRecoveryCompleted(partitionConsumptionState);
        }
    }

    protected boolean shouldProduceToVersionTopic(PartitionConsumptionState partitionConsumptionState) {
        if (isLeader(partitionConsumptionState)) {
            return !this.versionTopic.equals(partitionConsumptionState.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository)) || partitionConsumptionState.consumeRemotely();
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isLeader(PartitionConsumptionState partitionConsumptionState) {
        return Objects.equals(partitionConsumptionState.getLeaderFollowerState(), LeaderFollowerStateType.LEADER);
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    protected boolean processTopicSwitch(ControlMessage controlMessage, int i, long j, PartitionConsumptionState partitionConsumptionState) {
        if (isLeader(partitionConsumptionState) && !this.amplificationFactorAdapter.isLeaderSubPartition(i)) {
            LOGGER.info("SubPartition: {} is demoted from LEADER to STANDBY.", Integer.valueOf(partitionConsumptionState.getPartition()));
            PubSubTopic leaderTopic = partitionConsumptionState.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository);
            consumerUnSubscribe(leaderTopic, partitionConsumptionState);
            waitForLastLeaderPersistFuture(partitionConsumptionState, String.format("Leader failed to produce the last message to version topic before switching feed topic from %s to %s on partition %s", leaderTopic, this.kafkaVersionTopic, Integer.valueOf(i)));
            partitionConsumptionState.setConsumeRemotely(false);
            partitionConsumptionState.setLeaderFollowerState(LeaderFollowerStateType.STANDBY);
            consumerSubscribe(partitionConsumptionState.getSourceTopicPartition(this.versionTopic), partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset(), this.localKafkaServer);
        }
        TopicSwitch topicSwitch = (TopicSwitch) controlMessage.controlMessageUnion;
        List list = topicSwitch.sourceKafkaServers;
        if (list.size() != 1) {
            throw new VeniceException("More than one Kafka server urls in TopicSwitch control message, TopicSwitch.sourceKafkaServers: " + list);
        }
        this.statusReportAdapter.reportTopicSwitchReceived(partitionConsumptionState);
        String charSequence = ((CharSequence) list.get(0)).toString();
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(topicSwitch.sourceTopicName.toString());
        long j2 = -1;
        if (!this.isDaVinciClient && topicSwitch.rewindStartTimestamp > 0) {
            j2 = getTopicManager(charSequence).getPartitionOffsetByTime(new PubSubTopicPartitionImpl(topic, partitionConsumptionState.getSourceTopicPartitionNumber(topic)), topicSwitch.rewindStartTimestamp);
            if (j2 != -1) {
                j2--;
            }
        }
        syncTopicSwitchToIngestionMetadataService(topicSwitch, partitionConsumptionState, Collections.singletonMap(charSequence, Long.valueOf(j2)));
        if (isLeader(partitionConsumptionState)) {
            partitionConsumptionState.getOffsetRecord().setLeaderUpstreamOffset("", j2);
            return false;
        }
        partitionConsumptionState.getOffsetRecord().setLeaderTopic(topic);
        partitionConsumptionState.getOffsetRecord().setLeaderUpstreamOffset("", j2);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void syncTopicSwitchToIngestionMetadataService(TopicSwitch topicSwitch, PartitionConsumptionState partitionConsumptionState, Map<String, Long> map) {
        this.storageMetadataService.computeStoreVersionState(this.kafkaVersionTopic, storeVersionState -> {
            if (storeVersionState == null) {
                throw new VeniceException("Unexpected: received some " + ControlMessageType.TOPIC_SWITCH.name() + " control message in a topic where we have not yet received a " + ControlMessageType.START_OF_PUSH.name() + " control message, for partition " + partitionConsumptionState + " and upstreamStartOffsetByKafkaURL: " + map);
            }
            if (storeVersionState.topicSwitch == null) {
                LOGGER.info("First time receiving a TopicSwitch message (new source topic: {}; rewind start time: {}; upstream start offset by source Kafka URL: {})", topicSwitch.sourceTopicName, Long.valueOf(topicSwitch.rewindStartTimestamp), map);
            } else {
                LOGGER.info("Previous TopicSwitch message in metadata store (source topic: {}; rewind start time: {}; source kafka servers {}) will be replaced by the new TopicSwitch message (new source topic: {}; rewind start time: {}; upstream start offset by source Kafka URL: {})", storeVersionState.topicSwitch.sourceTopicName, Long.valueOf(storeVersionState.topicSwitch.rewindStartTimestamp), topicSwitch.sourceKafkaServers, topicSwitch.sourceTopicName, Long.valueOf(topicSwitch.rewindStartTimestamp), map);
            }
            storeVersionState.topicSwitch = topicSwitch;
            TopicSwitch resolveSourceKafkaServersWithinTopicSwitch = resolveSourceKafkaServersWithinTopicSwitch(topicSwitch);
            partitionConsumptionState.setTopicSwitch(new TopicSwitchWrapper(resolveSourceKafkaServersWithinTopicSwitch, this.pubSubTopicRepository.getTopic(resolveSourceKafkaServersWithinTopicSwitch.sourceTopicName.toString())));
            return storeVersionState;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void updateOffsetsFromConsumerRecord(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, LeaderProducedRecordContext leaderProducedRecordContext, UpdateVersionTopicOffset updateVersionTopicOffset, UpdateUpstreamTopicOffset updateUpstreamTopicOffset, GetLastKnownUpstreamTopicOffset getLastKnownUpstreamTopicOffset, Supplier<String> supplier) {
        if (shouldProduceToVersionTopic(partitionConsumptionState)) {
            updateOffsetsAsRemoteConsumeLeader(partitionConsumptionState, leaderProducedRecordContext, supplier.get(), pubSubMessage, updateVersionTopicOffset, updateUpstreamTopicOffset);
            return;
        }
        KafkaMessageEnvelope kafkaMessageEnvelope = (KafkaMessageEnvelope) pubSubMessage.getValue();
        updateVersionTopicOffset.apply(((Long) pubSubMessage.getOffset()).longValue());
        OffsetRecord offsetRecord = partitionConsumptionState.getOffsetRecord();
        if (this.isDaVinciClient) {
            return;
        }
        if (shouldUpdateUpstreamOffset(pubSubMessage)) {
            String str = supplier.get();
            long j = kafkaMessageEnvelope.leaderMetadataFooter.upstreamOffset;
            PubSubTopic leaderTopic = offsetRecord.getLeaderTopic(this.pubSubTopicRepository);
            if (leaderTopic == null) {
                leaderTopic = this.versionTopic;
            }
            checkAndHandleUpstreamOffsetRewind(partitionConsumptionState, partitionConsumptionState.getOffsetRecord(), pubSubMessage, j, getLastKnownUpstreamTopicOffset.apply(str, leaderTopic));
            updateUpstreamTopicOffset.apply(str, leaderTopic, j);
        }
        partitionConsumptionState.setLeaderGUID(kafkaMessageEnvelope.producerMetadata.producerGUID);
        if (kafkaMessageEnvelope.leaderMetadataFooter != null) {
            partitionConsumptionState.setLeaderHostId(kafkaMessageEnvelope.leaderMetadataFooter.hostName.toString());
        }
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    protected void updateOffsetMetadataInOffsetRecord(PartitionConsumptionState partitionConsumptionState) {
        OffsetRecord offsetRecord = partitionConsumptionState.getOffsetRecord();
        offsetRecord.setCheckpointLocalVersionTopicOffset(partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset());
        if (this.isDaVinciClient) {
            return;
        }
        PubSubTopic leaderTopic = offsetRecord.getLeaderTopic(this.pubSubTopicRepository);
        if (leaderTopic == null) {
            leaderTopic = this.versionTopic;
        }
        if (leaderTopic.isRealTime()) {
            offsetRecord.resetUpstreamOffsetMap(partitionConsumptionState.getLatestProcessedUpstreamRTOffsetMap());
        } else {
            offsetRecord.setCheckpointUpstreamVersionTopicOffset(partitionConsumptionState.getLatestProcessedUpstreamVersionTopicOffset());
        }
        offsetRecord.setLeaderGUID(partitionConsumptionState.getLeaderGUID());
        offsetRecord.setLeaderHostId(partitionConsumptionState.getLeaderHostId());
    }

    private void updateOffsetsAsRemoteConsumeLeader(PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, String str, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, UpdateVersionTopicOffset updateVersionTopicOffset, UpdateUpstreamTopicOffset updateUpstreamTopicOffset) {
        if (leaderProducedRecordContext == null) {
            String str2 = this.consumerTaskId + " UpdateOffset: Produced record should not be null in LEADER for: " + pubSubMessage.getTopicPartition();
            if (REDUNDANT_LOGGING_FILTER.isRedundantException(str2)) {
                return;
            }
            LOGGER.warn(str2);
            return;
        }
        if (leaderProducedRecordContext.hasCorrespondingUpstreamMessage()) {
            updateVersionTopicOffset.apply(leaderProducedRecordContext.getProducedOffset());
            PubSubTopic leaderTopic = partitionConsumptionState.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository);
            if (leaderTopic == null) {
                leaderTopic = this.versionTopic;
            }
            updateUpstreamTopicOffset.apply(str, leaderTopic, leaderProducedRecordContext.getConsumedOffset());
        }
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    protected void updateLatestInMemoryProcessedOffset(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, LeaderProducedRecordContext leaderProducedRecordContext, String str) {
        Objects.requireNonNull(partitionConsumptionState);
        updateOffsetsFromConsumerRecord(partitionConsumptionState, pubSubMessage, leaderProducedRecordContext, partitionConsumptionState::updateLatestProcessedLocalVersionTopicOffset, (str2, pubSubTopic, j) -> {
            if (pubSubTopic.isRealTime()) {
                partitionConsumptionState.updateLatestProcessedUpstreamRTOffset(str2, j);
            } else {
                partitionConsumptionState.updateLatestProcessedUpstreamVersionTopicOffset(j);
            }
        }, (str3, pubSubTopic2) -> {
            return pubSubTopic2.isRealTime() ? partitionConsumptionState.getLatestProcessedUpstreamRTOffset(str3) : partitionConsumptionState.getLatestProcessedUpstreamVersionTopicOffset();
        }, () -> {
            return "";
        });
    }

    protected void checkAndHandleUpstreamOffsetRewind(PartitionConsumptionState partitionConsumptionState, OffsetRecord offsetRecord, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, long j, long j2) {
        if (j >= j2) {
            return;
        }
        KafkaMessageEnvelope kafkaMessageEnvelope = (KafkaMessageEnvelope) pubSubMessage.getValue();
        if (kafkaMessageEnvelope.leaderMetadataFooter == null || partitionConsumptionState.getLeaderHostId() == null || kafkaMessageEnvelope.leaderMetadataFooter.hostName.toString().equals(partitionConsumptionState.getLeaderHostId())) {
            return;
        }
        String str = this.consumerTaskId + " partition %d received message with upstreamOffset: %d; but recorded upstreamOffset is: %d. Received message producer GUID: %s; Recorded producer GUID: %s; Received message producer host: %s; Recorded producer host: %s. Multiple leaders are producing. ";
        Object[] objArr = new Object[7];
        objArr[0] = Integer.valueOf(pubSubMessage.getTopicPartition().getPartitionNumber());
        objArr[1] = Long.valueOf(j);
        objArr[2] = Long.valueOf(j2);
        objArr[3] = kafkaMessageEnvelope.producerMetadata.producerGUID == null ? "unknown" : GuidUtils.getHexFromGuid(kafkaMessageEnvelope.producerMetadata.producerGUID);
        objArr[4] = partitionConsumptionState.getLeaderGUID() == null ? "unknown" : GuidUtils.getHexFromGuid(partitionConsumptionState.getLeaderGUID());
        objArr[5] = kafkaMessageEnvelope.leaderMetadataFooter.hostName.toString();
        objArr[6] = partitionConsumptionState.getLeaderHostId();
        String format = String.format(str, objArr);
        boolean z = true;
        try {
            KafkaKey kafkaKey = (KafkaKey) pubSubMessage.getKey();
            KafkaMessageEnvelope kafkaMessageEnvelope2 = (KafkaMessageEnvelope) pubSubMessage.getValue();
            AbstractStorageEngine localStorageEngine = this.storageEngineRepository.getLocalStorageEngine(this.kafkaVersionTopic);
            switch (AnonymousClass1.$SwitchMap$com$linkedin$venice$kafka$protocol$enums$MessageType[MessageType.valueOf(kafkaMessageEnvelope2).ordinal()]) {
                case 1:
                    byte[] bArr = localStorageEngine.get(pubSubMessage.getTopicPartition().getPartitionNumber(), kafkaKey.getKey());
                    if (bArr != null) {
                        int readInt = ByteUtils.readInt(bArr, 0);
                        Put put = (Put) kafkaMessageEnvelope2.payloadUnion;
                        if (readInt == put.schemaId && ByteUtils.equals(put.putValue.array(), put.putValue.position(), bArr, 4)) {
                            z = false;
                            format = (format + Utils.NEW_LINE_CHAR) + "But this rewound PUT is not lossy because the data in the rewind message is the same as the data inside Venice";
                        }
                        break;
                    }
                    break;
                case 2:
                    if (localStorageEngine.get(pubSubMessage.getTopicPartition().getPartitionNumber(), kafkaKey.getKey()) == null) {
                        z = false;
                        format = (format + Utils.NEW_LINE_CHAR) + "But this rewound DELETE is not lossy because the data in the rewind message is deleted already";
                        break;
                    }
                    break;
            }
        } catch (Exception e) {
            LOGGER.warn("{} failed comparing the rewind message with the actual value in Venice", this.consumerTaskId, e);
        }
        if (!z) {
            LOGGER.info(format);
            this.versionedDIVStats.recordBenignLeaderOffsetRewind(this.storeName, this.versionNumber);
        } else {
            if (partitionConsumptionState.isEndOfPushReceived()) {
                LOGGER.error((format + Utils.NEW_LINE_CHAR) + "Don't fail the job during streaming ingestion");
                this.versionedDIVStats.recordPotentiallyLossyLeaderOffsetRewind(this.storeName, this.versionNumber);
                return;
            }
            String str2 = (format + Utils.NEW_LINE_CHAR) + "Failing the job because lossy rewind happens before receiving EndOfPush";
            LOGGER.error(str2);
            this.versionedDIVStats.recordPotentiallyLossyLeaderOffsetRewind(this.storeName, this.versionNumber);
            Exception veniceException = new VeniceException(str2);
            this.statusReportAdapter.reportError(Collections.singletonList(partitionConsumptionState), str2, veniceException);
            throw veniceException;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void produceToLocalKafka(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, BiConsumer<ChunkAwareCallback, LeaderMetadataWrapper> biConsumer, int i, String str, int i2, long j) {
        LeaderProducerCallback createProducerCallback = createProducerCallback(pubSubMessage, partitionConsumptionState, leaderProducedRecordContext, i, str, j);
        LeaderMetadataWrapper leaderMetadataWrapper = new LeaderMetadataWrapper(((Long) pubSubMessage.getOffset()).longValue(), i2);
        partitionConsumptionState.setLastLeaderPersistFuture(leaderProducedRecordContext.getPersistedToDBFuture());
        biConsumer.accept(createProducerCallback, leaderMetadataWrapper);
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    protected boolean isRealTimeBufferReplayStarted(PartitionConsumptionState partitionConsumptionState) {
        TopicSwitchWrapper topicSwitch = partitionConsumptionState.getTopicSwitch();
        if (topicSwitch == null) {
            return false;
        }
        if (topicSwitch.getSourceServers().size() != 1) {
            throw new VeniceException("Expect only one source Kafka URLs in Topic Switch. Got: " + topicSwitch.getSourceServers());
        }
        return topicSwitch.getNewSourceTopic().isRealTime();
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    protected long measureHybridOffsetLag(PartitionConsumptionState partitionConsumptionState, boolean z) {
        int partition = partitionConsumptionState.getPartition();
        PubSubTopic leaderTopic = partitionConsumptionState.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository);
        if (leaderTopic == null || !leaderTopic.isRealTime()) {
            return Long.MAX_VALUE;
        }
        if (this.isDaVinciClient || partitionConsumptionState.getLeaderFollowerState().equals(LeaderFollowerStateType.STANDBY)) {
            return this.cachedKafkaMetadataGetter.getOffset(getTopicManager(this.localKafkaServer), this.versionTopic, partition) - partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset();
        }
        Set<String> realTimeDataSourceKafkaAddress = getRealTimeDataSourceKafkaAddress(partitionConsumptionState);
        if (realTimeDataSourceKafkaAddress.isEmpty()) {
            throw new VeniceException("Expect a real-time source Kafka URL for " + partitionConsumptionState);
        }
        return realTimeDataSourceKafkaAddress.size() == 1 ? measureRTOffsetLagForSingleRegion(realTimeDataSourceKafkaAddress.iterator().next(), partitionConsumptionState, z) : measureRTOffsetLagForMultiRegions(realTimeDataSourceKafkaAddress, partitionConsumptionState, z);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long measureRTOffsetLagForSingleRegion(String str, PartitionConsumptionState partitionConsumptionState, boolean z) {
        return getLatestLeaderPersistedOffsetAndHybridTopicOffset(str, partitionConsumptionState.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository), partitionConsumptionState, z);
    }

    protected long measureRTOffsetLagForMultiRegions(Set<String> set, PartitionConsumptionState partitionConsumptionState, boolean z) {
        throw new VeniceException(String.format("%s Multi colo RT offset lag calculation is not supported for non Active-Active stores", this.consumerTaskId));
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public boolean isReadyToServeAnnouncedWithRTLag() {
        if (!this.hybridStoreConfig.isPresent() || this.partitionConsumptionStateMap.isEmpty()) {
            return false;
        }
        long offsetLagThresholdToGoOnline = this.hybridStoreConfig.get().getOffsetLagThresholdToGoOnline();
        for (PartitionConsumptionState partitionConsumptionState : this.partitionConsumptionStateMap.values()) {
            if (partitionConsumptionState.hasLagCaughtUp() && offsetLagThresholdToGoOnline >= 0) {
                Set<String> realTimeDataSourceKafkaAddress = getRealTimeDataSourceKafkaAddress(partitionConsumptionState);
                if (realTimeDataSourceKafkaAddress.isEmpty()) {
                    return true;
                }
                try {
                    if (measureRTOffsetLagForSingleRegion(realTimeDataSourceKafkaAddress.iterator().next(), partitionConsumptionState, false) > offsetLagThresholdToGoOnline) {
                        return true;
                    }
                } catch (Exception e) {
                    return true;
                }
            }
        }
        return false;
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    protected void reportIfCatchUpVersionTopicOffset(PartitionConsumptionState partitionConsumptionState) {
        int partition = partitionConsumptionState.getPartition();
        if (!partitionConsumptionState.isEndOfPushReceived() || partitionConsumptionState.isLatchReleased() || this.cachedKafkaMetadataGetter.getOffset(getTopicManager(this.localKafkaServer), this.versionTopic, partition) - 1 > partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset()) {
            return;
        }
        this.statusReportAdapter.reportCatchUpVersionTopicOffsetLag(partitionConsumptionState);
        if (this.isCurrentVersion.getAsBoolean()) {
            this.amplificationFactorAdapter.executePartitionConsumptionState(partitionConsumptionState.getUserPartition(), (v0) -> {
                v0.lagHasCaughtUp();
            });
            this.statusReportAdapter.reportCompleted(partitionConsumptionState, true);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public boolean shouldProcessRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, int i) {
        PartitionConsumptionState partitionConsumptionState = this.partitionConsumptionStateMap.get(Integer.valueOf(i));
        if (partitionConsumptionState == null) {
            LOGGER.info("Skipping message as partition is no longer actively subscribed. Topic: {} Partition Id: {}", this.kafkaVersionTopic, Integer.valueOf(i));
            return false;
        }
        switch (AnonymousClass1.$SwitchMap$com$linkedin$davinci$kafka$consumer$LeaderFollowerStateType[partitionConsumptionState.getLeaderFollowerState().ordinal()]) {
            case VeniceServerConfig.MINIMUM_CONSUMER_NUM_IN_CONSUMER_POOL_PER_KAFKA_CLUSTER /* 3 */:
                PubSubTopic leaderTopic = partitionConsumptionState.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository);
                if (partitionConsumptionState.consumeRemotely() && leaderTopic.isVersionTopicOrStreamReprocessingTopic()) {
                    if (partitionConsumptionState.skipKafkaMessage()) {
                        String str = "Skipping messages after EOP in remote version topic. Topic: " + this.kafkaVersionTopic + " Partition Id: " + i;
                        if (REDUNDANT_LOGGING_FILTER.isRedundantException(str)) {
                            return false;
                        }
                        LOGGER.info(str);
                        return false;
                    }
                    if (((KafkaKey) pubSubMessage.getKey()).isControlMessage() && ControlMessageType.valueOf((ControlMessage) ((KafkaMessageEnvelope) pubSubMessage.getValue()).payloadUnion) == ControlMessageType.END_OF_PUSH && (!this.isDataRecovery || !isHybridMode())) {
                        partitionConsumptionState.setSkipKafkaMessage(true);
                    }
                }
                if (!pubSubMessage.getTopicPartition().getPubSubTopic().equals(leaderTopic)) {
                    String str2 = "Leader receives a Kafka record that doesn't belong to leader topic. Store version: " + this.kafkaVersionTopic + ", partition: " + partitionConsumptionState.getPartition() + ", leader topic: " + leaderTopic + ", topic of incoming message: " + pubSubMessage.getTopicPartition().getPubSubTopic().getName();
                    if (REDUNDANT_LOGGING_FILTER.isRedundantException(str2)) {
                        return false;
                    }
                    LOGGER.error(str2);
                    return false;
                }
                break;
            default:
                PubSubTopic pubSubTopic = pubSubMessage.getTopicPartition().getPubSubTopic();
                String name = pubSubTopic.getName();
                if (!this.versionTopic.equals(pubSubTopic)) {
                    String str3 = this.consumerTaskId + " Current L/F state:" + partitionConsumptionState.getLeaderFollowerState() + "; partition: " + i + "; Message retrieved from non version topic " + name;
                    if (consumerHasSubscription(pubSubTopic, partitionConsumptionState)) {
                        throw new VeniceMessageException(str3 + ". Throwing exception as the node still subscribes to " + name);
                    }
                    if (REDUNDANT_LOGGING_FILTER.isRedundantException(str3)) {
                        return false;
                    }
                    LOGGER.error("{}. Skipping the message as the node does not subscribe to {}", str3, name);
                    return false;
                }
                long latestProcessedLocalVersionTopicOffset = partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset();
                if (latestProcessedLocalVersionTopicOffset >= ((Long) pubSubMessage.getOffset()).longValue()) {
                    String str4 = this.consumerTaskId + " Current L/F state:" + partitionConsumptionState.getLeaderFollowerState() + "; The record was already processed partition " + i;
                    if (REDUNDANT_LOGGING_FILTER.isRedundantException(str4)) {
                        return false;
                    }
                    LOGGER.info("{}; LastKnown {}; Current {}", str4, Long.valueOf(latestProcessedLocalVersionTopicOffset), pubSubMessage.getOffset());
                    return false;
                }
                break;
        }
        return super.shouldProcessRecord(pubSubMessage, i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public boolean shouldPersistRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, PartitionConsumptionState partitionConsumptionState) {
        if (!super.shouldPersistRecord(pubSubMessage, partitionConsumptionState)) {
            return false;
        }
        switch (AnonymousClass1.$SwitchMap$com$linkedin$davinci$kafka$consumer$LeaderFollowerStateType[partitionConsumptionState.getLeaderFollowerState().ordinal()]) {
            case VeniceServerConfig.MINIMUM_CONSUMER_NUM_IN_CONSUMER_POOL_PER_KAFKA_CLUSTER /* 3 */:
                PubSubTopic leaderTopic = partitionConsumptionState.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository);
                if (pubSubMessage.getTopicPartition().getPubSubTopic().equals(leaderTopic)) {
                    return true;
                }
                String str = "Leader receives a Kafka record that doesn't belong to leader topic. Store version: " + this.kafkaVersionTopic + ", partition: " + partitionConsumptionState.getPartition() + ", leader topic: " + leaderTopic + ", topic of incoming message: " + leaderTopic.getName();
                if (REDUNDANT_LOGGING_FILTER.isRedundantException(str)) {
                    return false;
                }
                LOGGER.error(str);
                return false;
            default:
                if (pubSubMessage.getTopicPartition().getPubSubTopic().equals(this.versionTopic)) {
                    return true;
                }
                String str2 = partitionConsumptionState.getLeaderFollowerState().toString() + " replica receives a Kafka record that doesn't belong to version topic. Store version: " + this.kafkaVersionTopic + ", partition: " + partitionConsumptionState.getPartition() + ", topic of incoming message: " + pubSubMessage.getTopicPartition().getPubSubTopic().getName();
                if (REDUNDANT_LOGGING_FILTER.isRedundantException(str2)) {
                    return false;
                }
                LOGGER.error(str2);
                return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public void recordWriterStats(long j, long j2, long j3, long j4, PartitionConsumptionState partitionConsumptionState) {
        if (isUserSystemStore()) {
            return;
        }
        if (!this.isNativeReplicationEnabled) {
            super.recordWriterStats(j, j2, j3, j4, partitionConsumptionState);
        } else if (partitionConsumptionState.getLeaderFollowerState().equals(LeaderFollowerStateType.LEADER)) {
            this.versionedDIVStats.recordLeaderLatencies(this.storeName, this.versionNumber, j, j2, j3, j4);
        } else {
            this.versionedDIVStats.recordFollowerLatencies(this.storeName, this.versionNumber, j, j2, j3, j4);
        }
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    protected void recordProcessedRecordStats(PartitionConsumptionState partitionConsumptionState, int i) {
        if (partitionConsumptionState.getLeaderFollowerState().equals(LeaderFollowerStateType.LEADER)) {
            this.versionedIngestionStats.recordLeaderConsumed(this.storeName, this.versionNumber, i);
            this.hostLevelIngestionStats.recordTotalLeaderBytesConsumed(i);
            this.hostLevelIngestionStats.recordTotalLeaderRecordsConsumed();
        } else {
            this.versionedIngestionStats.recordFollowerConsumed(this.storeName, this.versionNumber, i);
            this.hostLevelIngestionStats.recordTotalFollowerBytesConsumed(i);
            this.hostLevelIngestionStats.recordTotalFollowerRecordsConsumed();
        }
    }

    private void recordRegionHybridConsumptionStats(int i, int i2, long j, long j2) {
        if (i >= 0) {
            this.versionedIngestionStats.recordRegionHybridConsumption(this.storeName, this.versionNumber, i, i2, j, j2);
            this.hostLevelIngestionStats.recordTotalRegionHybridBytesConsumed(i, i2, j2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
    /* JADX WARN: Failed to find 'out' block for switch in B:27:0x0172. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:49:0x0281 A[Catch: Exception -> 0x02f8, TryCatch #0 {Exception -> 0x02f8, blocks: (B:3:0x0003, B:5:0x0030, B:8:0x0034, B:12:0x0050, B:13:0x0086, B:16:0x008c, B:18:0x0090, B:20:0x00ad, B:21:0x00da, B:23:0x00e1, B:24:0x0133, B:26:0x013b, B:27:0x0172, B:28:0x01a0, B:29:0x01b9, B:31:0x01cc, B:33:0x01eb, B:35:0x01fe, B:36:0x021c, B:37:0x0211, B:38:0x023a, B:40:0x0241, B:42:0x0249, B:44:0x024d, B:45:0x0274, B:47:0x0278, B:49:0x0281, B:51:0x02f4, B:55:0x02ae, B:56:0x02e4, B:57:0x02e5, B:66:0x0109, B:60:0x010f), top: B:2:0x0003, inners: #1, #2 }] */
    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public com.linkedin.davinci.kafka.consumer.StoreIngestionTask.DelegateConsumerRecordResult delegateConsumerRecord(com.linkedin.venice.pubsub.api.PubSubMessage<com.linkedin.venice.message.KafkaKey, com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope, java.lang.Long> r12, int r13, java.lang.String r14, int r15, long r16, long r18) {
        /*
            Method dump skipped, instructions count: 825
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.delegateConsumerRecord(com.linkedin.venice.pubsub.api.PubSubMessage, int, java.lang.String, int, long, long):com.linkedin.davinci.kafka.consumer.StoreIngestionTask$DelegateConsumerRecordResult");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public void waitForAllMessageToBeProcessedFromTopicPartition(PubSubTopicPartition pubSubTopicPartition, PartitionConsumptionState partitionConsumptionState) throws InterruptedException {
        super.waitForAllMessageToBeProcessedFromTopicPartition(pubSubTopicPartition, partitionConsumptionState);
        long millis = TimeUnit.MINUTES.toMillis(1L);
        if (partitionConsumptionState != null) {
            try {
                CompletableFuture<Void> lastQueuedRecordPersistedFuture = partitionConsumptionState.getLastQueuedRecordPersistedFuture();
                if (lastQueuedRecordPersistedFuture != null) {
                    lastQueuedRecordPersistedFuture.get(millis, TimeUnit.MILLISECONDS);
                }
            } catch (InterruptedException e) {
                LOGGER.warn("Got interrupted while waiting for the last queued record to be persisted for: {}. Will throw the interrupt exception.", pubSubTopicPartition, e);
                throw e;
            } catch (Exception e2) {
                LOGGER.error("Got exception while waiting for the last queued record to be persisted for: {}. Will swallow.", pubSubTopicPartition, e2);
            }
            Future<Void> future = null;
            try {
                future = partitionConsumptionState.getLastLeaderPersistFuture();
                if (future != null) {
                    long nanoTime = System.nanoTime();
                    future.get(millis, TimeUnit.MILLISECONDS);
                    this.hostLevelIngestionStats.recordLeaderProducerSynchronizeLatency(LatencyUtils.getLatencyInMS(nanoTime));
                }
            } catch (InterruptedException e3) {
                LOGGER.warn("Got interrupted while waiting for the last leader producer future for: {}. No data loss. Will throw the interrupt exception.", pubSubTopicPartition, e3);
                this.versionedDIVStats.recordBenignLeaderProducerFailure(this.storeName, this.versionNumber);
                throw e3;
            } catch (TimeoutException e4) {
                LOGGER.error("Timeout on waiting for the last leader producer future for: {}. No data loss. Will swallow.", pubSubTopicPartition, e4);
                future.cancel(true);
                partitionConsumptionState.setLastLeaderPersistFuture(null);
                this.versionedDIVStats.recordBenignLeaderProducerFailure(this.storeName, this.versionNumber);
            } catch (Exception e5) {
                LOGGER.error("Got exception while waiting for the latest producer future to be completed for: {}. Will swallow.", pubSubTopicPartition, e5);
                partitionConsumptionState.setLastLeaderPersistFuture(null);
                this.versionedDIVStats.recordBenignLeaderProducerFailure(this.storeName, this.versionNumber);
            }
        }
    }

    private void validateRecordBeforeProducingToLocalKafka(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, PartitionConsumptionState partitionConsumptionState, String str, int i) {
        if (i == this.localKafkaClusterId && pubSubMessage.getTopicPartition().getPubSubTopic().equals(this.versionTopic) && str.equals(this.localKafkaServer)) {
            try {
                int partition = partitionConsumptionState.getPartition();
                setIngestionException(partition, new VeniceException("Store version " + this.kafkaVersionTopic + " partition " + partition + " is consuming from local version topic and producing back to local version topic, kafkaClusterId = " + i + ", kafkaUrl = " + str + ", this.localKafkaServer = " + this.localKafkaServer));
            } catch (VeniceException e) {
                setLastStoreIngestionException(e);
            }
        }
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public long getBatchReplicationLag() {
        if (this.storageEngine.getStoreVersionState() == null || this.partitionConsumptionStateMap.isEmpty()) {
            return 0L;
        }
        return minZeroLag(this.partitionConsumptionStateMap.values().stream().filter(BATCH_REPLICATION_LAG_FILTER).mapToLong(partitionConsumptionState -> {
            PubSubTopic leaderTopic = partitionConsumptionState.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository);
            if (leaderTopic == null) {
                leaderTopic = this.versionTopic;
            }
            long partitionOffsetLag = getPartitionOffsetLag(getSourceKafkaUrlForOffsetLagMeasurement(partitionConsumptionState), leaderTopic, partitionConsumptionState.getUserPartition());
            return partitionOffsetLag >= 0 ? partitionOffsetLag : (this.cachedKafkaMetadataGetter.getOffset(getTopicManager(this.nativeReplicationSourceVersionTopicKafkaURL), leaderTopic, partitionConsumptionState.getPartition()) - 1) - (this.cachedKafkaMetadataGetter.getOffset(getTopicManager(this.localKafkaServer), leaderTopic, partitionConsumptionState.getPartition()) - 1);
        }).sum());
    }

    private long getLeaderOffsetLag(Predicate<? super PartitionConsumptionState> predicate) {
        if (this.storageEngine.getStoreVersionState() == null || this.partitionConsumptionStateMap.isEmpty()) {
            return 0L;
        }
        return minZeroLag(this.partitionConsumptionStateMap.values().stream().filter(predicate).mapToLong(partitionConsumptionState -> {
            PubSubTopic leaderTopic = partitionConsumptionState.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository);
            if (leaderTopic == null) {
                leaderTopic = this.versionTopic;
            }
            String sourceKafkaUrlForOffsetLagMeasurement = getSourceKafkaUrlForOffsetLagMeasurement(partitionConsumptionState);
            long partitionOffsetLag = getPartitionOffsetLag(sourceKafkaUrlForOffsetLagMeasurement, leaderTopic, partitionConsumptionState.getPartition());
            return partitionOffsetLag >= 0 ? partitionOffsetLag : leaderTopic.isRealTime() ? getLatestLeaderConsumedOffsetAndHybridTopicOffset(sourceKafkaUrlForOffsetLagMeasurement, leaderTopic, partitionConsumptionState) - 1 : (this.cachedKafkaMetadataGetter.getOffset(getTopicManager(sourceKafkaUrlForOffsetLagMeasurement), leaderTopic, partitionConsumptionState.getPartition()) - 1) - partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset();
        }).sum());
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public long getLeaderOffsetLag() {
        return getLeaderOffsetLag(LEADER_OFFSET_LAG_FILTER);
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public long getBatchLeaderOffsetLag() {
        return getLeaderOffsetLag(BATCH_LEADER_OFFSET_LAG_FILTER);
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public long getHybridLeaderOffsetLag() {
        return getLeaderOffsetLag(HYBRID_LEADER_OFFSET_LAG_FILTER);
    }

    private long getFollowerOffsetLag(Predicate<? super PartitionConsumptionState> predicate) {
        if (this.storageEngine.getStoreVersionState() == null || this.partitionConsumptionStateMap.isEmpty()) {
            return 0L;
        }
        return minZeroLag(this.partitionConsumptionStateMap.values().stream().filter(predicate).mapToLong(partitionConsumptionState -> {
            long partitionOffsetLag = getPartitionOffsetLag(this.localKafkaServer, this.versionTopic, partitionConsumptionState.getPartition());
            return partitionOffsetLag >= 0 ? partitionOffsetLag : (this.cachedKafkaMetadataGetter.getOffset(getTopicManager(this.localKafkaServer), this.versionTopic, partitionConsumptionState.getPartition()) - 1) - partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset();
        }).sum());
    }

    private String getSourceKafkaUrlForOffsetLagMeasurement(PartitionConsumptionState partitionConsumptionState) {
        String str;
        Set<String> consumptionSourceKafkaAddress = getConsumptionSourceKafkaAddress(partitionConsumptionState);
        if (consumptionSourceKafkaAddress.size() == 1) {
            str = consumptionSourceKafkaAddress.iterator().next();
        } else {
            if (!consumptionSourceKafkaAddress.contains(this.localKafkaServer)) {
                throw new VeniceException(String.format("Expect source Kafka URLs contains local Kafka URL. Got local Kafka URL %s and source Kafka URLs %s", this.localKafkaServer, consumptionSourceKafkaAddress));
            }
            str = this.localKafkaServer;
        }
        return str;
    }

    protected long getLatestPersistedUpstreamOffsetForHybridOffsetLagMeasurement(PartitionConsumptionState partitionConsumptionState, String str) {
        return partitionConsumptionState.getLatestProcessedUpstreamRTOffset("");
    }

    protected long getLatestConsumedUpstreamOffsetForHybridOffsetLagMeasurement(PartitionConsumptionState partitionConsumptionState, String str) {
        return partitionConsumptionState.getLeaderConsumedUpstreamRTOffset("");
    }

    protected void updateLatestInMemoryLeaderConsumedRTOffset(PartitionConsumptionState partitionConsumptionState, String str, long j) {
        partitionConsumptionState.updateLeaderConsumedUpstreamRTOffset("", j);
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public long getFollowerOffsetLag() {
        return getFollowerOffsetLag(this.FOLLOWER_OFFSET_LAG_FILTER);
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public long getBatchFollowerOffsetLag() {
        return getFollowerOffsetLag(this.BATCH_FOLLOWER_OFFSET_LAG_FILTER);
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public long getHybridFollowerOffsetLag() {
        return getFollowerOffsetLag(this.HYBRID_FOLLOWER_OFFSET_LAG_FILTER);
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public long getRegionHybridOffsetLag(int i) {
        return StatsErrorCode.ACTIVE_ACTIVE_NOT_ENABLED.code;
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public void consumerUnSubscribeAllTopics(PartitionConsumptionState partitionConsumptionState) {
        PubSubTopic leaderTopic = partitionConsumptionState.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository);
        int partition = partitionConsumptionState.getPartition();
        if (!partitionConsumptionState.getLeaderFollowerState().equals(LeaderFollowerStateType.LEADER) || leaderTopic == null) {
            this.aggKafkaConsumerService.unsubscribeConsumerFor(this.versionTopic, new PubSubTopicPartitionImpl(this.versionTopic, partition));
        } else {
            this.aggKafkaConsumerService.unsubscribeConsumerFor(this.versionTopic, new PubSubTopicPartitionImpl(leaderTopic, partition));
        }
        this.veniceWriter.ifPresent(veniceWriter -> {
            veniceWriter.closePartition(partition);
        });
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public int getWriteComputeErrorCode() {
        return this.writeComputeFailureCode;
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    public void updateLeaderTopicOnFollower(PartitionConsumptionState partitionConsumptionState) {
        if (isLeader(partitionConsumptionState)) {
            return;
        }
        TopicSwitchWrapper topicSwitch = partitionConsumptionState.getTopicSwitch();
        OffsetRecord offsetRecord = partitionConsumptionState.getOffsetRecord();
        if (topicSwitch == null || topicSwitch.getNewSourceTopic().equals(offsetRecord.getLeaderTopic(this.pubSubTopicRepository))) {
            return;
        }
        offsetRecord.setLeaderTopic(topicSwitch.getNewSourceTopic());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getSubPartitionId(byte[] bArr, PubSubTopicPartition pubSubTopicPartition) {
        return (this.amplificationFactor == 1 || !pubSubTopicPartition.getPubSubTopic().isRealTime()) ? pubSubTopicPartition.getPartitionNumber() : this.venicePartitioner.getPartitionId(bArr, this.subPartitionCount);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ByteBuffer maybeCompressData(int i, ByteBuffer byteBuffer, PartitionConsumptionState partitionConsumptionState) {
        if (byteBuffer == null) {
            return null;
        }
        if (!shouldCompressData(partitionConsumptionState)) {
            return byteBuffer;
        }
        try {
            return ((VeniceCompressor) this.compressor.get()).compress(byteBuffer, 4);
        } catch (IOException e) {
            throw new RuntimeException(String.format("Failed to compress value in venice writer! Aborting write! partition: %d, leader topic: %s, compressor: %s", Integer.valueOf(i), partitionConsumptionState.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository), this.compressor.getClass().getName()), e);
        }
    }

    protected boolean shouldCompressData(PartitionConsumptionState partitionConsumptionState) {
        if (isLeader(partitionConsumptionState)) {
            return this.realTimeTopic.equals(partitionConsumptionState.getOffsetRecord().getLeaderTopic(this.pubSubTopicRepository)) && !this.compressionStrategy.equals(CompressionStrategy.NO_OP);
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processMessageAndMaybeProduceToKafka(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, PartitionConsumptionState partitionConsumptionState, int i, String str, int i2, long j, long j2) {
        KafkaKey kafkaKey = (KafkaKey) pubSubMessage.getKey();
        KafkaMessageEnvelope kafkaMessageEnvelope = (KafkaMessageEnvelope) pubSubMessage.getValue();
        byte[] key = kafkaKey.getKey();
        switch (AnonymousClass1.$SwitchMap$com$linkedin$venice$kafka$protocol$enums$MessageType[MessageType.valueOf(kafkaMessageEnvelope.messageType).ordinal()]) {
            case 1:
                Put put = (Put) kafkaMessageEnvelope.payloadUnion;
                put.putValue = maybeCompressData(pubSubMessage.getTopicPartition().getPartitionNumber(), put.putValue, partitionConsumptionState);
                ByteBuffer byteBuffer = put.putValue;
                if (this.isWriteComputationEnabled && partitionConsumptionState.isEndOfPushReceived()) {
                    partitionConsumptionState.setTransientRecord(i2, ((Long) pubSubMessage.getOffset()).longValue(), key, byteBuffer.array(), byteBuffer.position(), byteBuffer.remaining(), put.schemaId, null);
                }
                produceToLocalKafka(pubSubMessage, partitionConsumptionState, LeaderProducedRecordContext.newPutRecord(i2, ((Long) pubSubMessage.getOffset()).longValue(), key, put), (chunkAwareCallback, leaderMetadataWrapper) -> {
                    if (partitionConsumptionState.isEndOfPushReceived()) {
                        ((VeniceWriter) this.veniceWriter.get()).put(key, ByteUtils.extractByteArray(byteBuffer), put.schemaId, chunkAwareCallback, leaderMetadataWrapper);
                    } else {
                        ((VeniceWriter) this.veniceWriter.get()).put(kafkaKey, kafkaMessageEnvelope, chunkAwareCallback, pubSubMessage.getTopicPartition().getPartitionNumber(), leaderMetadataWrapper);
                    }
                }, i, str, i2, j);
                return;
            case 2:
                if (this.isWriteComputationEnabled && partitionConsumptionState.isEndOfPushReceived()) {
                    partitionConsumptionState.setTransientRecord(i2, ((Long) pubSubMessage.getOffset()).longValue(), key, -1, null);
                }
                produceToLocalKafka(pubSubMessage, partitionConsumptionState, LeaderProducedRecordContext.newDeleteRecord(i2, ((Long) pubSubMessage.getOffset()).longValue(), key, null), (chunkAwareCallback2, leaderMetadataWrapper2) -> {
                    if (partitionConsumptionState.isEndOfPushReceived()) {
                        ((VeniceWriter) this.veniceWriter.get()).delete(key, chunkAwareCallback2, leaderMetadataWrapper2);
                    } else {
                        ((VeniceWriter) this.veniceWriter.get()).delete(kafkaKey, kafkaMessageEnvelope, chunkAwareCallback2, pubSubMessage.getTopicPartition().getPartitionNumber(), leaderMetadataWrapper2);
                    }
                }, i, str, i2, j);
                return;
            case VeniceServerConfig.MINIMUM_CONSUMER_NUM_IN_CONSUMER_POOL_PER_KAFKA_CLUSTER /* 3 */:
                handleUpdateRequest((Update) kafkaMessageEnvelope.payloadUnion, key, pubSubMessage, str, i2, partitionConsumptionState, j);
                return;
            default:
                throw new VeniceMessageException(this.consumerTaskId + " : Invalid/Unrecognized operation type submitted: " + kafkaMessageEnvelope.messageType);
        }
    }

    private void handleUpdateRequest(Update update, byte[] bArr, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, String str, int i, PartitionConsumptionState partitionConsumptionState, long j) {
        int id;
        int i2;
        int partition = partitionConsumptionState.getPartition();
        if (isIngestingSystemStore()) {
            id = this.schemaRepository.getSupersetOrLatestValueSchema(this.storeName).getId();
            i2 = this.schemaRepository.getLatestDerivedSchema(this.storeName, id).getId();
        } else {
            SchemaEntry supersetSchema = this.schemaRepository.getSupersetSchema(this.storeName);
            if (supersetSchema == null) {
                throw new IllegalStateException("Cannot find superset schema for store: " + this.storeName);
            }
            id = supersetSchema.getId();
            i2 = update.updateSchemaId;
        }
        GenericRecord readStoredValueRecord = readStoredValueRecord(partitionConsumptionState, bArr, id, pubSubMessage.getTopicPartition());
        try {
            long nanoTime = System.nanoTime();
            byte[] compress = ((VeniceCompressor) this.compressor.get()).compress(this.storeWriteComputeHandler.applyWriteCompute(readStoredValueRecord, update.schemaId, id, update.updateValue, update.updateSchemaId, i2));
            this.hostLevelIngestionStats.recordWriteComputeUpdateLatency(LatencyUtils.getLatencyInMS(nanoTime));
            if (compress == null) {
                if (readStoredValueRecord != null) {
                    throw new IllegalStateException("Detect a situation where the current value exists and the Write Compute requestdeletes the current value. It is unexpected because Write Compute only supports partial update and does not support record value deletion.");
                }
                return;
            }
            partitionConsumptionState.setTransientRecord(i, ((Long) pubSubMessage.getOffset()).longValue(), bArr, compress, 0, compress.length, id, null);
            ByteBuffer prependIntHeaderToByteBuffer = ByteUtils.prependIntHeaderToByteBuffer(ByteBuffer.wrap(compress), id, false);
            Put put = new Put();
            put.putValue = prependIntHeaderToByteBuffer;
            put.schemaId = id;
            byte[] bArr2 = bArr;
            if (this.isChunked) {
                bArr2 = ChunkingUtils.KEY_WITH_CHUNKING_SUFFIX_SERIALIZER.serializeNonChunkedKey(bArr);
            }
            int i3 = id;
            produceToLocalKafka(pubSubMessage, partitionConsumptionState, LeaderProducedRecordContext.newPutRecord(i, ((Long) pubSubMessage.getOffset()).longValue(), bArr2, put), (chunkAwareCallback, leaderMetadataWrapper) -> {
                ((VeniceWriter) this.veniceWriter.get()).put(bArr, compress, i3, chunkAwareCallback, leaderMetadataWrapper);
            }, partition, str, i, j);
        } catch (Exception e) {
            this.writeComputeFailureCode = StatsErrorCode.WRITE_COMPUTE_UPDATE_FAILURE.code;
            throw new RuntimeException(e);
        }
    }

    private GenericRecord readStoredValueRecord(PartitionConsumptionState partitionConsumptionState, byte[] bArr, int i, PubSubTopicPartition pubSubTopicPartition) {
        GenericRecord genericRecord;
        PartitionConsumptionState.TransientRecord transientRecord = partitionConsumptionState.getTransientRecord(bArr);
        if (transientRecord == null) {
            try {
                long nanoTime = System.nanoTime();
                genericRecord = (GenericRecord) GenericRecordChunkingAdapter.INSTANCE.get(this.storageEngine, i, getSubPartitionId(bArr, pubSubTopicPartition), ByteBuffer.wrap(bArr), this.isChunked, (boolean) null, (BinaryDecoder) null, (ReadResponse) null, this.compressionStrategy, this.serverConfig.isComputeFastAvroEnabled(), this.schemaRepository, this.storeName, (VeniceCompressor) this.compressor.get());
                this.hostLevelIngestionStats.recordWriteComputeLookUpLatency(LatencyUtils.getLatencyInMS(nanoTime));
            } catch (Exception e) {
                this.writeComputeFailureCode = StatsErrorCode.WRITE_COMPUTE_DESERIALIZATION_FAILURE.code;
                throw e;
            }
        } else {
            this.hostLevelIngestionStats.recordWriteComputeCacheHitCount();
            if (transientRecord.getValue() != null) {
                try {
                    genericRecord = (GenericRecord) GenericRecordChunkingAdapter.INSTANCE.constructValue(transientRecord.getValueSchemaId(), i, transientRecord.getValue(), transientRecord.getValueOffset(), transientRecord.getValueLen(), this.serverConfig.isComputeFastAvroEnabled(), this.schemaRepository, this.storeName, (VeniceCompressor) this.compressor.get());
                } catch (Exception e2) {
                    this.writeComputeFailureCode = StatsErrorCode.WRITE_COMPUTE_DESERIALIZATION_FAILURE.code;
                    throw e2;
                }
            } else {
                genericRecord = null;
            }
        }
        return genericRecord;
    }

    private void restoreProducerStatesForLeaderConsumption(int i) {
        this.kafkaDataIntegrityValidatorForLeaders.clearPartition(i);
        cloneProducerStates(i, this.kafkaDataIntegrityValidatorForLeaders);
    }

    protected long getLatestLeaderPersistedOffsetAndHybridTopicOffset(String str, PubSubTopic pubSubTopic, PartitionConsumptionState partitionConsumptionState, boolean z) {
        return getLatestLeaderOffsetAndHybridTopicOffset(str, pubSubTopic, partitionConsumptionState, this::getLatestPersistedUpstreamOffsetForHybridOffsetLagMeasurement, z);
    }

    private boolean isIngestingSystemStore() {
        return VeniceSystemStoreUtils.isSystemStore(this.storeName);
    }

    protected long getLatestLeaderConsumedOffsetAndHybridTopicOffset(String str, PubSubTopic pubSubTopic, PartitionConsumptionState partitionConsumptionState) {
        return getLatestLeaderOffsetAndHybridTopicOffset(str, pubSubTopic, partitionConsumptionState, this::getLatestConsumedUpstreamOffsetForHybridOffsetLagMeasurement, false);
    }

    private long getLatestLeaderOffsetAndHybridTopicOffset(String str, PubSubTopic pubSubTopic, PartitionConsumptionState partitionConsumptionState, BiFunction<PartitionConsumptionState, String, Long> biFunction, boolean z) {
        int i;
        long longValue;
        int partition = partitionConsumptionState.getPartition();
        if (this.amplificationFactor > 1) {
            i = PartitionUtils.getUserPartition(partition, this.amplificationFactor);
            IntList subPartitions = PartitionUtils.getSubPartitions(i, this.amplificationFactor);
            longValue = -1;
            for (int i2 = 0; i2 < subPartitions.size(); i2++) {
                PartitionConsumptionState partitionConsumptionState2 = this.partitionConsumptionStateMap.get(Integer.valueOf(subPartitions.getInt(i2)));
                if (partitionConsumptionState2 != null && partitionConsumptionState2.getOffsetRecord() != null) {
                    long longValue2 = biFunction.apply(partitionConsumptionState2, str).longValue();
                    if (longValue2 > longValue) {
                        longValue = longValue2;
                    }
                }
            }
        } else {
            i = partition;
            longValue = biFunction.apply(partitionConsumptionState, str).longValue();
        }
        long partitionLatestOffset = getPartitionLatestOffset(str, pubSubTopic, i);
        long offset = partitionLatestOffset >= 0 ? partitionLatestOffset : this.cachedKafkaMetadataGetter.getOffset(getTopicManager(str), pubSubTopic, i);
        if (longValue == -1) {
            longValue = 0;
        }
        long j = offset - longValue;
        if (getPartitionOffsetLag(str, pubSubTopic, i) <= 0 && this.cachedKafkaMetadataGetter.getEarliestOffset(getTopicManager(str), new PubSubTopicPartitionImpl(pubSubTopic, i)) == offset - 1) {
            j = 0;
        }
        if (z) {
            LOGGER.info("{} partition {} RT lag offset for {} is: Latest RT offset [{}] - persisted offset [{}] = Lag [{}]", this.consumerTaskId, Integer.valueOf(partitionConsumptionState.getPartition()), str, Long.valueOf(offset), Long.valueOf(longValue), Long.valueOf(j));
        }
        return j;
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionTask
    protected void processVersionSwapMessage(ControlMessage controlMessage, int i, PartitionConsumptionState partitionConsumptionState) {
        Iterator<VeniceViewWriter> it = this.viewWriters.values().iterator();
        while (it.hasNext()) {
            it.next().processControlMessage(controlMessage, i, partitionConsumptionState, this.versionNumber);
        }
    }

    protected LeaderProducerCallback createProducerCallback(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, int i, String str, long j) {
        return new LeaderProducerCallback(this, pubSubMessage, partitionConsumptionState, leaderProducedRecordContext, i, str, j);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Lazy<VeniceWriter<byte[], byte[], byte[]>> getVeniceWriter() {
        return this.veniceWriter;
    }
}
