package com.linkedin.venice.pubsub.adapter.kafka;

import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.StringJoiner;
import javax.annotation.Nonnull;
import org.apache.commons.lang.Validate;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/pubsub/adapter/kafka/TopicPartitionsOffsetsTracker.class */
public class TopicPartitionsOffsetsTracker {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) TopicPartitionsOffsetsTracker.class);
    private static final Duration DEFAULT_OFFSETS_UPDATE_INTERVAL = Duration.ofSeconds(30);
    private static final Duration DEFAULT_MIN_LOG_INTERVAL = Duration.ofMinutes(3);
    private static final ResultType[] RESULT_TYPE_VALUES = ResultType.values();
    private final Map<TopicPartition, Double> topicPartitionCurrentOffset;
    private final Map<TopicPartition, Double> topicPartitionEndOffset;
    private Instant lastMetricsCollectedTime;
    private final Duration offsetsUpdateInterval;
    private final StatsAccumulator statsAccumulator;

    /* loaded from: input_file:com/linkedin/venice/pubsub/adapter/kafka/TopicPartitionsOffsetsTracker$ResultType.class */
    public enum ResultType {
        VALID_OFFSET_LAG,
        NO_OFFSET_LAG,
        INVALID_OFFSET_LAG
    }

    /* loaded from: input_file:com/linkedin/venice/pubsub/adapter/kafka/TopicPartitionsOffsetsTracker$StatsAccumulator.class */
    private static class StatsAccumulator {
        private final Map<ResultType, Integer> resultsStats;
        private final Duration minLogInterval;
        private Instant lastLoggedTime;

        private StatsAccumulator(Duration duration) {
            this.resultsStats = new VeniceConcurrentHashMap(TopicPartitionsOffsetsTracker.RESULT_TYPE_VALUES.length);
            this.minLogInterval = duration;
            this.lastLoggedTime = Instant.now();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void recordResult(ResultType resultType) {
            this.resultsStats.compute(resultType, (resultType2, num) -> {
                return Integer.valueOf(num == null ? 1 : num.intValue() + 1);
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void maybeLogAccumulatedStats(Logger logger) {
            if (this.resultsStats.isEmpty()) {
                return;
            }
            Instant now = Instant.now();
            Duration between = Duration.between(this.lastLoggedTime, now);
            if (between.toMillis() >= this.minLogInterval.toMillis()) {
                logger.info(String.format("In the last %d second(s), results states are: %s", Long.valueOf(between.getSeconds()), resultsStatsToString()));
                this.lastLoggedTime = now;
                this.resultsStats.clear();
            }
        }

        private String resultsStatsToString() {
            StringJoiner stringJoiner = new StringJoiner(", ");
            for (ResultType resultType : TopicPartitionsOffsetsTracker.RESULT_TYPE_VALUES) {
                stringJoiner.add(resultType + " count: " + this.resultsStats.getOrDefault(resultType, 0));
            }
            return stringJoiner.toString();
        }

        Map<ResultType, Integer> getResultsStats() {
            return Collections.unmodifiableMap(this.resultsStats);
        }
    }

    public TopicPartitionsOffsetsTracker() {
        this(DEFAULT_OFFSETS_UPDATE_INTERVAL);
    }

    public TopicPartitionsOffsetsTracker(Duration duration) {
        this(duration, DEFAULT_MIN_LOG_INTERVAL);
    }

    public TopicPartitionsOffsetsTracker(@Nonnull Duration duration, Duration duration2) {
        Validate.notNull(duration);
        this.offsetsUpdateInterval = duration;
        this.lastMetricsCollectedTime = null;
        this.topicPartitionCurrentOffset = new VeniceConcurrentHashMap();
        this.topicPartitionEndOffset = new VeniceConcurrentHashMap();
        this.statsAccumulator = new StatsAccumulator(duration2);
    }

    public void updateEndAndCurrentOffsets(ConsumerRecords<byte[], byte[]> consumerRecords, Map<MetricName, ? extends Metric> map) {
        TopicPartition topicPartition;
        Double d;
        if (this.lastMetricsCollectedTime == null || LatencyUtils.getElapsedTimeInMs(this.lastMetricsCollectedTime.toEpochMilli()) >= this.offsetsUpdateInterval.toMillis()) {
            this.lastMetricsCollectedTime = Instant.now();
            for (TopicPartition topicPartition2 : consumerRecords.partitions()) {
                List<ConsumerRecord<byte[], byte[]>> records = consumerRecords.records(topicPartition2);
                this.topicPartitionCurrentOffset.put(topicPartition2, Double.valueOf(records.get(records.size() - 1).offset()));
            }
            for (Map.Entry<MetricName, ? extends Metric> entry : map.entrySet()) {
                MetricName key = entry.getKey();
                Metric value = entry.getValue();
                if (isMetricEntryRecordsLag(key, value) && (d = this.topicPartitionCurrentOffset.get((topicPartition = new TopicPartition(key.tags().get("topic"), Integer.parseInt(key.tags().get("partition")))))) != null) {
                    this.topicPartitionEndOffset.put(topicPartition, Double.valueOf(d.doubleValue() + ((Double) value.metricValue()).doubleValue()));
                }
            }
        }
    }

    private boolean isMetricEntryRecordsLag(MetricName metricName, Metric metric) {
        try {
            if (Objects.equals(metricName.name(), "records-lag")) {
                if (metric.metricValue() instanceof Double) {
                    return true;
                }
            }
            return false;
        } catch (Exception e) {
            LOGGER.warn("Caught exception: {} when attempting to get consumer metrics. Incomplete metrics might be returned.", e.getMessage());
            return false;
        }
    }

    public void removeTrackedOffsets(TopicPartition topicPartition) {
        this.topicPartitionCurrentOffset.remove(topicPartition);
        this.topicPartitionEndOffset.remove(topicPartition);
    }

    public void clearAllOffsetState() {
        this.topicPartitionCurrentOffset.clear();
        this.topicPartitionEndOffset.clear();
    }

    public long getEndOffset(String str, int i) {
        Double d = this.topicPartitionEndOffset.get(new TopicPartition(str, i));
        if (d == null) {
            return -1L;
        }
        return d.longValue();
    }

    public long getOffsetLag(String str, int i) {
        TopicPartition topicPartition = new TopicPartition(str, i);
        Double d = this.topicPartitionEndOffset.get(topicPartition);
        if (d == null) {
            this.statsAccumulator.recordResult(ResultType.NO_OFFSET_LAG);
            return -1L;
        }
        Double d2 = this.topicPartitionCurrentOffset.get(topicPartition);
        if (d2 == null) {
            this.statsAccumulator.recordResult(ResultType.NO_OFFSET_LAG);
            return -1L;
        }
        long longValue = d.longValue() - d2.longValue();
        if (longValue < 0) {
            this.statsAccumulator.recordResult(ResultType.INVALID_OFFSET_LAG);
            return -1L;
        }
        this.statsAccumulator.recordResult(ResultType.VALID_OFFSET_LAG);
        this.statsAccumulator.maybeLogAccumulatedStats(LOGGER);
        return longValue;
    }

    public Map<ResultType, Integer> getResultsStats() {
        return this.statsAccumulator.getResultsStats();
    }
}
