package com.linkedin.venice.kafka;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.TopicManagerRepository;
import com.linkedin.venice.kafka.partitionoffset.PartitionOffsetFetcher;
import com.linkedin.venice.kafka.partitionoffset.PartitionOffsetFetcherFactory;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.pubsub.PubSubTopicConfiguration;
import com.linkedin.venice.pubsub.PubSubTopicPartitionInfo;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.admin.InstrumentedApacheKafkaAdminAdapter;
import com.linkedin.venice.pubsub.api.PubSubAdminAdapter;
import com.linkedin.venice.pubsub.api.PubSubAdminAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.ExceptionUtils;
import com.linkedin.venice.utils.RetryUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.lazy.Lazy;
import io.tehuti.metrics.MetricsRepository;
import it.unimi.dsi.fastutil.ints.Int2LongMap;
import java.io.Closeable;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/kafka/TopicManager.class */
public class TopicManager implements Closeable {
    private static final int MINIMUM_TOPIC_DELETION_STATUS_POLL_TIMES = 10;
    private static final int FAST_KAFKA_OPERATION_TIMEOUT_MS = 1000;
    protected static final long ETERNAL_TOPIC_RETENTION_POLICY_MS = Long.MAX_VALUE;
    public static final long DEFAULT_TOPIC_RETENTION_POLICY_MS = 432000000;
    public static final long BUFFER_REPLAY_MINIMAL_SAFETY_MARGIN = 172800000;
    public static final int DEFAULT_KAFKA_OPERATION_TIMEOUT_MS = 30000;
    public static final long UNKNOWN_TOPIC_RETENTION = Long.MIN_VALUE;
    public static final int MAX_TOPIC_DELETE_RETRIES = 3;
    public static final int DEFAULT_KAFKA_REPLICATION_FACTOR = 3;
    public static final long DEFAULT_KAFKA_MIN_LOG_COMPACTION_LAG_MS = 86400000;
    public static final boolean CONCURRENT_TOPIC_DELETION_REQUEST_POLICY = false;
    private static final List<Class<? extends Throwable>> CREATE_TOPIC_RETRIABLE_EXCEPTIONS = Collections.unmodifiableList(Arrays.asList(InvalidReplicationFactorException.class, TimeoutException.class));
    private final Logger logger;
    private final String pubSubBootstrapServers;
    private final long kafkaOperationTimeoutMs;
    private final long topicDeletionStatusPollIntervalMs;
    private final long topicMinLogCompactionLagMs;
    private final PubSubAdminAdapterFactory<PubSubAdminAdapter> pubSubAdminAdapterFactory;
    private final Lazy<PubSubAdminAdapter> kafkaWriteOnlyAdmin;
    private final Lazy<PubSubAdminAdapter> kafkaReadOnlyAdmin;
    private final PartitionOffsetFetcher partitionOffsetFetcher;
    Cache<PubSubTopic, PubSubTopicConfiguration> topicConfigCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();

    public TopicManager(TopicManagerRepository.Builder builder, String str) {
        this.logger = LogManager.getLogger(getClass().getSimpleName() + " [" + str + DefaultExpressionEngine.DEFAULT_ATTRIBUTE_END);
        this.kafkaOperationTimeoutMs = builder.getKafkaOperationTimeoutMs();
        this.topicDeletionStatusPollIntervalMs = builder.getTopicDeletionStatusPollIntervalMs();
        this.topicMinLogCompactionLagMs = builder.getTopicMinLogCompactionLagMs();
        this.pubSubAdminAdapterFactory = builder.getPubSubAdminAdapterFactory();
        this.pubSubBootstrapServers = str;
        TopicManagerRepository.SSLPropertiesSupplier pubSubProperties = builder.getPubSubProperties();
        PubSubTopicRepository pubSubTopicRepository = builder.getPubSubTopicRepository();
        Optional ofNullable = Optional.ofNullable(builder.getMetricsRepository());
        this.kafkaReadOnlyAdmin = Lazy.of(() -> {
            PubSubAdminAdapter createInstrumentedPubSubAdmin = createInstrumentedPubSubAdmin(ofNullable, "ReadOnlyKafkaAdminStats", this.pubSubAdminAdapterFactory.create(pubSubProperties.get(str), pubSubTopicRepository), str);
            this.logger.info("{} is using kafka read-only admin client of class: {}", getClass().getSimpleName(), createInstrumentedPubSubAdmin.getClassName());
            return createInstrumentedPubSubAdmin;
        });
        this.kafkaWriteOnlyAdmin = Lazy.of(() -> {
            PubSubAdminAdapter createInstrumentedPubSubAdmin = createInstrumentedPubSubAdmin(ofNullable, "WriteOnlyKafkaAdminStats", this.pubSubAdminAdapterFactory.create(pubSubProperties.get(str), pubSubTopicRepository), str);
            this.logger.info("{} is using kafka write-only admin client of class: {}", getClass().getSimpleName(), createInstrumentedPubSubAdmin.getClassName());
            return createInstrumentedPubSubAdmin;
        });
        this.partitionOffsetFetcher = PartitionOffsetFetcherFactory.createDefaultPartitionOffsetFetcher(builder.getPubSubConsumerAdapterFactory(), pubSubProperties.get(str), str, this.kafkaReadOnlyAdmin, this.kafkaOperationTimeoutMs, ofNullable);
    }

    private PubSubAdminAdapter createInstrumentedPubSubAdmin(Optional<MetricsRepository> optional, String str, PubSubAdminAdapter pubSubAdminAdapter, String str2) {
        if (!optional.isPresent()) {
            this.logger.info("Created non-instrumented Kafka admin client for Kafka cluster with bootstrap server {}", str2);
            return pubSubAdminAdapter;
        }
        InstrumentedApacheKafkaAdminAdapter instrumentedApacheKafkaAdminAdapter = new InstrumentedApacheKafkaAdminAdapter(pubSubAdminAdapter, optional.get(), String.format("%s_%s_%s", str, pubSubAdminAdapter.getClassName(), str2));
        this.logger.info("Created instrumented Kafka admin client for Kafka cluster with bootstrap server {} and has stats name prefix {}", str2, str);
        return instrumentedApacheKafkaAdminAdapter;
    }

    public void createTopic(PubSubTopic pubSubTopic, int i, int i2, boolean z) {
        createTopic(pubSubTopic, i, i2, z, false, Optional.empty(), false);
    }

    public void createTopic(PubSubTopic pubSubTopic, int i, int i2, boolean z, boolean z2, Optional<Integer> optional) {
        createTopic(pubSubTopic, i, i2, z, z2, optional, true);
    }

    public void createTopic(PubSubTopic pubSubTopic, int i, int i2, boolean z, boolean z2, Optional<Integer> optional, boolean z3) {
        createTopic(pubSubTopic, i, i2, z ? Long.MAX_VALUE : 432000000L, z2, optional, z3);
    }

    public void createTopic(PubSubTopic pubSubTopic, int i, int i2, long j, boolean z, Optional<Integer> optional, boolean z2) {
        long currentTimeMillis = System.currentTimeMillis();
        long j2 = currentTimeMillis + (z2 ? 1000L : this.kafkaOperationTimeoutMs);
        PubSubTopicConfiguration pubSubTopicConfiguration = new PubSubTopicConfiguration(Optional.of(Long.valueOf(j)), z, optional, Long.valueOf(this.topicMinLogCompactionLagMs));
        this.logger.info("Creating topic: {} partitions: {} replication: {}, configuration: {}", pubSubTopic, Integer.valueOf(i), Integer.valueOf(i2), pubSubTopicConfiguration);
        try {
            RetryUtils.executeWithMaxAttemptAndExponentialBackoff(() -> {
                this.kafkaWriteOnlyAdmin.get().createTopic(pubSubTopic, i, i2, pubSubTopicConfiguration);
            }, 10, Duration.ofMillis(200L), Duration.ofSeconds(1L), Duration.ofMillis(z2 ? 1000L : this.kafkaOperationTimeoutMs), CREATE_TOPIC_RETRIABLE_EXCEPTIONS);
            waitUntilTopicCreated(pubSubTopic, i, j2);
            this.logger.info("Successfully created {}topic: {}", (j > Long.MAX_VALUE ? 1 : (j == Long.MAX_VALUE ? 0 : -1)) == 0 ? "eternal " : "", pubSubTopic);
        } catch (Exception e) {
            if (!ExceptionUtils.recursiveClassEquals(e, TopicExistsException.class)) {
                throw new VeniceOperationAgainstKafkaTimedOut("Timeout while creating topic: " + pubSubTopic + ". Topic still does not exist after " + (j2 - currentTimeMillis) + "ms.", e);
            }
            this.logger.info("Topic: {} already exists, will update retention policy.", pubSubTopic);
            waitUntilTopicCreated(pubSubTopic, i, j2);
            updateTopicRetention(pubSubTopic, j);
            this.logger.info("Updated retention policy to be {}ms for topic: {}", Long.valueOf(j), pubSubTopic);
        }
    }

    protected void waitUntilTopicCreated(PubSubTopic pubSubTopic, int i, long j) {
        long currentTimeMillis = System.currentTimeMillis();
        while (!containsTopicAndAllPartitionsAreOnline(pubSubTopic, Integer.valueOf(i))) {
            if (System.currentTimeMillis() > j) {
                throw new VeniceOperationAgainstKafkaTimedOut("Timeout while creating topic: " + pubSubTopic + ".  Topic still did not pass all the checks after " + (j - currentTimeMillis) + "ms.");
            }
            Utils.sleep(200L);
        }
    }

    private Future<Void> ensureTopicIsDeletedAsync(PubSubTopic pubSubTopic) {
        this.logger.info("Deleting topic: {}", pubSubTopic);
        return this.kafkaWriteOnlyAdmin.get().deleteTopic(pubSubTopic);
    }

    public int getReplicationFactor(PubSubTopic pubSubTopic) {
        return partitionsFor(pubSubTopic).iterator().next().replicasNum();
    }

    public boolean updateTopicRetention(PubSubTopic pubSubTopic, long j) throws TopicDoesNotExistException {
        return updateTopicRetention(pubSubTopic, j, getTopicConfig(pubSubTopic));
    }

    public boolean updateTopicRetention(PubSubTopic pubSubTopic, long j, PubSubTopicConfiguration pubSubTopicConfiguration) throws TopicDoesNotExistException {
        Optional<Long> retentionInMs = pubSubTopicConfiguration.retentionInMs();
        if (retentionInMs.isPresent() && j == retentionInMs.get().longValue()) {
            return false;
        }
        pubSubTopicConfiguration.setRetentionInMs(Optional.of(Long.valueOf(j)));
        this.kafkaWriteOnlyAdmin.get().setTopicConfig(pubSubTopic, pubSubTopicConfiguration);
        this.logger.info("Updated topic: {} with retention.ms: {} in cluster [{}]", pubSubTopic, Long.valueOf(j), this.pubSubBootstrapServers);
        return true;
    }

    public synchronized void updateTopicCompactionPolicy(PubSubTopic pubSubTopic, boolean z) throws TopicDoesNotExistException {
        PubSubTopicConfiguration topicConfig = getTopicConfig(pubSubTopic);
        boolean isLogCompacted = topicConfig.isLogCompacted();
        if (z != isLogCompacted) {
            topicConfig.setLogCompacted(z);
            Long minLogCompactionLagMs = topicConfig.minLogCompactionLagMs();
            Long valueOf = Long.valueOf(z ? this.topicMinLogCompactionLagMs : 0L);
            topicConfig.setMinLogCompactionLagMs(valueOf);
            this.kafkaWriteOnlyAdmin.get().setTopicConfig(pubSubTopic, topicConfig);
            this.logger.info("Kafka compaction policy for topic: {} has been updated from {} to {}, min compaction lag updated from {} to {}", pubSubTopic, Boolean.valueOf(isLogCompacted), Boolean.valueOf(z), minLogCompactionLagMs, valueOf);
        }
    }

    public boolean isTopicCompactionEnabled(PubSubTopic pubSubTopic) {
        return getCachedTopicConfig(pubSubTopic).isLogCompacted();
    }

    public long getTopicMinLogCompactionLagMs(PubSubTopic pubSubTopic) {
        return getCachedTopicConfig(pubSubTopic).minLogCompactionLagMs().longValue();
    }

    public boolean updateTopicMinInSyncReplica(PubSubTopic pubSubTopic, int i) throws TopicDoesNotExistException {
        PubSubTopicConfiguration topicConfig = getTopicConfig(pubSubTopic);
        Optional<Integer> minInSyncReplicas = topicConfig.minInSyncReplicas();
        if (minInSyncReplicas.isPresent() && minInSyncReplicas.get().equals(Integer.valueOf(i))) {
            return false;
        }
        topicConfig.setMinInSyncReplicas(Optional.of(Integer.valueOf(i)));
        this.kafkaWriteOnlyAdmin.get().setTopicConfig(pubSubTopic, topicConfig);
        this.logger.info("Updated topic: {} with min.insync.replicas: {}", pubSubTopic, Integer.valueOf(i));
        return true;
    }

    public Map<PubSubTopic, Long> getAllTopicRetentions() {
        return this.kafkaReadOnlyAdmin.get().getAllTopicRetentions();
    }

    public long getTopicRetention(PubSubTopic pubSubTopic) throws TopicDoesNotExistException {
        return getTopicRetention(getTopicConfig(pubSubTopic));
    }

    public long getTopicRetention(PubSubTopicConfiguration pubSubTopicConfiguration) {
        if (pubSubTopicConfiguration.retentionInMs().isPresent()) {
            return pubSubTopicConfiguration.retentionInMs().get().longValue();
        }
        return Long.MIN_VALUE;
    }

    public boolean isTopicTruncated(PubSubTopic pubSubTopic, long j) {
        try {
            return isRetentionBelowTruncatedThreshold(getTopicRetention(pubSubTopic), j);
        } catch (TopicDoesNotExistException e) {
            return true;
        }
    }

    public boolean isRetentionBelowTruncatedThreshold(long j, long j2) {
        return j != Long.MIN_VALUE && j <= j2;
    }

    public PubSubTopicConfiguration getTopicConfig(PubSubTopic pubSubTopic) throws TopicDoesNotExistException {
        PubSubTopicConfiguration topicConfig = this.kafkaReadOnlyAdmin.get().getTopicConfig(pubSubTopic);
        this.topicConfigCache.put(pubSubTopic, topicConfig);
        return topicConfig;
    }

    public PubSubTopicConfiguration getTopicConfigWithRetry(PubSubTopic pubSubTopic) {
        PubSubTopicConfiguration topicConfigWithRetry = this.kafkaReadOnlyAdmin.get().getTopicConfigWithRetry(pubSubTopic);
        this.topicConfigCache.put(pubSubTopic, topicConfigWithRetry);
        return topicConfigWithRetry;
    }

    public PubSubTopicConfiguration getCachedTopicConfig(PubSubTopic pubSubTopic) {
        PubSubTopicConfiguration ifPresent = this.topicConfigCache.getIfPresent(pubSubTopic);
        if (ifPresent == null) {
            ifPresent = getTopicConfigWithRetry(pubSubTopic);
        }
        return ifPresent;
    }

    public Map<PubSubTopic, PubSubTopicConfiguration> getSomeTopicConfigs(Set<PubSubTopic> set) {
        Map<PubSubTopic, PubSubTopicConfiguration> someTopicConfigs = this.kafkaReadOnlyAdmin.get().getSomeTopicConfigs(set);
        for (Map.Entry<PubSubTopic, PubSubTopicConfiguration> entry : someTopicConfigs.entrySet()) {
            this.topicConfigCache.put(entry.getKey(), entry.getValue());
        }
        return someTopicConfigs;
    }

    public void ensureTopicIsDeletedAndBlock(PubSubTopic pubSubTopic) throws ExecutionException {
        if (containsTopicAndAllPartitionsAreOnline(pubSubTopic)) {
            if (this.kafkaReadOnlyAdmin.get().isTopicDeletionUnderway()) {
                throw new VeniceException("Delete operation already in progress! Try again later.");
            }
            Future<Void> ensureTopicIsDeletedAsync = ensureTopicIsDeletedAsync(pubSubTopic);
            if (ensureTopicIsDeletedAsync != null) {
                try {
                    ensureTopicIsDeletedAsync.get(this.kafkaOperationTimeoutMs, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    throw new VeniceException("Thread interrupted while waiting to delete topic: " + pubSubTopic);
                } catch (ExecutionException e2) {
                    if (!(e2.getCause() instanceof UnknownTopicOrPartitionException)) {
                        throw e2;
                    }
                } catch (java.util.concurrent.TimeoutException e3) {
                    throw new VeniceOperationAgainstKafkaTimedOut("Failed to delete kafka topic: " + pubSubTopic + " after " + this.kafkaOperationTimeoutMs);
                }
                this.logger.info("Topic: {} has been deleted", pubSubTopic);
                return;
            }
            long max = Math.max(this.topicDeletionStatusPollIntervalMs == 0 ? this.kafkaOperationTimeoutMs : this.kafkaOperationTimeoutMs / this.topicDeletionStatusPollIntervalMs, 10L);
            int i = 0;
            int i2 = 0;
            int i3 = 5;
            while (true) {
                i++;
                if (i > max) {
                    break;
                }
                Utils.sleep(this.topicDeletionStatusPollIntervalMs);
                if (i - i2 == i3) {
                    i2 = i;
                    i3 = Math.min(i3 * 2, 100);
                    if (i3 <= 0) {
                        i3 = 100;
                    }
                }
            }
            throw new VeniceOperationAgainstKafkaTimedOut("Failed to delete kafka topic: " + pubSubTopic + " after " + this.kafkaOperationTimeoutMs + " ms (" + i + " attempts).");
        }
    }

    public void ensureTopicIsDeletedAndBlockWithRetry(PubSubTopic pubSubTopic) throws ExecutionException {
        int i = 0;
        while (true) {
            try {
                ensureTopicIsDeletedAndBlock(pubSubTopic);
                return;
            } catch (VeniceOperationAgainstKafkaTimedOut e) {
                i++;
                this.logger.warn("Topic deletion for topic {} timed out!  Retry attempt {} / {}", (Object) pubSubTopic, (Object) Integer.valueOf(i), (Object) 3);
                if (i == 3) {
                    this.logger.error("Topic deletion for topic {} timed out! Giving up!!", pubSubTopic, e);
                    throw e;
                }
            } catch (ExecutionException e2) {
                i++;
                this.logger.warn("Topic deletion for topic {} errored out!  Retry attempt {} / {}", (Object) pubSubTopic, (Object) Integer.valueOf(i), (Object) 3);
                if (i == 3) {
                    this.logger.error("Topic deletion for topic {} errored out! Giving up!!", pubSubTopic, e2);
                    throw e2;
                }
            }
        }
    }

    public synchronized Set<PubSubTopic> listTopics() {
        return this.kafkaReadOnlyAdmin.get().listAllTopics();
    }

    public boolean containsTopic(PubSubTopic pubSubTopic) {
        return this.kafkaReadOnlyAdmin.get().containsTopic(pubSubTopic);
    }

    public boolean containsTopicWithExpectationAndRetry(PubSubTopic pubSubTopic, int i, boolean z) {
        return this.kafkaReadOnlyAdmin.get().containsTopicWithExpectationAndRetry(pubSubTopic, i, z);
    }

    public boolean containsTopicWithExpectationAndRetry(PubSubTopic pubSubTopic, int i, boolean z, Duration duration, Duration duration2, Duration duration3) {
        return this.kafkaReadOnlyAdmin.get().containsTopicWithExpectationAndRetry(pubSubTopic, i, z, duration, duration2, duration3);
    }

    public boolean containsTopicAndAllPartitionsAreOnline(PubSubTopic pubSubTopic) {
        return containsTopicAndAllPartitionsAreOnline(pubSubTopic, null);
    }

    public synchronized boolean containsTopicAndAllPartitionsAreOnline(PubSubTopic pubSubTopic, Integer num) {
        if (!containsTopic(pubSubTopic)) {
            return false;
        }
        List<PubSubTopicPartitionInfo> partitionsFor = this.partitionOffsetFetcher.partitionsFor(pubSubTopic);
        if (partitionsFor == null) {
            this.logger.warn("getConsumer().partitionsFor() returned null for topic: {}", pubSubTopic);
            return false;
        }
        if (num != null && partitionsFor.size() != num.intValue()) {
            this.logger.error("getConsumer().partitionsFor() returned the wrong number of partitions for topic: {}, expectedPartitionCount: {}, actual size: {}, partitionInfoList: {}", pubSubTopic, num, Integer.valueOf(partitionsFor.size()), Arrays.toString(partitionsFor.toArray()));
            return false;
        }
        boolean allMatch = partitionsFor.stream().allMatch((v0) -> {
            return v0.hasInSyncReplicas();
        });
        if (allMatch) {
            this.logger.trace("The following topic has the at least one in-sync replica for each partition: {}", pubSubTopic);
        } else {
            this.logger.info("getConsumer().partitionsFor() returned some partitionInfo with no in-sync replica for topic: {}, partitionInfoList: {}", pubSubTopic, Arrays.toString(partitionsFor.toArray()));
        }
        return allMatch;
    }

    public Int2LongMap getTopicLatestOffsets(PubSubTopic pubSubTopic) {
        return this.partitionOffsetFetcher.getTopicLatestOffsets(pubSubTopic);
    }

    public long getPartitionLatestOffsetAndRetry(PubSubTopicPartition pubSubTopicPartition, int i) {
        return this.partitionOffsetFetcher.getPartitionLatestOffsetAndRetry(pubSubTopicPartition, i);
    }

    public long getProducerTimestampOfLastDataRecord(PubSubTopicPartition pubSubTopicPartition, int i) {
        return this.partitionOffsetFetcher.getProducerTimestampOfLastDataRecord(pubSubTopicPartition, i);
    }

    public long getPartitionEarliestOffsetAndRetry(PubSubTopicPartition pubSubTopicPartition, int i) {
        return this.partitionOffsetFetcher.getPartitionEarliestOffsetAndRetry(pubSubTopicPartition, i);
    }

    public long getPartitionOffsetByTime(PubSubTopicPartition pubSubTopicPartition, long j) {
        return this.partitionOffsetFetcher.getPartitionOffsetByTime(pubSubTopicPartition, j);
    }

    public List<PubSubTopicPartitionInfo> partitionsFor(PubSubTopic pubSubTopic) {
        return this.partitionOffsetFetcher.partitionsFor(pubSubTopic);
    }

    public String getKafkaBootstrapServers() {
        return this.pubSubBootstrapServers;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        Utils.closeQuietlyWithErrorLogged(this.partitionOffsetFetcher);
        this.kafkaReadOnlyAdmin.ifPresent(closeable -> {
            Utils.closeQuietlyWithErrorLogged(closeable);
        });
        this.kafkaWriteOnlyAdmin.ifPresent(closeable2 -> {
            Utils.closeQuietlyWithErrorLogged(closeable2);
        });
    }

    public void setTopicConfigCache(Cache<PubSubTopic, PubSubTopicConfiguration> cache) {
        this.topicConfigCache = cache;
    }

    public static long getExpectedRetentionTimeInMs(Store store, HybridStoreConfig hybridStoreConfig) {
        return Math.max((hybridStoreConfig.getRewindTimeInSeconds() * 1000) + (store.getBootstrapToOnlineTimeoutInHours() * 3600000) + BUFFER_REPLAY_MINIMAL_SAFETY_MARGIN, DEFAULT_TOPIC_RETENTION_POLICY_MS);
    }
}
