package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.stats.KafkaConsumerServiceStats;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.kafka.KafkaPubSubMessageDeserializer;
import com.linkedin.venice.throttle.EventThrottler;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.tehuti.metrics.MetricsRepository;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/PartitionWiseKafkaConsumerService.class */
public class PartitionWiseKafkaConsumerService extends KafkaConsumerService {
    private final Map<PubSubTopicPartition, Set<PubSubConsumerAdapter>> rtTopicPartitionToConsumerMap;
    private final Logger logger;
    private int shareConsumerIndex;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartitionWiseKafkaConsumerService(PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory, Properties properties, long j, int i, EventThrottler eventThrottler, EventThrottler eventThrottler2, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, MetricsRepository metricsRepository, String str, long j2, TopicExistenceChecker topicExistenceChecker, boolean z, KafkaPubSubMessageDeserializer kafkaPubSubMessageDeserializer, Time time, KafkaConsumerServiceStats kafkaConsumerServiceStats, boolean z2) {
        super(pubSubConsumerAdapterFactory, properties, j, i, eventThrottler, eventThrottler2, kafkaClusterBasedRecordThrottler, metricsRepository, str, j2, topicExistenceChecker, z, kafkaPubSubMessageDeserializer, time, kafkaConsumerServiceStats, z2);
        this.rtTopicPartitionToConsumerMap = new VeniceConcurrentHashMap();
        this.shareConsumerIndex = 0;
        this.logger = LogManager.getLogger(PartitionWiseKafkaConsumerService.class + " [" + this.kafkaUrl + "]");
    }

    @Override // com.linkedin.davinci.kafka.consumer.KafkaConsumerService
    protected synchronized SharedKafkaConsumer pickConsumerForPartition(PubSubTopic pubSubTopic, PubSubTopicPartition pubSubTopicPartition) {
        boolean z = true;
        int i = -1;
        int i2 = 0;
        SharedKafkaConsumer sharedKafkaConsumer = null;
        while (z) {
            if (i2 == this.consumerToConsumptionTask.size()) {
                throw new VeniceException("Can not find consumer for topic: " + pubSubTopicPartition.getPubSubTopic().getName() + " and partition: " + pubSubTopicPartition.getPartitionNumber() + " from the ingestion task belonging to version topic: " + pubSubTopic);
            }
            sharedKafkaConsumer = (SharedKafkaConsumer) this.consumerToConsumptionTask.getByIndex(this.shareConsumerIndex).getKey();
            i = this.shareConsumerIndex;
            this.shareConsumerIndex++;
            if (this.shareConsumerIndex == this.consumerToConsumptionTask.size()) {
                this.shareConsumerIndex = 0;
            }
            z = false;
            if (pubSubTopicPartition.getPubSubTopic().isRealTime()) {
                if (alreadySubscribedRealtimeTopicPartition(sharedKafkaConsumer, pubSubTopicPartition)) {
                    this.logger.info("Current consumer has already subscribed the same real time topic-partition: {} will skip it and try next consumer in consumer pool", pubSubTopicPartition);
                    z = true;
                } else {
                    this.rtTopicPartitionToConsumerMap.computeIfAbsent(pubSubTopicPartition, pubSubTopicPartition2 -> {
                        return new HashSet();
                    }).add(sharedKafkaConsumer);
                }
            }
            i2++;
        }
        if (sharedKafkaConsumer == null) {
            throw new IllegalStateException("Did not find a suitable consumer after checking " + i2 + " instances.");
        }
        this.logger.info("Get shared consumer for: {} from the ingestion task belonging to version topic: {} with index: {}", pubSubTopicPartition, pubSubTopic, Integer.valueOf(i));
        return sharedKafkaConsumer;
    }

    private boolean alreadySubscribedRealtimeTopicPartition(SharedKafkaConsumer sharedKafkaConsumer, PubSubTopicPartition pubSubTopicPartition) {
        Set<PubSubConsumerAdapter> set = this.rtTopicPartitionToConsumerMap.get(pubSubTopicPartition);
        return set != null && set.contains(sharedKafkaConsumer);
    }

    @Override // com.linkedin.davinci.kafka.consumer.KafkaConsumerService
    void handleUnsubscription(SharedKafkaConsumer sharedKafkaConsumer, PubSubTopicPartition pubSubTopicPartition) {
        Set<PubSubConsumerAdapter> set;
        if (!pubSubTopicPartition.getPubSubTopic().isRealTime() || (set = this.rtTopicPartitionToConsumerMap.get(pubSubTopicPartition)) == null) {
            return;
        }
        set.remove(sharedKafkaConsumer);
    }
}
