package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.ingestion.consumption.ConsumedDataReceiver;
import com.linkedin.davinci.stats.KafkaConsumerServiceStats;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.ExceptionUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.IntConsumer;
import java.util.function.Supplier;
import org.apache.kafka.common.TopicPartition;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/ConsumptionTask.class */
public class ConsumptionTask implements Runnable {
    private final Logger logger;
    private final int taskId;
    private final long readCycleDelayMs;
    private final Supplier<Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>>> pollFunction;
    private final IntConsumer bandwidthThrottler;
    private final IntConsumer recordsThrottler;
    private final KafkaConsumerServiceStats stats;
    private final ConsumerSubscriptionCleaner cleaner;
    private final Map<PubSubTopicPartition, ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>>> dataReceiverMap = new VeniceConcurrentHashMap();
    private volatile boolean running = true;
    private volatile long lastSuccessfulPollTimestamp = System.currentTimeMillis();

    public ConsumptionTask(String str, int i, long j, Supplier<Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>>> supplier, IntConsumer intConsumer, IntConsumer intConsumer2, KafkaConsumerServiceStats kafkaConsumerServiceStats, ConsumerSubscriptionCleaner consumerSubscriptionCleaner) {
        this.taskId = i;
        this.readCycleDelayMs = j;
        this.pollFunction = supplier;
        this.bandwidthThrottler = intConsumer;
        this.recordsThrottler = intConsumer2;
        this.stats = kafkaConsumerServiceStats;
        this.cleaner = consumerSubscriptionCleaner;
        this.logger = LogManager.getLogger(getClass().getSimpleName() + "[ " + str + " - " + i + " ]");
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // java.lang.Runnable
    public void run() {
        boolean z = false;
        Set<PubSubTopicPartition> hashSet = new HashSet();
        int i = 0;
        while (true) {
            if (!this.running) {
                break;
            }
            if (z) {
                try {
                    synchronized (this) {
                        wait(this.readCycleDelayMs);
                    }
                    z = false;
                } catch (Exception e) {
                    if (ExceptionUtils.recursiveClassEquals(e, InterruptedException.class)) {
                        this.logger.error("Received InterruptedException, will exit");
                        break;
                    } else {
                        this.logger.error("Received exception while polling, will retry", (Throwable) e);
                        z = true;
                        this.stats.recordPollError();
                    }
                }
            }
            long currentTimeMillis = System.currentTimeMillis();
            hashSet = this.cleaner.getTopicPartitionsToUnsubscribe(hashSet);
            for (PubSubTopicPartition pubSubTopicPartition : hashSet) {
                ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> remove = this.dataReceiverMap.remove(pubSubTopicPartition);
                if (remove != null) {
                    remove.notifyOfTopicDeletion(pubSubTopicPartition.getPubSubTopic().getName());
                }
            }
            hashSet.clear();
            Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> map = this.pollFunction.get();
            this.lastSuccessfulPollTimestamp = System.currentTimeMillis();
            this.stats.recordPollRequestLatency(this.lastSuccessfulPollTimestamp - currentTimeMillis);
            this.stats.recordPollResultNum(i);
            int i2 = 0;
            i = 0;
            if (map.isEmpty()) {
                z = true;
            } else {
                long currentTimeMillis2 = System.currentTimeMillis();
                for (Map.Entry<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> entry : map.entrySet()) {
                    PubSubTopicPartition key = entry.getKey();
                    List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> value = entry.getValue();
                    ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> consumedDataReceiver = this.dataReceiverMap.get(key);
                    if (consumedDataReceiver == null) {
                        this.logger.error("Couldn't find consumed data receiver for topic partition : {} after receiving records from `poll` request", key);
                        hashSet.add(key);
                    } else {
                        i += value.size();
                        Iterator<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> it2 = value.iterator();
                        while (it2.hasNext()) {
                            i2 += it2.next().getPayloadSize();
                        }
                        consumedDataReceiver.write(value);
                    }
                }
                this.stats.recordConsumerRecordsProducingToWriterBufferLatency(LatencyUtils.getElapsedTimeInMs(currentTimeMillis2));
                this.bandwidthThrottler.accept(i2);
                this.recordsThrottler.accept(i);
                this.cleaner.unsubscribe(hashSet);
                this.stats.recordDetectedNoRunningIngestionTopicPartitionNum(hashSet.size());
            }
        }
        this.logger.info("Shared consumer thread: {} exited", Thread.currentThread().getName());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        this.running = false;
        synchronized (this) {
            notifyAll();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getLastSuccessfulPollTimestamp() {
        return this.lastSuccessfulPollTimestamp;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getTaskId() {
        return this.taskId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setDataReceiver(PubSubTopicPartition pubSubTopicPartition, ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> consumedDataReceiver) {
        ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> put = this.dataReceiverMap.put(pubSubTopicPartition, consumedDataReceiver);
        if (put != null && !put.destinationIdentifier().equals(consumedDataReceiver.destinationIdentifier())) {
            throw new IllegalStateException("It is not allowed to set multiple " + ConsumedDataReceiver.class.getSimpleName() + " instances for the same " + TopicPartition.class.getSimpleName() + " of a given consumer. Previous: " + put + ", New: " + consumedDataReceiver);
        }
        synchronized (this) {
            notifyAll();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeDataReceiver(PubSubTopicPartition pubSubTopicPartition) {
        this.dataReceiverMap.remove(pubSubTopicPartition);
    }
}
