package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.ingestion.consumption.ConsumedDataReceiver;
import com.linkedin.davinci.kafka.consumer.KafkaConsumerService;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.TopicManagerRepository;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.kafka.KafkaPubSubMessageDeserializer;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.throttle.EventThrottler;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.tehuti.metrics.MetricsRepository;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.class */
public class AggKafkaConsumerService extends AbstractVeniceService {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) AggKafkaConsumerService.class);
    private final PubSubConsumerAdapterFactory consumerFactory;
    private final int numOfConsumersPerKafkaCluster;
    private final long readCycleDelayMs;
    private final long sharedConsumerNonExistingTopicCleanupDelayMS;
    private final EventThrottler bandwidthThrottler;
    private final EventThrottler recordsThrottler;
    private final KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler;
    private final MetricsRepository metricsRepository;
    private final TopicExistenceChecker topicExistenceChecker;
    private final boolean liveConfigBasedKafkaThrottlingEnabled;
    private final boolean isKafkaConsumerOffsetCollectionEnabled;
    private final KafkaConsumerService.ConsumerAssignmentStrategy sharedConsumerAssignmentStrategy;
    private final Map<String, KafkaConsumerService> kafkaServerToConsumerServiceMap = new VeniceConcurrentHashMap();
    private final Map<String, String> kafkaClusterUrlToAliasMap;
    private final Object2IntMap<String> kafkaClusterUrlToIdMap;
    private final KafkaPubSubMessageDeserializer pubSubDeserializer;
    private final TopicManagerRepository.SSLPropertiesSupplier sslPropertiesSupplier;

    public AggKafkaConsumerService(PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory, TopicManagerRepository.SSLPropertiesSupplier sSLPropertiesSupplier, VeniceServerConfig veniceServerConfig, EventThrottler eventThrottler, EventThrottler eventThrottler2, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, MetricsRepository metricsRepository, TopicExistenceChecker topicExistenceChecker, KafkaPubSubMessageDeserializer kafkaPubSubMessageDeserializer) {
        this.consumerFactory = pubSubConsumerAdapterFactory;
        this.readCycleDelayMs = veniceServerConfig.getKafkaReadCycleDelayMs();
        this.numOfConsumersPerKafkaCluster = veniceServerConfig.getConsumerPoolSizePerKafkaCluster();
        this.sharedConsumerNonExistingTopicCleanupDelayMS = veniceServerConfig.getSharedConsumerNonExistingTopicCleanupDelayMS();
        this.bandwidthThrottler = eventThrottler;
        this.recordsThrottler = eventThrottler2;
        this.kafkaClusterBasedRecordThrottler = kafkaClusterBasedRecordThrottler;
        this.metricsRepository = metricsRepository;
        this.topicExistenceChecker = topicExistenceChecker;
        this.liveConfigBasedKafkaThrottlingEnabled = veniceServerConfig.isLiveConfigBasedKafkaThrottlingEnabled();
        this.sharedConsumerAssignmentStrategy = veniceServerConfig.getSharedConsumerAssignmentStrategy();
        this.kafkaClusterUrlToAliasMap = veniceServerConfig.getKafkaClusterUrlToAliasMap();
        this.kafkaClusterUrlToIdMap = veniceServerConfig.getKafkaClusterUrlToIdMap();
        this.isKafkaConsumerOffsetCollectionEnabled = veniceServerConfig.isKafkaConsumerOffsetCollectionEnabled();
        this.pubSubDeserializer = kafkaPubSubMessageDeserializer;
        this.sslPropertiesSupplier = sSLPropertiesSupplier;
        LOGGER.info("Successfully initialized AggKafkaConsumerService");
    }

    @Override // com.linkedin.venice.service.AbstractVeniceService
    public boolean startInner() {
        return true;
    }

    @Override // com.linkedin.venice.service.AbstractVeniceService
    public void stopInner() throws Exception {
        Iterator<KafkaConsumerService> it2 = this.kafkaServerToConsumerServiceMap.values().iterator();
        while (it2.hasNext()) {
            it2.next().stop();
        }
    }

    private KafkaConsumerService getKafkaConsumerService(String str) {
        return this.kafkaServerToConsumerServiceMap.get(str);
    }

    public synchronized KafkaConsumerService createKafkaConsumerService(Properties properties) {
        String property = properties.getProperty("kafka.bootstrap.servers");
        properties.putAll(this.sslPropertiesSupplier.get(property).toProperties());
        if (property == null || property.isEmpty()) {
            throw new IllegalArgumentException("Kafka URL must be set in the consumer properties config. Got: " + property);
        }
        KafkaConsumerService kafkaConsumerService = this.kafkaServerToConsumerServiceMap.get(property);
        if (kafkaConsumerService != null) {
            LOGGER.warn("KafkaConsumerService has already been created for Kafka cluster with URL: {}", property);
            return kafkaConsumerService;
        }
        KafkaConsumerService computeIfAbsent = this.kafkaServerToConsumerServiceMap.computeIfAbsent(property, str -> {
            return this.sharedConsumerAssignmentStrategy.constructor.construct(this.consumerFactory, properties, this.readCycleDelayMs, this.numOfConsumersPerKafkaCluster, this.bandwidthThrottler, this.recordsThrottler, this.kafkaClusterBasedRecordThrottler, this.metricsRepository, this.kafkaClusterUrlToAliasMap.getOrDefault(str, str), this.sharedConsumerNonExistingTopicCleanupDelayMS, this.topicExistenceChecker, this.liveConfigBasedKafkaThrottlingEnabled, this.pubSubDeserializer, SystemTime.INSTANCE, null, this.isKafkaConsumerOffsetCollectionEnabled);
        });
        if (!computeIfAbsent.isRunning()) {
            computeIfAbsent.start();
        }
        return computeIfAbsent;
    }

    public boolean hasConsumerAssignedFor(String str, PubSubTopic pubSubTopic, PubSubTopicPartition pubSubTopicPartition) {
        SharedKafkaConsumer consumerAssignedToVersionTopicPartition;
        KafkaConsumerService kafkaConsumerService = getKafkaConsumerService(str);
        return (kafkaConsumerService == null || (consumerAssignedToVersionTopicPartition = kafkaConsumerService.getConsumerAssignedToVersionTopicPartition(pubSubTopic, pubSubTopicPartition)) == null || !consumerAssignedToVersionTopicPartition.hasSubscription(pubSubTopicPartition)) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasConsumerAssignedFor(PubSubTopic pubSubTopic, PubSubTopicPartition pubSubTopicPartition) {
        Iterator<String> it2 = this.kafkaServerToConsumerServiceMap.keySet().iterator();
        while (it2.hasNext()) {
            if (hasConsumerAssignedFor(it2.next(), pubSubTopic, pubSubTopicPartition)) {
                return true;
            }
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasAnyConsumerAssignedForVersionTopic(PubSubTopic pubSubTopic) {
        Iterator<KafkaConsumerService> it2 = this.kafkaServerToConsumerServiceMap.values().iterator();
        while (it2.hasNext()) {
            if (it2.next().hasAnySubscriptionFor(pubSubTopic)) {
                return true;
            }
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void resetOffsetFor(PubSubTopic pubSubTopic, PubSubTopicPartition pubSubTopicPartition) {
        Iterator<KafkaConsumerService> it2 = this.kafkaServerToConsumerServiceMap.values().iterator();
        while (it2.hasNext()) {
            SharedKafkaConsumer consumerAssignedToVersionTopicPartition = it2.next().getConsumerAssignedToVersionTopicPartition(pubSubTopic, pubSubTopicPartition);
            if (consumerAssignedToVersionTopicPartition != null) {
                consumerAssignedToVersionTopicPartition.resetOffset(pubSubTopicPartition);
            }
        }
    }

    public void unsubscribeConsumerFor(PubSubTopic pubSubTopic, PubSubTopicPartition pubSubTopicPartition) {
        Iterator<KafkaConsumerService> it2 = this.kafkaServerToConsumerServiceMap.values().iterator();
        while (it2.hasNext()) {
            it2.next().unSubscribe(pubSubTopic, pubSubTopicPartition);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void batchUnsubscribeConsumerFor(PubSubTopic pubSubTopic, Set<PubSubTopicPartition> set) {
        Iterator<KafkaConsumerService> it2 = this.kafkaServerToConsumerServiceMap.values().iterator();
        while (it2.hasNext()) {
            it2.next().batchUnsubscribe(pubSubTopic, set);
        }
    }

    public ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> subscribeConsumerFor(String str, StoreIngestionTask storeIngestionTask, PubSubTopicPartition pubSubTopicPartition, long j) {
        PubSubTopic versionTopic = storeIngestionTask.getVersionTopic();
        KafkaConsumerService kafkaConsumerService = getKafkaConsumerService(str);
        if (kafkaConsumerService == null) {
            throw new VeniceException("Kafka consumer service must exist for version topic: " + versionTopic + " in Kafka cluster: " + str);
        }
        StorePartitionDataReceiver storePartitionDataReceiver = new StorePartitionDataReceiver(storeIngestionTask, pubSubTopicPartition, str, this.kafkaClusterUrlToIdMap.getOrDefault(str, -1));
        kafkaConsumerService.startConsumptionIntoDataReceiver(pubSubTopicPartition, j, storePartitionDataReceiver);
        return storePartitionDataReceiver;
    }

    public long getOffsetLagFor(String str, PubSubTopic pubSubTopic, PubSubTopicPartition pubSubTopicPartition) {
        KafkaConsumerService kafkaConsumerService = getKafkaConsumerService(str);
        if (kafkaConsumerService == null) {
            return -1L;
        }
        return kafkaConsumerService.getOffsetLagFor(pubSubTopic, pubSubTopicPartition);
    }

    public long getLatestOffsetFor(String str, PubSubTopic pubSubTopic, PubSubTopicPartition pubSubTopicPartition) {
        KafkaConsumerService kafkaConsumerService = getKafkaConsumerService(str);
        if (kafkaConsumerService == null) {
            return -1L;
        }
        return kafkaConsumerService.getLatestOffsetFor(pubSubTopic, pubSubTopicPartition);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unsubscribeAll(PubSubTopic pubSubTopic) {
        this.kafkaServerToConsumerServiceMap.values().forEach(kafkaConsumerService -> {
            kafkaConsumerService.unsubscribeAll(pubSubTopic);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void pauseConsumerFor(PubSubTopic pubSubTopic, PubSubTopicPartition pubSubTopicPartition) {
        Iterator<KafkaConsumerService> it2 = this.kafkaServerToConsumerServiceMap.values().iterator();
        while (it2.hasNext()) {
            SharedKafkaConsumer consumerAssignedToVersionTopicPartition = it2.next().getConsumerAssignedToVersionTopicPartition(pubSubTopic, pubSubTopicPartition);
            if (consumerAssignedToVersionTopicPartition != null) {
                consumerAssignedToVersionTopicPartition.pause(pubSubTopicPartition);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void resumeConsumerFor(PubSubTopic pubSubTopic, PubSubTopicPartition pubSubTopicPartition) {
        Iterator<KafkaConsumerService> it2 = this.kafkaServerToConsumerServiceMap.values().iterator();
        while (it2.hasNext()) {
            SharedKafkaConsumer consumerAssignedToVersionTopicPartition = it2.next().getConsumerAssignedToVersionTopicPartition(pubSubTopic, pubSubTopicPartition);
            if (consumerAssignedToVersionTopicPartition != null) {
                consumerAssignedToVersionTopicPartition.resume(pubSubTopicPartition);
            }
        }
    }

    Set<String> getKafkaUrlsFor(PubSubTopic pubSubTopic) {
        HashSet hashSet = new HashSet(this.kafkaServerToConsumerServiceMap.size());
        for (Map.Entry<String, KafkaConsumerService> entry : this.kafkaServerToConsumerServiceMap.entrySet()) {
            if (entry.getValue().hasAnySubscriptionFor(pubSubTopic)) {
                hashSet.add(entry.getKey());
            }
        }
        return hashSet;
    }
}
