package com.linkedin.venice.pubsub.adapter.kafka.consumer;

import com.linkedin.venice.annotation.NotThreadsafe;
import com.linkedin.venice.exceptions.UnsubscribedTopicPartitionException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.PubSubTopicPartitionInfo;
import com.linkedin.venice.pubsub.adapter.kafka.TopicPartitionsOffsetsTracker;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerAdapter;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.kafka.KafkaPubSubMessageDeserializer;
import com.linkedin.venice.utils.VeniceProperties;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@NotThreadsafe
/* loaded from: input_file:com/linkedin/venice/pubsub/adapter/kafka/consumer/ApacheKafkaConsumerAdapter.class */
public class ApacheKafkaConsumerAdapter implements PubSubConsumerAdapter {
    private static final Logger LOGGER = LogManager.getLogger(ApacheKafkaConsumerAdapter.class);
    public static final String CONSUMER_POLL_RETRY_TIMES_CONFIG = "consumer.poll.retry.times";
    public static final String CONSUMER_POLL_RETRY_BACKOFF_MS_CONFIG = "consumer.poll.retry.backoff.ms";
    private static final boolean DEFAULT_PARTITIONS_OFFSETS_COLLECTION_ENABLE = false;
    private static final int CONSUMER_POLL_RETRY_TIMES_DEFAULT = 3;
    private static final int CONSUMER_POLL_RETRY_BACKOFF_MS_DEFAULT = 0;
    private final Consumer<byte[], byte[]> kafkaConsumer;
    private final int consumerPollRetryTimes;
    private final int consumerPollRetryBackoffMs;
    private final TopicPartitionsOffsetsTracker topicPartitionsOffsetsTracker;
    private final Map<TopicPartition, PubSubTopicPartition> assignments;
    private final KafkaPubSubMessageDeserializer pubSubMessageDeserializer;

    public ApacheKafkaConsumerAdapter(Properties properties, KafkaPubSubMessageDeserializer kafkaPubSubMessageDeserializer) {
        this(properties, false, kafkaPubSubMessageDeserializer);
    }

    public ApacheKafkaConsumerAdapter(Properties properties, boolean z, KafkaPubSubMessageDeserializer kafkaPubSubMessageDeserializer) {
        this(new KafkaConsumer(properties), new VeniceProperties(properties), z, kafkaPubSubMessageDeserializer);
    }

    public ApacheKafkaConsumerAdapter(Consumer<byte[], byte[]> consumer, VeniceProperties veniceProperties, boolean z, KafkaPubSubMessageDeserializer kafkaPubSubMessageDeserializer) {
        this.kafkaConsumer = consumer;
        this.consumerPollRetryTimes = veniceProperties.getInt(CONSUMER_POLL_RETRY_TIMES_CONFIG, 3);
        this.consumerPollRetryBackoffMs = veniceProperties.getInt(CONSUMER_POLL_RETRY_BACKOFF_MS_CONFIG, 0);
        this.topicPartitionsOffsetsTracker = z ? new TopicPartitionsOffsetsTracker() : null;
        this.assignments = new HashMap();
        this.pubSubMessageDeserializer = kafkaPubSubMessageDeserializer;
        LOGGER.info("Consumer poll retry times: {}", Integer.valueOf(this.consumerPollRetryTimes));
        LOGGER.info("Consumer poll retry back off in ms: {}", Integer.valueOf(this.consumerPollRetryBackoffMs));
        LOGGER.info("Consumer offset collection enabled: {}", Boolean.valueOf(z));
    }

    private void seekNextOffset(TopicPartition topicPartition, long j) {
        TopicPartition mapToPulsar = ApacheKafkaProducerAdapter.mapToPulsar(topicPartition);
        if (j == -1) {
            this.kafkaConsumer.seekToBeginning(Collections.singletonList(mapToPulsar));
        } else {
            this.kafkaConsumer.seek(mapToPulsar, j + 1);
        }
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public void subscribe(PubSubTopicPartition pubSubTopicPartition, long j) {
        String mapToPulsar = ApacheKafkaProducerAdapter.mapToPulsar(pubSubTopicPartition.getPubSubTopic().getName());
        int partitionNumber = pubSubTopicPartition.getPartitionNumber();
        TopicPartition topicPartition = new TopicPartition(mapToPulsar, partitionNumber);
        Set assignment = this.kafkaConsumer.assignment();
        if (assignment.contains(topicPartition)) {
            LOGGER.warn("Already subscribed on Topic: {} Partition: {}, ignore the request of subscription.", mapToPulsar, Integer.valueOf(partitionNumber));
            return;
        }
        ArrayList arrayList = new ArrayList(assignment);
        arrayList.add(topicPartition);
        this.kafkaConsumer.assign(arrayList);
        seekNextOffset(topicPartition, j);
        this.assignments.put(topicPartition, pubSubTopicPartition);
        LOGGER.info("Subscribed to Topic: {} Partition: {} Offset: {}", mapToPulsar, Integer.valueOf(partitionNumber), Long.valueOf(j));
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public void unSubscribe(PubSubTopicPartition pubSubTopicPartition) {
        TopicPartition topicPartition = new TopicPartition(ApacheKafkaProducerAdapter.mapToPulsar(pubSubTopicPartition.getPubSubTopic().getName()), pubSubTopicPartition.getPartitionNumber());
        Set assignment = this.kafkaConsumer.assignment();
        if (assignment.contains(topicPartition)) {
            ArrayList arrayList = new ArrayList(assignment);
            if (arrayList.remove(topicPartition)) {
                this.kafkaConsumer.assign(arrayList);
            }
            this.assignments.remove(topicPartition);
        }
        if (this.topicPartitionsOffsetsTracker != null) {
            this.topicPartitionsOffsetsTracker.removeTrackedOffsets(topicPartition);
        }
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public void batchUnsubscribe(Set<PubSubTopicPartition> set) {
        set.forEach(pubSubTopicPartition -> {
            this.assignments.remove(new TopicPartition(ApacheKafkaProducerAdapter.mapToPulsar(pubSubTopicPartition.getPubSubTopic().getName()), pubSubTopicPartition.getPartitionNumber()));
        });
        this.kafkaConsumer.assign(this.assignments.keySet());
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public void resetOffset(PubSubTopicPartition pubSubTopicPartition) {
        String mapToPulsar = ApacheKafkaProducerAdapter.mapToPulsar(pubSubTopicPartition.getPubSubTopic().getName());
        int partitionNumber = pubSubTopicPartition.getPartitionNumber();
        if (!hasSubscription(pubSubTopicPartition)) {
            throw new UnsubscribedTopicPartitionException(pubSubTopicPartition);
        }
        this.kafkaConsumer.seekToBeginning(Collections.singletonList(new TopicPartition(mapToPulsar, partitionNumber)));
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> poll(long j) {
        ConsumerRecords<byte[], byte[]> empty = ConsumerRecords.empty();
        HashMap hashMap = new HashMap();
        for (int i = 1; i <= this.consumerPollRetryTimes && !Thread.currentThread().isInterrupted(); i++) {
            try {
                empty = this.kafkaConsumer.poll(Duration.ofMillis(j));
                for (TopicPartition topicPartition : empty.partitions()) {
                    PubSubTopicPartition pubSubTopicPartition = this.assignments.get(topicPartition);
                    List records = empty.records(topicPartition);
                    ArrayList arrayList = new ArrayList(records.size());
                    Iterator it = records.iterator();
                    while (it.hasNext()) {
                        arrayList.add(this.pubSubMessageDeserializer.deserialize((ConsumerRecord<byte[], byte[]>) it.next(), pubSubTopicPartition));
                    }
                    hashMap.put(pubSubTopicPartition, arrayList);
                }
                int i2 = i + 1;
                break;
            } catch (RetriableException e) {
                try {
                    LOGGER.warn("Retriable exception thrown when attempting to consume records from kafka, attempt {}/{}", Integer.valueOf(i), Integer.valueOf(this.consumerPollRetryTimes), e);
                    if (i == this.consumerPollRetryTimes) {
                        throw e;
                    }
                    try {
                        if (this.consumerPollRetryBackoffMs > 0) {
                            Thread.sleep(this.consumerPollRetryBackoffMs);
                        }
                    } catch (InterruptedException e2) {
                        throw new VeniceException("Consumer poll retry back off sleep got interrupted", e);
                    }
                } catch (Throwable th) {
                    int i3 = i + 1;
                    throw th;
                }
            }
        }
        if (this.topicPartitionsOffsetsTracker != null) {
            this.topicPartitionsOffsetsTracker.updateEndAndCurrentOffsets(empty, this.kafkaConsumer.metrics());
        }
        return hashMap;
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public boolean hasAnySubscription() {
        return !this.kafkaConsumer.assignment().isEmpty();
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public boolean hasSubscription(PubSubTopicPartition pubSubTopicPartition) {
        return this.kafkaConsumer.assignment().contains(new TopicPartition(ApacheKafkaProducerAdapter.mapToPulsar(pubSubTopicPartition.getPubSubTopic().getName()), pubSubTopicPartition.getPartitionNumber()));
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public void pause(PubSubTopicPartition pubSubTopicPartition) {
        TopicPartition topicPartition = new TopicPartition(ApacheKafkaProducerAdapter.mapToPulsar(pubSubTopicPartition.getPubSubTopic().getName()), pubSubTopicPartition.getPartitionNumber());
        if (this.kafkaConsumer.assignment().contains(topicPartition)) {
            this.kafkaConsumer.pause(Collections.singletonList(topicPartition));
        }
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public void resume(PubSubTopicPartition pubSubTopicPartition) {
        TopicPartition mapToPulsar = ApacheKafkaProducerAdapter.mapToPulsar(new TopicPartition(pubSubTopicPartition.getPubSubTopic().getName(), pubSubTopicPartition.getPartitionNumber()));
        if (this.kafkaConsumer.assignment().contains(mapToPulsar)) {
            this.kafkaConsumer.resume(Collections.singletonList(mapToPulsar));
        }
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public Set<PubSubTopicPartition> getAssignment() {
        return new HashSet(this.assignments.values());
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter, java.lang.AutoCloseable, java.io.Closeable
    public void close() {
        if (this.topicPartitionsOffsetsTracker != null) {
            this.topicPartitionsOffsetsTracker.clearAllOffsetState();
        }
        if (this.kafkaConsumer != null) {
            try {
                this.kafkaConsumer.close(Duration.ZERO);
            } catch (Exception e) {
                LOGGER.warn("{} threw an exception while closing.", this.kafkaConsumer.getClass().getSimpleName(), e);
            }
        }
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public long getOffsetLag(PubSubTopicPartition pubSubTopicPartition) {
        String mapToPulsar = ApacheKafkaProducerAdapter.mapToPulsar(pubSubTopicPartition.getPubSubTopic().getName());
        int partitionNumber = pubSubTopicPartition.getPartitionNumber();
        if (this.topicPartitionsOffsetsTracker != null) {
            return this.topicPartitionsOffsetsTracker.getOffsetLag(mapToPulsar, partitionNumber);
        }
        return -1L;
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public long getLatestOffset(PubSubTopicPartition pubSubTopicPartition) {
        String mapToPulsar = ApacheKafkaProducerAdapter.mapToPulsar(pubSubTopicPartition.getPubSubTopic().getName());
        int partitionNumber = pubSubTopicPartition.getPartitionNumber();
        if (this.topicPartitionsOffsetsTracker != null) {
            return this.topicPartitionsOffsetsTracker.getEndOffset(mapToPulsar, partitionNumber);
        }
        return -1L;
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public Long offsetForTime(PubSubTopicPartition pubSubTopicPartition, long j, Duration duration) {
        TopicPartition mapToPulsar = ApacheKafkaProducerAdapter.mapToPulsar(new TopicPartition(pubSubTopicPartition.getPubSubTopic().getName(), pubSubTopicPartition.getPartitionNumber()));
        Map offsetsForTimes = this.kafkaConsumer.offsetsForTimes(Collections.singletonMap(mapToPulsar, Long.valueOf(j)), duration);
        if (offsetsForTimes.isEmpty()) {
            return -1L;
        }
        OffsetAndTimestamp offsetAndTimestamp = (OffsetAndTimestamp) offsetsForTimes.get(mapToPulsar);
        if (offsetAndTimestamp == null) {
            return null;
        }
        return Long.valueOf(offsetAndTimestamp.offset());
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public Long offsetForTime(PubSubTopicPartition pubSubTopicPartition, long j) {
        TopicPartition mapToPulsar = ApacheKafkaProducerAdapter.mapToPulsar(new TopicPartition(pubSubTopicPartition.getPubSubTopic().getName(), pubSubTopicPartition.getPartitionNumber()));
        Map offsetsForTimes = this.kafkaConsumer.offsetsForTimes(Collections.singletonMap(mapToPulsar, Long.valueOf(j)));
        if (offsetsForTimes.isEmpty()) {
            return -1L;
        }
        OffsetAndTimestamp offsetAndTimestamp = (OffsetAndTimestamp) offsetsForTimes.get(mapToPulsar);
        if (offsetAndTimestamp == null) {
            return null;
        }
        return Long.valueOf(offsetAndTimestamp.offset());
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public Long beginningOffset(PubSubTopicPartition pubSubTopicPartition, Duration duration) {
        TopicPartition mapToPulsar = ApacheKafkaProducerAdapter.mapToPulsar(new TopicPartition(pubSubTopicPartition.getPubSubTopic().getName(), pubSubTopicPartition.getPartitionNumber()));
        return (Long) this.kafkaConsumer.beginningOffsets(Collections.singleton(mapToPulsar), duration).get(mapToPulsar);
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public Map<PubSubTopicPartition, Long> endOffsets(Collection<PubSubTopicPartition> collection, Duration duration) {
        HashMap hashMap = new HashMap(collection.size());
        for (PubSubTopicPartition pubSubTopicPartition : collection) {
            hashMap.put(ApacheKafkaProducerAdapter.mapToPulsar(new TopicPartition(pubSubTopicPartition.getPubSubTopic().getName(), pubSubTopicPartition.getPartitionNumber())), pubSubTopicPartition);
        }
        HashMap hashMap2 = new HashMap(collection.size());
        for (Map.Entry entry : this.kafkaConsumer.endOffsets(hashMap.keySet(), duration).entrySet()) {
            hashMap2.put((PubSubTopicPartition) hashMap.get(entry.getKey()), (Long) entry.getValue());
        }
        return hashMap2;
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public Long endOffset(PubSubTopicPartition pubSubTopicPartition) {
        TopicPartition mapToPulsar = ApacheKafkaProducerAdapter.mapToPulsar(new TopicPartition(pubSubTopicPartition.getPubSubTopic().getName(), pubSubTopicPartition.getPartitionNumber()));
        return (Long) this.kafkaConsumer.endOffsets(Collections.singleton(mapToPulsar)).get(mapToPulsar);
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public List<PubSubTopicPartitionInfo> partitionsFor(PubSubTopic pubSubTopic) {
        List<PartitionInfo> partitionsFor = this.kafkaConsumer.partitionsFor(ApacheKafkaProducerAdapter.mapToPulsar(pubSubTopic.getName()));
        if (partitionsFor == null) {
            return null;
        }
        ArrayList arrayList = new ArrayList(partitionsFor.size());
        for (PartitionInfo partitionInfo : partitionsFor) {
            if (partitionInfo.topic().equals(ApacheKafkaProducerAdapter.mapToPulsar(pubSubTopic.getName()))) {
                arrayList.add(new PubSubTopicPartitionInfo(pubSubTopic, partitionInfo.partition(), partitionInfo.replicas().length, Boolean.valueOf(partitionInfo.inSyncReplicas().length > 0)));
            }
        }
        return arrayList;
    }
}
