package com.linkedin.venice.cleaner;

import com.linkedin.davinci.kafka.consumer.StoreIngestionService;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.stats.LeakedResourceCleanerStats;
import io.tehuti.metrics.MetricsRepository;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/cleaner/LeakedResourceCleaner.class */
public class LeakedResourceCleaner extends AbstractVeniceService {
    private static final Logger LOGGER = LogManager.getLogger(LeakedResourceCleaner.class);
    private final long pollIntervalMs;
    private LeakedResourceCleanerRunnable cleaner;
    private Thread runner;
    private final StorageEngineRepository storageEngineRepository;
    private final ReadOnlyStoreRepository storeRepository;
    private final StoreIngestionService ingestionService;
    private final StorageService storageService;
    private final LeakedResourceCleanerStats stats;
    private long nonExistentStoreCleanupInterval = 86400000;

    /* loaded from: input_file:com/linkedin/venice/cleaner/LeakedResourceCleaner$LeakedResourceCleanerRunnable.class */
    private class LeakedResourceCleanerRunnable implements Runnable {
        private StorageEngineRepository storageEngineRepository;
        private volatile boolean stop = false;
        Map<String, Long> nonExistentStoreToCheckedTimestamp = new HashMap();

        public LeakedResourceCleanerRunnable(StorageEngineRepository storageEngineRepository) {
            this.storageEngineRepository = storageEngineRepository;
        }

        protected void setStop() {
            this.stop = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!this.stop) {
                try {
                    Thread.sleep(LeakedResourceCleaner.this.pollIntervalMs);
                    Iterator it = this.storageEngineRepository.getAllLocalStorageEngines().iterator();
                    while (it.hasNext()) {
                        String storeName = ((AbstractStorageEngine) it.next()).getStoreName();
                        try {
                            String parseStoreFromKafkaTopicName = Version.parseStoreFromKafkaTopicName(storeName);
                            int parseVersionFromKafkaTopicName = Version.parseVersionFromKafkaTopicName(storeName);
                            try {
                                Store storeOrThrow = LeakedResourceCleaner.this.storeRepository.getStoreOrThrow(parseStoreFromKafkaTopicName);
                                this.nonExistentStoreToCheckedTimestamp.remove(parseStoreFromKafkaTopicName);
                                if (storeOrThrow.getVersions().isEmpty()) {
                                    LeakedResourceCleaner.LOGGER.warn("Found no version for store: {}, but a lingering resource: {}, which is suspicious, so here will skip the cleanup.", parseStoreFromKafkaTopicName, storeName);
                                } else if (!storeOrThrow.getVersion(parseVersionFromKafkaTopicName).isPresent() && !LeakedResourceCleaner.this.ingestionService.containsRunningConsumption(storeName)) {
                                    LeakedResourceCleaner.LOGGER.info("Resource: {} doesn't have either the corresponding version stored in ZK, or a running ingestion task, so it will be cleaned up.", storeName);
                                    LeakedResourceCleaner.this.storageService.removeStorageEngine(storeName);
                                    LeakedResourceCleaner.LOGGER.info("Resource: {} has been cleaned up.", storeName);
                                    LeakedResourceCleaner.this.stats.recordLeakedVersion();
                                }
                            } catch (VeniceNoStoreException e) {
                                long currentTimeMillis = System.currentTimeMillis();
                                if (!this.nonExistentStoreToCheckedTimestamp.containsKey(parseStoreFromKafkaTopicName)) {
                                    this.nonExistentStoreToCheckedTimestamp.put(parseStoreFromKafkaTopicName, Long.valueOf(currentTimeMillis));
                                } else if (this.nonExistentStoreToCheckedTimestamp.get(parseStoreFromKafkaTopicName).longValue() + LeakedResourceCleaner.this.nonExistentStoreCleanupInterval < currentTimeMillis) {
                                    LeakedResourceCleaner.LOGGER.info("Store: {} is not hosted by this host, it's resources will be cleaned up.", parseStoreFromKafkaTopicName);
                                    LeakedResourceCleaner.this.storageService.removeStorageEngine(storeName);
                                    LeakedResourceCleaner.LOGGER.info("Resource: {} has been cleaned up.", storeName);
                                    LeakedResourceCleaner.this.stats.recordLeakedVersion();
                                    this.nonExistentStoreToCheckedTimestamp.remove(parseStoreFromKafkaTopicName);
                                }
                            }
                        } catch (Exception e2) {
                            LeakedResourceCleaner.LOGGER.error("Received exception while verifying/cleaning up resource: {}", storeName, e2);
                        }
                    }
                } catch (InterruptedException e3) {
                    LeakedResourceCleaner.LOGGER.info("Received interruptedException while running LeakedResourceCleanerRunnable, will exit");
                    return;
                }
            }
        }
    }

    public LeakedResourceCleaner(StorageEngineRepository storageEngineRepository, long j, ReadOnlyStoreRepository readOnlyStoreRepository, StoreIngestionService storeIngestionService, StorageService storageService, MetricsRepository metricsRepository) {
        this.storageEngineRepository = storageEngineRepository;
        this.pollIntervalMs = j;
        this.storeRepository = readOnlyStoreRepository;
        this.ingestionService = storeIngestionService;
        this.storageService = storageService;
        this.stats = new LeakedResourceCleanerStats(metricsRepository);
    }

    public boolean startInner() {
        this.cleaner = new LeakedResourceCleanerRunnable(this.storageEngineRepository);
        this.runner = new Thread(this.cleaner);
        this.runner.setName("Storage Leaked Resource cleaner");
        this.runner.setDaemon(true);
        this.runner.start();
        return true;
    }

    public void stopInner() {
        this.cleaner.setStop();
        this.runner.interrupt();
    }

    void setNonExistentStoreCleanupInterval(long j) {
        this.nonExistentStoreCleanupInterval = j;
    }
}
