package com.linkedin.davinci.kafka.consumer;

import com.linkedin.venice.exceptions.QuotaExceededException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.throttle.EventThrottler;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/KafkaClusterBasedRecordThrottler.class */
public class KafkaClusterBasedRecordThrottler {
    private static final Logger LOGGER = LogManager.getLogger(KafkaClusterBasedRecordThrottler.class);
    private final Map<String, EventThrottler> kafkaUrlToRecordsThrottler;
    protected Map<String, Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>>> kafkaUrlToThrottledRecords = new VeniceConcurrentHashMap();

    public KafkaClusterBasedRecordThrottler(Map<String, EventThrottler> map) {
        this.kafkaUrlToRecordsThrottler = map;
    }

    public Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> poll(PubSubConsumerAdapter pubSubConsumerAdapter, String str, long j) {
        Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> map = this.kafkaUrlToThrottledRecords.get(str);
        if (map == null) {
            map = pubSubConsumerAdapter.poll(j);
            if (map == null) {
                map = Collections.emptyMap();
            }
        }
        if (this.kafkaUrlToRecordsThrottler != null) {
            try {
                EventThrottler eventThrottler = this.kafkaUrlToRecordsThrottler.get(str);
                if (eventThrottler != null) {
                    eventThrottler.maybeThrottle(map.values().stream().mapToInt((v0) -> {
                        return v0.size();
                    }).sum());
                    this.kafkaUrlToThrottledRecords.remove(str);
                }
            } catch (QuotaExceededException e) {
                if (!StoreIngestionTask.REDUNDANT_LOGGING_FILTER.isRedundantException(str + "_records_quota_exceeded")) {
                    LOGGER.info("Ingestion quota exceeded for Kafka URL {}", str);
                }
                this.kafkaUrlToThrottledRecords.put(str, map);
                try {
                    Thread.sleep(j);
                    return Collections.emptyMap();
                } catch (InterruptedException e2) {
                    throw new VeniceException("Kafka Cluster based throttled records poll sleep got interrupted", e2);
                }
            }
        }
        return map;
    }
}
