package com.linkedin.davinci.kafka.consumer;

import com.linkedin.venice.kafka.TopicDoesNotExistException;
import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.stats.StatsErrorCode;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/CachedKafkaMetadataGetter.class */
public class CachedKafkaMetadataGetter {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) CachedKafkaMetadataGetter.class);
    private static final int DEFAULT_MAX_RETRY = 10;
    private final long ttlNs;
    private final Map<KafkaMetadataCacheKey, ValueAndExpiryTime<Boolean>> topicExistenceCache = new VeniceConcurrentHashMap();
    private final Map<KafkaMetadataCacheKey, ValueAndExpiryTime<Long>> offsetCache = new VeniceConcurrentHashMap();
    private final Map<KafkaMetadataCacheKey, ValueAndExpiryTime<Long>> lastProducerTimestampCache = new VeniceConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/CachedKafkaMetadataGetter$KafkaMetadataCacheKey.class */
    public static class KafkaMetadataCacheKey {
        private final String kafkaServer;
        private final PubSubTopicPartition pubSubTopicPartition;

        KafkaMetadataCacheKey(String str, PubSubTopicPartition pubSubTopicPartition) {
            this.kafkaServer = str;
            this.pubSubTopicPartition = pubSubTopicPartition;
        }

        public int hashCode() {
            return (31 * ((31 * 1) + (this.kafkaServer == null ? 0 : this.kafkaServer.hashCode()))) + (this.pubSubTopicPartition == null ? 0 : this.pubSubTopicPartition.hashCode());
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof KafkaMetadataCacheKey)) {
                return false;
            }
            KafkaMetadataCacheKey kafkaMetadataCacheKey = (KafkaMetadataCacheKey) obj;
            return this.pubSubTopicPartition.equals(kafkaMetadataCacheKey.pubSubTopicPartition) && Objects.equals(this.kafkaServer, kafkaMetadataCacheKey.kafkaServer);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/CachedKafkaMetadataGetter$ValueAndExpiryTime.class */
    public static class ValueAndExpiryTime<T> {
        private final T value;
        private final long expiryTimeNs;
        private final AtomicBoolean valueUpdateInProgress = new AtomicBoolean(false);

        ValueAndExpiryTime(T t, long j) {
            this.value = t;
            this.expiryTimeNs = j;
        }

        T getValue() {
            return this.value;
        }

        long getExpiryTimeNs() {
            return this.expiryTimeNs;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CachedKafkaMetadataGetter(long j) {
        this.ttlNs = TimeUnit.MILLISECONDS.toNanos(j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getOffset(TopicManager topicManager, PubSubTopic pubSubTopic, int i) {
        String kafkaBootstrapServers = topicManager.getKafkaBootstrapServers();
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(pubSubTopic, i);
        try {
            return ((Long) fetchMetadata(new KafkaMetadataCacheKey(kafkaBootstrapServers, pubSubTopicPartitionImpl), this.offsetCache, () -> {
                return Long.valueOf(topicManager.getPartitionLatestOffsetAndRetry(pubSubTopicPartitionImpl, 10));
            })).longValue();
        } catch (TopicDoesNotExistException e) {
            LOGGER.error("Failed to get offset for topic partition {}", pubSubTopicPartitionImpl, e);
            return StatsErrorCode.LAG_MEASUREMENT_FAILURE.code;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getEarliestOffset(TopicManager topicManager, PubSubTopicPartition pubSubTopicPartition) {
        try {
            return ((Long) fetchMetadata(new KafkaMetadataCacheKey(topicManager.getKafkaBootstrapServers(), pubSubTopicPartition), this.offsetCache, () -> {
                return Long.valueOf(topicManager.getPartitionEarliestOffsetAndRetry(pubSubTopicPartition, 10));
            })).longValue();
        } catch (TopicDoesNotExistException e) {
            LOGGER.error("Failed to get offset for topic partition {}", pubSubTopicPartition, e);
            return StatsErrorCode.LAG_MEASUREMENT_FAILURE.code;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getProducerTimestampOfLastDataMessage(TopicManager topicManager, PubSubTopicPartition pubSubTopicPartition) {
        try {
            return ((Long) fetchMetadata(new KafkaMetadataCacheKey(topicManager.getKafkaBootstrapServers(), pubSubTopicPartition), this.lastProducerTimestampCache, () -> {
                return Long.valueOf(topicManager.getProducerTimestampOfLastDataRecord(pubSubTopicPartition, 10));
            })).longValue();
        } catch (TopicDoesNotExistException e) {
            return StatsErrorCode.LAG_MEASUREMENT_FAILURE.code;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean containsTopic(TopicManager topicManager, PubSubTopic pubSubTopic) {
        return ((Boolean) fetchMetadata(new KafkaMetadataCacheKey(topicManager.getKafkaBootstrapServers(), new PubSubTopicPartitionImpl(pubSubTopic, -1)), this.topicExistenceCache, () -> {
            return Boolean.valueOf(topicManager.containsTopic(pubSubTopic));
        })).booleanValue();
    }

    <T> T fetchMetadata(KafkaMetadataCacheKey kafkaMetadataCacheKey, Map<KafkaMetadataCacheKey, ValueAndExpiryTime<T>> map, Supplier<T> supplier) {
        long nanoTime = System.nanoTime();
        ValueAndExpiryTime<T> computeIfAbsent = map.computeIfAbsent(kafkaMetadataCacheKey, kafkaMetadataCacheKey2 -> {
            return new ValueAndExpiryTime(supplier.get(), nanoTime + this.ttlNs);
        });
        if (computeIfAbsent.getExpiryTimeNs() <= nanoTime && ((ValueAndExpiryTime) computeIfAbsent).valueUpdateInProgress.compareAndSet(false, true)) {
            CompletableFuture.runAsync(() -> {
                try {
                    map.put(kafkaMetadataCacheKey, new ValueAndExpiryTime(supplier.get(), System.nanoTime() + this.ttlNs));
                } catch (Exception e) {
                    map.remove(kafkaMetadataCacheKey);
                }
            });
        }
        return computeIfAbsent.getValue();
    }
}
