package com.linkedin.davinci.storage;

import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.stats.AggVersionedStorageEngineStats;
import com.linkedin.davinci.stats.RocksDBMemoryStats;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.StorageEngineFactory;
import com.linkedin.davinci.store.blackhole.BlackHoleStorageEngineFactory;
import com.linkedin.davinci.store.memory.InMemoryStorageEngineFactory;
import com.linkedin.davinci.store.rocksdb.RocksDBStorageEngineFactory;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.utils.ExceptionUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.Utils;
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.rocksdb.RocksDBException;
import org.rocksdb.Statistics;

/* loaded from: input_file:com/linkedin/davinci/storage/StorageService.class */
public class StorageService extends AbstractVeniceService {
    private static final Logger LOGGER = LogManager.getLogger(StorageService.class);
    private final StorageEngineRepository storageEngineRepository;
    private final VeniceConfigLoader configLoader;
    private final VeniceServerConfig serverConfig;
    private final Map<PersistenceType, StorageEngineFactory> persistenceTypeToStorageEngineFactoryMap;
    private final AggVersionedStorageEngineStats aggVersionedStorageEngineStats;
    private final RocksDBMemoryStats rocksDBMemoryStats;
    private final InternalAvroSpecificSerializer<StoreVersionState> storeVersionStateSerializer;
    private final InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer;
    private final ReadOnlyStoreRepository storeRepository;

    public StorageService(VeniceConfigLoader veniceConfigLoader, AggVersionedStorageEngineStats aggVersionedStorageEngineStats, RocksDBMemoryStats rocksDBMemoryStats, InternalAvroSpecificSerializer<StoreVersionState> internalAvroSpecificSerializer, InternalAvroSpecificSerializer<PartitionState> internalAvroSpecificSerializer2, ReadOnlyStoreRepository readOnlyStoreRepository, boolean z, boolean z2, Function<String, Boolean> function) {
        String dataBasePath = veniceConfigLoader.getVeniceServerConfig().getDataBasePath();
        if (!Utils.directoryExists(dataBasePath)) {
            if (!veniceConfigLoader.getVeniceServerConfig().isAutoCreateDataPath()) {
                throw new VeniceException("Data directory '" + dataBasePath + "' does not exist and autocreate.data.path is disabled.");
            }
            File file = new File(dataBasePath);
            LOGGER.info("Creating data directory {}", file.getAbsolutePath());
            file.mkdirs();
        }
        this.configLoader = veniceConfigLoader;
        this.serverConfig = veniceConfigLoader.getVeniceServerConfig();
        this.storageEngineRepository = new StorageEngineRepository();
        this.persistenceTypeToStorageEngineFactoryMap = new HashMap();
        this.aggVersionedStorageEngineStats = aggVersionedStorageEngineStats;
        this.rocksDBMemoryStats = rocksDBMemoryStats;
        this.storeVersionStateSerializer = internalAvroSpecificSerializer;
        this.partitionStateSerializer = internalAvroSpecificSerializer2;
        this.storeRepository = readOnlyStoreRepository;
        initInternalStorageEngineFactories();
        if (z || z2) {
            restoreAllStores(veniceConfigLoader, z, z2, function);
        }
    }

    public StorageService(VeniceConfigLoader veniceConfigLoader, AggVersionedStorageEngineStats aggVersionedStorageEngineStats, RocksDBMemoryStats rocksDBMemoryStats, InternalAvroSpecificSerializer<StoreVersionState> internalAvroSpecificSerializer, InternalAvroSpecificSerializer<PartitionState> internalAvroSpecificSerializer2, ReadOnlyStoreRepository readOnlyStoreRepository, boolean z, boolean z2) {
        this(veniceConfigLoader, aggVersionedStorageEngineStats, rocksDBMemoryStats, internalAvroSpecificSerializer, internalAvroSpecificSerializer2, readOnlyStoreRepository, z, z2, str -> {
            return true;
        });
    }

    public StorageService(VeniceConfigLoader veniceConfigLoader, AggVersionedStorageEngineStats aggVersionedStorageEngineStats, RocksDBMemoryStats rocksDBMemoryStats, InternalAvroSpecificSerializer<StoreVersionState> internalAvroSpecificSerializer, InternalAvroSpecificSerializer<PartitionState> internalAvroSpecificSerializer2, ReadOnlyStoreRepository readOnlyStoreRepository) {
        this(veniceConfigLoader, aggVersionedStorageEngineStats, rocksDBMemoryStats, internalAvroSpecificSerializer, internalAvroSpecificSerializer2, readOnlyStoreRepository, true, true);
    }

    private void initInternalStorageEngineFactories() {
        this.persistenceTypeToStorageEngineFactoryMap.put(PersistenceType.IN_MEMORY, new InMemoryStorageEngineFactory(this.serverConfig));
        this.persistenceTypeToStorageEngineFactoryMap.put(PersistenceType.ROCKS_DB, new RocksDBStorageEngineFactory(this.serverConfig, this.rocksDBMemoryStats, this.storeVersionStateSerializer, this.partitionStateSerializer));
        this.persistenceTypeToStorageEngineFactoryMap.put(PersistenceType.BLACK_HOLE, new BlackHoleStorageEngineFactory());
    }

    private void restoreAllStores(VeniceConfigLoader veniceConfigLoader, boolean z, boolean z2, Function<String, Boolean> function) {
        LOGGER.info("Start restoring all the stores persisted previously");
        for (Map.Entry<PersistenceType, StorageEngineFactory> entry : this.persistenceTypeToStorageEngineFactoryMap.entrySet()) {
            PersistenceType key = entry.getKey();
            StorageEngineFactory value = entry.getValue();
            LOGGER.info("Start restoring all the stores with type: {}", key);
            for (String str : value.getPersistedStoreNames()) {
                LOGGER.info("Start restoring store: {} with type: {}", str, key);
                VeniceStoreVersionConfig storeConfig = veniceConfigLoader.getStoreConfig(str, key);
                storeConfig.setRestoreDataPartitions(z);
                storeConfig.setRestoreMetadataPartition(z2);
                if (function.apply(str).booleanValue()) {
                    try {
                        LOGGER.info("Loaded the following partitions: {}, for store: {}", Arrays.toString(openStore(storeConfig, () -> {
                            return null;
                        }).getPartitionIds().toArray()), str);
                        LOGGER.info("Done restoring store: {} with type: {}", str, key);
                    } catch (Exception e) {
                        if (!ExceptionUtils.recursiveClassEquals(e, new Class[]{RocksDBException.class})) {
                            throw new VeniceException("Error caught during opening store " + str, e);
                        }
                        LOGGER.error("Could not load the following store : " + str, e);
                        this.aggVersionedStorageEngineStats.recordRocksDBOpenFailure(str);
                    }
                } else {
                    LOGGER.info("Starting deleting local storage engine: {} with type: {}", str, key);
                    value.removeStorageEngine(str);
                    LOGGER.info("Done deleting local storage engine: {} with type: {}", str, key);
                }
            }
            LOGGER.info("Done restoring all the stores with type: {}", key);
        }
        LOGGER.info("Done restoring all the stores persisted previously");
    }

    public synchronized AbstractStorageEngine openStoreForNewPartition(VeniceStoreVersionConfig veniceStoreVersionConfig, int i, Supplier<StoreVersionState> supplier) {
        LOGGER.info("Opening store for {} partition {}", veniceStoreVersionConfig.getStoreVersionName(), Integer.valueOf(i));
        AbstractStorageEngine openStore = openStore(veniceStoreVersionConfig, supplier);
        synchronized (openStore) {
            Iterator<Integer> it = getSubPartition(veniceStoreVersionConfig.getStoreVersionName(), i).iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                if (!openStore.containsPartition(intValue)) {
                    openStore.addStoragePartition(intValue);
                }
            }
        }
        LOGGER.info("Opened store for {} partition {}", veniceStoreVersionConfig.getStoreVersionName(), Integer.valueOf(i));
        return openStore;
    }

    public BiConsumer<String, StoreVersionState> getStoreVersionStateSyncer() {
        return (str, storeVersionState) -> {
            AbstractStorageEngine localStorageEngine = this.storageEngineRepository.getLocalStorageEngine(str);
            if (localStorageEngine != null) {
                localStorageEngine.updateStoreVersionStateCache(storeVersionState);
            }
        };
    }

    public StorageEngineFactory getInternalStorageEngineFactory(VeniceStoreVersionConfig veniceStoreVersionConfig) {
        PersistenceType storePersistenceType = veniceStoreVersionConfig.getStorePersistenceType();
        if (this.persistenceTypeToStorageEngineFactoryMap.containsKey(storePersistenceType)) {
            return this.persistenceTypeToStorageEngineFactoryMap.get(storePersistenceType);
        }
        throw new VeniceException("Unrecognized persistence type " + storePersistenceType);
    }

    public Optional<Statistics> getRocksDBAggregatedStatistics() {
        return this.persistenceTypeToStorageEngineFactoryMap.containsKey(PersistenceType.ROCKS_DB) ? ((RocksDBStorageEngineFactory) this.persistenceTypeToStorageEngineFactoryMap.get(PersistenceType.ROCKS_DB)).getAggStatistics() : Optional.empty();
    }

    public synchronized AbstractStorageEngine openStore(VeniceStoreVersionConfig veniceStoreVersionConfig, Supplier<StoreVersionState> supplier) {
        String storeVersionName = veniceStoreVersionConfig.getStoreVersionName();
        AbstractStorageEngine localStorageEngine = this.storageEngineRepository.getLocalStorageEngine(storeVersionName);
        if (localStorageEngine != null) {
            return localStorageEngine;
        }
        long nanoTime = System.nanoTime();
        if (!veniceStoreVersionConfig.isStorePersistenceTypeKnown()) {
            veniceStoreVersionConfig.setStorePersistenceType(veniceStoreVersionConfig.getPersistenceType());
        }
        LOGGER.info("Creating/Opening Storage Engine {} with type: {}", storeVersionName, veniceStoreVersionConfig.getStorePersistenceType());
        StorageEngineFactory internalStorageEngineFactory = getInternalStorageEngineFactory(veniceStoreVersionConfig);
        AbstractStorageEngine storageEngine = internalStorageEngineFactory.getStorageEngine(veniceStoreVersionConfig, isReplicationMetadataEnabled(storeVersionName, internalStorageEngineFactory.getPersistenceType()));
        storageEngine.updateStoreVersionStateCache(supplier.get());
        this.storageEngineRepository.addLocalStorageEngine(storageEngine);
        this.aggVersionedStorageEngineStats.setStorageEngine(storeVersionName, storageEngine);
        LOGGER.info("time spent on creating new storage Engine for store {}: {} ms", storeVersionName, Double.valueOf(LatencyUtils.getLatencyInMS(nanoTime)));
        return storageEngine;
    }

    public synchronized void dropStorePartition(VeniceStoreVersionConfig veniceStoreVersionConfig, int i) {
        dropStorePartition(veniceStoreVersionConfig, i, true);
    }

    public synchronized void dropStorePartition(VeniceStoreVersionConfig veniceStoreVersionConfig, int i, boolean z) {
        String storeVersionName = veniceStoreVersionConfig.getStoreVersionName();
        AbstractStorageEngine localStorageEngine = this.storageEngineRepository.getLocalStorageEngine(storeVersionName);
        if (localStorageEngine == null) {
            LOGGER.warn("Storage engine {} does not exist, ignoring drop partition request.", storeVersionName);
            return;
        }
        Iterator<Integer> it = getSubPartition(storeVersionName, i).iterator();
        while (it.hasNext()) {
            localStorageEngine.dropPartition(it.next().intValue());
        }
        Set<Integer> partitionIds = localStorageEngine.getPartitionIds();
        LOGGER.info("Dropped partition {} of {}, remaining partitions={}", Integer.valueOf(i), storeVersionName, partitionIds);
        if (partitionIds.isEmpty() && z) {
            removeStorageEngine(storeVersionName);
        }
    }

    public synchronized void closeStorePartition(VeniceStoreVersionConfig veniceStoreVersionConfig, int i) {
        String storeVersionName = veniceStoreVersionConfig.getStoreVersionName();
        AbstractStorageEngine localStorageEngine = this.storageEngineRepository.getLocalStorageEngine(storeVersionName);
        if (localStorageEngine == null) {
            LOGGER.warn("Storage engine {} does not exist, ignoring close partition request.", storeVersionName);
            return;
        }
        Iterator<Integer> it = getSubPartition(storeVersionName, i).iterator();
        while (it.hasNext()) {
            localStorageEngine.closePartition(it.next().intValue());
        }
    }

    public synchronized void removeStorageEngine(String str) {
        AbstractStorageEngine removeLocalStorageEngine = getStorageEngineRepository().removeLocalStorageEngine(str);
        if (removeLocalStorageEngine == null) {
            LOGGER.warn("Storage engine {} does not exist, ignoring remove request.", str);
            return;
        }
        removeLocalStorageEngine.drop();
        VeniceStoreVersionConfig storeConfig = this.configLoader.getStoreConfig(str);
        storeConfig.setStorePersistenceType(removeLocalStorageEngine.getType());
        getInternalStorageEngineFactory(storeConfig).removeStorageEngine(removeLocalStorageEngine);
    }

    public synchronized void closeStorageEngine(String str) {
        AbstractStorageEngine removeLocalStorageEngine = getStorageEngineRepository().removeLocalStorageEngine(str);
        if (removeLocalStorageEngine == null) {
            LOGGER.warn("Storage engine {} does not exist, ignoring close request.", str);
            return;
        }
        removeLocalStorageEngine.close();
        VeniceStoreVersionConfig storeConfig = this.configLoader.getStoreConfig(str);
        storeConfig.setStorePersistenceType(removeLocalStorageEngine.getType());
        getInternalStorageEngineFactory(storeConfig).closeStorageEngine(removeLocalStorageEngine);
    }

    public void cleanupAllStores(VeniceConfigLoader veniceConfigLoader) {
        restoreAllStores(veniceConfigLoader, true, true, str -> {
            return true;
        });
        LOGGER.info("Start cleaning up all the stores persisted previously");
        this.storageEngineRepository.getAllLocalStorageEngines().stream().forEach(abstractStorageEngine -> {
            String storeName = abstractStorageEngine.getStoreName();
            LOGGER.info("Start deleting store: {}", storeName);
            Iterator<Integer> it = abstractStorageEngine.getPartitionIds().iterator();
            while (it.hasNext()) {
                dropStorePartition(veniceConfigLoader.getStoreConfig(storeName), it.next().intValue());
            }
            LOGGER.info("Deleted store: {}", storeName);
        });
        LOGGER.info("Done cleaning up all the stores persisted previously");
    }

    public List<Integer> getUserPartitions(String str) {
        int amplificationFactor = PartitionUtils.getAmplificationFactor(this.storeRepository, str);
        AbstractStorageEngine localStorageEngine = this.storageEngineRepository.getLocalStorageEngine(str);
        if (localStorageEngine != null) {
            return PartitionUtils.getUserPartitions(localStorageEngine.getPartitionIds(), amplificationFactor);
        }
        LOGGER.warn("Local storage engine does not exist for topic: {}", str);
        return Collections.emptyList();
    }

    public void closeAllStorageEngines() {
        LOGGER.info("Storage service has {} storage engines before cleanup.", Integer.valueOf(this.storageEngineRepository.getAllLocalStorageEngines().size()));
        Iterator<AbstractStorageEngine> it = this.storageEngineRepository.getAllLocalStorageEngines().iterator();
        while (it.hasNext()) {
            closeStorageEngine(it.next().getStoreName());
        }
        LOGGER.info("Storage service has {} storage engines after cleanup.", Integer.valueOf(this.storageEngineRepository.getAllLocalStorageEngines().size()));
    }

    public StorageEngineRepository getStorageEngineRepository() {
        return this.storageEngineRepository;
    }

    public AbstractStorageEngine getStorageEngine(String str) {
        return getStorageEngineRepository().getLocalStorageEngine(str);
    }

    public boolean startInner() throws Exception {
        return true;
    }

    public void stopInner() throws VeniceException {
        VeniceException veniceException = null;
        try {
            this.storageEngineRepository.close();
        } catch (VeniceException e) {
            veniceException = e;
        }
        for (Map.Entry<PersistenceType, StorageEngineFactory> entry : this.persistenceTypeToStorageEngineFactoryMap.entrySet()) {
            PersistenceType key = entry.getKey();
            LOGGER.info("Closing {} storage engine factory", key);
            try {
                entry.getValue().close();
            } catch (VeniceException e2) {
                LOGGER.error("Error closing " + key, e2);
                veniceException = e2;
            }
        }
        if (veniceException != null) {
            throw veniceException;
        }
    }

    private List<Integer> getSubPartition(String str, int i) {
        return PartitionUtils.getSubPartitions(i, PartitionUtils.getAmplificationFactor(this.storeRepository, str));
    }

    private boolean isReplicationMetadataEnabled(String str, PersistenceType persistenceType) {
        if (this.serverConfig.isDaVinciClient() || !Objects.equals(persistenceType, PersistenceType.ROCKS_DB)) {
            return false;
        }
        try {
            String parseStoreFromVersionTopic = Version.parseStoreFromVersionTopic(str);
            int parseVersionFromKafkaTopicName = Version.parseVersionFromKafkaTopicName(str);
            try {
                Optional version = this.storeRepository.getStoreOrThrow(parseStoreFromVersionTopic).getVersion(parseVersionFromKafkaTopicName);
                if (version.isPresent()) {
                    return ((Version) version.get()).isActiveActiveReplicationEnabled();
                }
                LOGGER.warn("Version {} of store {} does not exist in storeRepository.", Integer.valueOf(parseVersionFromKafkaTopicName), parseStoreFromVersionTopic);
                return false;
            } catch (VeniceNoStoreException e) {
                LOGGER.warn("Store {} does not exist in storeRepository.", parseStoreFromVersionTopic);
                return false;
            }
        } catch (IllegalArgumentException e2) {
            return false;
        }
    }
}
