package com.linkedin.davinci.kafka.consumer;

import com.linkedin.avroutil1.compatibility.shaded.org.apache.commons.lang3.Validate;
import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModel;
import com.linkedin.davinci.kafka.consumer.StoreIngestionTaskFactory;
import com.linkedin.davinci.listener.response.AdminResponse;
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.stats.AggVersionedDIVStats;
import com.linkedin.davinci.stats.AggVersionedIngestionStats;
import com.linkedin.davinci.stats.HostLevelIngestionStats;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.StoragePartitionConfig;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.davinci.validation.KafkaDataIntegrityValidator;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.exceptions.PersistenceFailureException;
import com.linkedin.venice.exceptions.UnsubscribedTopicPartitionException;
import com.linkedin.venice.exceptions.VeniceChecksumException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceInconsistentStoreMetadataException;
import com.linkedin.venice.exceptions.VeniceIngestionTaskKilledException;
import com.linkedin.venice.exceptions.VeniceMessageException;
import com.linkedin.venice.exceptions.VeniceTimeoutException;
import com.linkedin.venice.exceptions.validation.FatalDataValidationException;
import com.linkedin.venice.exceptions.validation.ImproperlyStartedSegmentException;
import com.linkedin.venice.exceptions.validation.UnsupportedMessageTypeException;
import com.linkedin.venice.guid.GuidUtils;
import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.kafka.TopicManagerRepository;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.Delete;
import com.linkedin.venice.kafka.protocol.EndOfIncrementalPush;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.StartOfIncrementalPush;
import com.linkedin.venice.kafka.protocol.StartOfPush;
import com.linkedin.venice.kafka.protocol.TopicSwitch;
import com.linkedin.venice.kafka.protocol.Update;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.kafka.protocol.state.ProducerPartitionState;
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
import com.linkedin.venice.kafka.validation.ProducerTracker;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.PartitionerConfig;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.partitioner.VenicePartitioner;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.serializer.AvroGenericDeserializer;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.stats.StatsErrorCode;
import com.linkedin.venice.system.store.MetaStoreWriter;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.ComplementSet;
import com.linkedin.venice.utils.DiskUsage;
import com.linkedin.venice.utils.ExceptionUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.RedundantExceptionFilter;
import com.linkedin.venice.utils.SparseConcurrentList;
import com.linkedin.venice.utils.Timer;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.lazy.Lazy;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.ints.IntListIterator;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.avro.Schema;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/StoreIngestionTask.class */
public abstract class StoreIngestionTask implements Runnable, Closeable {
    private static final int MAX_CONSUMER_ACTION_ATTEMPTS = 5;
    private static final int MAX_IDLE_COUNTER = 100;
    private static final int CONSUMER_ACTION_QUEUE_INIT_CAPACITY = 11;
    protected static final long KILL_WAIT_TIME_MS = 5000;
    private static final int MAX_KILL_CHECKING_ATTEMPTS = 10;
    private static final int SLOPPY_OFFSET_CATCHUP_THRESHOLD = 100;
    protected final StorageEngineRepository storageEngineRepository;
    protected final AbstractStorageEngine storageEngine;
    protected final String kafkaVersionTopic;
    protected final PubSubTopic versionTopic;
    protected final PubSubTopic realTimeTopic;
    protected final String storeName;
    private final boolean isUserSystemStore;
    protected final int versionNumber;
    protected final ReadOnlySchemaRepository schemaRepository;
    protected final ReadOnlyStoreRepository storeRepository;
    protected final String consumerTaskId;
    protected final Properties kafkaProps;
    protected final StorageMetadataService storageMetadataService;
    protected final TopicManagerRepository topicManagerRepository;
    protected final CachedKafkaMetadataGetter cachedKafkaMetadataGetter;
    protected final AbstractStoreBufferService storeBufferService;
    private final KafkaDataIntegrityValidator kafkaDataIntegrityValidator;
    protected final HostLevelIngestionStats hostLevelIngestionStats;
    protected final AggVersionedDIVStats versionedDIVStats;
    protected final AggVersionedIngestionStats versionedIngestionStats;
    protected final BooleanSupplier isCurrentVersion;
    protected final Optional<HybridStoreConfig> hybridStoreConfig;
    protected final ProducerTracker.DIVErrorMetricCallback divErrorMetricCallback;
    protected final long readCycleDelayMs;
    protected final long emptyPollSleepMs;
    protected final DiskUsage diskUsage;
    protected final long databaseSyncBytesIntervalForTransactionalMode;
    protected final long databaseSyncBytesIntervalForDeferredWriteMode;
    protected final VeniceServerConfig serverConfig;
    protected final int errorPartitionId;
    protected final ReadyToServeCheck defaultReadyToServeChecker;
    private final StorageUtilizationManager storageUtilizationManager;
    protected final AggKafkaConsumerService aggKafkaConsumerService;
    private final long startReportingReadyToServeTimestamp;
    private final InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer;
    protected final boolean isWriteComputationEnabled;
    private final boolean suppressLiveUpdates;
    private final boolean isActiveActiveReplicationEnabled;
    protected final int subPartitionCount;
    protected final int amplificationFactor;
    protected final VenicePartitioner venicePartitioner;
    protected final int storeVersionPartitionCount;
    protected final long bootstrapTimeoutInMs;
    protected final boolean isIsolatedIngestion;
    protected final AmplificationFactorAdapter amplificationFactorAdapter;
    protected final StatusReportAdapter statusReportAdapter;
    private final Optional<ObjectCacheBackend> cacheBackend;
    protected final String localKafkaServer;
    protected final int localKafkaClusterId;
    protected final Set<String> localKafkaServerSingletonSet;
    protected final boolean isDaVinciClient;
    private final boolean offsetLagDeltaRelaxEnabled;
    private final boolean ingestionCheckpointDuringGracefulShutdownEnabled;
    protected boolean isDataRecovery;
    protected final MetaStoreWriter metaStoreWriter;
    protected final Function<String, String> kafkaClusterUrlResolver;
    private final Object2IntMap<String> kafkaClusterUrlToIdMap;
    protected final boolean readOnlyForBatchOnlyStoreEnabled;
    protected final CompressionStrategy compressionStrategy;
    protected final StorageEngineBackedCompressorFactory compressorFactory;
    protected final Lazy<VeniceCompressor> compressor;
    protected final boolean isChunked;
    protected final PubSubTopicRepository pubSubTopicRepository;
    private final String[] msgForLagMeasurement;
    private static final Logger LOGGER = LogManager.getLogger(StoreIngestionTask.class);
    private static final String CONSUMER_TASK_ID_FORMAT = StoreIngestionTask.class.getSimpleName() + " for [ Topic: %s ]";
    public static long SCHEMA_POLLING_DELAY_MS = TimeUnit.SECONDS.toMillis(5);
    private static final long SCHEMA_POLLING_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(5);
    protected static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER = RedundantExceptionFilter.getRedundantExceptionFilter();
    protected final AtomicInteger consumerActionSequenceNumber = new AtomicInteger(0);
    private final List<PartitionExceptionInfo> partitionIngestionExceptionList = new SparseConcurrentList();
    private Exception lastConsumerException = null;
    private final AtomicReference<Exception> lastStoreIngestionException = new AtomicReference<>();
    protected final SparseConcurrentList<Object> availableSchemaIds = new SparseConcurrentList<>();
    protected final SparseConcurrentList<Object> deserializedSchemaIds = new SparseConcurrentList<>();
    protected int idleCounter = 0;
    protected int writeComputeFailureCode = 0;
    private boolean purgeTransientRecordBuffer = true;
    private int subscribedCount = 0;
    private int forceUnSubscribedCount = 0;
    private int valueSchemaId = -1;
    protected final PriorityBlockingQueue<ConsumerAction> consumerActionsQueue = new PriorityBlockingQueue<>(CONSUMER_ACTION_QUEUE_INIT_CAPACITY);
    protected final ConcurrentMap<Integer, PartitionConsumptionState> partitionConsumptionStateMap = new VeniceConcurrentHashMap();
    protected final AtomicBoolean isRunning = new AtomicBoolean(true);
    protected final AtomicBoolean emitMetrics = new AtomicBoolean(true);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.linkedin.davinci.kafka.consumer.StoreIngestionTask$1, reason: invalid class name */
    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/StoreIngestionTask$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$davinci$kafka$consumer$StoreIngestionTask$DelegateConsumerRecordResult;
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$davinci$kafka$consumer$ConsumerActionType;
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$venice$kafka$protocol$enums$ControlMessageType;
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$venice$kafka$protocol$enums$MessageType = new int[MessageType.values().length];

        static {
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$MessageType[MessageType.PUT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$MessageType[MessageType.DELETE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$MessageType[MessageType.UPDATE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$com$linkedin$venice$kafka$protocol$enums$ControlMessageType = new int[ControlMessageType.values().length];
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$ControlMessageType[ControlMessageType.START_OF_PUSH.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$ControlMessageType[ControlMessageType.END_OF_PUSH.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$ControlMessageType[ControlMessageType.START_OF_SEGMENT.ordinal()] = 3;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$ControlMessageType[ControlMessageType.END_OF_SEGMENT.ordinal()] = 4;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$ControlMessageType[ControlMessageType.START_OF_INCREMENTAL_PUSH.ordinal()] = StoreIngestionTask.MAX_CONSUMER_ACTION_ATTEMPTS;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$ControlMessageType[ControlMessageType.END_OF_INCREMENTAL_PUSH.ordinal()] = 6;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$ControlMessageType[ControlMessageType.TOPIC_SWITCH.ordinal()] = 7;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$ControlMessageType[ControlMessageType.VERSION_SWAP.ordinal()] = 8;
            } catch (NoSuchFieldError e11) {
            }
            $SwitchMap$com$linkedin$davinci$kafka$consumer$ConsumerActionType = new int[ConsumerActionType.values().length];
            try {
                $SwitchMap$com$linkedin$davinci$kafka$consumer$ConsumerActionType[ConsumerActionType.SUBSCRIBE.ordinal()] = 1;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$com$linkedin$davinci$kafka$consumer$ConsumerActionType[ConsumerActionType.UNSUBSCRIBE.ordinal()] = 2;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$com$linkedin$davinci$kafka$consumer$ConsumerActionType[ConsumerActionType.RESET_OFFSET.ordinal()] = 3;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$com$linkedin$davinci$kafka$consumer$ConsumerActionType[ConsumerActionType.KILL.ordinal()] = 4;
            } catch (NoSuchFieldError e15) {
            }
            $SwitchMap$com$linkedin$davinci$kafka$consumer$StoreIngestionTask$DelegateConsumerRecordResult = new int[DelegateConsumerRecordResult.values().length];
            try {
                $SwitchMap$com$linkedin$davinci$kafka$consumer$StoreIngestionTask$DelegateConsumerRecordResult[DelegateConsumerRecordResult.QUEUED_TO_DRAINER.ordinal()] = 1;
            } catch (NoSuchFieldError e16) {
            }
            try {
                $SwitchMap$com$linkedin$davinci$kafka$consumer$StoreIngestionTask$DelegateConsumerRecordResult[DelegateConsumerRecordResult.PRODUCED_TO_KAFKA.ordinal()] = 2;
            } catch (NoSuchFieldError e17) {
            }
            try {
                $SwitchMap$com$linkedin$davinci$kafka$consumer$StoreIngestionTask$DelegateConsumerRecordResult[DelegateConsumerRecordResult.SKIPPED_MESSAGE.ordinal()] = 3;
            } catch (NoSuchFieldError e18) {
            }
            try {
                $SwitchMap$com$linkedin$davinci$kafka$consumer$StoreIngestionTask$DelegateConsumerRecordResult[DelegateConsumerRecordResult.DUPLICATE_MESSAGE.ordinal()] = 4;
            } catch (NoSuchFieldError e19) {
            }
        }
    }

    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/StoreIngestionTask$DelegateConsumerRecordResult.class */
    protected enum DelegateConsumerRecordResult {
        PRODUCED_TO_KAFKA,
        QUEUED_TO_DRAINER,
        DUPLICATE_MESSAGE,
        SKIPPED_MESSAGE
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/StoreIngestionTask$ReadyToServeCheck.class */
    public interface ReadyToServeCheck {
        default void apply(PartitionConsumptionState partitionConsumptionState) {
            apply(partitionConsumptionState, false);
        }

        void apply(PartitionConsumptionState partitionConsumptionState, boolean z);
    }

    public StoreIngestionTask(StoreIngestionTaskFactory.Builder builder, Store store, Version version, Properties properties, BooleanSupplier booleanSupplier, VeniceStoreVersionConfig veniceStoreVersionConfig, int i, boolean z, Optional<ObjectCacheBackend> optional, Queue<VeniceNotifier> queue) {
        long millis;
        this.readCycleDelayMs = veniceStoreVersionConfig.getKafkaReadCycleDelayMs();
        this.emptyPollSleepMs = veniceStoreVersionConfig.getKafkaEmptyPollSleepMs();
        this.databaseSyncBytesIntervalForTransactionalMode = veniceStoreVersionConfig.getDatabaseSyncBytesIntervalForTransactionalMode();
        this.databaseSyncBytesIntervalForDeferredWriteMode = veniceStoreVersionConfig.getDatabaseSyncBytesIntervalForDeferredWriteMode();
        this.kafkaProps = properties;
        this.storageEngineRepository = builder.getStorageEngineRepository();
        this.storageMetadataService = builder.getStorageMetadataService();
        this.storeRepository = builder.getMetadataRepo();
        this.schemaRepository = builder.getSchemaRepo();
        this.kafkaVersionTopic = veniceStoreVersionConfig.getStoreVersionName();
        this.pubSubTopicRepository = builder.getPubSubTopicRepository();
        this.versionTopic = this.pubSubTopicRepository.getTopic(this.kafkaVersionTopic);
        this.storeName = this.versionTopic.getStoreName();
        this.isUserSystemStore = VeniceSystemStoreUtils.isUserSystemStore(this.storeName);
        this.realTimeTopic = this.pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(this.storeName));
        this.versionNumber = Version.parseVersionFromKafkaTopicName(this.kafkaVersionTopic);
        this.kafkaDataIntegrityValidator = new KafkaDataIntegrityValidator(this.kafkaVersionTopic);
        this.consumerTaskId = String.format(CONSUMER_TASK_ID_FORMAT, this.kafkaVersionTopic);
        this.topicManagerRepository = builder.getTopicManagerRepository();
        this.cachedKafkaMetadataGetter = new CachedKafkaMetadataGetter(veniceStoreVersionConfig.getTopicOffsetCheckIntervalMs());
        this.hostLevelIngestionStats = (HostLevelIngestionStats) builder.getIngestionStats().getStoreStats(this.storeName);
        this.versionedDIVStats = builder.getVersionedDIVStats();
        this.versionedIngestionStats = builder.getVersionedStorageIngestionStats();
        this.readOnlyForBatchOnlyStoreEnabled = veniceStoreVersionConfig.isReadOnlyForBatchOnlyStoreEnabled();
        this.storeBufferService = builder.getStoreBufferService();
        this.isCurrentVersion = booleanSupplier;
        this.hybridStoreConfig = Optional.ofNullable(version.isUseVersionLevelHybridConfig() ? version.getHybridStoreConfig() : store.getHybridStoreConfig());
        this.divErrorMetricCallback = dataValidationException -> {
            this.versionedDIVStats.recordException(this.storeName, this.versionNumber, dataValidationException);
        };
        this.diskUsage = builder.getDiskUsage();
        this.storageEngine = (AbstractStorageEngine) Validate.notNull(this.storageEngineRepository.getLocalStorageEngine(this.kafkaVersionTopic));
        this.serverConfig = builder.getServerConfig();
        this.defaultReadyToServeChecker = getDefaultReadyToServeChecker();
        this.aggKafkaConsumerService = (AggKafkaConsumerService) Validate.notNull(builder.getAggKafkaConsumerService());
        this.errorPartitionId = i;
        this.startReportingReadyToServeTimestamp = builder.getStartReportingReadyToServeTimestamp();
        this.isWriteComputationEnabled = store.isWriteComputationEnabled();
        this.partitionStateSerializer = builder.getPartitionStateSerializer();
        this.suppressLiveUpdates = this.serverConfig.freezeIngestionIfReadyToServeOrLocalDataExists();
        this.storeVersionPartitionCount = version.getPartitionCount();
        try {
            millis = TimeUnit.HOURS.toMillis(store.getBootstrapToOnlineTimeoutInHours());
        } catch (Exception e) {
            LOGGER.warn("Error when getting bootstrap to online timeout config for store {}. Will use default timeout threshold which is 24 hours", this.storeName, e);
            millis = TimeUnit.HOURS.toMillis(24L);
        }
        this.bootstrapTimeoutInMs = millis;
        this.isIsolatedIngestion = z;
        PartitionerConfig partitionerConfig = version.getPartitionerConfig();
        if (partitionerConfig == null) {
            this.venicePartitioner = new DefaultVenicePartitioner();
            this.amplificationFactor = 1;
        } else {
            this.venicePartitioner = PartitionUtils.getVenicePartitioner(partitionerConfig);
            this.amplificationFactor = partitionerConfig.getAmplificationFactor();
        }
        this.subPartitionCount = this.storeVersionPartitionCount * this.amplificationFactor;
        this.amplificationFactorAdapter = new AmplificationFactorAdapter(this.amplificationFactor, this.partitionConsumptionStateMap);
        this.statusReportAdapter = new StatusReportAdapter(new IngestionNotificationDispatcher(queue, this.kafkaVersionTopic, booleanSupplier), this.amplificationFactorAdapter);
        this.cacheBackend = optional;
        this.localKafkaServer = this.kafkaProps.getProperty("kafka.bootstrap.servers");
        this.localKafkaServerSingletonSet = Collections.singleton(this.localKafkaServer);
        this.isDaVinciClient = builder.isDaVinciClient();
        this.isActiveActiveReplicationEnabled = version.isActiveActiveReplicationEnabled();
        this.offsetLagDeltaRelaxEnabled = this.serverConfig.getOffsetLagDeltaRelaxFactorForFastOnlineTransitionInRestart() > 0;
        this.ingestionCheckpointDuringGracefulShutdownEnabled = this.serverConfig.isServerIngestionCheckpointDuringGracefulShutdownEnabled();
        this.metaStoreWriter = builder.getMetaStoreWriter();
        this.storageUtilizationManager = new StorageUtilizationManager(this.storageEngine, store, this.kafkaVersionTopic, this.subPartitionCount, Collections.unmodifiableMap(this.partitionConsumptionStateMap), this.serverConfig.isHybridQuotaEnabled(), this.serverConfig.isServerCalculateQuotaUsageBasedOnPartitionsAssignmentEnabled(), this.statusReportAdapter, this::pauseConsumption, this::resumeConsumption);
        this.storeRepository.registerStoreDataChangedListener(this.storageUtilizationManager);
        this.kafkaClusterUrlResolver = this.serverConfig.getKafkaClusterUrlResolver();
        this.kafkaClusterUrlToIdMap = this.serverConfig.getKafkaClusterUrlToIdMap();
        this.localKafkaClusterId = this.kafkaClusterUrlToIdMap.getOrDefault(this.localKafkaServer, Integer.MIN_VALUE);
        this.compressionStrategy = version.getCompressionStrategy();
        this.compressorFactory = builder.getCompressorFactory();
        this.compressor = Lazy.of(() -> {
            return this.compressorFactory.getCompressor(this.compressionStrategy, this.kafkaVersionTopic);
        });
        this.isChunked = version.isChunkingEnabled();
        this.msgForLagMeasurement = new String[this.subPartitionCount];
        for (int i2 = 0; i2 < this.msgForLagMeasurement.length; i2++) {
            this.msgForLagMeasurement[i2] = this.kafkaVersionTopic + "_" + i2;
        }
    }

    void setPurgeTransientRecordBuffer(boolean z) {
        this.purgeTransientRecordBuffer = z;
    }

    public AbstractStorageEngine getStorageEngine() {
        return this.storageEngine;
    }

    public boolean isFutureVersion() {
        return this.versionedIngestionStats.isFutureVersion(this.storeName, this.versionNumber);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void throwIfNotRunning() {
        if (!isRunning()) {
            throw new VeniceException(" Topic " + this.kafkaVersionTopic + " is shutting down, no more messages accepted");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int nextSeqNum() {
        return this.consumerActionSequenceNumber.incrementAndGet();
    }

    public synchronized void subscribePartition(PubSubTopicPartition pubSubTopicPartition, Optional<LeaderFollowerStateType> optional) {
        throwIfNotRunning();
        this.statusReportAdapter.initializePartitionReportStatus(pubSubTopicPartition.getPartitionNumber());
        this.amplificationFactorAdapter.execute(pubSubTopicPartition.getPartitionNumber(), num -> {
            this.consumerActionsQueue.add(new ConsumerAction(ConsumerActionType.SUBSCRIBE, (PubSubTopicPartition) new PubSubTopicPartitionImpl(pubSubTopicPartition.getPubSubTopic(), num.intValue()), nextSeqNum(), (Optional<LeaderFollowerStateType>) (this.amplificationFactorAdapter.isLeaderSubPartition(num.intValue()) ? optional : Optional.empty())));
        });
    }

    public synchronized void unSubscribePartition(PubSubTopicPartition pubSubTopicPartition) {
        throwIfNotRunning();
        this.amplificationFactorAdapter.execute(pubSubTopicPartition.getPartitionNumber(), num -> {
            this.consumerActionsQueue.add(new ConsumerAction(ConsumerActionType.UNSUBSCRIBE, new PubSubTopicPartitionImpl(pubSubTopicPartition.getPubSubTopic(), num.intValue()), nextSeqNum()));
        });
    }

    public boolean hasAnySubscription() {
        return !this.partitionConsumptionStateMap.isEmpty();
    }

    public synchronized void resetPartitionConsumptionOffset(PubSubTopicPartition pubSubTopicPartition) {
        throwIfNotRunning();
        this.amplificationFactorAdapter.execute(pubSubTopicPartition.getPartitionNumber(), num -> {
            this.consumerActionsQueue.add(new ConsumerAction(ConsumerActionType.RESET_OFFSET, new PubSubTopicPartitionImpl(pubSubTopicPartition.getPubSubTopic(), num.intValue()), nextSeqNum()));
        });
    }

    public String getStoreName() {
        return this.storeName;
    }

    public boolean isUserSystemStore() {
        return this.isUserSystemStore;
    }

    public abstract void promoteToLeader(PubSubTopicPartition pubSubTopicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker leaderSessionIdChecker);

    public abstract void demoteToStandby(PubSubTopicPartition pubSubTopicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker leaderSessionIdChecker);

    public void kill() {
        synchronized (this) {
            throwIfNotRunning();
            this.consumerActionsQueue.add(ConsumerAction.createKillAction(this.versionTopic, nextSeqNum()));
        }
        try {
            Timer run = Timer.run(d -> {
                LOGGER.info("Completed waiting for kill action to take effect. Total elapsed time: {} ms", d);
            });
            for (int i = 0; isRunning() && i < MAX_KILL_CHECKING_ATTEMPTS; i++) {
                try {
                    TimeUnit.MILLISECONDS.sleep(500L);
                } finally {
                }
            }
            if (run != null) {
                run.close();
            }
        } catch (InterruptedException e) {
            LOGGER.warn("StoreIngestionTask::kill was interrupted.", e);
        }
        synchronized (this) {
            if (isRunning()) {
                this.statusReportAdapter.reportError(this.partitionConsumptionStateMap.values(), "Received the signal to kill this consumer. Topic " + this.kafkaVersionTopic, (Exception) new VeniceException("Kill the consumer"));
                close();
            }
        }
    }

    private void beginBatchWrite(int i, boolean z, PartitionConsumptionState partitionConsumptionState) {
        Map<String, String> databaseInfo = partitionConsumptionState.getOffsetRecord().getDatabaseInfo();
        StoragePartitionConfig storagePartitionConfig = getStoragePartitionConfig(i, z, partitionConsumptionState);
        partitionConsumptionState.setDeferredWrite(storagePartitionConfig.isDeferredWrite());
        Optional<Supplier<byte[]>> empty = Optional.empty();
        if (this.serverConfig.isDatabaseChecksumVerificationEnabled() && partitionConsumptionState.isDeferredWrite() && !this.serverConfig.getRocksDBServerConfig().isRocksDBPlainTableFormatEnabled()) {
            partitionConsumptionState.initializeExpectedChecksum();
            empty = Optional.of(() -> {
                byte[] expectedChecksum = partitionConsumptionState.getExpectedChecksum();
                partitionConsumptionState.resetExpectedChecksum();
                return expectedChecksum;
            });
        }
        this.storageEngine.beginBatchWrite(storagePartitionConfig, databaseInfo, empty);
        if (!this.cacheBackend.isPresent() || this.cacheBackend.get().getStorageEngine(this.kafkaVersionTopic) == null) {
            return;
        }
        this.cacheBackend.get().getStorageEngine(this.kafkaVersionTopic).beginBatchWrite(storagePartitionConfig, databaseInfo, empty);
    }

    private StoragePartitionConfig getStoragePartitionConfig(int i, boolean z, PartitionConsumptionState partitionConsumptionState) {
        boolean z2;
        StoragePartitionConfig storagePartitionConfig = new StoragePartitionConfig(this.kafkaVersionTopic, i);
        boolean z3 = false;
        if (partitionConsumptionState.isEndOfPushReceived()) {
            z2 = false;
            if (partitionConsumptionState.isBatchOnly() && this.readOnlyForBatchOnlyStoreEnabled) {
                z3 = true;
            }
        } else {
            z2 = z;
        }
        storagePartitionConfig.setDeferredWrite(z2);
        storagePartitionConfig.setReadOnly(z3);
        return storagePartitionConfig;
    }

    protected boolean isReadyToServe(PartitionConsumptionState partitionConsumptionState) {
        String str;
        if (!partitionConsumptionState.isEndOfPushReceived()) {
            return false;
        }
        if (partitionConsumptionState.isComplete() || !partitionConsumptionState.isWaitingForReplicationLag()) {
            return true;
        }
        if (partitionConsumptionState.isHybrid() && !isRealTimeBufferReplayStarted(partitionConsumptionState)) {
            return false;
        }
        int partition = partitionConsumptionState.getPartition();
        boolean z = false;
        if (this.hybridStoreConfig.isPresent()) {
            try {
                long offsetLagThresholdToGoOnline = this.hybridStoreConfig.get().getOffsetLagThresholdToGoOnline();
                long producerTimestampLagThresholdToGoOnlineInSeconds = this.hybridStoreConfig.get().getProducerTimestampLagThresholdToGoOnlineInSeconds();
                String str2 = this.msgForLagMeasurement[partition];
                boolean z2 = !REDUNDANT_LOGGING_FILTER.isRedundantException(str2);
                if (offsetLagThresholdToGoOnline >= 0) {
                    long measureHybridOffsetLag = measureHybridOffsetLag(partitionConsumptionState, z2);
                    boolean z3 = measureHybridOffsetLag > offsetLagThresholdToGoOnline;
                    z = !z3;
                    if (z2) {
                        LOGGER.info("{} [Offset lag] partition {} is {}lagging. Lag: [{}] {} Threshold [{}]", this.consumerTaskId, Integer.valueOf(partition), z3 ? "" : "not ", Long.valueOf(measureHybridOffsetLag), z3 ? ">" : "<", Long.valueOf(offsetLagThresholdToGoOnline));
                    }
                }
                if (producerTimestampLagThresholdToGoOnlineInSeconds > 0) {
                    long millis = TimeUnit.SECONDS.toMillis(producerTimestampLagThresholdToGoOnlineInSeconds);
                    long latestProducerProcessingTimeInMs = partitionConsumptionState.getOffsetRecord().getLatestProducerProcessingTimeInMs();
                    if (this.amplificationFactor != 1) {
                        latestProducerProcessingTimeInMs = getLatestConsumedProducerTimestampWithSubPartition(latestProducerProcessingTimeInMs, partitionConsumptionState);
                    }
                    long elapsedTimeInMs = LatencyUtils.getElapsedTimeInMs(latestProducerProcessingTimeInMs);
                    boolean z4 = elapsedTimeInMs < millis;
                    if (z2) {
                        LOGGER.info("{} [Time lag] partition {} is {}lagging. The latest producer timestamp is {}. Timestamp Lag: [{}] {} Threshold [{}]", this.consumerTaskId, Integer.valueOf(partition), !z4 ? "" : "not ", Long.valueOf(latestProducerProcessingTimeInMs), Long.valueOf(elapsedTimeInMs), z4 ? "<" : ">", Long.valueOf(millis));
                    }
                    if (!z4) {
                        String str3 = str2 + "_ignore_time_lag";
                        Set<String> realTimeDataSourceKafkaAddress = getRealTimeDataSourceKafkaAddress(partitionConsumptionState);
                        if (realTimeDataSourceKafkaAddress.isEmpty()) {
                            throw new VeniceException("Expect a real-time topic Kafka URL for store " + this.storeName);
                        }
                        if (realTimeDataSourceKafkaAddress.size() == 1) {
                            str = realTimeDataSourceKafkaAddress.iterator().next();
                        } else {
                            if (!realTimeDataSourceKafkaAddress.contains(this.localKafkaServer)) {
                                throw new VeniceException(String.format("Expect source RT Kafka URLs contains local Kafka URL. Got local Kafka URL %s and RT source Kafka URLs %s", this.localKafkaServer, realTimeDataSourceKafkaAddress));
                            }
                            str = this.localKafkaServer;
                        }
                        PubSubTopic topic = this.pubSubTopicRepository.getTopic(this.realTimeTopic.getName());
                        PubSubTopicPartition pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(topic, partition);
                        String str4 = this.isDaVinciClient ? this.localKafkaServer : str;
                        if (this.cachedKafkaMetadataGetter.containsTopic(getTopicManager(str4), this.realTimeTopic)) {
                            long producerTimestampOfLastDataMessage = this.cachedKafkaMetadataGetter.getProducerTimestampOfLastDataMessage(getTopicManager(str4), pubSubTopicPartitionImpl);
                            if (producerTimestampOfLastDataMessage < 0 || producerTimestampOfLastDataMessage <= latestProducerProcessingTimeInMs) {
                                z4 = true;
                                if (!REDUNDANT_LOGGING_FILTER.isRedundantException(str3)) {
                                    if (producerTimestampOfLastDataMessage < 0) {
                                        LOGGER.info("{} [Time lag] Topic {} is empty or all messages have been truncated; ignoring time lag.", this.consumerTaskId, topic);
                                    } else {
                                        LOGGER.info("{} [Time lag] Producer timestamp of last message in topic {} partition {}: {}, which is smaller or equal than the known latest producer time: {}. Consumption lag is caught up already.", this.consumerTaskId, topic, Integer.valueOf(partition), Long.valueOf(producerTimestampOfLastDataMessage), Long.valueOf(latestProducerProcessingTimeInMs));
                                    }
                                }
                            }
                        } else {
                            z4 = true;
                            if (!REDUNDANT_LOGGING_FILTER.isRedundantException(str3)) {
                                LOGGER.info("{} [Time lag] Topic {} doesn't exist; ignoring time lag.", this.consumerTaskId, topic);
                            }
                        }
                    }
                    z = offsetLagThresholdToGoOnline >= 0 ? z & z4 : z4;
                }
            } catch (Exception e) {
                if (!REDUNDANT_LOGGING_FILTER.isRedundantException(this.kafkaVersionTopic + "_isReadyToServe")) {
                    LOGGER.info("Exception when trying to determine if hybrid store is ready to serve: {}", this.storeName, e);
                }
                z = false;
            }
        } else {
            z = this.cachedKafkaMetadataGetter.getOffset(getTopicManager(this.localKafkaServer), this.versionTopic, partition) <= partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset() + 100;
        }
        if (z) {
            this.amplificationFactorAdapter.executePartitionConsumptionState(partitionConsumptionState.getUserPartition(), (v0) -> {
                v0.lagHasCaughtUp();
            });
        }
        return z;
    }

    public boolean isReadyToServeAnnouncedWithRTLag() {
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getLatestConsumedProducerTimestampWithSubPartition(long j, PartitionConsumptionState partitionConsumptionState) {
        long j2 = j;
        IntList subPartitions = PartitionUtils.getSubPartitions(partitionConsumptionState.getUserPartition(), this.amplificationFactor);
        for (int i = 0; i < subPartitions.size(); i++) {
            PartitionConsumptionState partitionConsumptionState2 = this.partitionConsumptionStateMap.get(Integer.valueOf(subPartitions.getInt(i)));
            if (partitionConsumptionState2 != null && partitionConsumptionState2.isEndOfPushReceived()) {
                j2 = Math.max(j2, partitionConsumptionState2.getOffsetRecord().getLatestProducerProcessingTimeInMs());
            }
        }
        return j2;
    }

    protected abstract boolean isRealTimeBufferReplayStarted(PartitionConsumptionState partitionConsumptionState);

    protected abstract long measureHybridOffsetLag(PartitionConsumptionState partitionConsumptionState, boolean z);

    protected abstract void reportIfCatchUpVersionTopicOffset(PartitionConsumptionState partitionConsumptionState);

    /* JADX INFO: Access modifiers changed from: protected */
    public void produceToStoreBufferService(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, LeaderProducedRecordContext leaderProducedRecordContext, int i, String str, long j, long j2) throws InterruptedException {
        boolean z = this.emitMetrics.get();
        long nanoTime = z ? System.nanoTime() : 0L;
        this.storeBufferService.putConsumerRecord(pubSubMessage, this, leaderProducedRecordContext, i, str, j);
        if (z) {
            this.hostLevelIngestionStats.recordConsumerRecordsQueuePutLatency(LatencyUtils.getLatencyInMS(nanoTime), j2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Failed to find 'out' block for switch in B:13:0x00f7. Please report as an issue. */
    public void produceToStoreBufferServiceOrKafka(Iterable<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> iterable, PubSubTopicPartition pubSubTopicPartition, String str, int i) throws InterruptedException {
        long j = 0;
        double d = 0.0d;
        int subPartition = PartitionUtils.getSubPartition(pubSubTopicPartition, this.amplificationFactor);
        boolean z = this.emitMetrics.get();
        long currentTimeMillis = System.currentTimeMillis();
        for (PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage : iterable) {
            long nanoTime = System.nanoTime();
            if (shouldProcessRecord(pubSubMessage, subPartition)) {
                if (((KafkaKey) pubSubMessage.getKey()).isControlMessage()) {
                    ControlMessage controlMessage = (ControlMessage) ((KafkaMessageEnvelope) pubSubMessage.getValue()).payloadUnion;
                    if (ControlMessageType.valueOf(controlMessage.controlMessageType) == ControlMessageType.START_OF_PUSH) {
                        processStartOfPush((KafkaMessageEnvelope) pubSubMessage.getValue(), controlMessage, pubSubMessage.getTopicPartition().getPartitionNumber(), this.partitionConsumptionStateMap.get(Integer.valueOf(subPartition)));
                    }
                }
                waitReadyToProcessRecord(pubSubMessage);
                switch (AnonymousClass1.$SwitchMap$com$linkedin$davinci$kafka$consumer$StoreIngestionTask$DelegateConsumerRecordResult[delegateConsumerRecord(pubSubMessage, subPartition, str, i, nanoTime, currentTimeMillis).ordinal()]) {
                    case 1:
                        long nanoTime2 = z ? System.nanoTime() : 0L;
                        this.storeBufferService.putConsumerRecord(pubSubMessage, this, null, subPartition, str, nanoTime);
                        if (z) {
                            d += LatencyUtils.getLatencyInMS(nanoTime2);
                        }
                    case 2:
                    case VeniceServerConfig.MINIMUM_CONSUMER_NUM_IN_CONSUMER_POOL_PER_KAFKA_CLUSTER /* 3 */:
                    case ValueRecord.SCHEMA_HEADER_LENGTH /* 4 */:
                        j += pubSubMessage.getPayloadSize();
                        PartitionConsumptionState partitionConsumptionState = this.partitionConsumptionStateMap.get(Integer.valueOf(subPartition));
                        if (partitionConsumptionState != null) {
                            partitionConsumptionState.setLatestMessageConsumptionTimestampInMs(currentTimeMillis);
                            break;
                        } else {
                            break;
                        }
                    default:
                        throw new VeniceException(this.consumerTaskId + " received unknown DelegateConsumerRecordResult enum for " + pubSubMessage.getTopicPartition());
                }
            } else {
                PartitionConsumptionState partitionConsumptionState2 = this.partitionConsumptionStateMap.get(Integer.valueOf(subPartition));
                if (partitionConsumptionState2 != null) {
                    partitionConsumptionState2.updateLatestIgnoredUpstreamRTOffset(str, ((Long) pubSubMessage.getOffset()).longValue());
                }
            }
        }
        this.storageUtilizationManager.enforcePartitionQuota(pubSubTopicPartition.getPartitionNumber(), j);
        if (z) {
            if (j > 0) {
                this.hostLevelIngestionStats.recordTotalBytesReadFromKafkaAsUncompressedSize(j);
            }
            if (d > 0.0d) {
                this.hostLevelIngestionStats.recordConsumerRecordsQueuePutLatency(d, currentTimeMillis);
            }
            this.hostLevelIngestionStats.recordStorageQuotaUsed(this.storageUtilizationManager.getDiskQuotaUsage(), currentTimeMillis);
        }
    }

    List<PartitionExceptionInfo> getPartitionIngestionExceptionList() {
        return this.partitionIngestionExceptionList;
    }

    private void processIngestionException() {
        this.partitionIngestionExceptionList.forEach(partitionExceptionInfo -> {
            int partitionId = partitionExceptionInfo.getPartitionId();
            Exception exception = partitionExceptionInfo.getException();
            PartitionConsumptionState partitionConsumptionState = this.partitionConsumptionStateMap.get(Integer.valueOf(partitionId));
            if (partitionConsumptionState == null || !partitionConsumptionState.isSubscribed()) {
                LOGGER.warn("Ignoring exception for partition {} for store version {} since this partition has been unsubscribed already.", Integer.valueOf(partitionId), this.kafkaVersionTopic, exception);
                this.partitionIngestionExceptionList.set(partitionId, null);
                return;
            }
            if (partitionConsumptionState.isCompletionReported()) {
                LOGGER.error("Ignoring exception for partition {} for store version {} since this partition is already online. Please engage Venice DEV team immediately.", Integer.valueOf(partitionId), this.kafkaVersionTopic, exception);
            } else {
                reportError(exception.getMessage(), partitionId, exception);
            }
            if (this.partitionConsumptionStateMap.containsKey(Integer.valueOf(partitionId))) {
                unSubscribePartition(new PubSubTopicPartitionImpl(this.versionTopic, partitionId));
            }
        });
    }

    protected void checkIngestionProgress(Store store) throws InterruptedException {
        Exception exc = this.lastStoreIngestionException.get();
        if (exc != null) {
            throw new VeniceException("Unexpected store ingestion task level exception, will error out the entire ingestion task and all its partitions", exc);
        }
        if (this.lastConsumerException != null) {
            throw new VeniceException("Exception thrown by shared consumer", this.lastConsumerException);
        }
        processIngestionException();
        if (consumerHasAnySubscription()) {
            this.idleCounter = 0;
            maybeUnsubscribeCompletedPartitions(store);
            recordQuotaMetrics(store);
            if (this.storageUtilizationManager.hasPausedPartitionIngestion()) {
                this.storageUtilizationManager.checkAllPartitionsQuota();
            }
            Thread.sleep(this.readCycleDelayMs);
            return;
        }
        int i = this.idleCounter + 1;
        this.idleCounter = i;
        if (i <= 100) {
            String str = this.consumerTaskId + " Not subscribed to any partitions ";
            if (!REDUNDANT_LOGGING_FILTER.isRedundantException(str)) {
                LOGGER.info(str);
            }
            Thread.sleep(this.readCycleDelayMs);
            return;
        }
        if (this.hybridStoreConfig.isPresent() || !this.serverConfig.isUnsubscribeAfterBatchpushEnabled() || this.subscribedCount == 0 || this.subscribedCount != this.forceUnSubscribedCount) {
            LOGGER.warn("{} Has expired due to not being subscribed to any partitions for too long.", this.consumerTaskId);
            complete();
            return;
        }
        String str2 = this.consumerTaskId + " Going back to sleep as consumption has finished and topics are unsubscribed";
        if (!REDUNDANT_LOGGING_FILTER.isRedundantException(str2)) {
            LOGGER.info(str2);
        }
        Thread.sleep(this.readCycleDelayMs * 20);
        this.idleCounter = 0;
    }

    private void maybeUnsubscribeCompletedPartitions(Store store) {
        if (this.hybridStoreConfig.isPresent() || !this.serverConfig.isUnsubscribeAfterBatchpushEnabled() || this.versionNumber > store.getCurrentVersion()) {
            return;
        }
        HashSet hashSet = new HashSet();
        for (PartitionConsumptionState partitionConsumptionState : this.partitionConsumptionStateMap.values()) {
            if (partitionConsumptionState.isCompletionReported() && consumerHasSubscription(this.versionTopic, partitionConsumptionState)) {
                LOGGER.info("Unsubscribing completed partitions {} of store : {} version : {} current version: {}", Integer.valueOf(partitionConsumptionState.getPartition()), store.getName(), Integer.valueOf(this.versionNumber), Integer.valueOf(store.getCurrentVersion()));
                hashSet.add(new PubSubTopicPartitionImpl(this.versionTopic, partitionConsumptionState.getPartition()));
                this.forceUnSubscribedCount++;
            }
        }
        if (hashSet.size() != 0) {
            consumerBatchUnsubscribe(hashSet);
        }
    }

    private void recordQuotaMetrics(Store store) {
        if (this.emitMetrics.get()) {
            long storageQuotaInByte = store.getStorageQuotaInByte();
            long currentTimeMillis = System.currentTimeMillis();
            this.hostLevelIngestionStats.recordDiskQuotaAllowed(storageQuotaInByte, currentTimeMillis);
            this.hostLevelIngestionStats.recordStorageQuotaUsed(this.storageUtilizationManager.getDiskQuotaUsage(), currentTimeMillis);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                try {
                    Thread.currentThread().setName("venice-SIT-" + this.kafkaVersionTopic);
                    LOGGER.info("Running {}", this.consumerTaskId);
                    this.versionedIngestionStats.resetIngestionTaskPushTimeoutGauge(this.storeName, this.versionNumber);
                    while (isRunning()) {
                        Store storeOrThrow = this.storeRepository.getStoreOrThrow(this.storeName);
                        processConsumerActions(storeOrThrow);
                        checkLongRunningTaskState();
                        checkIngestionProgress(storeOrThrow);
                    }
                    for (Map.Entry<Integer, PartitionConsumptionState> entry : this.partitionConsumptionStateMap.entrySet()) {
                        int intValue = entry.getKey().intValue();
                        PartitionConsumptionState value = entry.getValue();
                        consumerUnSubscribeAllTopics(value);
                        if (this.ingestionCheckpointDuringGracefulShutdownEnabled) {
                            waitForAllMessageToBeProcessedFromTopicPartition(new PubSubTopicPartitionImpl(this.versionTopic, value.getPartition()), value);
                            this.kafkaDataIntegrityValidator.updateOffsetRecordForPartition(intValue, value.getOffsetRecord());
                            updateOffsetMetadataInOffsetRecord(value);
                            syncOffset(this.kafkaVersionTopic, value);
                        }
                    }
                    internalClose(true);
                } catch (VeniceChecksumException e) {
                    recordChecksumVerificationFailure();
                    if (isRunning()) {
                        handleIngestionException(e);
                    } else {
                        LOGGER.info("{} encountered checksum verification failure, skipping error reporting because server is shutting down", this.consumerTaskId, e);
                    }
                    internalClose(true);
                } catch (Throwable th) {
                    handleIngestionThrowable(th);
                    internalClose(true);
                }
            } catch (VeniceIngestionTaskKilledException e2) {
                LOGGER.info("{} has been killed.", this.consumerTaskId);
                this.statusReportAdapter.reportKilled(this.partitionConsumptionStateMap.values(), e2);
                internalClose(false);
            } catch (VeniceTimeoutException e3) {
                this.versionedIngestionStats.setIngestionTaskPushTimeoutGauge(this.storeName, this.versionNumber);
                handleIngestionException(e3);
                internalClose(true);
            } catch (Exception e4) {
                if (isRunning() || !ExceptionUtils.recursiveClassEquals(e4, new Class[]{InterruptedException.class, InterruptException.class})) {
                    handleIngestionException(e4);
                    internalClose(true);
                } else {
                    LOGGER.info("{} interrupted, skipping error reporting because server is shutting down", this.consumerTaskId, e4);
                    internalClose(true);
                }
            }
        } catch (Throwable th2) {
            internalClose(true);
            throw th2;
        }
    }

    private void recordStalePartitionsWithoutIngestionTask() {
        this.partitionIngestionExceptionList.forEach(partitionExceptionInfo -> {
            if (partitionExceptionInfo == null || !partitionExceptionInfo.isReplicaCompleted()) {
                return;
            }
            this.versionedIngestionStats.recordStalePartitionsWithoutIngestionTask(this.storeName, this.versionNumber);
        });
    }

    private void handleIngestionException(Exception exc) {
        LOGGER.error("{} has failed.", this.consumerTaskId, exc);
        recordStalePartitionsWithoutIngestionTask();
        reportError(this.partitionConsumptionStateMap.values(), this.errorPartitionId, "Caught Exception during ingestion.", exc);
        this.hostLevelIngestionStats.recordIngestionFailure();
    }

    private void handleIngestionThrowable(Throwable th) {
        LOGGER.error("{} has failed.", this.consumerTaskId, th);
        recordStalePartitionsWithoutIngestionTask();
        reportError(this.partitionConsumptionStateMap.values(), this.errorPartitionId, "Caught non-exception Throwable during ingestion.", new VeniceException(th));
        this.hostLevelIngestionStats.recordIngestionFailure();
    }

    private void reportError(Collection<PartitionConsumptionState> collection, int i, String str, Exception exc) {
        if (collection.isEmpty()) {
            this.statusReportAdapter.reportError(i, str, exc);
        } else {
            this.statusReportAdapter.reportError(collection, str, exc);
        }
    }

    private void internalClose(boolean z) {
        try {
            this.storeRepository.unregisterStoreDataChangedListener(this.storageUtilizationManager);
            Iterator<ConsumerAction> it = this.consumerActionsQueue.iterator();
            while (it.hasNext()) {
                ConsumerAction next = it.next();
                if (next.getType() == ConsumerActionType.RESET_OFFSET) {
                    String topic = next.getTopic();
                    int partition = next.getPartition();
                    LOGGER.info("{} Cleanup Reset OffSet : Topic {} Partition Id {}", this.consumerTaskId, topic, Integer.valueOf(partition));
                    this.storageMetadataService.clearOffset(topic, partition);
                } else {
                    LOGGER.info("{} Cleanup ignoring the Message {}", this.consumerTaskId, next);
                }
            }
        } catch (Exception e) {
            LOGGER.error("{} Error while resetting offset.", this.consumerTaskId, e);
        }
        this.aggKafkaConsumerService.unsubscribeAll(this.versionTopic);
        LOGGER.info("Detached Kafka consumer(s) for version topic: {}", this.kafkaVersionTopic);
        try {
            this.partitionConsumptionStateMap.values().parallelStream().forEach((v0) -> {
                v0.unsubscribe();
            });
            this.partitionConsumptionStateMap.clear();
        } catch (Exception e2) {
            LOGGER.error("{} Error while unsubscribing topic.", this.consumerTaskId, e2);
        }
        try {
            closeVeniceWriters(z);
        } catch (Exception e3) {
            LOGGER.error("Error while closing venice writers", e3);
        }
        try {
            closeVeniceViewWriters();
        } catch (Exception e4) {
            LOGGER.error("Error while closing venice view writer", e4);
        }
        close();
        synchronized (this) {
            notifyAll();
        }
        LOGGER.info("Store ingestion task for store: {} is closed", this.kafkaVersionTopic);
    }

    protected void closeVeniceWriters(boolean z) {
    }

    protected void closeVeniceViewWriters() {
    }

    private void processConsumerActions(Store store) throws InterruptedException {
        Instant now = Instant.now();
        while (true) {
            ConsumerAction peek = this.consumerActionsQueue.peek();
            if (peek == null) {
                if (this.emitMetrics.get()) {
                    this.hostLevelIngestionStats.recordProcessConsumerActionLatency(Duration.between(now, Instant.now()).toMillis());
                    return;
                }
                return;
            }
            try {
                LOGGER.info("Starting consumer action {}", peek);
                peek.incrementAttempt();
                processConsumerAction(peek, store);
                this.consumerActionsQueue.remove(peek);
                LOGGER.info("Finished consumer action {}", peek);
            } catch (VeniceIngestionTaskKilledException | InterruptedException e) {
                throw e;
            } catch (Throwable th) {
                if (peek.getAttemptsCount() <= MAX_CONSUMER_ACTION_ATTEMPTS) {
                    LOGGER.warn("Failed to process consumer action {}, will retry later.", peek, th);
                    return;
                }
                LOGGER.error("Failed to execute consumer action {} after {} attempts.", peek, Integer.valueOf(peek.getAttemptsCount()), th);
                PartitionConsumptionState partitionConsumptionState = this.partitionConsumptionStateMap.get(Integer.valueOf(peek.getPartition()));
                this.consumerActionsQueue.remove(peek);
                if (partitionConsumptionState != null && !partitionConsumptionState.isCompletionReported()) {
                    reportError("Error when processing consumer action: " + peek, peek.getPartition(), new VeniceException(th));
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TopicSwitch resolveSourceKafkaServersWithinTopicSwitch(TopicSwitch topicSwitch) {
        if (topicSwitch == null || topicSwitch.sourceKafkaServers == null) {
            return topicSwitch;
        }
        ArrayList arrayList = new ArrayList(topicSwitch.sourceKafkaServers.size());
        Iterator it = topicSwitch.sourceKafkaServers.iterator();
        while (it.hasNext()) {
            arrayList.add(this.kafkaClusterUrlResolver.apply(((CharSequence) it.next()).toString()));
        }
        topicSwitch.sourceKafkaServers = arrayList;
        return topicSwitch;
    }

    private void checkConsumptionStateWhenStart(OffsetRecord offsetRecord, PartitionConsumptionState partitionConsumptionState) {
        StoreVersionState storeVersionState;
        int partition = partitionConsumptionState.getPartition();
        if (offsetRecord.getLocalVersionTopicOffset() > 0 && (storeVersionState = this.storageEngine.getStoreVersionState()) != null) {
            boolean z = storeVersionState.sorted;
            TopicSwitch resolveSourceKafkaServersWithinTopicSwitch = resolveSourceKafkaServersWithinTopicSwitch(storeVersionState.topicSwitch);
            partitionConsumptionState.setTopicSwitch(resolveSourceKafkaServersWithinTopicSwitch == null ? null : new TopicSwitchWrapper(resolveSourceKafkaServersWithinTopicSwitch, this.pubSubTopicRepository.getTopic(resolveSourceKafkaServersWithinTopicSwitch.sourceTopicName.toString())));
            beginBatchWrite(partition, z, partitionConsumptionState);
            partitionConsumptionState.setStartOfPushTimestamp(storeVersionState.startOfPushTimestamp);
            partitionConsumptionState.setEndOfPushTimestamp(storeVersionState.endOfPushTimestamp);
            this.statusReportAdapter.reportRestarted(partitionConsumptionState);
        }
        try {
            boolean z2 = false;
            long offsetLagDeltaRelaxFactorForFastOnlineTransitionInRestart = this.serverConfig.getOffsetLagDeltaRelaxFactorForFastOnlineTransitionInRestart();
            long offsetLag = partitionConsumptionState.getOffsetRecord().getOffsetLag();
            if (this.hybridStoreConfig.isPresent() && partitionConsumptionState.isEndOfPushReceived()) {
                long offsetLagThresholdToGoOnline = this.hybridStoreConfig.get().getOffsetLagThresholdToGoOnline();
                if (this.offsetLagDeltaRelaxEnabled && offsetLagThresholdToGoOnline > 0) {
                    long measureHybridOffsetLag = measureHybridOffsetLag(partitionConsumptionState, true);
                    if (offsetLag != -1) {
                        LOGGER.info("Checking offset Lag behavior: current offset lag: {}, previous offset lag: {}, offset lag threshold: {}", Long.valueOf(measureHybridOffsetLag), Long.valueOf(offsetLag), Long.valueOf(offsetLagThresholdToGoOnline));
                        if (measureHybridOffsetLag < offsetLag + (offsetLagDeltaRelaxFactorForFastOnlineTransitionInRestart * offsetLagThresholdToGoOnline)) {
                            this.amplificationFactorAdapter.executePartitionConsumptionState(partitionConsumptionState.getUserPartition(), (v0) -> {
                                v0.lagHasCaughtUp();
                            });
                            this.statusReportAdapter.reportCompleted(partitionConsumptionState, true);
                            z2 = true;
                        }
                        partitionConsumptionState.getOffsetRecord().setOffsetLag(-1L);
                    }
                }
            }
            if (!z2) {
                this.defaultReadyToServeChecker.apply(partitionConsumptionState);
            }
        } catch (VeniceInconsistentStoreMetadataException e) {
            this.hostLevelIngestionStats.recordInconsistentStoreMetadata();
            this.storageMetadataService.clearOffset(this.kafkaVersionTopic, partition);
            this.storageMetadataService.clearStoreVersionState(this.kafkaVersionTopic);
            this.kafkaDataIntegrityValidator.clearPartition(partition);
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processCommonConsumerAction(ConsumerActionType consumerActionType, PubSubTopicPartition pubSubTopicPartition, LeaderFollowerStateType leaderFollowerStateType) throws InterruptedException {
        int partitionNumber = pubSubTopicPartition.getPartitionNumber();
        switch (AnonymousClass1.$SwitchMap$com$linkedin$davinci$kafka$consumer$ConsumerActionType[consumerActionType.ordinal()]) {
            case 1:
                this.partitionIngestionExceptionList.set(partitionNumber, null);
                this.storeBufferService.drainBufferedRecordsFromTopicPartition(pubSubTopicPartition);
                this.subscribedCount++;
                OffsetRecord lastOffset = this.storageMetadataService.getLastOffset(pubSubTopicPartition.getPubSubTopic().getName(), partitionNumber);
                PartitionConsumptionState partitionConsumptionState = new PartitionConsumptionState(partitionNumber, this.amplificationFactor, lastOffset, this.hybridStoreConfig.isPresent());
                partitionConsumptionState.setLeaderFollowerState(leaderFollowerStateType);
                this.partitionConsumptionStateMap.put(Integer.valueOf(partitionNumber), partitionConsumptionState);
                lastOffset.getProducerPartitionStateMap().entrySet().forEach(entry -> {
                    this.kafkaDataIntegrityValidator.registerProducer(GuidUtils.getGuidFromCharSequence((CharSequence) entry.getKey())).setPartitionState(partitionNumber, (ProducerPartitionState) entry.getValue());
                });
                long currentTimeMillis = System.currentTimeMillis();
                checkConsumptionStateWhenStart(lastOffset, partitionConsumptionState);
                reportIfCatchUpVersionTopicOffset(partitionConsumptionState);
                this.versionedIngestionStats.recordSubscribePrepLatency(this.storeName, this.versionNumber, LatencyUtils.getElapsedTimeInMs(currentTimeMillis));
                if (leaderFollowerStateType.equals(LeaderFollowerStateType.LEADER)) {
                    startConsumingAsLeader(partitionConsumptionState);
                } else {
                    updateLeaderTopicOnFollower(partitionConsumptionState);
                    reportStoreVersionTopicOffsetRewindMetrics(partitionConsumptionState);
                    consumerSubscribe(partitionConsumptionState.getSourceTopicPartition(pubSubTopicPartition.getPubSubTopic()), lastOffset.getLocalVersionTopicOffset(), this.localKafkaServer);
                    LOGGER.info("{} subscribed to: {} Offset {}", this.consumerTaskId, pubSubTopicPartition, Long.valueOf(lastOffset.getLocalVersionTopicOffset()));
                }
                this.storageUtilizationManager.initPartition(partitionNumber);
                return;
            case 2:
                LOGGER.info("{} Unsubscribing to: {}", this.consumerTaskId, pubSubTopicPartition);
                PartitionConsumptionState partitionConsumptionState2 = this.partitionConsumptionStateMap.get(Integer.valueOf(partitionNumber));
                this.forceUnSubscribedCount--;
                this.subscribedCount--;
                if (partitionConsumptionState2 != null) {
                    consumerUnSubscribeAllTopics(partitionConsumptionState2);
                }
                if (partitionConsumptionState2 != null) {
                    partitionConsumptionState2.unsubscribe();
                }
                waitForAllMessageToBeProcessedFromTopicPartition(pubSubTopicPartition, partitionConsumptionState2);
                if (partitionConsumptionState2 != null) {
                    this.statusReportAdapter.reportStopped(partitionConsumptionState2);
                }
                this.partitionConsumptionStateMap.remove(Integer.valueOf(partitionNumber));
                this.storageUtilizationManager.removePartition(partitionNumber);
                this.kafkaDataIntegrityValidator.clearPartition(partitionNumber);
                PartitionExceptionInfo partitionExceptionInfo = this.partitionIngestionExceptionList.get(partitionNumber);
                if (partitionExceptionInfo == null) {
                    LOGGER.info("{} Unsubscribed to: {}", this.consumerTaskId, pubSubTopicPartition);
                    return;
                } else {
                    this.partitionIngestionExceptionList.set(partitionNumber, null);
                    LOGGER.info("{} Unsubscribed to: {}, which has errored with exception", this.consumerTaskId, pubSubTopicPartition, partitionExceptionInfo.getException());
                    return;
                }
            case VeniceServerConfig.MINIMUM_CONSUMER_NUM_IN_CONSUMER_POOL_PER_KAFKA_CLUSTER /* 3 */:
                PartitionConsumptionState partitionConsumptionState3 = this.partitionConsumptionStateMap.get(Integer.valueOf(partitionNumber));
                if (partitionConsumptionState3 == null || !consumerHasSubscription(pubSubTopicPartition.getPubSubTopic(), partitionConsumptionState3)) {
                    LOGGER.info("{} No need to reset offset by Kafka consumer, since the consumer is not subscribing: {}", this.consumerTaskId, pubSubTopicPartition);
                } else {
                    LOGGER.error("This shouldn't happen since unsubscription should happen before reset offset for: {}", pubSubTopicPartition);
                    try {
                        consumerResetOffset(pubSubTopicPartition.getPubSubTopic(), partitionConsumptionState3);
                        LOGGER.info("{} Reset OffSet : {}", this.consumerTaskId, pubSubTopicPartition);
                    } catch (UnsubscribedTopicPartitionException e) {
                        LOGGER.error("{} Kafka consumer should have subscribed to the partition already but it fails on resetting offset for: {}", this.consumerTaskId, pubSubTopicPartition);
                    }
                    this.partitionConsumptionStateMap.put(Integer.valueOf(partitionNumber), new PartitionConsumptionState(partitionNumber, this.amplificationFactor, new OffsetRecord(this.partitionStateSerializer), this.hybridStoreConfig.isPresent()));
                    this.storageUtilizationManager.initPartition(partitionNumber);
                    this.partitionIngestionExceptionList.set(partitionNumber, null);
                }
                this.kafkaDataIntegrityValidator.clearPartition(partitionNumber);
                this.storageMetadataService.clearOffset(pubSubTopicPartition.getPubSubTopic().getName(), partitionNumber);
                return;
            case ValueRecord.SCHEMA_HEADER_LENGTH /* 4 */:
                LOGGER.info("Kill this consumer task for Topic: {}", pubSubTopicPartition.getPubSubTopic().getName());
                throw new VeniceIngestionTaskKilledException("Received the signal to kill this consumer. Topic " + pubSubTopicPartition.getPubSubTopic().getName());
            default:
                throw new UnsupportedOperationException(consumerActionType.name() + " is not supported in " + getClass().getName());
        }
    }

    private void reportStoreVersionTopicOffsetRewindMetrics(PartitionConsumptionState partitionConsumptionState) {
        long latestProcessedLocalVersionTopicOffset = partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset();
        if (latestProcessedLocalVersionTopicOffset == -1) {
            return;
        }
        long kafkaTopicPartitionEndOffSet = getKafkaTopicPartitionEndOffSet(this.localKafkaServer, this.versionTopic, partitionConsumptionState.getPartition());
        if (kafkaTopicPartitionEndOffSet == StatsErrorCode.LAG_MEASUREMENT_FAILURE.code || latestProcessedLocalVersionTopicOffset <= kafkaTopicPartitionEndOffSet) {
            return;
        }
        LOGGER.warn("Offset rewind for version topic: {}, partition: {}, persisted record offset: {}, Kafka topic partition end-offset: {}", this.kafkaVersionTopic, Integer.valueOf(partitionConsumptionState.getPartition()), Long.valueOf(latestProcessedLocalVersionTopicOffset), Long.valueOf(kafkaTopicPartitionEndOffSet));
        this.versionedIngestionStats.recordVersionTopicEndOffsetRewind(this.storeName, this.versionNumber);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getKafkaTopicPartitionEndOffSet(String str, PubSubTopic pubSubTopic, int i) {
        long partitionLatestOffset = getPartitionLatestOffset(str, pubSubTopic, i);
        return partitionLatestOffset >= 0 ? partitionLatestOffset : this.cachedKafkaMetadataGetter.getOffset(getTopicManager(str), this.versionTopic, i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getPartitionOffsetLag(String str, PubSubTopic pubSubTopic, int i) {
        return this.aggKafkaConsumerService.getOffsetLagFor(str, this.versionTopic, new PubSubTopicPartitionImpl(pubSubTopic, i));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getPartitionLatestOffset(String str, PubSubTopic pubSubTopic, int i) {
        return this.aggKafkaConsumerService.getLatestOffsetFor(str, this.versionTopic, new PubSubTopicPartitionImpl(pubSubTopic, i));
    }

    protected abstract void checkLongRunningTaskState() throws InterruptedException;

    protected abstract void processConsumerAction(ConsumerAction consumerAction, Store store) throws InterruptedException;

    protected abstract Set<String> getConsumptionSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState);

    protected void startConsumingAsLeader(PartitionConsumptionState partitionConsumptionState) {
        throw new UnsupportedOperationException("Leader consumption should only happen in L/F mode!");
    }

    protected Set<String> getRealTimeDataSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState) {
        return this.localKafkaServerSingletonSet;
    }

    public PartitionConsumptionState getPartitionConsumptionState(int i) {
        return this.partitionConsumptionStateMap.get(Integer.valueOf(i));
    }

    public boolean hasAnyPartitionConsumptionState(Predicate<PartitionConsumptionState> predicate) {
        Iterator<Map.Entry<Integer, PartitionConsumptionState>> it = this.partitionConsumptionStateMap.entrySet().iterator();
        while (it.hasNext()) {
            if (predicate.test(it.next().getValue())) {
                return true;
            }
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean shouldProcessRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, int i) {
        PartitionConsumptionState partitionConsumptionState = this.partitionConsumptionStateMap.get(Integer.valueOf(i));
        if (partitionConsumptionState == null) {
            LOGGER.info("Topic {} Partition {} has been unsubscribed, skip this record that has offset {}", this.kafkaVersionTopic, Integer.valueOf(i), pubSubMessage.getOffset());
            return false;
        }
        if (partitionConsumptionState.isErrorReported()) {
            LOGGER.info("Topic {} Partition {} is already errored, skip this record that has offset {}", this.kafkaVersionTopic, Integer.valueOf(i), pubSubMessage.getOffset());
            return false;
        }
        if (!partitionConsumptionState.isEndOfPushReceived() || !partitionConsumptionState.isBatchOnly()) {
            return true;
        }
        KafkaKey kafkaKey = (KafkaKey) pubSubMessage.getKey();
        KafkaMessageEnvelope kafkaMessageEnvelope = (KafkaMessageEnvelope) pubSubMessage.getValue();
        if (kafkaKey.isControlMessage() && ControlMessageType.valueOf((ControlMessage) kafkaMessageEnvelope.payloadUnion) == ControlMessageType.END_OF_SEGMENT) {
            return true;
        }
        if (this.emitMetrics.get()) {
            this.hostLevelIngestionStats.recordUnexpectedMessage();
        }
        String str = "The record was received after 'EOP', but the store: " + this.kafkaVersionTopic + " is neither hybrid nor incremental push enabled, so will skip it.";
        if (REDUNDANT_LOGGING_FILTER.isRedundantException(str)) {
            return false;
        }
        LOGGER.warn(str);
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean shouldPersistRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, PartitionConsumptionState partitionConsumptionState) {
        int partitionNumber = pubSubMessage.getTopicPartition().getPartitionNumber();
        if (this.partitionIngestionExceptionList.get(partitionNumber) != null) {
            String str = "Errors already exist in partition: " + partitionNumber + " for resource: " + this.kafkaVersionTopic + ", skipping this record";
            if (REDUNDANT_LOGGING_FILTER.isRedundantException(str)) {
                return false;
            }
            LOGGER.info("{} that has offset {}", str, pubSubMessage.getOffset());
            return false;
        }
        if (partitionConsumptionState == null || !partitionConsumptionState.isSubscribed()) {
            String str2 = "Topic " + this.kafkaVersionTopic + " Partition " + partitionNumber + " has been unsubscribed, skip this record";
            if (REDUNDANT_LOGGING_FILTER.isRedundantException(str2)) {
                return false;
            }
            LOGGER.info("{} that has offset {}", str2, pubSubMessage.getOffset());
            return false;
        }
        if (partitionConsumptionState.isErrorReported()) {
            String str3 = "Topic " + this.kafkaVersionTopic + " Partition " + partitionNumber + " is already errored, skip this record";
            if (REDUNDANT_LOGGING_FILTER.isRedundantException(str3)) {
                return false;
            }
            LOGGER.info("{} that has offset {}", str3, pubSubMessage.getOffset());
            return false;
        }
        if (this.suppressLiveUpdates && partitionConsumptionState.isCompletionReported()) {
            String str4 = "Skipping message as live update suppression is enabled and store: " + this.kafkaVersionTopic + " partition " + partitionNumber + " is already ready to serve, these are buffered records in the queue.";
            if (REDUNDANT_LOGGING_FILTER.isRedundantException(str4)) {
                return false;
            }
            LOGGER.info(str4);
            return false;
        }
        if (!this.isIsolatedIngestion || !partitionConsumptionState.isCompletionReported()) {
            return true;
        }
        String str5 = "Skipping message as it is using ingestion isolation and store: " + this.kafkaVersionTopic + " partition " + partitionNumber + " is already ready to serve, these are buffered records in the queue.";
        if (REDUNDANT_LOGGING_FILTER.isRedundantException(str5)) {
            return false;
        }
        LOGGER.info(str5);
        return false;
    }

    public void processConsumerRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, LeaderProducedRecordContext leaderProducedRecordContext, int i, String str, long j) throws InterruptedException {
        PartitionConsumptionState partitionConsumptionState = this.partitionConsumptionStateMap.get(Integer.valueOf(i));
        if (shouldPersistRecord(pubSubMessage, partitionConsumptionState)) {
            int i2 = 0;
            try {
                i2 = internalProcessConsumerRecord(pubSubMessage, partitionConsumptionState, leaderProducedRecordContext, str, j);
            } catch (VeniceMessageException | UnsupportedOperationException e) {
                throw new VeniceException(this.consumerTaskId + " : Received an exception for message at partition: " + pubSubMessage.getTopicPartition().getPartitionNumber() + ", offset: " + pubSubMessage.getOffset() + ". Bubbling up.", e);
            } catch (FatalDataValidationException e2) {
                int partitionNumber = pubSubMessage.getTopicPartition().getPartitionNumber();
                String str2 = (this.amplificationFactor == 1 || !pubSubMessage.getTopicPartition().getPubSubTopic().isRealTime()) ? "Fatal data validation problem with partition " + partitionNumber + ", offset " + pubSubMessage.getOffset() : "Fatal data validation problem with in RT topic partition " + partitionNumber + ", offset " + pubSubMessage.getOffset() + ", leaderSubPartition: " + i;
                if ((this.isCurrentVersion.getAsBoolean() || partitionConsumptionState.isEndOfPushReceived()) ? false : true) {
                    this.statusReportAdapter.reportError(Collections.singletonList(partitionConsumptionState), str2 + ". Consumption will be halted.", e2);
                    unSubscribePartition(new PubSubTopicPartitionImpl(this.versionTopic, partitionNumber));
                } else {
                    LOGGER.warn("{}. However, {} is the current version or EOP is already received so consumption will continue. {}", str2, this.kafkaVersionTopic, e2.getMessage());
                }
            }
            if (this.diskUsage.isDiskFull(i2)) {
                throw new VeniceException("Disk is full: throwing exception to error push: " + this.storeName + " version " + this.versionNumber + ". " + this.diskUsage.getDiskStatus());
            }
            if (!((KafkaKey) pubSubMessage.getKey()).isControlMessage()) {
                this.versionedIngestionStats.recordBytesConsumed(this.storeName, this.versionNumber, i2);
                this.versionedIngestionStats.recordRecordsConsumed(this.storeName, this.versionNumber);
                this.hostLevelIngestionStats.recordTotalBytesConsumed(i2);
                this.hostLevelIngestionStats.recordTotalRecordsConsumed();
                recordProcessedRecordStats(partitionConsumptionState, i2);
                partitionConsumptionState.incrementProcessedRecordSizeSinceLastSync(i2);
            }
            reportIfCatchUpVersionTopicOffset(partitionConsumptionState);
            long j2 = partitionConsumptionState.isDeferredWrite() ? this.databaseSyncBytesIntervalForDeferredWriteMode : this.databaseSyncBytesIntervalForTransactionalMode;
            this.defaultReadyToServeChecker.apply(partitionConsumptionState, j2 > 0 && partitionConsumptionState.getProcessedRecordSizeSinceLastSync() >= j2);
            if (shouldSyncOffset(partitionConsumptionState, j2, pubSubMessage, leaderProducedRecordContext)) {
                this.kafkaDataIntegrityValidator.updateOffsetRecordForPartition(i, partitionConsumptionState.getOffsetRecord());
                updateOffsetMetadataInOffsetRecord(partitionConsumptionState);
                syncOffset(this.kafkaVersionTopic, partitionConsumptionState);
            }
        }
    }

    public LeaderFollowerStateType getLeaderState(int i) {
        return this.amplificationFactorAdapter.getLeaderState(i);
    }

    private boolean shouldSyncOffset(PartitionConsumptionState partitionConsumptionState, long j, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, LeaderProducedRecordContext leaderProducedRecordContext) {
        boolean z = false;
        if (((KafkaKey) pubSubMessage.getKey()).isControlMessage()) {
            ControlMessageType valueOf = ControlMessageType.valueOf(leaderProducedRecordContext == null ? (ControlMessage) ((KafkaMessageEnvelope) pubSubMessage.getValue()).payloadUnion : (ControlMessage) leaderProducedRecordContext.getValueUnion());
            if (valueOf != ControlMessageType.START_OF_SEGMENT && valueOf != ControlMessageType.END_OF_SEGMENT) {
                z = true;
            }
        } else {
            z = j > 0 && partitionConsumptionState.getProcessedRecordSizeSinceLastSync() >= j;
        }
        return z;
    }

    private void syncOffset(String str, PartitionConsumptionState partitionConsumptionState) {
        int partition = partitionConsumptionState.getPartition();
        AbstractStorageEngine localStorageEngine = this.storageEngineRepository.getLocalStorageEngine(str);
        if (localStorageEngine == null) {
            LOGGER.warn("Storage engine has been removed. Could not execute sync offset for topic: {} and partition: {}", str, Integer.valueOf(partition));
            return;
        }
        Map<String, String> sync = localStorageEngine.sync(partition);
        this.storageUtilizationManager.notifyFlushToDisk(partitionConsumptionState);
        if (this.offsetLagDeltaRelaxEnabled) {
            updateOffsetLagInMetadata(partitionConsumptionState);
        }
        OffsetRecord offsetRecord = partitionConsumptionState.getOffsetRecord();
        offsetRecord.setDatabaseInfo(sync);
        this.storageMetadataService.put(this.kafkaVersionTopic, partition, offsetRecord);
        partitionConsumptionState.resetProcessedRecordSizeSinceLastSync();
        String str2 = "Offset synced for partition " + partition + " of topic " + str + ": ";
        if (REDUNDANT_LOGGING_FILTER.isRedundantException(str2)) {
            return;
        }
        LOGGER.info(str2 + offsetRecord.getLocalVersionTopicOffset());
    }

    private void updateOffsetLagInMetadata(PartitionConsumptionState partitionConsumptionState) {
        partitionConsumptionState.getOffsetRecord().setOffsetLag(measureHybridOffsetLag(partitionConsumptionState, true));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setIngestionException(int i, Exception exc) {
        boolean z = false;
        PartitionConsumptionState partitionConsumptionState = this.partitionConsumptionStateMap.get(Integer.valueOf(i));
        if (partitionConsumptionState != null && partitionConsumptionState.isCompletionReported()) {
            z = true;
        }
        this.partitionIngestionExceptionList.set(i, new PartitionExceptionInfo(exc, i, z));
    }

    public void setLastConsumerException(Exception exc) {
        this.lastConsumerException = exc;
    }

    public void setLastStoreIngestionException(Exception exc) {
        this.lastStoreIngestionException.set(exc);
    }

    public void recordChecksumVerificationFailure() {
        this.hostLevelIngestionStats.recordChecksumVerificationFailure();
    }

    public abstract long getBatchReplicationLag();

    public abstract long getLeaderOffsetLag();

    public abstract long getBatchLeaderOffsetLag();

    public abstract long getHybridLeaderOffsetLag();

    public abstract long getFollowerOffsetLag();

    public abstract long getBatchFollowerOffsetLag();

    public abstract long getHybridFollowerOffsetLag();

    public abstract long getRegionHybridOffsetLag(int i);

    public abstract int getWriteComputeErrorCode();

    public abstract void updateLeaderTopicOnFollower(PartitionConsumptionState partitionConsumptionState);

    /* JADX INFO: Access modifiers changed from: protected */
    public long minZeroLag(long j) {
        if (j >= 0) {
            return j;
        }
        LOGGER.debug("Got a negative value for a lag metric. Will report zero.");
        return 0L;
    }

    public boolean isHybridMode() {
        return this.hybridStoreConfig.isPresent();
    }

    private void syncEndOfPushTimestampToMetadataService(long j) {
        this.storageMetadataService.computeStoreVersionState(this.kafkaVersionTopic, storeVersionState -> {
            if (storeVersionState == null) {
                throw new VeniceException("Unexpected: received some " + ControlMessageType.END_OF_PUSH.name() + " control message in a topic where we have not yet received a " + ControlMessageType.START_OF_PUSH.name() + " control message.");
            }
            storeVersionState.endOfPushTimestamp = j;
            return storeVersionState;
        });
    }

    private void processStartOfPush(KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int i, PartitionConsumptionState partitionConsumptionState) {
        StartOfPush startOfPush = (StartOfPush) controlMessage.controlMessageUnion;
        beginBatchWrite(i, startOfPush.sorted, partitionConsumptionState);
        partitionConsumptionState.setStartOfPushTimestamp(kafkaMessageEnvelope.producerMetadata.messageTimestamp);
        this.statusReportAdapter.reportStarted(partitionConsumptionState);
        this.storageMetadataService.computeStoreVersionState(this.kafkaVersionTopic, storeVersionState -> {
            if (storeVersionState != null) {
                if (storeVersionState.sorted != startOfPush.sorted) {
                    throw new VeniceException("Unexpected: received multiple " + ControlMessageType.START_OF_PUSH.name() + " control messages with inconsistent 'sorted' fields within the same topic!");
                }
                if (storeVersionState.chunked != startOfPush.chunked) {
                    throw new VeniceException("Unexpected: received multiple " + ControlMessageType.START_OF_PUSH.name() + " control messages with inconsistent 'chunked' fields within the same topic!");
                }
                return storeVersionState;
            }
            StoreVersionState storeVersionState = new StoreVersionState();
            storeVersionState.sorted = startOfPush.sorted;
            storeVersionState.chunked = startOfPush.chunked;
            storeVersionState.compressionStrategy = startOfPush.compressionStrategy;
            storeVersionState.compressionDictionary = startOfPush.compressionDictionary;
            storeVersionState.batchConflictResolutionPolicy = startOfPush.timestampPolicy;
            storeVersionState.startOfPushTimestamp = kafkaMessageEnvelope.producerMetadata.messageTimestamp;
            LOGGER.info("Persisted {} for the first time following a SOP for topic {}.", StoreVersionState.class.getSimpleName(), this.kafkaVersionTopic);
            return storeVersionState;
        });
    }

    protected void processEndOfPush(KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int i, long j, PartitionConsumptionState partitionConsumptionState) {
        if (partitionConsumptionState.getOffsetRecord().isEndOfPushReceived()) {
            LOGGER.warn("{} Received duplicate EOP control message, ignoring it. Partition: {}, Offset: {}", this.consumerTaskId, Integer.valueOf(i), Long.valueOf(j));
            return;
        }
        partitionConsumptionState.getOffsetRecord().endOfPushReceived(j);
        StoragePartitionConfig storagePartitionConfig = getStoragePartitionConfig(i, false, partitionConsumptionState);
        partitionConsumptionState.setDeferredWrite(storagePartitionConfig.isDeferredWrite());
        this.storageEngine.endBatchWrite(storagePartitionConfig);
        if (this.cacheBackend.isPresent() && this.cacheBackend.get().getStorageEngine(this.kafkaVersionTopic) != null) {
            this.cacheBackend.get().getStorageEngine(this.kafkaVersionTopic).endBatchWrite(storagePartitionConfig);
        }
        partitionConsumptionState.finalizeExpectedChecksum();
        partitionConsumptionState.setEndOfPushTimestamp(kafkaMessageEnvelope.producerMetadata.messageTimestamp);
        syncEndOfPushTimestampToMetadataService(kafkaMessageEnvelope.producerMetadata.messageTimestamp);
        this.statusReportAdapter.reportEndOfPushReceived(partitionConsumptionState);
        if (this.isDataRecovery && partitionConsumptionState.isBatchOnly()) {
            partitionConsumptionState.setDataRecoveryCompleted(true);
            this.statusReportAdapter.reportDataRecoveryCompleted(partitionConsumptionState);
        }
    }

    protected void processStartOfIncrementalPush(ControlMessage controlMessage, PartitionConsumptionState partitionConsumptionState) {
        this.statusReportAdapter.reportStartOfIncrementalPushReceived(partitionConsumptionState, ((StartOfIncrementalPush) controlMessage.controlMessageUnion).version.toString());
    }

    protected void processEndOfIncrementalPush(ControlMessage controlMessage, PartitionConsumptionState partitionConsumptionState) {
        this.statusReportAdapter.reportEndOfIncrementalPushReceived(partitionConsumptionState, ((EndOfIncrementalPush) controlMessage.controlMessageUnion).version.toString());
    }

    protected void processVersionSwapMessage(ControlMessage controlMessage, int i, PartitionConsumptionState partitionConsumptionState) {
    }

    protected boolean processTopicSwitch(ControlMessage controlMessage, int i, long j, PartitionConsumptionState partitionConsumptionState) {
        throw new VeniceException(ControlMessageType.TOPIC_SWITCH.name() + " control message should not be received inOnline/Offline state model. Topic " + this.kafkaVersionTopic + " partition " + i);
    }

    private boolean processControlMessage(KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int i, long j, PartitionConsumptionState partitionConsumptionState) {
        boolean z = false;
        ControlMessageType valueOf = ControlMessageType.valueOf(controlMessage);
        if (!isSegmentControlMsg(valueOf)) {
            LOGGER.info("{} : Received {} control message. Partition: {}, Offset: {}", this.consumerTaskId, valueOf.name(), Integer.valueOf(i), Long.valueOf(j));
        }
        switch (AnonymousClass1.$SwitchMap$com$linkedin$venice$kafka$protocol$enums$ControlMessageType[valueOf.ordinal()]) {
            case 1:
            case VeniceServerConfig.MINIMUM_CONSUMER_NUM_IN_CONSUMER_POOL_PER_KAFKA_CLUSTER /* 3 */:
            case ValueRecord.SCHEMA_HEADER_LENGTH /* 4 */:
                break;
            case 2:
                processEndOfPush(kafkaMessageEnvelope, controlMessage, i, j, partitionConsumptionState);
                break;
            case MAX_CONSUMER_ACTION_ATTEMPTS /* 5 */:
                processStartOfIncrementalPush(controlMessage, partitionConsumptionState);
                break;
            case 6:
                processEndOfIncrementalPush(controlMessage, partitionConsumptionState);
                break;
            case 7:
                z = processTopicSwitch(controlMessage, i, j, partitionConsumptionState);
                break;
            case 8:
                processVersionSwapMessage(controlMessage, i, partitionConsumptionState);
                break;
            default:
                throw new UnsupportedMessageTypeException("Unrecognized Control message type " + controlMessage.controlMessageType);
        }
        return z;
    }

    protected abstract void updateOffsetMetadataInOffsetRecord(PartitionConsumptionState partitionConsumptionState);

    protected abstract void updateLatestInMemoryProcessedOffset(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, LeaderProducedRecordContext leaderProducedRecordContext, String str);

    /* JADX WARN: Code restructure failed: missing block: B:40:0x0071, code lost:
    
        if (r14.hasCorrespondingUpstreamMessage() != false) goto L8;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private int internalProcessConsumerRecord(com.linkedin.venice.pubsub.api.PubSubMessage<com.linkedin.venice.message.KafkaKey, com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope, java.lang.Long> r12, com.linkedin.davinci.kafka.consumer.PartitionConsumptionState r13, com.linkedin.davinci.kafka.consumer.LeaderProducedRecordContext r14, java.lang.String r15, long r16) {
        /*
            Method dump skipped, instructions count: 488
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.linkedin.davinci.kafka.consumer.StoreIngestionTask.internalProcessConsumerRecord(com.linkedin.venice.pubsub.api.PubSubMessage, com.linkedin.davinci.kafka.consumer.PartitionConsumptionState, com.linkedin.davinci.kafka.consumer.LeaderProducedRecordContext, java.lang.String, long):int");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void recordWriterStats(long j, long j2, long j3, long j4, PartitionConsumptionState partitionConsumptionState) {
        if (this.isUserSystemStore) {
            return;
        }
        this.versionedDIVStats.recordLatencies(this.storeName, this.versionNumber, j, j2, j3, j4);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void validateMessage(KafkaDataIntegrityValidator kafkaDataIntegrityValidator, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, boolean z, PartitionConsumptionState partitionConsumptionState) {
        try {
            kafkaDataIntegrityValidator.validateMessage(pubSubMessage, z, Lazy.of(() -> {
                TopicManager topicManager = this.topicManagerRepository.getTopicManager();
                PubSubTopic pubSubTopic = pubSubMessage.getTopicPartition().getPubSubTopic();
                return Boolean.valueOf((this.isDataRecovery && isHybridMode() && partitionConsumptionState.getTopicSwitch() == null) || (topicManager.isTopicCompactionEnabled(pubSubTopic) && LatencyUtils.getElapsedTimeInMs(pubSubMessage.getPubSubMessageTime()) >= topicManager.getTopicMinLogCompactionLagMs(pubSubTopic)));
            }));
        } catch (FatalDataValidationException e) {
            this.divErrorMetricCallback.execute(e);
            if (!z) {
                throw e;
            }
            if (((KafkaMessageEnvelope) pubSubMessage.getValue()).producerMetadata.messageSequenceNumber != 1) {
                LOGGER.warn("Data integrity validation problem with: {} offset: {}, but consumption will continue since EOP is already received. Msg: {}", pubSubMessage.getTopicPartition(), pubSubMessage.getOffset(), e.getMessage());
            }
            if (e instanceof ImproperlyStartedSegmentException) {
                return;
            }
            kafkaDataIntegrityValidator.validateMessage(pubSubMessage, z, Lazy.TRUE);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void cloneProducerStates(int i, KafkaDataIntegrityValidator kafkaDataIntegrityValidator) {
        this.kafkaDataIntegrityValidator.cloneProducerStates(i, kafkaDataIntegrityValidator);
    }

    private void prependHeaderAndWriteToStorageEngine(int i, byte[] bArr, Put put, long j) {
        ByteBuffer byteBuffer = put.putValue;
        if (byteBuffer.remaining() == 0 && put.replicationMetadataPayload.remaining() > 0) {
            writeToStorageEngine(i, bArr, put, j);
            return;
        }
        if (byteBuffer.position() < 4) {
            throw new VeniceException("Start position of 'putValue' ByteBuffer shouldn't be less than 4");
        }
        byteBuffer.position(byteBuffer.position() - 4);
        int i2 = byteBuffer.getInt();
        byteBuffer.position(byteBuffer.position() - 4);
        ByteUtils.writeInt(byteBuffer.array(), put.schemaId, byteBuffer.position());
        writeToStorageEngine(i, bArr, put, j);
        byteBuffer.putInt(i2);
    }

    private void writeToStorageEngine(int i, byte[] bArr, Put put, long j) {
        boolean z = this.emitMetrics.get();
        boolean isTraceEnabled = LOGGER.isTraceEnabled();
        long nanoTime = (z || isTraceEnabled) ? System.nanoTime() : 0L;
        putInStorageEngine(i, bArr, put);
        if (this.cacheBackend.isPresent() && this.cacheBackend.get().getStorageEngine(this.kafkaVersionTopic) != null) {
            this.cacheBackend.get().getStorageEngine(this.kafkaVersionTopic).put(i, bArr, put.putValue);
        }
        if (isTraceEnabled) {
            LOGGER.trace("{} : Completed PUT to Store: {} in {} ns at {}", this.consumerTaskId, this.kafkaVersionTopic, Long.valueOf(System.nanoTime() - nanoTime), Long.valueOf(System.currentTimeMillis()));
        }
        if (z) {
            this.hostLevelIngestionStats.recordStorageEnginePutLatency(LatencyUtils.getLatencyInMS(nanoTime), j);
        }
    }

    protected void putInStorageEngine(int i, byte[] bArr, Put put) {
        try {
            this.storageEngine.put(i, bArr, put.putValue);
        } catch (PersistenceFailureException e) {
            throwOrLogStorageFailureDependingIfStillSubscribed(i, e);
        }
    }

    protected void removeFromStorageEngine(int i, byte[] bArr, Delete delete) {
        try {
            this.storageEngine.delete(i, bArr);
        } catch (PersistenceFailureException e) {
            throwOrLogStorageFailureDependingIfStillSubscribed(i, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void throwOrLogStorageFailureDependingIfStillSubscribed(int i, PersistenceFailureException persistenceFailureException) {
        if (this.partitionConsumptionStateMap.containsKey(Integer.valueOf(i))) {
            throw new VeniceException("Caught an exception while trying to interact with the storage engine for partition " + i + " while it still appears to be subscribed.", persistenceFailureException);
        }
        logStorageOperationWhileUnsubscribed(i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void logStorageOperationWhileUnsubscribed(int i) {
        LOGGER.info("Attempted to interact with the storage engine for partition {} while the partitionConsumptionStateMap does not contain this partition. Will ignore the operation as it probably indicates the partition was unsubscribed.", Integer.valueOf(i));
    }

    public boolean consumerHasAnySubscription() {
        return this.aggKafkaConsumerService.hasAnyConsumerAssignedForVersionTopic(this.versionTopic);
    }

    public boolean consumerHasSubscription(PubSubTopic pubSubTopic, PartitionConsumptionState partitionConsumptionState) {
        return this.aggKafkaConsumerService.hasConsumerAssignedFor(this.versionTopic, new PubSubTopicPartitionImpl(pubSubTopic, partitionConsumptionState.getSourceTopicPartitionNumber(pubSubTopic)));
    }

    public void consumerUnSubscribe(PubSubTopic pubSubTopic, PartitionConsumptionState partitionConsumptionState) {
        Instant now = Instant.now();
        int partition = partitionConsumptionState.getPartition();
        this.aggKafkaConsumerService.unsubscribeConsumerFor(this.versionTopic, new PubSubTopicPartitionImpl(pubSubTopic, partition));
        LOGGER.info("Consumer unsubscribed topic {} partition {}. Took {} ms", pubSubTopic, Integer.valueOf(partition), Long.valueOf(Instant.now().toEpochMilli() - now.toEpochMilli()));
    }

    public void consumerBatchUnsubscribe(Set<PubSubTopicPartition> set) {
        Instant now = Instant.now();
        this.aggKafkaConsumerService.batchUnsubscribeConsumerFor(this.versionTopic, set);
        LOGGER.info("Consumer unsubscribed {} partitions. Took {} ms", Integer.valueOf(set.size()), Long.valueOf(Instant.now().toEpochMilli() - now.toEpochMilli()));
    }

    public abstract void consumerUnSubscribeAllTopics(PartitionConsumptionState partitionConsumptionState);

    public void consumerSubscribe(PubSubTopicPartition pubSubTopicPartition, long j, String str) {
        this.aggKafkaConsumerService.createKafkaConsumerService(createKafkaConsumerProperties(this.kafkaProps, str, !Objects.equals(str, this.localKafkaServer)));
        this.aggKafkaConsumerService.subscribeConsumerFor(str, this, pubSubTopicPartition, j);
    }

    public void consumerResetOffset(PubSubTopic pubSubTopic, PartitionConsumptionState partitionConsumptionState) {
        this.aggKafkaConsumerService.resetOffsetFor(this.versionTopic, new PubSubTopicPartitionImpl(pubSubTopic, partitionConsumptionState.getSourceTopicPartitionNumber(pubSubTopic)));
    }

    private void pauseConsumption(String str, int i) {
        this.aggKafkaConsumerService.pauseConsumerFor(this.versionTopic, new PubSubTopicPartitionImpl(this.pubSubTopicRepository.getTopic(str), i));
    }

    private void resumeConsumption(String str, int i) {
        this.aggKafkaConsumerService.resumeConsumerFor(this.versionTopic, new PubSubTopicPartitionImpl(this.pubSubTopicRepository.getTopic(str), i));
    }

    private int processKafkaDataMessage(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, long j) {
        byte[] keyBytes;
        Delete delete;
        int length;
        byte[] keyBytes2;
        Put put;
        int i = 0;
        KafkaKey kafkaKey = (KafkaKey) pubSubMessage.getKey();
        KafkaMessageEnvelope kafkaMessageEnvelope = (KafkaMessageEnvelope) pubSubMessage.getValue();
        int partition = partitionConsumptionState.getPartition();
        switch (AnonymousClass1.$SwitchMap$com$linkedin$venice$kafka$protocol$enums$MessageType[(leaderProducedRecordContext == null ? MessageType.valueOf(kafkaMessageEnvelope) : leaderProducedRecordContext.getMessageType()).ordinal()]) {
            case 1:
                if (leaderProducedRecordContext == null) {
                    keyBytes2 = kafkaKey.getKey();
                    put = (Put) kafkaMessageEnvelope.payloadUnion;
                } else {
                    keyBytes2 = leaderProducedRecordContext.getKeyBytes();
                    put = (Put) leaderProducedRecordContext.getValueUnion();
                }
                i = put.putValue.remaining();
                length = keyBytes2.length;
                partitionConsumptionState.maybeUpdateExpectedChecksum(keyBytes2, put);
                prependHeaderAndWriteToStorageEngine(partition, keyBytes2, put, j);
                if (put.schemaId > 0) {
                    this.valueSchemaId = put.schemaId;
                    break;
                }
                break;
            case 2:
                if (leaderProducedRecordContext == null) {
                    keyBytes = kafkaKey.getKey();
                    delete = (Delete) kafkaMessageEnvelope.payloadUnion;
                } else {
                    keyBytes = leaderProducedRecordContext.getKeyBytes();
                    delete = (Delete) leaderProducedRecordContext.getValueUnion();
                }
                length = keyBytes.length;
                removeFromStorageEngine(partition, keyBytes, delete);
                if (this.cacheBackend.isPresent() && this.cacheBackend.get().getStorageEngine(this.kafkaVersionTopic) != null) {
                    this.cacheBackend.get().getStorageEngine(this.kafkaVersionTopic).delete(partition, keyBytes);
                    break;
                }
                break;
            case VeniceServerConfig.MINIMUM_CONSUMER_NUM_IN_CONSUMER_POOL_PER_KAFKA_CLUSTER /* 3 */:
                throw new VeniceMessageException(this.consumerTaskId + ": Not expecting UPDATE message from: " + pubSubMessage.getTopicPartition() + ", Offset: " + pubSubMessage.getOffset());
            default:
                throw new VeniceMessageException(this.consumerTaskId + " : Invalid/Unrecognized operation type submitted: " + kafkaMessageEnvelope.messageType);
        }
        if (this.purgeTransientRecordBuffer && isTransientRecordBufferUsed() && partitionConsumptionState.isEndOfPushReceived() && leaderProducedRecordContext != null && leaderProducedRecordContext.getConsumedOffset() != -1) {
            partitionConsumptionState.mayRemoveTransientRecord(leaderProducedRecordContext.getConsumedKafkaClusterId(), leaderProducedRecordContext.getConsumedOffset(), kafkaKey.getKey());
        }
        if (this.emitMetrics.get()) {
            this.hostLevelIngestionStats.recordKeySize(length, j);
            this.hostLevelIngestionStats.recordValueSize(i, j);
        }
        return length + i;
    }

    private void waitReadyToProcessRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage) throws InterruptedException {
        KafkaMessageEnvelope kafkaMessageEnvelope = (KafkaMessageEnvelope) pubSubMessage.getValue();
        if (((KafkaKey) pubSubMessage.getKey()).isControlMessage() || kafkaMessageEnvelope == null) {
            return;
        }
        switch (AnonymousClass1.$SwitchMap$com$linkedin$venice$kafka$protocol$enums$MessageType[MessageType.valueOf(kafkaMessageEnvelope).ordinal()]) {
            case 1:
                Put put = (Put) kafkaMessageEnvelope.payloadUnion;
                waitReadyToProcessDataRecord(put.schemaId);
                try {
                    deserializeValue(put.schemaId, put.putValue, pubSubMessage);
                    return;
                } catch (Exception e) {
                    PartitionConsumptionState partitionConsumptionState = this.partitionConsumptionStateMap.get(Integer.valueOf(pubSubMessage.getTopicPartition().getPartitionNumber()));
                    throw new VeniceException("Failed to deserialize PUT for: " + pubSubMessage.getTopicPartition() + ", offset: " + pubSubMessage.getOffset() + ", schema id: " + put.schemaId + ", LF state: " + (partitionConsumptionState == null ? null : partitionConsumptionState.getLeaderFollowerState()), e);
                }
            case 2:
                return;
            case VeniceServerConfig.MINIMUM_CONSUMER_NUM_IN_CONSUMER_POOL_PER_KAFKA_CLUSTER /* 3 */:
                waitReadyToProcessDataRecord(((Update) kafkaMessageEnvelope.payloadUnion).schemaId);
                return;
            default:
                throw new VeniceMessageException(this.consumerTaskId + " : Invalid/Unrecognized operation type submitted: " + kafkaMessageEnvelope.messageType);
        }
    }

    private void waitReadyToProcessDataRecord(int i) throws InterruptedException {
        if (i == -1) {
            return;
        }
        if (i != AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion() && i != AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()) {
            waitUntilValueSchemaAvailable(i);
        } else if (!waitVersionStateAvailable(this.kafkaVersionTopic).chunked) {
            throw new VeniceException("Detected chunking in a store-version where chunking is NOT enabled. Will abort ingestion.");
        }
    }

    protected StoreVersionState waitVersionStateAvailable(String str) throws InterruptedException {
        long currentTimeMillis;
        long currentTimeMillis2 = System.currentTimeMillis();
        while (true) {
            StoreVersionState storeVersionState = this.storageEngine.getStoreVersionState();
            currentTimeMillis = System.currentTimeMillis() - currentTimeMillis2;
            if (storeVersionState != null) {
                LOGGER.info("Version state is available for {} after {} ms", str, Long.valueOf(currentTimeMillis));
                return storeVersionState;
            }
            if (currentTimeMillis > SCHEMA_POLLING_TIMEOUT_MS || !isRunning()) {
                break;
            }
            Thread.sleep(SCHEMA_POLLING_DELAY_MS);
        }
        LOGGER.warn("Version state is not available for {} after {}", str, Long.valueOf(currentTimeMillis));
        throw new VeniceException("Store version state is not available for " + str);
    }

    /* JADX WARN: Code restructure failed: missing block: B:14:0x00b3, code lost:
    
        com.linkedin.davinci.kafka.consumer.StoreIngestionTask.LOGGER.warn("Value schema [{}] is not available for {} after {} ms", java.lang.Integer.valueOf(r8), r7.storeName, java.lang.Long.valueOf(r0));
     */
    /* JADX WARN: Code restructure failed: missing block: B:15:0x00f3, code lost:
    
        throw new com.linkedin.venice.exceptions.VeniceException("Value schema [" + r8 + "] is not available for " + r7.storeName);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void waitUntilValueSchemaAvailable(int r8) throws java.lang.InterruptedException {
        /*
            Method dump skipped, instructions count: 253
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.linkedin.davinci.kafka.consumer.StoreIngestionTask.waitUntilValueSchemaAvailable(int):void");
    }

    private void deserializeValue(int i, ByteBuffer byteBuffer, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage) throws IOException {
        if (i < 0 || this.deserializedSchemaIds.get(i) != null || this.availableSchemaIds.get(i) == null) {
            return;
        }
        if (!pubSubMessage.getTopicPartition().getPubSubTopic().isRealTime()) {
            byteBuffer = ((VeniceCompressor) this.compressor.get()).decompress(byteBuffer);
        }
        SchemaEntry valueSchema = this.schemaRepository.getValueSchema(this.storeName, i);
        if (valueSchema != null) {
            Schema schema = valueSchema.getSchema();
            new AvroGenericDeserializer(schema, schema).deserialize(byteBuffer);
            LOGGER.info("Value deserialization succeeded with schema id {} for: {}", Integer.valueOf(i), pubSubMessage.getTopicPartition());
            this.deserializedSchemaIds.set(i, new Object());
        }
    }

    private synchronized void complete() {
        if (this.consumerActionsQueue.isEmpty()) {
            close();
        } else {
            LOGGER.info("{} consumerActionsQueue is not empty, ignoring complete() call.", this.consumerTaskId);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        this.isRunning.set(false);
    }

    public synchronized void shutdown(int i) {
        long currentTimeMillis = System.currentTimeMillis();
        close();
        try {
            wait(i);
        } catch (Exception e) {
            LOGGER.error("Caught exception while waiting for ingestion task of topic: {} shutdown.", this.kafkaVersionTopic);
        }
        LOGGER.info("Ingestion task of topic: {} is shutdown in {}ms", this.kafkaVersionTopic, Long.valueOf(LatencyUtils.getElapsedTimeInMs(currentTimeMillis)));
    }

    public boolean isRunning() {
        return this.isRunning.get();
    }

    public PubSubTopic getVersionTopic() {
        return this.versionTopic;
    }

    public boolean isMetricsEmissionEnabled() {
        return this.emitMetrics.get();
    }

    public void enableMetricsEmission() {
        this.emitMetrics.set(true);
    }

    public void disableMetricsEmission() {
        this.emitMetrics.set(false);
    }

    public boolean isPartitionConsuming(int i) {
        AmplificationFactorAdapter amplificationFactorAdapter = this.amplificationFactorAdapter;
        ConcurrentMap<Integer, PartitionConsumptionState> concurrentMap = this.partitionConsumptionStateMap;
        Objects.requireNonNull(concurrentMap);
        return amplificationFactorAdapter.meetsAny(i, (v1) -> {
            return r2.containsKey(v1);
        });
    }

    protected Properties createKafkaConsumerProperties(Properties properties, String str, boolean z) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.setProperty("kafka.bootstrap.servers", str);
        VeniceProperties kafkaConsumerConfigsForRemoteConsumption = z ? this.serverConfig.getKafkaConsumerConfigsForRemoteConsumption() : this.serverConfig.getKafkaConsumerConfigsForLocalConsumption();
        if (!kafkaConsumerConfigsForRemoteConsumption.isEmpty()) {
            properties2.putAll(kafkaConsumerConfigsForRemoteConsumption.toProperties());
        }
        return properties2;
    }

    private ReadyToServeCheck getDefaultReadyToServeChecker() {
        return (partitionConsumptionState, z) -> {
            if (z || (partitionConsumptionState.isEndOfPushReceived() && !partitionConsumptionState.isCompletionReported())) {
                if (!isReadyToServe(partitionConsumptionState)) {
                    this.statusReportAdapter.reportProgress(partitionConsumptionState);
                    return;
                }
                int partition = partitionConsumptionState.getPartition();
                Store storeOrThrow = this.storeRepository.getStoreOrThrow(this.storeName);
                AbstractStorageEngine localStorageEngine = this.storageEngineRepository.getLocalStorageEngine(this.kafkaVersionTopic);
                if (storeOrThrow.isHybrid()) {
                    if (localStorageEngine == null) {
                        LOGGER.warn("Storage engine {} was removed before reopening", this.kafkaVersionTopic);
                    } else {
                        LOGGER.info("Reopen partition {}_{} for reading after ready-to-serve.", this.kafkaVersionTopic, Integer.valueOf(partition));
                        localStorageEngine.preparePartitionForReading(partition);
                    }
                }
                if (partitionConsumptionState.isCompletionReported()) {
                    LOGGER.info("{} Partition {} synced offset: {}", this.consumerTaskId, Integer.valueOf(partition), Long.valueOf(partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset()));
                } else {
                    this.statusReportAdapter.reportCompleted(partitionConsumptionState);
                    LOGGER.info("{} Partition {} is ready to serve", this.consumerTaskId, Integer.valueOf(partition));
                    warmupSchemaCache(storeOrThrow);
                }
                if (this.suppressLiveUpdates) {
                    String str = this.consumerTaskId + " Live update suppression is enabled. Stop consumption for partition " + partition;
                    if (!REDUNDANT_LOGGING_FILTER.isRedundantException(str)) {
                        LOGGER.info(str);
                    }
                    unSubscribePartition(new PubSubTopicPartitionImpl(this.versionTopic, partition));
                }
            }
        };
    }

    private void warmupSchemaCache(Store store) {
        if (store.isReadComputationEnabled() && this.valueSchemaId >= 1) {
            try {
                waitUntilValueSchemaAvailable(this.valueSchemaId);
                int numSchemaFastClassWarmup = this.serverConfig.getNumSchemaFastClassWarmup();
                long fastClassSchemaWarmupTimeout = this.serverConfig.getFastClassSchemaWarmupTimeout();
                int i = numSchemaFastClassWarmup >= this.valueSchemaId ? 1 : this.valueSchemaId - numSchemaFastClassWarmup;
                Schema schema = this.schemaRepository.getValueSchema(this.storeName, this.valueSchemaId).getSchema();
                HashSet hashSet = new HashSet();
                for (int i2 = this.valueSchemaId; i2 >= i; i2--) {
                    hashSet.add(this.schemaRepository.getValueSchema(this.storeName, i2).getSchema());
                }
                if (store.getLatestSuperSetValueSchemaId() > 0) {
                    hashSet.add(this.schemaRepository.getValueSchema(this.storeName, store.getLatestSuperSetValueSchemaId()).getSchema());
                }
                Iterator it = hashSet.iterator();
                while (it.hasNext()) {
                    FastSerializerDeserializerFactory.cacheFastAvroGenericDeserializer(schema, (Schema) it.next(), fastClassSchemaWarmupTimeout);
                }
            } catch (InterruptedException e) {
                LOGGER.error("Got interrupted while trying to fetch value schema");
            }
        }
    }

    public void reportError(String str, int i, Exception exc) {
        ArrayList arrayList = new ArrayList();
        IntListIterator it = PartitionUtils.getSubPartitions(i, this.amplificationFactor).iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            if (this.partitionConsumptionStateMap.containsKey(Integer.valueOf(intValue))) {
                arrayList.add(this.partitionConsumptionStateMap.get(Integer.valueOf(intValue)));
            }
        }
        this.statusReportAdapter.reportError(arrayList, str, exc);
    }

    public int getAmplificationFactor() {
        return this.amplificationFactor;
    }

    protected StatusReportAdapter getStatusReportAdapter() {
        return this.statusReportAdapter;
    }

    public boolean isActiveActiveReplicationEnabled() {
        return this.isActiveActiveReplicationEnabled;
    }

    public void dumpPartitionConsumptionStates(AdminResponse adminResponse, ComplementSet<Integer> complementSet) {
        for (Map.Entry<Integer, PartitionConsumptionState> entry : this.partitionConsumptionStateMap.entrySet()) {
            try {
                if (complementSet.contains(entry.getKey())) {
                    adminResponse.addPartitionConsumptionState(entry.getValue());
                }
            } catch (Throwable th) {
                LOGGER.error("Error when dumping consumption state for store {} partition {}", this.storeName, entry.getKey(), th);
            }
        }
    }

    public void dumpStoreVersionState(AdminResponse adminResponse) {
        StoreVersionState storeVersionState = this.storageEngine.getStoreVersionState();
        if (storeVersionState != null) {
            adminResponse.addStoreVersionState(storeVersionState);
        }
    }

    public VeniceServerConfig getServerConfig() {
        return this.serverConfig;
    }

    public void updateOffsetMetadataAndSync(String str, int i) {
        PartitionConsumptionState partitionConsumptionState = getPartitionConsumptionState(i);
        updateOffsetMetadataInOffsetRecord(partitionConsumptionState);
        syncOffset(str, partitionConsumptionState);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TopicManager getTopicManager(String str) {
        return str.equals(this.localKafkaServer) ? this.topicManagerRepository.getTopicManager() : this.topicManagerRepository.getTopicManager(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitForAllMessageToBeProcessedFromTopicPartition(PubSubTopicPartition pubSubTopicPartition, PartitionConsumptionState partitionConsumptionState) throws InterruptedException {
        this.storeBufferService.drainBufferedRecordsFromTopicPartition(pubSubTopicPartition);
    }

    protected abstract DelegateConsumerRecordResult delegateConsumerRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, int i, String str, int i2, long j, long j2);

    protected void recordProcessedRecordStats(PartitionConsumptionState partitionConsumptionState, int i) {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isSegmentControlMsg(ControlMessageType controlMessageType) {
        return ControlMessageType.START_OF_SEGMENT.equals(controlMessageType) || ControlMessageType.END_OF_SEGMENT.equals(controlMessageType);
    }

    protected boolean isTransientRecordBufferUsed() {
        return this.isWriteComputationEnabled;
    }

    protected void setPartitionConsumptionState(int i, PartitionConsumptionState partitionConsumptionState) {
        this.partitionConsumptionStateMap.put(Integer.valueOf(i), partitionConsumptionState);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AggVersionedDIVStats getVersionedDIVStats() {
        return this.versionedDIVStats;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AggVersionedIngestionStats getVersionIngestionStats() {
        return this.versionedIngestionStats;
    }

    protected CompressionStrategy getCompressionStrategy() {
        return this.compressionStrategy;
    }

    protected Lazy<VeniceCompressor> getCompressor() {
        return this.compressor;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isChunked() {
        return this.isChunked;
    }

    protected ReadOnlySchemaRepository getSchemaRepo() {
        return this.schemaRepository;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HostLevelIngestionStats getHostLevelIngestionStats() {
        return this.hostLevelIngestionStats;
    }

    protected String getKafkaVersionTopic() {
        return this.kafkaVersionTopic;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean shouldUpdateUpstreamOffset(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage) {
        if (pubSubMessage == null) {
            return false;
        }
        KafkaMessageEnvelope kafkaMessageEnvelope = (KafkaMessageEnvelope) pubSubMessage.getValue();
        return kafkaMessageEnvelope.leaderMetadataFooter != null && kafkaMessageEnvelope.leaderMetadataFooter.upstreamOffset >= 0;
    }
}
