package com.linkedin.venice.controller.kafka;

import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.controller.Admin;
import com.linkedin.venice.controller.VeniceControllerMultiClusterConfig;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.meta.StoreConfig;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.system.store.MetaStoreWriter;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/controller/kafka/TopicCleanupService.class */
public class TopicCleanupService extends AbstractVeniceService {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) TopicCleanupService.class);
    private static final Map<String, Integer> storeToCountdownForDeletion = new HashMap();
    private final Admin admin;
    protected final long sleepIntervalBetweenTopicListFetchMs;
    protected final int delayFactor;
    private final int minNumberOfUnusedKafkaTopicsToPreserve;
    private PubSubTopicRepository pubSubTopicRepository;
    protected final VeniceControllerMultiClusterConfig multiClusterConfigs;
    private final AtomicBoolean stop = new AtomicBoolean(false);
    private boolean isLeaderControllerOfControllerCluster = false;
    private long refreshQueueCycle = 60000;
    private final Thread cleanupThread = new Thread(new TopicCleanupTask(), "TopicCleanupTask");

    /* loaded from: input_file:com/linkedin/venice/controller/kafka/TopicCleanupService$TopicCleanupTask.class */
    private class TopicCleanupTask implements Runnable {
        private TopicCleanupTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!TopicCleanupService.this.stop.get()) {
                try {
                    Thread.sleep(TopicCleanupService.this.sleepIntervalBetweenTopicListFetchMs);
                    if (TopicCleanupService.this.stop.get()) {
                        break;
                    }
                    try {
                    } catch (Exception e) {
                        TopicCleanupService.LOGGER.error("Received exception when cleaning up topics", (Throwable) e);
                    }
                    if (!TopicCleanupService.this.admin.isLeaderControllerOfControllerCluster()) {
                        TopicCleanupService.this.isLeaderControllerOfControllerCluster = false;
                    } else if (TopicCleanupService.this.isLeaderControllerOfControllerCluster) {
                        TopicCleanupService.this.cleanupVeniceTopics();
                    } else {
                        TopicCleanupService.this.isLeaderControllerOfControllerCluster = true;
                        TopicCleanupService.LOGGER.info("Current controller becomes the leader controller of controller cluster");
                    }
                } catch (InterruptedException e2) {
                    TopicCleanupService.LOGGER.error("Received InterruptedException during sleep in TopicCleanup thread");
                }
            }
            TopicCleanupService.LOGGER.info("TopicCleanupTask stopped");
        }
    }

    public TopicCleanupService(Admin admin, VeniceControllerMultiClusterConfig veniceControllerMultiClusterConfig, PubSubTopicRepository pubSubTopicRepository) {
        this.admin = admin;
        this.sleepIntervalBetweenTopicListFetchMs = veniceControllerMultiClusterConfig.getTopicCleanupSleepIntervalBetweenTopicListFetchMs();
        this.delayFactor = veniceControllerMultiClusterConfig.getTopicCleanupDelayFactor();
        this.minNumberOfUnusedKafkaTopicsToPreserve = veniceControllerMultiClusterConfig.getMinNumberOfUnusedKafkaTopicsToPreserve();
        this.multiClusterConfigs = veniceControllerMultiClusterConfig;
        this.pubSubTopicRepository = pubSubTopicRepository;
    }

    @Override // com.linkedin.venice.service.AbstractVeniceService
    public boolean startInner() throws Exception {
        this.cleanupThread.start();
        return true;
    }

    @Override // com.linkedin.venice.service.AbstractVeniceService
    public void stopInner() throws Exception {
        this.stop.set(true);
        this.cleanupThread.interrupt();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TopicManager getTopicManager() {
        return this.admin.getTopicManager();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TopicManager getTopicManager(String str) {
        return this.admin.getTopicManager(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Admin getAdmin() {
        return this.admin;
    }

    void cleanupVeniceTopics() {
        PriorityQueue<PubSubTopic> priorityQueue = new PriorityQueue<>((Comparator<? super PubSubTopic>) (pubSubTopic, pubSubTopic2) -> {
            return pubSubTopic.isRealTime() ? -1 : 0;
        });
        populateDeprecatedTopicQueue(priorityQueue);
        long currentTimeMillis = System.currentTimeMillis();
        while (!priorityQueue.isEmpty()) {
            PubSubTopic poll = priorityQueue.poll();
            try {
                try {
                    cleanupReplicaStatusesFromMetaSystemStore(poll);
                } catch (Exception e) {
                    LOGGER.error("Received exception: {} while trying to clean up replica statuses from meta system store for topic: {}, but topic deletion will continue", e, poll);
                }
                getTopicManager().ensureTopicIsDeletedAndBlockWithRetry(poll);
            } catch (ExecutionException e2) {
                LOGGER.warn("ExecutionException caught when trying to delete topic: {}", poll);
            }
            if (!poll.isRealTime() && System.currentTimeMillis() - currentTimeMillis > this.refreshQueueCycle) {
                priorityQueue.clear();
                populateDeprecatedTopicQueue(priorityQueue);
                if (priorityQueue.isEmpty()) {
                    return;
                } else {
                    currentTimeMillis = System.currentTimeMillis();
                }
            }
        }
    }

    private void populateDeprecatedTopicQueue(PriorityQueue<PubSubTopic> priorityQueue) {
        getAllVeniceStoreTopicsRetentions(getTopicManager()).forEach((str, map) -> {
            PubSubTopic topic = this.pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(str));
            if (map.containsKey(topic)) {
                if (this.admin.isTopicTruncatedBasedOnRetention(((Long) map.get(topic)).longValue())) {
                    priorityQueue.offer(topic);
                }
                map.remove(topic);
            }
            List<PubSubTopic> extractVersionTopicsToCleanup = extractVersionTopicsToCleanup(this.admin, map, this.minNumberOfUnusedKafkaTopicsToPreserve, this.delayFactor);
            if (extractVersionTopicsToCleanup.isEmpty()) {
                return;
            }
            priorityQueue.addAll(extractVersionTopicsToCleanup);
        });
    }

    public static Map<String, Map<PubSubTopic, Long>> getAllVeniceStoreTopicsRetentions(TopicManager topicManager) {
        Map<PubSubTopic, Long> allTopicRetentions = topicManager.getAllTopicRetentions();
        HashMap hashMap = new HashMap();
        for (Map.Entry<PubSubTopic, Long> entry : allTopicRetentions.entrySet()) {
            PubSubTopic key = entry.getKey();
            long longValue = entry.getValue().longValue();
            String storeName = key.getStoreName();
            if (!storeName.isEmpty()) {
                hashMap.compute(storeName, (str, map) -> {
                    if (map == null) {
                        map = new HashMap();
                    }
                    map.put(key, Long.valueOf(longValue));
                    return map;
                });
            }
        }
        return hashMap;
    }

    public static List<PubSubTopic> extractVersionTopicsToCleanup(Admin admin, Map<PubSubTopic, Long> map, int i, int i2) {
        if (map.isEmpty()) {
            return Collections.emptyList();
        }
        Set<PubSubTopic> keySet = map.keySet();
        Optional max = keySet.stream().filter(pubSubTopic -> {
            return Version.isVersionTopic(pubSubTopic.getName());
        }).map(pubSubTopic2 -> {
            return Integer.valueOf(Version.parseVersionFromKafkaTopicName(pubSubTopic2.getName()));
        }).max((v0, v1) -> {
            return Integer.compare(v0, v1);
        });
        if (!max.isPresent()) {
            return Collections.emptyList();
        }
        int intValue = ((Integer) max.get()).intValue();
        String storeName = keySet.iterator().next().getStoreName();
        VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName);
        boolean z = systemStoreType != null && systemStoreType.isStoreZkShared();
        long j = ((!z && !admin.getStoreConfigRepo().getStoreConfig(storeName).isPresent()) || z) ? intValue : intValue - i;
        return (List) keySet.stream().filter(pubSubTopic3 -> {
            return admin.isTopicTruncatedBasedOnRetention(((Long) map.get(pubSubTopic3)).longValue());
        }).filter(pubSubTopic4 -> {
            return ((long) Version.parseVersionFromKafkaTopicName(pubSubTopic4.getName())) <= j;
        }).filter(pubSubTopic5 -> {
            return admin.isParent() || !admin.isResourceStillAlive(pubSubTopic5.getName());
        }).filter(pubSubTopic6 -> {
            if (Version.isRealTimeTopic(pubSubTopic6.getName())) {
                return true;
            }
            if (storeToCountdownForDeletion.merge(pubSubTopic6.getName(), Integer.valueOf(i2), (num, num2) -> {
                return Integer.valueOf(num.intValue() - 1);
            }).intValue() > 0) {
                return false;
            }
            storeToCountdownForDeletion.remove(pubSubTopic6);
            return true;
        }).collect(Collectors.toList());
    }

    protected boolean cleanupReplicaStatusesFromMetaSystemStore(PubSubTopic pubSubTopic) {
        if (this.admin.isParent() || !pubSubTopic.isVersionTopic()) {
            return false;
        }
        String parseStoreFromKafkaTopicName = Version.parseStoreFromKafkaTopicName(pubSubTopic.getName());
        int parseVersionFromKafkaTopicName = Version.parseVersionFromKafkaTopicName(pubSubTopic.getName());
        Optional<StoreConfig> storeConfig = this.admin.getStoreConfigRepo().getStoreConfig(parseStoreFromKafkaTopicName);
        if (!storeConfig.isPresent()) {
            throw new VeniceException("Failed to get store config for store: " + parseStoreFromKafkaTopicName);
        }
        String cluster = storeConfig.get().getCluster();
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(VeniceSystemStoreType.META_STORE.getSystemStoreName(parseStoreFromKafkaTopicName)));
        TopicManager topicManager = getTopicManager();
        if (!topicManager.containsTopic(topic)) {
            return false;
        }
        int size = topicManager.partitionsFor(pubSubTopic).size();
        MetaStoreWriter metaStoreWriter = this.admin.getMetaStoreWriter();
        for (int i = 0; i < size; i++) {
            metaStoreWriter.deleteStoreReplicaStatus(cluster, parseStoreFromKafkaTopicName, parseVersionFromKafkaTopicName, i);
        }
        LOGGER.info("Successfully removed store replica status from meta system store for store: {} , version: {} with partition count: {} in cluster: {}", parseStoreFromKafkaTopicName, Integer.valueOf(parseVersionFromKafkaTopicName), Integer.valueOf(size), cluster);
        return true;
    }
}
