package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.ingestion.consumption.ConsumedDataReceiver;
import com.linkedin.davinci.stats.KafkaConsumerServiceStats;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.kafka.KafkaPubSubMessageDeserializer;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.throttle.EventThrottler;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.IndexedHashMap;
import com.linkedin.venice.utils.IndexedMap;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.RedundantExceptionFilter;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.tehuti.metrics.MetricsRepository;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.IntConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/KafkaConsumerService.class */
public abstract class KafkaConsumerService extends AbstractVeniceService {
    private static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER = RedundantExceptionFilter.getRedundantExceptionFilter();
    private final ExecutorService consumerExecutor;
    protected final String kafkaUrl;
    private final Logger LOGGER;
    protected KafkaConsumerServiceStats stats;
    protected final IndexedMap<SharedKafkaConsumer, ConsumptionTask> consumerToConsumptionTask;
    protected final Map<PubSubTopic, Map<PubSubTopicPartition, SharedKafkaConsumer>> versionTopicToTopicPartitionToConsumer = new VeniceConcurrentHashMap();

    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/KafkaConsumerService$ConsumerAssignmentStrategy.class */
    public enum ConsumerAssignmentStrategy {
        TOPIC_WISE_SHARED_CONSUMER_ASSIGNMENT_STRATEGY(TopicWiseKafkaConsumerService::new),
        PARTITION_WISE_SHARED_CONSUMER_ASSIGNMENT_STRATEGY(PartitionWiseKafkaConsumerService::new);

        final KCSConstructor constructor;

        ConsumerAssignmentStrategy(KCSConstructor kCSConstructor) {
            this.constructor = kCSConstructor;
        }
    }

    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/KafkaConsumerService$KCSConstructor.class */
    interface KCSConstructor {
        KafkaConsumerService construct(PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory, Properties properties, long j, int i, EventThrottler eventThrottler, EventThrottler eventThrottler2, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, MetricsRepository metricsRepository, String str, long j2, TopicExistenceChecker topicExistenceChecker, boolean z, KafkaPubSubMessageDeserializer kafkaPubSubMessageDeserializer, Time time, KafkaConsumerServiceStats kafkaConsumerServiceStats, boolean z2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/KafkaConsumerService$OffsetGetter.class */
    public interface OffsetGetter {
        long apply(PubSubConsumerAdapter pubSubConsumerAdapter, PubSubTopicPartition pubSubTopicPartition);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaConsumerService(PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory, Properties properties, long j, int i, EventThrottler eventThrottler, EventThrottler eventThrottler2, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, MetricsRepository metricsRepository, String str, long j2, TopicExistenceChecker topicExistenceChecker, boolean z, KafkaPubSubMessageDeserializer kafkaPubSubMessageDeserializer, Time time, KafkaConsumerServiceStats kafkaConsumerServiceStats, boolean z2) {
        this.kafkaUrl = properties.getProperty("kafka.bootstrap.servers");
        this.LOGGER = LogManager.getLogger(KafkaConsumerService.class.getSimpleName() + " [" + this.kafkaUrl + "]");
        this.consumerExecutor = Executors.newFixedThreadPool(i, new DaemonThreadFactory("venice-shared-consumer-for-" + this.kafkaUrl));
        this.consumerToConsumptionTask = new IndexedHashMap(i);
        this.stats = kafkaConsumerServiceStats != null ? kafkaConsumerServiceStats : createKafkaConsumerServiceStats(metricsRepository, str, this::getMaxElapsedTimeSinceLastPollInConsumerPool);
        for (int i2 = 0; i2 < i; i2++) {
            properties.setProperty("kafka.client.id", getUniqueClientId(this.kafkaUrl, i2));
            SharedKafkaConsumer sharedKafkaConsumer = new SharedKafkaConsumer(pubSubConsumerAdapterFactory.create(new VeniceProperties(properties), z2, kafkaPubSubMessageDeserializer, (String) null), this.stats, this::recordPartitionsPerConsumerSensor, this::handleUnsubscription);
            Supplier supplier = z ? () -> {
                return kafkaClusterBasedRecordThrottler.poll(sharedKafkaConsumer, this.kafkaUrl, j);
            } : () -> {
                return sharedKafkaConsumer.poll(j);
            };
            IntConsumer intConsumer = i3 -> {
                eventThrottler.maybeThrottle(i3);
            };
            IntConsumer intConsumer2 = i4 -> {
                eventThrottler2.maybeThrottle(i4);
            };
            Objects.requireNonNull(sharedKafkaConsumer);
            Supplier supplier2 = sharedKafkaConsumer::getAssignment;
            KafkaConsumerServiceStats kafkaConsumerServiceStats2 = this.stats;
            Objects.requireNonNull(kafkaConsumerServiceStats2);
            IntConsumer intConsumer3 = kafkaConsumerServiceStats2::recordDetectedDeletedTopicNum;
            Objects.requireNonNull(sharedKafkaConsumer);
            this.consumerToConsumptionTask.putByIndex(sharedKafkaConsumer, new ConsumptionTask(this.kafkaUrl, i2, j, supplier, intConsumer, intConsumer2, this.stats, new ConsumerSubscriptionCleaner(j2, 1000, topicExistenceChecker, supplier2, intConsumer3, sharedKafkaConsumer::batchUnsubscribe, time)), i2);
        }
        this.LOGGER.info("KafkaConsumerService was initialized with {} consumers.", Integer.valueOf(i));
    }

    void handleUnsubscription(SharedKafkaConsumer sharedKafkaConsumer, PubSubTopicPartition pubSubTopicPartition) {
    }

    private String getUniqueClientId(String str, int i) {
        return Utils.getHostName() + "_" + str + "_" + i;
    }

    public SharedKafkaConsumer getConsumerAssignedToVersionTopicPartition(PubSubTopic pubSubTopic, PubSubTopicPartition pubSubTopicPartition) {
        Map<PubSubTopicPartition, SharedKafkaConsumer> map = this.versionTopicToTopicPartitionToConsumer.get(pubSubTopic);
        if (map == null) {
            return null;
        }
        return map.get(pubSubTopicPartition);
    }

    public SharedKafkaConsumer assignConsumerFor(PubSubTopic pubSubTopic, PubSubTopicPartition pubSubTopicPartition) {
        return this.versionTopicToTopicPartitionToConsumer.computeIfAbsent(pubSubTopic, pubSubTopic2 -> {
            return new VeniceConcurrentHashMap();
        }).computeIfAbsent(pubSubTopicPartition, pubSubTopicPartition2 -> {
            return pickConsumerForPartition(pubSubTopic, pubSubTopicPartition);
        });
    }

    protected abstract SharedKafkaConsumer pickConsumerForPartition(PubSubTopic pubSubTopic, PubSubTopicPartition pubSubTopicPartition);

    protected void removeTopicPartitionFromConsumptionTask(PubSubConsumerAdapter pubSubConsumerAdapter, PubSubTopicPartition pubSubTopicPartition) {
        ((ConsumptionTask) this.consumerToConsumptionTask.get(pubSubConsumerAdapter)).removeDataReceiver(pubSubTopicPartition);
    }

    public void unsubscribeAll(PubSubTopic pubSubTopic) {
        this.versionTopicToTopicPartitionToConsumer.compute(pubSubTopic, (pubSubTopic2, map) -> {
            if (map == null) {
                return null;
            }
            map.forEach((pubSubTopicPartition, sharedKafkaConsumer) -> {
                sharedKafkaConsumer.unSubscribe(pubSubTopicPartition);
                removeTopicPartitionFromConsumptionTask(sharedKafkaConsumer, pubSubTopicPartition);
            });
            return null;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unSubscribe(PubSubTopic pubSubTopic, PubSubTopicPartition pubSubTopicPartition) {
        SharedKafkaConsumer consumerAssignedToVersionTopicPartition = getConsumerAssignedToVersionTopicPartition(pubSubTopic, pubSubTopicPartition);
        if (consumerAssignedToVersionTopicPartition != null) {
            consumerAssignedToVersionTopicPartition.unSubscribe(pubSubTopicPartition);
            ((ConsumptionTask) this.consumerToConsumptionTask.get(consumerAssignedToVersionTopicPartition)).removeDataReceiver(pubSubTopicPartition);
            this.versionTopicToTopicPartitionToConsumer.compute(pubSubTopic, (pubSubTopic2, map) -> {
                if (map == null) {
                    return null;
                }
                map.remove(pubSubTopicPartition);
                if (map.isEmpty()) {
                    return null;
                }
                return map;
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void batchUnsubscribe(PubSubTopic pubSubTopic, Set<PubSubTopicPartition> set) {
        HashMap hashMap = new HashMap();
        for (PubSubTopicPartition pubSubTopicPartition : set) {
            SharedKafkaConsumer consumerAssignedToVersionTopicPartition = getConsumerAssignedToVersionTopicPartition(pubSubTopic, pubSubTopicPartition);
            if (consumerAssignedToVersionTopicPartition != null) {
                ((Set) hashMap.computeIfAbsent(consumerAssignedToVersionTopicPartition, pubSubConsumerAdapter -> {
                    return new HashSet();
                })).add(pubSubTopicPartition);
            }
        }
        hashMap.forEach((pubSubConsumerAdapter2, set2) -> {
            pubSubConsumerAdapter2.batchUnsubscribe(set2);
            ConsumptionTask consumptionTask = (ConsumptionTask) this.consumerToConsumptionTask.get(pubSubConsumerAdapter2);
            set2.forEach(pubSubTopicPartition2 -> {
                consumptionTask.removeDataReceiver(pubSubTopicPartition2);
                this.versionTopicToTopicPartitionToConsumer.compute(pubSubTopic, (pubSubTopic2, map) -> {
                    if (map == null) {
                        return null;
                    }
                    map.remove(pubSubTopicPartition2);
                    if (map.isEmpty()) {
                        return null;
                    }
                    return map;
                });
            });
        });
    }

    public boolean startInner() {
        Collection values = this.consumerToConsumptionTask.values();
        ExecutorService executorService = this.consumerExecutor;
        Objects.requireNonNull(executorService);
        values.forEach((v1) -> {
            r1.submit(v1);
        });
        this.consumerExecutor.shutdown();
        this.LOGGER.info("KafkaConsumerService started for {}", this.kafkaUrl);
        return true;
    }

    public void stopInner() throws Exception {
        this.consumerToConsumptionTask.values().forEach((v0) -> {
            v0.stop();
        });
        long currentTimeMillis = System.currentTimeMillis();
        boolean awaitTermination = this.consumerExecutor.awaitTermination(1, TimeUnit.SECONDS);
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        if (awaitTermination) {
            this.LOGGER.info("consumerExecutor terminated gracefully in {} ms.", Long.valueOf(currentTimeMillis2));
        } else {
            this.LOGGER.warn("consumerExecutor timed out after {} ms while awaiting graceful termination. Will force shutdown.", Long.valueOf(currentTimeMillis2));
            long currentTimeMillis3 = System.currentTimeMillis();
            this.consumerExecutor.shutdownNow();
            boolean awaitTermination2 = this.consumerExecutor.awaitTermination(1, TimeUnit.SECONDS);
            long currentTimeMillis4 = System.currentTimeMillis() - currentTimeMillis3;
            if (awaitTermination2) {
                this.LOGGER.info("consumerExecutor terminated forcefully in {} ms.", Long.valueOf(currentTimeMillis4));
            } else {
                this.LOGGER.warn("consumerExecutor timed out after {} ms while awaiting forceful termination.", Long.valueOf(currentTimeMillis4));
            }
        }
        this.consumerToConsumptionTask.keySet().forEach((v0) -> {
            v0.close();
        });
    }

    public boolean hasAnySubscriptionFor(PubSubTopic pubSubTopic) {
        Map<PubSubTopicPartition, SharedKafkaConsumer> map = this.versionTopicToTopicPartitionToConsumer.get(pubSubTopic);
        return (map == null || map.isEmpty()) ? false : true;
    }

    private KafkaConsumerServiceStats createKafkaConsumerServiceStats(MetricsRepository metricsRepository, String str, LongSupplier longSupplier) {
        return new KafkaConsumerServiceStats(metricsRepository, "kafka_consumer_service_for_" + str, longSupplier);
    }

    private long getMaxElapsedTimeSinceLastPollInConsumerPool() {
        long j = -1;
        int i = -1;
        for (ConsumptionTask consumptionTask : this.consumerToConsumptionTask.values()) {
            long elapsedTimeInMs = LatencyUtils.getElapsedTimeInMs(consumptionTask.getLastSuccessfulPollTimestamp());
            if (elapsedTimeInMs > j) {
                j = elapsedTimeInMs;
                i = consumptionTask.getTaskId();
            }
        }
        if (j > 60000) {
            if (!REDUNDANT_LOGGING_FILTER.isRedundantException(this.kafkaUrl + i)) {
                this.LOGGER.warn("Shared consumer ({} - task {}) couldn't make any progress for over {} ms!", this.kafkaUrl, Integer.valueOf(i), Long.valueOf(j));
            }
        }
        return j;
    }

    public void startConsumptionIntoDataReceiver(PubSubTopicPartition pubSubTopicPartition, long j, ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> consumedDataReceiver) {
        PubSubTopic destinationIdentifier = consumedDataReceiver.destinationIdentifier();
        SharedKafkaConsumer assignConsumerFor = assignConsumerFor(destinationIdentifier, pubSubTopicPartition);
        if (assignConsumerFor == null) {
            throw new VeniceException("Shared consumer must exist for version topic: " + destinationIdentifier + " in Kafka cluster: " + this.kafkaUrl);
        }
        ConsumptionTask consumptionTask = (ConsumptionTask) this.consumerToConsumptionTask.get(assignConsumerFor);
        if (consumptionTask == null) {
            throw new IllegalStateException("There should be a " + ConsumptionTask.class.getSimpleName() + " assigned for this " + SharedKafkaConsumer.class.getSimpleName());
        }
        consumptionTask.setDataReceiver(pubSubTopicPartition, consumedDataReceiver);
        assignConsumerFor.subscribe(consumedDataReceiver.destinationIdentifier(), pubSubTopicPartition, j);
    }

    final void recordPartitionsPerConsumerSensor() {
        int i = 0;
        int i2 = Integer.MAX_VALUE;
        int i3 = Integer.MIN_VALUE;
        Iterator it = this.consumerToConsumptionTask.keySet().iterator();
        while (it.hasNext()) {
            int assignmentSize = ((SharedKafkaConsumer) it.next()).getAssignmentSize();
            i += assignmentSize;
            i2 = Math.min(i2, assignmentSize);
            i3 = Math.max(i3, assignmentSize);
        }
        this.stats.recordAvgPartitionsPerConsumer(i / this.consumerToConsumptionTask.size());
        this.stats.recordMaxPartitionsPerConsumer(i3);
        this.stats.recordMinPartitionsPerConsumer(i2);
    }

    public long getOffsetLagFor(PubSubTopic pubSubTopic, PubSubTopicPartition pubSubTopicPartition) {
        OffsetGetter offsetGetter = (v0, v1) -> {
            return v0.getOffsetLag(v1);
        };
        KafkaConsumerServiceStats kafkaConsumerServiceStats = this.stats;
        Objects.requireNonNull(kafkaConsumerServiceStats);
        Runnable runnable = kafkaConsumerServiceStats::recordOffsetLagIsAbsent;
        KafkaConsumerServiceStats kafkaConsumerServiceStats2 = this.stats;
        Objects.requireNonNull(kafkaConsumerServiceStats2);
        return getSomeOffsetFor(pubSubTopic, pubSubTopicPartition, offsetGetter, runnable, kafkaConsumerServiceStats2::recordOffsetLagIsPresent);
    }

    public long getLatestOffsetFor(PubSubTopic pubSubTopic, PubSubTopicPartition pubSubTopicPartition) {
        OffsetGetter offsetGetter = (v0, v1) -> {
            return v0.getLatestOffset(v1);
        };
        KafkaConsumerServiceStats kafkaConsumerServiceStats = this.stats;
        Objects.requireNonNull(kafkaConsumerServiceStats);
        Runnable runnable = kafkaConsumerServiceStats::recordLatestOffsetIsAbsent;
        KafkaConsumerServiceStats kafkaConsumerServiceStats2 = this.stats;
        Objects.requireNonNull(kafkaConsumerServiceStats2);
        return getSomeOffsetFor(pubSubTopic, pubSubTopicPartition, offsetGetter, runnable, kafkaConsumerServiceStats2::recordLatestOffsetIsPresent);
    }

    private long getSomeOffsetFor(PubSubTopic pubSubTopic, PubSubTopicPartition pubSubTopicPartition, OffsetGetter offsetGetter, Runnable runnable, Runnable runnable2) {
        SharedKafkaConsumer consumerAssignedToVersionTopicPartition = getConsumerAssignedToVersionTopicPartition(pubSubTopic, pubSubTopicPartition);
        if (consumerAssignedToVersionTopicPartition == null) {
            runnable.run();
            return -1L;
        }
        long apply = offsetGetter.apply(consumerAssignedToVersionTopicPartition, pubSubTopicPartition);
        if (apply < 0) {
            runnable.run();
        } else {
            runnable2.run();
        }
        return apply;
    }
}
