package com.linkedin.venice.kafka.partitionoffset;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.TopicDoesNotExistException;
import com.linkedin.venice.kafka.VeniceOperationAgainstKafkaTimedOut;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicPartitionInfo;
import com.linkedin.venice.pubsub.api.PubSubAdminAdapter;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.RetryUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.lazy.Lazy;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import it.unimi.dsi.fastutil.ints.Int2LongMap;
import it.unimi.dsi.fastutil.ints.Int2LongMaps;
import it.unimi.dsi.fastutil.ints.Int2LongOpenHashMap;
import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.Validate;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/kafka/partitionoffset/PartitionOffsetFetcherImpl.class */
public class PartitionOffsetFetcherImpl implements PartitionOffsetFetcher {
    private static final List<Class<? extends Throwable>> KAFKA_RETRIABLE_FAILURES = Collections.singletonList(RetriableException.class);
    public static final Duration DEFAULT_KAFKA_OFFSET_API_TIMEOUT = Duration.ofMinutes(1);
    public static final long NO_PRODUCER_TIME_IN_EMPTY_TOPIC_PARTITION = -1;
    private static final int KAFKA_POLLING_RETRY_MAX_ATTEMPT = 3;
    private final Logger logger;
    private final Lock adminConsumerLock;
    private final Lazy<PubSubAdminAdapter> kafkaAdminWrapper;
    private final Lazy<PubSubConsumerAdapter> pubSubConsumer;
    private final Duration kafkaOperationTimeout;

    public PartitionOffsetFetcherImpl(@Nonnull Lazy<PubSubAdminAdapter> lazy, @Nonnull Lazy<PubSubConsumerAdapter> lazy2, long j, String str) {
        Validate.notNull(lazy);
        this.kafkaAdminWrapper = lazy;
        this.pubSubConsumer = lazy2;
        this.adminConsumerLock = new ReentrantLock();
        this.kafkaOperationTimeout = Duration.ofMillis(j);
        this.logger = LogManager.getLogger(PartitionOffsetFetcherImpl.class.getSimpleName() + " [" + str + "]");
    }

    @Override // com.linkedin.venice.kafka.partitionoffset.PartitionOffsetFetcher
    public Int2LongMap getTopicLatestOffsets(PubSubTopic pubSubTopic) {
        AutoCloseableLock of = AutoCloseableLock.of(this.adminConsumerLock);
        try {
            List<PubSubTopicPartitionInfo> partitionsFor = ((PubSubConsumerAdapter) this.pubSubConsumer.get()).partitionsFor(pubSubTopic);
            if (partitionsFor == null || partitionsFor.isEmpty()) {
                this.logger.warn("Unexpected! Topic: {} has a null partition set, returning empty map for latest offsets", pubSubTopic);
                Int2LongMaps.EmptyMap emptyMap = Int2LongMaps.EMPTY_MAP;
                if (of != null) {
                    of.close();
                }
                return emptyMap;
            }
            Map<PubSubTopicPartition, Long> endOffsets = ((PubSubConsumerAdapter) this.pubSubConsumer.get()).endOffsets((List) partitionsFor.stream().map(pubSubTopicPartitionInfo -> {
                return new PubSubTopicPartitionImpl(pubSubTopic, pubSubTopicPartitionInfo.partition());
            }).collect(Collectors.toList()), DEFAULT_KAFKA_OFFSET_API_TIMEOUT);
            Int2LongOpenHashMap int2LongOpenHashMap = new Int2LongOpenHashMap(endOffsets.size());
            for (Map.Entry<PubSubTopicPartition, Long> entry : endOffsets.entrySet()) {
                int2LongOpenHashMap.put(entry.getKey().getPartitionNumber(), entry.getValue().longValue());
            }
            if (of != null) {
                of.close();
            }
            return int2LongOpenHashMap;
        } catch (Throwable th) {
            if (of != null) {
                try {
                    of.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private long getLatestOffset(PubSubTopicPartition pubSubTopicPartition) throws TopicDoesNotExistException {
        if (pubSubTopicPartition.getPartitionNumber() < 0) {
            throw new IllegalArgumentException("Cannot retrieve latest offsets for invalid partition " + pubSubTopicPartition.getPartitionNumber());
        }
        AutoCloseableLock of = AutoCloseableLock.of(this.adminConsumerLock);
        try {
            if (!((PubSubAdminAdapter) this.kafkaAdminWrapper.get()).containsTopicWithPartitionCheckExpectationAndRetry(pubSubTopicPartition, 3, true)) {
                throw new TopicDoesNotExistException("Topic " + pubSubTopicPartition.getPubSubTopic() + " does not exist or partition requested is less topic partition count!");
            }
            try {
                Long l = ((PubSubConsumerAdapter) this.pubSubConsumer.get()).endOffsets(Collections.singletonList(pubSubTopicPartition), DEFAULT_KAFKA_OFFSET_API_TIMEOUT).get(pubSubTopicPartition);
                if (l == null) {
                    throw new VeniceException("offset result returned from endOffsets does not contain entry: " + pubSubTopicPartition);
                }
                long longValue = l.longValue();
                if (of != null) {
                    of.close();
                }
                return longValue;
            } catch (Exception e) {
                if (e instanceof TimeoutException) {
                    throw new VeniceOperationAgainstKafkaTimedOut("Timeout exception when seeking to end to get latest offset for topic partition: " + pubSubTopicPartition, e);
                }
                throw e;
            }
        } catch (Throwable th) {
            if (of != null) {
                try {
                    of.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.linkedin.venice.kafka.partitionoffset.PartitionOffsetFetcher
    public long getPartitionLatestOffsetAndRetry(PubSubTopicPartition pubSubTopicPartition, int i) {
        return getEndOffset(pubSubTopicPartition, i, this::getLatestOffset);
    }

    @Override // com.linkedin.venice.kafka.partitionoffset.PartitionOffsetFetcher
    public long getPartitionEarliestOffsetAndRetry(PubSubTopicPartition pubSubTopicPartition, int i) {
        return getEndOffset(pubSubTopicPartition, i, this::getEarliestOffset);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v4, types: [java.lang.Throwable] */
    private long getEndOffset(PubSubTopicPartition pubSubTopicPartition, int i, Function<PubSubTopicPartition, Long> function) {
        if (i < 1) {
            throw new IllegalArgumentException("Invalid retries. Got: " + i);
        }
        VeniceOperationAgainstKafkaTimedOut veniceOperationAgainstKafkaTimedOut = new VeniceOperationAgainstKafkaTimedOut("This exception should not be thrown");
        for (int i2 = 0; i2 < i; i2++) {
            try {
                return function.apply(pubSubTopicPartition).longValue();
            } catch (VeniceOperationAgainstKafkaTimedOut e) {
                this.logger.warn("Failed to get offset. Retries remaining: {}", Integer.valueOf(i - i2), e);
                veniceOperationAgainstKafkaTimedOut = e;
            }
        }
        throw veniceOperationAgainstKafkaTimedOut;
    }

    @Override // com.linkedin.venice.kafka.partitionoffset.PartitionOffsetFetcher
    public long getPartitionOffsetByTime(PubSubTopicPartition pubSubTopicPartition, long j) {
        AutoCloseableLock of = AutoCloseableLock.of(this.adminConsumerLock);
        try {
            Long offsetsForTimesWithRetry = offsetsForTimesWithRetry(pubSubTopicPartition, j);
            if (offsetsForTimesWithRetry == null) {
                offsetsForTimesWithRetry = Long.valueOf(getOffsetByTimeIfOutOfRange(pubSubTopicPartition, j));
            } else if (offsetsForTimesWithRetry.longValue() == -1) {
                this.logger.warn("Offsets result is empty. Will complement with the last offsets.");
                offsetsForTimesWithRetry = Long.valueOf(endOffsetsWithRetry(pubSubTopicPartition).longValue() + 1);
            }
            long longValue = offsetsForTimesWithRetry.longValue();
            if (of != null) {
                of.close();
            }
            return longValue;
        } catch (Throwable th) {
            if (of != null) {
                try {
                    of.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private Long offsetsForTimesWithRetry(PubSubTopicPartition pubSubTopicPartition, long j) {
        AutoCloseableLock of = AutoCloseableLock.of(this.adminConsumerLock);
        try {
            Long l = (Long) RetryUtils.executeWithMaxAttemptAndExponentialBackoff(() -> {
                return ((PubSubConsumerAdapter) this.pubSubConsumer.get()).offsetForTime(pubSubTopicPartition, j, this.kafkaOperationTimeout);
            }, 25, Duration.ofMillis(100L), Duration.ofSeconds(5L), Duration.ofMinutes(1L), KAFKA_RETRIABLE_FAILURES);
            if (of != null) {
                of.close();
            }
            return l;
        } catch (Throwable th) {
            if (of != null) {
                try {
                    of.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private Long endOffsetsWithRetry(PubSubTopicPartition pubSubTopicPartition) {
        AutoCloseableLock of = AutoCloseableLock.of(this.adminConsumerLock);
        try {
            Long l = (Long) RetryUtils.executeWithMaxAttemptAndExponentialBackoff(() -> {
                return ((PubSubConsumerAdapter) this.pubSubConsumer.get()).endOffset(pubSubTopicPartition);
            }, 25, Duration.ofMillis(100L), Duration.ofSeconds(5L), Duration.ofMinutes(1L), KAFKA_RETRIABLE_FAILURES);
            if (of != null) {
                of.close();
            }
            return l;
        } catch (Throwable th) {
            if (of != null) {
                try {
                    of.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v4, types: [java.lang.Throwable] */
    @Override // com.linkedin.venice.kafka.partitionoffset.PartitionOffsetFetcher
    public long getProducerTimestampOfLastDataRecord(PubSubTopicPartition pubSubTopicPartition, int i) {
        if (i < 1) {
            throw new IllegalArgumentException("Invalid retries. Got: " + i);
        }
        VeniceOperationAgainstKafkaTimedOut veniceOperationAgainstKafkaTimedOut = new VeniceOperationAgainstKafkaTimedOut("This exception should not be thrown");
        for (int i2 = 0; i2 < i; i2++) {
            try {
                return getProducerTimestampOfLastDataRecord(pubSubTopicPartition);
            } catch (VeniceOperationAgainstKafkaTimedOut e) {
                this.logger.warn("Failed to get producer timestamp on the latest data record. Retries remaining: {}", Integer.valueOf(i - i2), e);
                veniceOperationAgainstKafkaTimedOut = e;
            }
        }
        throw veniceOperationAgainstKafkaTimedOut;
    }

    private long getProducerTimestampOfLastDataRecord(PubSubTopicPartition pubSubTopicPartition) throws TopicDoesNotExistException {
        List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> consumeLatestRecords = consumeLatestRecords(pubSubTopicPartition, 1);
        if (consumeLatestRecords.isEmpty()) {
            return -1L;
        }
        PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> next = consumeLatestRecords.iterator().next();
        if (!next.getKey().isControlMessage()) {
            return next.getValue().producerMetadata.messageTimestamp;
        }
        this.logger.info("The last record in topic partition {} is a control message. Hence, try to find the last data record among the last {} records from the end of that partition", pubSubTopicPartition, 60);
        List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> consumeLatestRecords2 = consumeLatestRecords(pubSubTopicPartition, 60);
        if (consumeLatestRecords2.isEmpty()) {
            this.logger.warn("Second attempt to find producer timestamp from topic partition {} by consuming the last {} record(s) consumed no record. Assume the topic partition is empty.", pubSubTopicPartition, 60);
            return -1L;
        }
        Long l = null;
        Long l2 = null;
        int i = 0;
        for (PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage : consumeLatestRecords2) {
            l2 = l2 == null ? pubSubMessage.getOffset() : l2;
            i++;
            if (!pubSubMessage.getKey().isControlMessage()) {
                l = Long.valueOf(pubSubMessage.getValue().producerMetadata.messageTimestamp);
            }
        }
        if (l == null) {
            throw new VeniceException(String.format("Failed to find latest data record producer timestamp in topic partition %s since no data record is found in the last %d records starting from offset %d", pubSubTopicPartition, Integer.valueOf(i), l2));
        }
        return l.longValue();
    }

    private List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> consumeLatestRecords(PubSubTopicPartition pubSubTopicPartition, int i) {
        if (pubSubTopicPartition.getPartitionNumber() < 0) {
            throw new IllegalArgumentException("Cannot retrieve latest producer timestamp for invalid topic partition " + pubSubTopicPartition);
        }
        if (i < 1) {
            throw new IllegalArgumentException("Last record count must be greater than or equal to 1. Got: " + i);
        }
        AutoCloseableLock of = AutoCloseableLock.of(this.adminConsumerLock);
        try {
            try {
                if (!((PubSubAdminAdapter) this.kafkaAdminWrapper.get()).containsTopicWithExpectationAndRetry(pubSubTopicPartition.getPubSubTopic(), 3, true)) {
                    throw new TopicDoesNotExistException("Topic " + pubSubTopicPartition.getPubSubTopic() + " does not exist!");
                }
                try {
                    Map<PubSubTopicPartition, Long> endOffsets = ((PubSubConsumerAdapter) this.pubSubConsumer.get()).endOffsets(Collections.singletonList(pubSubTopicPartition), DEFAULT_KAFKA_OFFSET_API_TIMEOUT);
                    if (endOffsets == null || !endOffsets.containsKey(pubSubTopicPartition)) {
                        throw new VeniceException("Got no results of finding end offsets for topic partition: " + pubSubTopicPartition);
                    }
                    long longValue = endOffsets.get(pubSubTopicPartition).longValue();
                    if (longValue <= 0) {
                        List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> emptyList = Collections.emptyList();
                        ((PubSubConsumerAdapter) this.pubSubConsumer.get()).unSubscribe(pubSubTopicPartition);
                        if (of != null) {
                            of.close();
                        }
                        return emptyList;
                    }
                    Long beginningOffset = ((PubSubConsumerAdapter) this.pubSubConsumer.get()).beginningOffset(pubSubTopicPartition, DEFAULT_KAFKA_OFFSET_API_TIMEOUT);
                    if (beginningOffset == null) {
                        throw new VeniceException("Got no results of finding the earliest offset for topic partition: " + pubSubTopicPartition);
                    }
                    if (beginningOffset.longValue() == longValue) {
                        List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> emptyList2 = Collections.emptyList();
                        ((PubSubConsumerAdapter) this.pubSubConsumer.get()).unSubscribe(pubSubTopicPartition);
                        if (of != null) {
                            of.close();
                        }
                        return emptyList2;
                    }
                    long max = Math.max(longValue - i, beginningOffset.longValue());
                    ((PubSubConsumerAdapter) this.pubSubConsumer.get()).subscribe(pubSubTopicPartition, max - 1);
                    ArrayList arrayList = new ArrayList(i);
                    do {
                        List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> emptyList3 = Collections.emptyList();
                        int i2 = 0;
                        while (true) {
                            int i3 = i2;
                            i2++;
                            if (i3 >= 3 || !emptyList3.isEmpty()) {
                                break;
                            }
                            this.logger.info("Trying to get records from topic partition {} from offset {} to its log end offset. Attempt# {} / {}", pubSubTopicPartition, Long.valueOf(max), Integer.valueOf(i2), 3);
                            emptyList3 = ((PubSubConsumerAdapter) this.pubSubConsumer.get()).poll(this.kafkaOperationTimeout.toMillis()).get(pubSubTopicPartition);
                        }
                        if (emptyList3.isEmpty()) {
                            String str = "Failed to get the last record from topic-partition: " + pubSubTopicPartition + " after 3 attempts";
                            this.logger.error(str);
                            throw new VeniceException(str);
                        }
                        this.logger.info("Consumed {} record(s) from topic partition {}", Integer.valueOf(emptyList3.size()), pubSubTopicPartition);
                        arrayList.addAll(emptyList3);
                    } while (((Long) ((PubSubMessage) arrayList.get(arrayList.size() - 1)).getOffset()).longValue() + 1 < longValue);
                    if (of != null) {
                        of.close();
                    }
                    return arrayList;
                } catch (TimeoutException e) {
                    throw new VeniceOperationAgainstKafkaTimedOut("Timeout exception when seeking to end to get latest offset for topic and partition: " + pubSubTopicPartition, e);
                }
            } finally {
                ((PubSubConsumerAdapter) this.pubSubConsumer.get()).unSubscribe(pubSubTopicPartition);
            }
        } catch (Throwable th) {
            if (of != null) {
                try {
                    of.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.linkedin.venice.kafka.partitionoffset.PartitionOffsetFetcher
    public List<PubSubTopicPartitionInfo> partitionsFor(PubSubTopic pubSubTopic) {
        AutoCloseableLock of = AutoCloseableLock.of(this.adminConsumerLock);
        try {
            List<PubSubTopicPartitionInfo> partitionsFor = ((PubSubConsumerAdapter) this.pubSubConsumer.get()).partitionsFor(pubSubTopic);
            if (of != null) {
                of.close();
            }
            return partitionsFor;
        } catch (Throwable th) {
            if (of != null) {
                try {
                    of.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.linkedin.venice.kafka.partitionoffset.PartitionOffsetFetcher
    public long getOffsetByTimeIfOutOfRange(PubSubTopicPartition pubSubTopicPartition, long j) throws TopicDoesNotExistException {
        Long offsetForTime;
        AutoCloseableLock of = AutoCloseableLock.of(this.adminConsumerLock);
        try {
            long latestOffset = getLatestOffset(pubSubTopicPartition);
            if (latestOffset <= 0) {
                this.logger.info("End offset for topic {} is {}; return offset {}", pubSubTopicPartition, Long.valueOf(latestOffset), 0L);
                if (of != null) {
                    of.close();
                }
                return 0L;
            }
            long earliestOffset = getEarliestOffset(pubSubTopicPartition);
            if (earliestOffset == latestOffset) {
                this.logger.info("Both beginning offset and end offset is {} for topic {}; it's empty; return offset {}", Long.valueOf(latestOffset), pubSubTopicPartition, Long.valueOf(latestOffset));
                if (of != null) {
                    of.close();
                }
                return latestOffset;
            }
            try {
                ((PubSubConsumerAdapter) this.pubSubConsumer.get()).subscribe(pubSubTopicPartition, latestOffset - 2);
                Map hashMap = new HashMap();
                int i = 0;
                while (true) {
                    int i2 = i;
                    i++;
                    if (i2 >= 3 || !hashMap.isEmpty()) {
                        break;
                    }
                    this.logger.info("Trying to get the last record from topic: {} at offset: {}. Attempt#{}/{}", pubSubTopicPartition, Long.valueOf(latestOffset - 1), Integer.valueOf(i), 3);
                    hashMap = ((PubSubConsumerAdapter) this.pubSubConsumer.get()).poll(this.kafkaOperationTimeout.toMillis());
                }
                if (hashMap.isEmpty()) {
                    String str = "Failed to get the last record from topic: " + pubSubTopicPartition + " after 3 attempts";
                    this.logger.error(str);
                    throw new VeniceException(str);
                }
                PubSubMessage pubSubMessage = (PubSubMessage) Utils.iterateOnMapOfLists(hashMap).next();
                if (j > pubSubMessage.getPubSubMessageTime() || (offsetForTime = ((PubSubConsumerAdapter) this.pubSubConsumer.get()).offsetForTime(pubSubTopicPartition, j)) == null || offsetForTime.longValue() == -1) {
                    long j2 = j > pubSubMessage.getPubSubMessageTime() ? latestOffset : earliestOffset;
                    this.logger.info("Successfully return offset: {} for topic: {} for timestamp: {}", Long.valueOf(j2), pubSubTopicPartition, Long.valueOf(j));
                    ((PubSubConsumerAdapter) this.pubSubConsumer.get()).unSubscribe(pubSubTopicPartition);
                    if (of != null) {
                        of.close();
                    }
                    return j2;
                }
                this.logger.info("Successfully return offset: {} for topic: {} for timestamp: {}", offsetForTime, pubSubTopicPartition, Long.valueOf(j));
                long longValue = offsetForTime.longValue();
                ((PubSubConsumerAdapter) this.pubSubConsumer.get()).unSubscribe(pubSubTopicPartition);
                if (of != null) {
                    of.close();
                }
                return longValue;
            } catch (Throwable th) {
                ((PubSubConsumerAdapter) this.pubSubConsumer.get()).unSubscribe(pubSubTopicPartition);
                throw th;
            }
        } catch (Throwable th2) {
            if (of != null) {
                try {
                    of.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    private long getEarliestOffset(PubSubTopicPartition pubSubTopicPartition) throws TopicDoesNotExistException {
        AutoCloseableLock of = AutoCloseableLock.of(this.adminConsumerLock);
        try {
            if (!((PubSubAdminAdapter) this.kafkaAdminWrapper.get()).containsTopicWithExpectationAndRetry(pubSubTopicPartition.getPubSubTopic(), 3, true)) {
                throw new TopicDoesNotExistException("Topic " + pubSubTopicPartition.getPubSubTopic() + " does not exist!");
            }
            if (pubSubTopicPartition.getPartitionNumber() < 0) {
                throw new IllegalArgumentException("Cannot retrieve latest offsets for invalid partition " + pubSubTopicPartition.getPartitionNumber());
            }
            try {
                Long beginningOffset = ((PubSubConsumerAdapter) this.pubSubConsumer.get()).beginningOffset(pubSubTopicPartition, DEFAULT_KAFKA_OFFSET_API_TIMEOUT);
                if (beginningOffset == null) {
                    throw new VeniceException("offset result returned from beginningOffsets does not contain entry: " + pubSubTopicPartition);
                }
                long longValue = beginningOffset.longValue();
                if (of != null) {
                    of.close();
                }
                return longValue;
            } catch (Exception e) {
                if (e instanceof TimeoutException) {
                    throw new VeniceOperationAgainstKafkaTimedOut("Timeout exception when seeking to beginning to get earliest offset for topic partition: " + pubSubTopicPartition, e);
                }
                throw e;
            }
        } catch (Throwable th) {
            if (of != null) {
                try {
                    of.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.linkedin.venice.kafka.partitionoffset.PartitionOffsetFetcher, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.kafkaAdminWrapper.isPresent()) {
            Closeable closeable = (Closeable) this.kafkaAdminWrapper.get();
            Logger logger = this.logger;
            Objects.requireNonNull(logger);
            IOUtils.closeQuietly(closeable, (v1) -> {
                r1.error(v1);
            });
        }
        if (this.pubSubConsumer.isPresent()) {
            Closeable closeable2 = (Closeable) this.pubSubConsumer.get();
            Logger logger2 = this.logger;
            Objects.requireNonNull(logger2);
            IOUtils.closeQuietly(closeable2, (v1) -> {
                r1.error(v1);
            });
        }
    }
}
