package com.linkedin.davinci.kafka.consumer;

import com.linkedin.avroutil1.compatibility.shaded.org.apache.commons.lang3.Validate;
import com.linkedin.davinci.ingestion.consumption.ConsumedDataReceiver;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.ExceptionUtils;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/StorePartitionDataReceiver.class */
public class StorePartitionDataReceiver implements ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> {
    private final StoreIngestionTask storeIngestionTask;
    private final PubSubTopicPartition topicPartition;
    private final String kafkaUrl;
    private final int kafkaClusterId;
    private final Logger LOGGER;
    private long receivedRecordsCount = 0;

    public StorePartitionDataReceiver(StoreIngestionTask storeIngestionTask, PubSubTopicPartition pubSubTopicPartition, String str, int i) {
        this.storeIngestionTask = (StoreIngestionTask) Validate.notNull(storeIngestionTask);
        this.topicPartition = (PubSubTopicPartition) Validate.notNull(pubSubTopicPartition);
        this.kafkaUrl = (String) Validate.notNull(str);
        this.kafkaClusterId = i;
        this.LOGGER = LogManager.getLogger(getClass().getSimpleName() + " [" + str + "]");
    }

    @Override // com.linkedin.davinci.ingestion.consumption.ConsumedDataReceiver
    public void write(List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> list) throws Exception {
        this.receivedRecordsCount += list.size();
        try {
            this.storeIngestionTask.produceToStoreBufferServiceOrKafka(list, this.topicPartition, this.kafkaUrl, this.kafkaClusterId);
        } catch (Exception e) {
            handleDataReceiverException(e);
        }
    }

    @Override // com.linkedin.davinci.ingestion.consumption.ConsumedDataReceiver
    public PubSubTopic destinationIdentifier() {
        return this.storeIngestionTask.getVersionTopic();
    }

    @Override // com.linkedin.davinci.ingestion.consumption.ConsumedDataReceiver
    public void notifyOfTopicDeletion(String str) {
        this.storeIngestionTask.setLastConsumerException(new VeniceException("Topic " + str + " got deleted."));
    }

    @Override // com.linkedin.davinci.ingestion.consumption.ConsumedDataReceiver
    public PubSubTopicPartition getPubSubTopicPartition() {
        return this.topicPartition;
    }

    private void handleDataReceiverException(Exception exc) throws Exception {
        if (ExceptionUtils.recursiveClassEquals(exc, InterruptedException.class)) {
            if (this.storeIngestionTask.isRunning()) {
                this.LOGGER.warn("Unexpected: got interrupted prior to the {} getting closed.", this.storeIngestionTask.getClass().getSimpleName());
            }
            throw exc;
        }
        this.LOGGER.error("Received exception when StoreIngestionTask is processing the polled consumer record for topic: {}", this.topicPartition, exc);
        this.storeIngestionTask.setLastConsumerException(exc);
    }

    public long receivedRecordsCount() {
        return this.receivedRecordsCount;
    }

    public String toString() {
        return getClass().getSimpleName() + "{VT=" + this.storeIngestionTask.getVersionTopic() + ", topicPartition=" + this.topicPartition + '}';
    }
}
