package com.linkedin.venice.controller.kafka.consumer;

import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.controller.AdminTopicMetadataAccessor;
import com.linkedin.venice.controller.ExecutionIdAccessor;
import com.linkedin.venice.controller.VeniceHelixAdmin;
import com.linkedin.venice.controller.kafka.AdminTopicUtils;
import com.linkedin.venice.controller.kafka.protocol.admin.AdminOperation;
import com.linkedin.venice.controller.kafka.protocol.admin.KillOfflinePushJob;
import com.linkedin.venice.controller.kafka.protocol.enums.AdminMessageType;
import com.linkedin.venice.controller.kafka.protocol.serializer.AdminOperationSerializer;
import com.linkedin.venice.controller.stats.AdminConsumptionStats;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.validation.DataValidationException;
import com.linkedin.venice.exceptions.validation.DuplicateDataException;
import com.linkedin.venice.exceptions.validation.MissingDataException;
import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.kafka.protocol.GUID;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.kafka.KafkaPubSubMessageDeserializer;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.Utils;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.avro.generic.GenericRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.class */
public class AdminConsumptionTask implements Runnable, Closeable {
    private static final int MAX_WORKER_QUEUE_SIZE = 10000;
    private static final long UNASSIGNED_VALUE = -1;
    private static final int READ_CYCLE_DELAY_MS = 1000;
    private static final int MAX_DUPLICATE_MESSAGE_LOGS = 20;
    public static final int IGNORED_CURRENT_VERSION = -1;
    private static final String STORAGE_PERSONA_MAP_KEY = "STORAGE_PERSONA";
    private final Logger LOGGER;
    private final String clusterName;
    private final String topic;
    private final PubSubTopic pubSubTopic;
    private final String consumerTaskId;
    private final AdminTopicMetadataAccessor adminTopicMetadataAccessor;
    private final VeniceHelixAdmin admin;
    private final boolean isParentController;
    private final AdminConsumptionStats stats;
    private final int adminTopicReplicationFactor;
    private final Optional<Integer> minInSyncReplicas;
    private final boolean remoteConsumptionEnabled;
    private final PubSubConsumerAdapter consumer;
    private final ExecutionIdAccessor executionIdAccessor;
    private final ExecutorService executorService;
    private TopicManager sourceKafkaClusterTopicManager;
    private final long processingCycleTimeoutInMs;
    private volatile ConcurrentHashMap<String, Long> lastSucceededExecutionIdMap;
    private final PubSubTopicRepository pubSubTopicRepository;
    private final PubSubMessageDeserializer pubSubMessageDeserializer;
    private final String regionName;
    private static final String CONSUMER_TASK_ID_FORMAT = AdminConsumptionTask.class.getSimpleName() + " [Topic: %s] ";
    private static final long CONSUMPTION_LAG_UPDATE_INTERVAL_IN_MS = TimeUnit.MINUTES.toMillis(5);
    private volatile long offsetToSkip = -1;
    private volatile long offsetToSkipDIV = -1;
    private volatile long failingOffset = -1;
    private long lastPersistedExecutionId = -1;
    private long lastPersistedOffset = -1;
    private long lastDelegatedExecutionId = -1;
    private long lastOffset = -1;
    private long lastConsumedOffset = -1;
    private long localOffsetCheckpointAtStartTime = -1;
    private long upstreamOffsetCheckpointAtStartTime = -1;
    private ProducerInfo producerInfo = null;
    private int consecutiveDuplicateMessageCount = 0;
    private long lastUpdateTimeForConsumptionOffsetLag = 0;
    private final AdminOperationSerializer deserializer = new AdminOperationSerializer();
    private final AtomicBoolean isRunning = new AtomicBoolean(true);
    private boolean isSubscribed = false;
    private boolean topicExists = false;
    private final Map<String, Queue<AdminOperationWrapper>> storeAdminOperationsMapWithOffset = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, AdminErrorInfo> problematicStores = new ConcurrentHashMap<>();
    private final Queue<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> undelegatedRecords = new LinkedList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask$AdminErrorInfo.class */
    public static class AdminErrorInfo {
        long offset;
        Exception exception;

        private AdminErrorInfo() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask$ProducerInfo.class */
    public static class ProducerInfo {
        private GUID producerGUID;
        private int segmentNumber;
        private int sequenceNumber;

        ProducerInfo(ProducerMetadata producerMetadata) {
            updateProducerMetadata(producerMetadata);
        }

        boolean isIncomingMessageValid(ProducerMetadata producerMetadata) {
            if (!producerMetadata.producerGUID.equals(this.producerGUID)) {
                updateProducerMetadata(producerMetadata);
                return true;
            }
            if (producerMetadata.segmentNumber != this.segmentNumber) {
                return false;
            }
            if (producerMetadata.messageSequenceNumber == this.sequenceNumber + 1) {
                this.sequenceNumber = producerMetadata.messageSequenceNumber;
            }
            return producerMetadata.messageSequenceNumber <= this.sequenceNumber;
        }

        void updateProducerMetadata(ProducerMetadata producerMetadata) {
            this.producerGUID = producerMetadata.producerGUID;
            this.segmentNumber = producerMetadata.segmentNumber;
            this.sequenceNumber = producerMetadata.messageSequenceNumber;
        }

        public String toString() {
            return String.format("{producerGUID: %s, segmentNumber: %d, sequenceNumber: %d}", this.producerGUID, Integer.valueOf(this.segmentNumber), Integer.valueOf(this.sequenceNumber));
        }
    }

    public ExecutorService getExecutorService() {
        return this.executorService;
    }

    public AdminConsumptionTask(String str, PubSubConsumerAdapter pubSubConsumerAdapter, boolean z, Optional<String> optional, VeniceHelixAdmin veniceHelixAdmin, AdminTopicMetadataAccessor adminTopicMetadataAccessor, ExecutionIdAccessor executionIdAccessor, boolean z2, AdminConsumptionStats adminConsumptionStats, int i, Optional<Integer> optional2, long j, int i2, PubSubTopicRepository pubSubTopicRepository, KafkaPubSubMessageDeserializer kafkaPubSubMessageDeserializer, String str2) {
        this.clusterName = str;
        this.topic = AdminTopicUtils.getTopicNameFromClusterName(str);
        this.consumerTaskId = String.format(CONSUMER_TASK_ID_FORMAT, this.topic);
        this.LOGGER = LogManager.getLogger(this.consumerTaskId);
        this.admin = veniceHelixAdmin;
        this.isParentController = z2;
        this.stats = adminConsumptionStats;
        this.adminTopicReplicationFactor = i;
        this.minInSyncReplicas = optional2;
        this.consumer = pubSubConsumerAdapter;
        this.remoteConsumptionEnabled = z;
        this.adminTopicMetadataAccessor = adminTopicMetadataAccessor;
        this.executionIdAccessor = executionIdAccessor;
        this.processingCycleTimeoutInMs = j;
        this.executorService = new ThreadPoolExecutor(i2, i2, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(10000), new DaemonThreadFactory("Venice-Admin-Execution-Task"));
        this.stats.setAdminConsumptionFailedOffset(this.failingOffset);
        this.pubSubTopicRepository = pubSubTopicRepository;
        this.pubSubMessageDeserializer = kafkaPubSubMessageDeserializer;
        this.pubSubTopic = pubSubTopicRepository.getTopic(this.topic);
        this.regionName = str2;
        if (z) {
            if (!optional.isPresent()) {
                throw new VeniceException("Admin topic remote consumption is enabled but no config found for the source Kafka bootstrap server url");
            }
            this.sourceKafkaClusterTopicManager = veniceHelixAdmin.getTopicManager(optional.get());
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() throws IOException {
        this.isRunning.getAndSet(false);
    }

    @Override // java.lang.Runnable
    public void run() {
        this.LOGGER.info("Running {}", getClass().getSimpleName());
        long j = 0;
        while (this.isRunning.get()) {
            try {
                Utils.sleep(1000L);
                if (this.admin.isLeaderControllerFor(this.clusterName) && this.admin.isAdminTopicConsumptionEnabled(this.clusterName)) {
                    if (!this.isSubscribed) {
                        if (whetherTopicExists(this.pubSubTopic)) {
                            makeSureAdminTopicUsingInfiniteRetentionPolicy(this.pubSubTopic);
                        } else if (this.isParentController) {
                            this.LOGGER.info("Admin topic: {} hasn't been created yet. {}", this.topic, "Since this is the parent controller, it will be created now.");
                            this.admin.getTopicManager().createTopic(this.pubSubTopic, 1, this.adminTopicReplicationFactor, true, false, this.minInSyncReplicas);
                            this.LOGGER.info("Admin topic {} is created.", this.topic);
                        } else if (System.currentTimeMillis() - j > 60000) {
                            this.LOGGER.info("Admin topic: {} hasn't been created yet. {}", this.topic, "Since this is a child controller, it will not be created by this process.");
                            j = System.currentTimeMillis();
                        }
                        subscribe();
                    }
                    if (this.undelegatedRecords.isEmpty()) {
                        Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> poll = this.consumer.poll(1000L);
                        if (poll == null || poll.isEmpty()) {
                            this.LOGGER.debug("Received null or no messages");
                        } else {
                            this.LOGGER.info("Consumed {} admin messages from kafka. Will queue them up for processing", Integer.valueOf(poll.values().stream().mapToInt((v0) -> {
                                return v0.size();
                            }).sum()));
                            Iterator iterateOnMapOfLists = Utils.iterateOnMapOfLists(poll);
                            while (iterateOnMapOfLists.hasNext()) {
                                PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage = (PubSubMessage) iterateOnMapOfLists.next();
                                this.lastConsumedOffset = pubSubMessage.getOffset().longValue();
                                this.undelegatedRecords.add(pubSubMessage);
                            }
                        }
                    } else {
                        this.LOGGER.info("There are {} admin messages in the undelegated message queue. Will consume from the undelegated queue first before polling the admin topic.", Integer.valueOf(this.undelegatedRecords.size()));
                    }
                    while (!this.undelegatedRecords.isEmpty()) {
                        try {
                            try {
                                if (delegateMessage(this.undelegatedRecords.peek()) == this.lastDelegatedExecutionId) {
                                    updateLastOffset(this.undelegatedRecords.peek().getOffset().longValue());
                                }
                                this.undelegatedRecords.remove();
                            } catch (DataValidationException e) {
                                this.LOGGER.error("Admin consumption task is blocked due to DataValidationException with offset {}", this.undelegatedRecords.peek().getOffset(), e);
                                this.failingOffset = this.undelegatedRecords.peek().getOffset().longValue();
                                this.stats.recordFailedAdminConsumption();
                                this.stats.recordAdminTopicDIVErrorReportCount();
                            }
                        } catch (Exception e2) {
                            this.LOGGER.error("Admin consumption task is blocked due to Exception with offset {}", this.undelegatedRecords.peek().getOffset(), e2);
                            this.failingOffset = this.undelegatedRecords.peek().getOffset().longValue();
                            this.stats.recordFailedAdminConsumption();
                        }
                    }
                    if (this.remoteConsumptionEnabled && LatencyUtils.getElapsedTimeInMs(this.lastUpdateTimeForConsumptionOffsetLag) > CONSUMPTION_LAG_UPDATE_INTERVAL_IN_MS) {
                        recordConsumptionLag();
                        this.lastUpdateTimeForConsumptionOffsetLag = System.currentTimeMillis();
                    }
                    executeMessagesAndCollectResults();
                    this.stats.setAdminConsumptionFailedOffset(this.failingOffset);
                } else {
                    unSubscribe();
                }
            } catch (Exception e3) {
                this.LOGGER.error("Exception thrown while running admin consumption task", (Throwable) e3);
                unSubscribe();
            }
        }
        internalClose();
    }

    private void subscribe() {
        Map<String, Long> metadata = this.adminTopicMetadataAccessor.getMetadata(this.clusterName);
        if (metadata.isEmpty()) {
            this.LOGGER.info("Admin topic metadata is empty, will resume consumption from the starting offset");
            this.lastOffset = -1L;
            this.lastDelegatedExecutionId = -1L;
        } else {
            Pair<Long, Long> offsets = AdminTopicMetadataAccessor.getOffsets(metadata);
            this.localOffsetCheckpointAtStartTime = offsets.getFirst().longValue();
            this.upstreamOffsetCheckpointAtStartTime = offsets.getSecond().longValue();
            this.lastPersistedOffset = this.remoteConsumptionEnabled ? this.upstreamOffsetCheckpointAtStartTime : this.localOffsetCheckpointAtStartTime;
            this.lastPersistedExecutionId = AdminTopicMetadataAccessor.getExecutionId(metadata);
            this.lastOffset = this.lastPersistedOffset - 1;
            this.lastDelegatedExecutionId = this.lastPersistedExecutionId;
        }
        this.stats.setAdminConsumptionCheckpointOffset(this.lastPersistedOffset);
        this.stats.registerAdminConsumptionCheckpointOffset();
        this.consumer.subscribe(new PubSubTopicPartitionImpl(this.pubSubTopic, 0), this.lastOffset);
        this.isSubscribed = true;
        this.LOGGER.info("Subscribed to topic name: {}, with offset: {} and execution id: {}. Remote consumption flag: {}", this.topic, Long.valueOf(this.lastOffset), Long.valueOf(this.lastPersistedExecutionId), Boolean.valueOf(this.remoteConsumptionEnabled));
    }

    private void unSubscribe() {
        if (this.isSubscribed) {
            this.consumer.unSubscribe(new PubSubTopicPartitionImpl(this.pubSubTopic, 0));
            this.storeAdminOperationsMapWithOffset.clear();
            this.problematicStores.clear();
            this.undelegatedRecords.clear();
            this.failingOffset = -1L;
            this.offsetToSkip = -1L;
            this.offsetToSkipDIV = -1L;
            this.lastDelegatedExecutionId = -1L;
            this.lastPersistedExecutionId = -1L;
            this.lastOffset = -1L;
            this.lastPersistedOffset = -1L;
            this.producerInfo = null;
            this.stats.recordPendingAdminMessagesCount(-1.0d);
            this.stats.recordStoresWithPendingAdminMessagesCount(-1.0d);
            resetConsumptionLag();
            this.isSubscribed = false;
            this.LOGGER.info("Unsubscribed from topic name: {}. Remote consumption flag before unsubscription: {}", this.topic, Boolean.valueOf(this.remoteConsumptionEnabled));
        }
    }

    private void executeMessagesAndCollectResults() throws InterruptedException {
        this.lastSucceededExecutionIdMap = new ConcurrentHashMap<>(this.executionIdAccessor.getLastSucceededExecutionIdMap(this.clusterName));
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        boolean z = false;
        for (Map.Entry<String, Queue<AdminOperationWrapper>> entry : this.storeAdminOperationsMapWithOffset.entrySet()) {
            if (!entry.getValue().isEmpty()) {
                if (checkOffsetToSkip(entry.getValue().peek().getOffset(), false)) {
                    entry.getValue().remove();
                    z = true;
                }
                arrayList.add(new AdminExecutionTask(this.LOGGER, this.clusterName, entry.getKey(), this.lastSucceededExecutionIdMap, this.lastPersistedExecutionId, entry.getValue(), this.admin, this.executionIdAccessor, this.isParentController, this.stats, this.regionName));
                arrayList2.add(entry.getKey());
            }
        }
        if (z) {
            resetOffsetToSkip();
        }
        if (this.isRunning.get()) {
            if (arrayList.isEmpty()) {
                persistAdminTopicMetadata();
                return;
            }
            int i = 0;
            int i2 = 0;
            long currentTimeMillis = System.currentTimeMillis();
            List invokeAll = this.executorService.invokeAll(arrayList, this.processingCycleTimeoutInMs, TimeUnit.MILLISECONDS);
            this.stats.recordAdminConsumptionCycleDurationMs(System.currentTimeMillis() - currentTimeMillis);
            Map<String, Long> lastSucceededExecutionIdMap = this.executionIdAccessor.getLastSucceededExecutionIdMap(this.clusterName);
            boolean z2 = true;
            for (int i3 = 0; i3 < invokeAll.size(); i3++) {
                String str = (String) arrayList2.get(i3);
                try {
                    ((Future) invokeAll.get(i3)).get();
                    this.problematicStores.remove(str);
                    if (z2 && this.storeAdminOperationsMapWithOffset.containsKey(str) && !this.storeAdminOperationsMapWithOffset.get(str).isEmpty()) {
                        z2 = false;
                    }
                } catch (CancellationException | ExecutionException e) {
                    z2 = false;
                    AdminErrorInfo adminErrorInfo = new AdminErrorInfo();
                    int size = this.storeAdminOperationsMapWithOffset.get(str).size();
                    i += size;
                    i2 += size > 0 ? 1 : 0;
                    if (e instanceof CancellationException) {
                        long longValue = this.lastSucceededExecutionIdMap.getOrDefault(str, -1L).longValue();
                        long longValue2 = lastSucceededExecutionIdMap.getOrDefault(str, -1L).longValue();
                        if (longValue == -1) {
                            this.LOGGER.error("Could not find last successful execution ID for store {}", str);
                        }
                        if (longValue == longValue2 && size > 0) {
                            adminErrorInfo.exception = new VeniceException("Could not finish processing admin message for store " + str + " in time");
                            adminErrorInfo.offset = this.storeAdminOperationsMapWithOffset.get(str).peek().getOffset();
                            this.problematicStores.put(str, adminErrorInfo);
                            this.LOGGER.warn(adminErrorInfo.exception.getMessage());
                        }
                    } else {
                        adminErrorInfo.exception = e;
                        adminErrorInfo.offset = this.storeAdminOperationsMapWithOffset.get(str).peek().getOffset();
                        this.problematicStores.put(str, adminErrorInfo);
                    }
                }
            }
            if (this.problematicStores.isEmpty() && z2) {
                if (this.failingOffset <= this.lastOffset) {
                    this.failingOffset = -1L;
                }
                persistAdminTopicMetadata();
            } else {
                long j = -1;
                for (Map.Entry<String, AdminErrorInfo> entry2 : this.problematicStores.entrySet()) {
                    if (j == -1 || entry2.getValue().offset < j) {
                        j = entry2.getValue().offset;
                    }
                }
                if (this.failingOffset <= this.lastOffset) {
                    this.failingOffset = j;
                }
            }
            this.stats.recordPendingAdminMessagesCount(i);
            this.stats.recordStoresWithPendingAdminMessagesCount(i2);
        }
    }

    private void internalClose() {
        unSubscribe();
        this.executorService.shutdownNow();
        try {
            if (!this.executorService.awaitTermination(this.processingCycleTimeoutInMs, TimeUnit.MILLISECONDS)) {
                this.LOGGER.warn("consumer Task Id {}: Unable to shutdown worker executor thread pool in admin consumption task in time", this.consumerTaskId);
            }
        } catch (InterruptedException e) {
            this.LOGGER.warn("consumer Task Id {}: Interrupted while waiting for worker executor thread pool to shutdown", this.consumerTaskId);
        }
        this.LOGGER.info("Closed consumer for admin topic: {}", this.topic);
        this.consumer.close();
    }

    private boolean whetherTopicExists(PubSubTopic pubSubTopic) {
        if (this.topicExists) {
            return true;
        }
        if (this.remoteConsumptionEnabled) {
            this.topicExists = this.sourceKafkaClusterTopicManager.containsTopicAndAllPartitionsAreOnline(pubSubTopic);
        } else {
            this.topicExists = this.admin.getTopicManager().containsTopicAndAllPartitionsAreOnline(pubSubTopic);
        }
        return this.topicExists;
    }

    private void makeSureAdminTopicUsingInfiniteRetentionPolicy(PubSubTopic pubSubTopic) {
        if (this.remoteConsumptionEnabled) {
            this.sourceKafkaClusterTopicManager.updateTopicRetention(pubSubTopic, Long.MAX_VALUE);
        } else {
            this.admin.getTopicManager().updateTopicRetention(pubSubTopic, Long.MAX_VALUE);
        }
        this.LOGGER.info("Admin topic: {} has been updated to use infinite retention policy", this.topic);
    }

    private long delegateMessage(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage) {
        if (checkOffsetToSkip(pubSubMessage.getOffset().longValue(), true) || !shouldProcessRecord(pubSubMessage)) {
            return this.lastDelegatedExecutionId;
        }
        KafkaKey key = pubSubMessage.getKey();
        KafkaMessageEnvelope value = pubSubMessage.getValue();
        if (key.isControlMessage()) {
            this.LOGGER.debug("Received control message: {}", value);
            return -1L;
        }
        MessageType valueOf = MessageType.valueOf(value);
        if (MessageType.PUT != valueOf) {
            throw new VeniceException("Received unexpected message type: " + valueOf);
        }
        Put put = (Put) value.payloadUnion;
        AdminOperation deserialize = this.deserializer.deserialize(put.putValue, put.schemaId);
        long j = deserialize.executionId;
        try {
            checkAndValidateMessage(deserialize, pubSubMessage);
            this.LOGGER.info("Received admin message: {} offset: {}", deserialize, pubSubMessage.getOffset());
            this.consecutiveDuplicateMessageCount = 0;
            if (AdminMessageType.valueOf(deserialize).isBatchUpdate()) {
                long j2 = value.producerMetadata.messageTimestamp;
                long pubSubMessageTime = pubSubMessage.getPubSubMessageTime();
                Iterator<Store> it2 = this.admin.getAllStores(this.clusterName).iterator();
                while (it2.hasNext()) {
                    this.storeAdminOperationsMapWithOffset.computeIfAbsent(it2.next().getName(), str -> {
                        return new LinkedList();
                    }).add(new AdminOperationWrapper(deserialize, pubSubMessage.getOffset().longValue(), j2, pubSubMessageTime, System.currentTimeMillis()));
                    this.stats.recordAdminMessageMMLatency(Math.max(0L, r0.getLocalBrokerTimestamp() - r0.getProducerTimestamp()));
                    this.stats.recordAdminMessageDelegateLatency(Math.max(0L, r0.getDelegateTimestamp() - r0.getLocalBrokerTimestamp()));
                }
            } else {
                AdminOperationWrapper adminOperationWrapper = new AdminOperationWrapper(deserialize, pubSubMessage.getOffset().longValue(), value.producerMetadata.messageTimestamp, pubSubMessage.getPubSubMessageTime(), System.currentTimeMillis());
                this.stats.recordAdminMessageMMLatency(Math.max(0L, adminOperationWrapper.getLocalBrokerTimestamp() - adminOperationWrapper.getProducerTimestamp()));
                this.stats.recordAdminMessageDelegateLatency(Math.max(0L, adminOperationWrapper.getDelegateTimestamp() - adminOperationWrapper.getLocalBrokerTimestamp()));
                String extractStoreName = extractStoreName(deserialize);
                this.storeAdminOperationsMapWithOffset.putIfAbsent(extractStoreName, new LinkedList());
                this.storeAdminOperationsMapWithOffset.get(extractStoreName).add(adminOperationWrapper);
            }
            return j;
        } catch (DuplicateDataException e) {
            if (this.consecutiveDuplicateMessageCount < 20) {
                this.consecutiveDuplicateMessageCount++;
                this.LOGGER.info(e.getMessage());
            } else if (this.consecutiveDuplicateMessageCount == 20) {
                this.LOGGER.info("It appears that controller is consuming from a low offset and encounters many admin messages that have already been processed. Will stop logging duplicate messages until a fresh admin message.");
            }
            return j;
        }
    }

    private void checkAndValidateMessage(AdminOperation adminOperation, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage) {
        long j = adminOperation.executionId;
        if (checkOffsetToSkipDIV(pubSubMessage.getOffset().longValue()) || this.lastDelegatedExecutionId == -1) {
            this.lastDelegatedExecutionId = j;
            updateProducerInfo(pubSubMessage.getValue().producerMetadata);
            return;
        }
        if (j == this.lastDelegatedExecutionId + 1) {
            this.lastDelegatedExecutionId++;
            updateProducerInfo(pubSubMessage.getValue().producerMetadata);
            return;
        }
        if (j <= this.lastDelegatedExecutionId) {
            updateProducerInfo(pubSubMessage.getValue().producerMetadata);
            throw new DuplicateDataException("Skipping message with execution id: " + j + " because last delegated execution id was: " + this.lastDelegatedExecutionId);
        }
        boolean z = true;
        String str = "Last delegated execution id was: " + this.lastDelegatedExecutionId + " ,but incoming execution id is: " + j;
        String str2 = " Previous producer info: " + (this.producerInfo == null ? "null" : this.producerInfo.toString()) + " Incoming message producer info: " + pubSubMessage.getValue().producerMetadata;
        if (this.producerInfo != null) {
            z = !this.producerInfo.isIncomingMessageValid(pubSubMessage.getValue().producerMetadata);
        }
        if (z) {
            throw new MissingDataException(this.producerInfo != null ? str + str2 : str + " Cannot cross-reference with previous producer info because it's not available yet");
        }
        this.LOGGER.info("Ignoring {} Cross-reference with producerInfo passed. {}", str, str2);
        updateProducerInfo(pubSubMessage.getValue().producerMetadata);
        this.lastDelegatedExecutionId = j;
    }

    private void updateProducerInfo(ProducerMetadata producerMetadata) {
        if (this.producerInfo != null) {
            this.producerInfo.updateProducerMetadata(producerMetadata);
        } else {
            this.producerInfo = new ProducerInfo(producerMetadata);
        }
    }

    private String extractStoreName(AdminOperation adminOperation) {
        String obj;
        switch (AdminMessageType.valueOf(adminOperation)) {
            case CREATE_STORAGE_PERSONA:
            case DELETE_STORAGE_PERSONA:
            case UPDATE_STORAGE_PERSONA:
                return STORAGE_PERSONA_MAP_KEY;
            case KILL_OFFLINE_PUSH_JOB:
                obj = Version.parseStoreFromKafkaTopicName(((KillOfflinePushJob) adminOperation.payloadUnion).kafkaTopic.toString());
                break;
            case CONFIGURE_NATIVE_REPLICATION_FOR_CLUSTER:
                throw new VeniceException("Operation " + AdminMessageType.CONFIGURE_NATIVE_REPLICATION_FOR_CLUSTER + " is a batch update that affects all existing store in cluster " + this.clusterName + ". Cannot extract a specific store name.");
            case CONFIGURE_ACTIVE_ACTIVE_REPLICATION_FOR_CLUSTER:
                throw new VeniceException("Operation " + AdminMessageType.CONFIGURE_ACTIVE_ACTIVE_REPLICATION_FOR_CLUSTER + " is a batch update that affects all existing store in cluster " + this.clusterName + ". Cannot extract a specific store name.");
            case CONFIGURE_INCREMENTAL_PUSH_FOR_CLUSTER:
                throw new VeniceException("Operation " + AdminMessageType.CONFIGURE_INCREMENTAL_PUSH_FOR_CLUSTER + " is a batch update that affects all existing store in cluster " + this.clusterName + ". Cannot extract a specific store name.");
            default:
                try {
                    obj = ((GenericRecord) adminOperation.payloadUnion).get("storeName").toString();
                    break;
                } catch (Exception e) {
                    throw new VeniceException("Failed to handle operation type: " + adminOperation.operationType + " because it does not contain a storeName field");
                }
        }
        return VeniceSystemStoreType.extractUserStoreName(obj);
    }

    private void updateLastOffset(long j) {
        if (j > this.lastOffset) {
            this.lastOffset = j;
        }
    }

    private void persistAdminTopicMetadata() {
        if (this.lastDelegatedExecutionId == this.lastPersistedExecutionId && this.lastOffset == this.lastPersistedOffset) {
            return;
        }
        this.adminTopicMetadataAccessor.updateMetadata(this.clusterName, this.remoteConsumptionEnabled ? AdminTopicMetadataAccessor.generateMetadataMap(this.localOffsetCheckpointAtStartTime, this.lastOffset, this.lastDelegatedExecutionId) : AdminTopicMetadataAccessor.generateMetadataMap(this.lastOffset, this.upstreamOffsetCheckpointAtStartTime, this.lastDelegatedExecutionId));
        this.lastPersistedOffset = this.lastOffset;
        this.lastPersistedExecutionId = this.lastDelegatedExecutionId;
        this.stats.setAdminConsumptionCheckpointOffset(this.lastPersistedOffset);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void skipMessageWithOffset(long j) {
        if (j != this.failingOffset) {
            throw new VeniceException("Cannot skip an offset that isn't the first one failing.  Last failed offset is: " + this.failingOffset);
        }
        this.offsetToSkip = j;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void skipMessageDIVWithOffset(long j) {
        if (j != this.failingOffset) {
            throw new VeniceException("Cannot skip an offset that isn't the first one failing.  Last failed offset is: " + this.failingOffset);
        }
        this.offsetToSkipDIV = j;
    }

    private void resetOffsetToSkip() {
        this.offsetToSkip = -1L;
    }

    private boolean checkOffsetToSkip(long j, boolean z) {
        boolean z2 = false;
        if (j == this.offsetToSkip) {
            this.LOGGER.warn("Skipping admin message with offset {} as instructed", Long.valueOf(j));
            if (z) {
                resetOffsetToSkip();
            }
            z2 = true;
        }
        return z2;
    }

    private boolean checkOffsetToSkipDIV(long j) {
        boolean z = false;
        if (j == this.offsetToSkipDIV) {
            this.LOGGER.warn("Skipping DIV for admin message with offset {} as instructed", Long.valueOf(j));
            this.offsetToSkipDIV = -1L;
            z = true;
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Long getLastSucceededExecutionId() {
        return Long.valueOf(this.lastPersistedExecutionId);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Long getLastSucceededExecutionId(String str) {
        if (this.lastSucceededExecutionIdMap != null) {
            return this.lastSucceededExecutionIdMap.get(str);
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Exception getLastExceptionForStore(String str) {
        if (this.problematicStores.containsKey(str)) {
            return this.problematicStores.get(str).exception;
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getFailingOffset() {
        return this.failingOffset;
    }

    private boolean shouldProcessRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage) {
        PubSubTopic pubSubTopic = pubSubMessage.getTopicPartition().getPubSubTopic();
        if (!this.pubSubTopic.equals(pubSubTopic)) {
            throw new VeniceException(this.consumerTaskId + " received message from different topic: " + pubSubTopic + ", expected: " + this.topic);
        }
        int partitionNumber = pubSubMessage.getTopicPartition().getPartitionNumber();
        if (0 != partitionNumber) {
            throw new VeniceException(this.consumerTaskId + " received message from different partition: " + partitionNumber + ", expected: 0");
        }
        long longValue = pubSubMessage.getOffset().longValue();
        if (this.lastOffset < longValue) {
            return true;
        }
        this.LOGGER.error("Current record has been processed, last known offset: {}, current offset: {}", Long.valueOf(this.lastOffset), Long.valueOf(longValue));
        return false;
    }

    private void recordConsumptionLag() {
        try {
            long partitionLatestOffsetAndRetry = this.sourceKafkaClusterTopicManager.getPartitionLatestOffsetAndRetry(new PubSubTopicPartitionImpl(this.pubSubTopic, 0), 10) - 1;
            if (this.lastConsumedOffset != -1) {
                this.stats.setAdminConsumptionOffsetLag(partitionLatestOffsetAndRetry - this.lastConsumedOffset);
            }
            this.stats.setMaxAdminConsumptionOffsetLag(partitionLatestOffsetAndRetry - this.lastPersistedOffset);
        } catch (Exception e) {
            this.LOGGER.error("Error when emitting admin consumption lag metrics; only log for warning; admin channel will continue to work.");
        }
    }

    private void resetConsumptionLag() {
        this.stats.setAdminConsumptionOffsetLag(0L);
        this.stats.setMaxAdminConsumptionOffsetLag(0L);
    }

    TopicManager getSourceKafkaClusterTopicManager() {
        return this.sourceKafkaClusterTopicManager;
    }
}
