package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.stats.KafkaConsumerServiceStats;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.kafka.KafkaPubSubMessageDeserializer;
import com.linkedin.venice.throttle.EventThrottler;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.tehuti.metrics.MetricsRepository;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/TopicWiseKafkaConsumerService.class */
public class TopicWiseKafkaConsumerService extends KafkaConsumerService {
    private final Map<PubSubTopic, SharedKafkaConsumer> versionTopicToConsumerMap;
    private final Map<SharedKafkaConsumer, Set<String>> consumerToStoresMap;
    private final Logger LOGGER;

    /* JADX INFO: Access modifiers changed from: package-private */
    public TopicWiseKafkaConsumerService(PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory, Properties properties, long j, int i, EventThrottler eventThrottler, EventThrottler eventThrottler2, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, MetricsRepository metricsRepository, String str, long j2, TopicExistenceChecker topicExistenceChecker, boolean z, KafkaPubSubMessageDeserializer kafkaPubSubMessageDeserializer, Time time, KafkaConsumerServiceStats kafkaConsumerServiceStats, boolean z2) {
        super(pubSubConsumerAdapterFactory, properties, j, i, eventThrottler, eventThrottler2, kafkaClusterBasedRecordThrottler, metricsRepository, str, j2, topicExistenceChecker, z, kafkaPubSubMessageDeserializer, time, kafkaConsumerServiceStats, z2);
        this.versionTopicToConsumerMap = new VeniceConcurrentHashMap();
        this.consumerToStoresMap = new VeniceConcurrentHashMap();
        this.LOGGER = LogManager.getLogger(TopicWiseKafkaConsumerService.class + " [" + this.kafkaUrl + "]");
    }

    @Override // com.linkedin.davinci.kafka.consumer.KafkaConsumerService
    protected synchronized SharedKafkaConsumer pickConsumerForPartition(PubSubTopic pubSubTopic, PubSubTopicPartition pubSubTopicPartition) {
        SharedKafkaConsumer sharedKafkaConsumer = this.versionTopicToConsumerMap.get(pubSubTopic);
        if (sharedKafkaConsumer != null) {
            this.LOGGER.info("The version topic: {} has been subscribed previously, so this function will return the previously assigned shared consumer directly", pubSubTopic);
        } else {
            boolean z = false;
            int i = Integer.MAX_VALUE;
            Iterator<SharedKafkaConsumer> it2 = this.consumerToConsumptionTask.keySet().iterator();
            while (true) {
                if (!it2.hasNext()) {
                    break;
                }
                SharedKafkaConsumer next = it2.next();
                if (checkWhetherConsumerHasSubscribedSameStore(next, pubSubTopic)) {
                    this.LOGGER.info("Current consumer has already subscribed the same store as the new topic: {}, will skip it and try next consumer in consumer pool", pubSubTopic);
                } else {
                    if (!isConsumerAssignedTopic(next)) {
                        sharedKafkaConsumer = next;
                        z = true;
                        break;
                    }
                    int assignmentSize = next.getAssignmentSize();
                    if (assignmentSize < i) {
                        i = assignmentSize;
                        sharedKafkaConsumer = next;
                    }
                }
            }
            if (sharedKafkaConsumer == null) {
                this.stats.recordConsumerSelectionForTopicError();
                throw new VeniceException("Failed to find consumer for topic: " + pubSubTopic + ", and it might be caused by that all the existing consumers have subscribed the same store, and that might be caused by a bug or resource leaking");
            }
            if (z) {
                this.LOGGER.info("Assigned a shared consumer with index of {} for topic: {} with least # of partitions assigned ({}) and subscribed it to {}", Integer.valueOf(this.consumerToConsumptionTask.indexOf(sharedKafkaConsumer)), pubSubTopic, Integer.valueOf(i), pubSubTopicPartition);
            } else {
                this.LOGGER.info("Assigned a shared consumer with index of {} for topic: {} without any partitions assigned and subscribed it to {}", Integer.valueOf(this.consumerToConsumptionTask.indexOf(sharedKafkaConsumer)), pubSubTopic, pubSubTopicPartition);
            }
        }
        assignVersionTopicToConsumer(pubSubTopic, sharedKafkaConsumer);
        return sharedKafkaConsumer;
    }

    private boolean checkWhetherConsumerHasSubscribedSameStore(SharedKafkaConsumer sharedKafkaConsumer, PubSubTopic pubSubTopic) {
        String storeName = pubSubTopic.getStoreName();
        Set<String> set = this.consumerToStoresMap.get(sharedKafkaConsumer);
        return set != null && set.contains(storeName);
    }

    @Override // com.linkedin.davinci.kafka.consumer.KafkaConsumerService
    public synchronized void unsubscribeAll(PubSubTopic pubSubTopic) {
        SharedKafkaConsumer sharedKafkaConsumer = this.versionTopicToConsumerMap.get(pubSubTopic);
        if (sharedKafkaConsumer == null) {
            this.LOGGER.warn("No assigned shared consumer found for this version topic: {}", pubSubTopic);
        } else {
            removeTopicFromConsumer(pubSubTopic, sharedKafkaConsumer);
            super.unsubscribeAll(pubSubTopic);
        }
    }

    private boolean isConsumerAssignedTopic(SharedKafkaConsumer sharedKafkaConsumer) {
        return this.consumerToStoresMap.containsKey(sharedKafkaConsumer);
    }

    private void assignVersionTopicToConsumer(PubSubTopic pubSubTopic, SharedKafkaConsumer sharedKafkaConsumer) {
        this.versionTopicToConsumerMap.put(pubSubTopic, sharedKafkaConsumer);
        this.consumerToStoresMap.computeIfAbsent(sharedKafkaConsumer, sharedKafkaConsumer2 -> {
            return new HashSet();
        }).add(pubSubTopic.getStoreName());
    }

    private void removeTopicFromConsumer(PubSubTopic pubSubTopic, SharedKafkaConsumer sharedKafkaConsumer) {
        this.versionTopicToConsumerMap.remove(pubSubTopic);
        String storeName = pubSubTopic.getStoreName();
        this.consumerToStoresMap.compute(sharedKafkaConsumer, (sharedKafkaConsumer2, set) -> {
            if (set == null) {
                return null;
            }
            set.remove(storeName);
            if (set.isEmpty()) {
                return null;
            }
            return set;
        });
    }
}
