package com.linkedin.davinci.ingestion;

import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModel;
import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService;
import com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType;
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/ingestion/DefaultIngestionBackend.class */
public class DefaultIngestionBackend implements DaVinciIngestionBackend, VeniceIngestionBackend {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) DefaultIngestionBackend.class);
    private final StorageMetadataService storageMetadataService;
    private final StorageService storageService;
    private final KafkaStoreIngestionService storeIngestionService;
    private final Map<String, AtomicReference<AbstractStorageEngine>> topicStorageEngineReferenceMap = new VeniceConcurrentHashMap();

    public DefaultIngestionBackend(StorageMetadataService storageMetadataService, KafkaStoreIngestionService kafkaStoreIngestionService, StorageService storageService) {
        this.storageMetadataService = storageMetadataService;
        this.storeIngestionService = kafkaStoreIngestionService;
        this.storageService = storageService;
    }

    @Override // com.linkedin.davinci.ingestion.IngestionBackendBase
    public void startConsumption(VeniceStoreVersionConfig veniceStoreVersionConfig, int i, Optional<LeaderFollowerStateType> optional) {
        String storeVersionName = veniceStoreVersionConfig.getStoreVersionName();
        LOGGER.info("Retrieving storage engine for store {} partition {}", storeVersionName, Integer.valueOf(i));
        Utils.waitStoreVersionOrThrow(storeVersionName, getStoreIngestionService().getMetadataRepo());
        AbstractStorageEngine openStoreForNewPartition = this.storageService.openStoreForNewPartition(veniceStoreVersionConfig, i, () -> {
            return this.storageMetadataService.getStoreVersionState(storeVersionName);
        });
        this.topicStorageEngineReferenceMap.compute(storeVersionName, (str, atomicReference) -> {
            if (atomicReference != null) {
                atomicReference.set(openStoreForNewPartition);
            }
            return atomicReference;
        });
        LOGGER.info("Retrieved storage engine for store {} partition {}. Starting consumption in ingestion service", storeVersionName, Integer.valueOf(i));
        getStoreIngestionService().startConsumption(veniceStoreVersionConfig, i, optional);
        LOGGER.info("Completed starting consumption in ingestion service for store {} partition {}", storeVersionName, Integer.valueOf(i));
    }

    @Override // com.linkedin.davinci.ingestion.IngestionBackendBase
    public void stopConsumption(VeniceStoreVersionConfig veniceStoreVersionConfig, int i) {
        getStoreIngestionService().stopConsumption(veniceStoreVersionConfig, i);
    }

    @Override // com.linkedin.davinci.ingestion.IngestionBackendBase
    public void killConsumptionTask(String str) {
        getStoreIngestionService().killConsumptionTask(str);
    }

    @Override // com.linkedin.davinci.ingestion.IngestionBackendBase
    public void shutdownIngestionTask(String str) {
        getStoreIngestionService().shutdownStoreIngestionTask(str);
    }

    @Override // com.linkedin.davinci.ingestion.DaVinciIngestionBackend
    public void removeStorageEngine(String str) {
        getStorageService().removeStorageEngine(str);
    }

    @Override // com.linkedin.davinci.ingestion.IngestionBackendBase
    public void dropStoragePartitionGracefully(VeniceStoreVersionConfig veniceStoreVersionConfig, int i, int i2, boolean z) {
        String storeVersionName = veniceStoreVersionConfig.getStoreVersionName();
        getStoreIngestionService().getMetaSystemStoreReplicaStatusNotifier().ifPresent(metaSystemStoreReplicaStatusNotifier -> {
            metaSystemStoreReplicaStatusNotifier.drop(storeVersionName, i);
        });
        getStoreIngestionService().stopConsumptionAndWait(veniceStoreVersionConfig, i, 1, i2);
        getStorageService().dropStorePartition(veniceStoreVersionConfig, i, z);
    }

    @Override // com.linkedin.davinci.ingestion.VeniceIngestionBackend
    public void promoteToLeader(VeniceStoreVersionConfig veniceStoreVersionConfig, int i, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker leaderSessionIdChecker) {
        LOGGER.info("Promoting partition: {} of topic: {} to leader.", Integer.valueOf(i), veniceStoreVersionConfig.getStoreVersionName());
        getStoreIngestionService().promoteToLeader(veniceStoreVersionConfig, i, leaderSessionIdChecker);
    }

    @Override // com.linkedin.davinci.ingestion.VeniceIngestionBackend
    public void demoteToStandby(VeniceStoreVersionConfig veniceStoreVersionConfig, int i, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker leaderSessionIdChecker) {
        LOGGER.info("Demoting partition: {} of topic: {} to standby.", Integer.valueOf(i), veniceStoreVersionConfig.getStoreVersionName());
        getStoreIngestionService().demoteToStandby(veniceStoreVersionConfig, i, leaderSessionIdChecker);
    }

    @Override // com.linkedin.davinci.ingestion.IngestionBackendBase
    public void addIngestionNotifier(VeniceNotifier veniceNotifier) {
        getStoreIngestionService().addIngestionNotifier(veniceNotifier);
    }

    @Override // com.linkedin.davinci.ingestion.VeniceIngestionBackend
    public void addPushStatusNotifier(VeniceNotifier veniceNotifier) {
        getStoreIngestionService().addIngestionNotifier(veniceNotifier);
    }

    @Override // com.linkedin.davinci.ingestion.VeniceIngestionBackend
    public void replaceAndAddTestPushStatusNotifier(VeniceNotifier veniceNotifier) {
        getStoreIngestionService().replaceAndAddTestNotifier(veniceNotifier);
    }

    @Override // com.linkedin.davinci.ingestion.DaVinciIngestionBackend
    public void setStorageEngineReference(String str, AtomicReference<AbstractStorageEngine> atomicReference) {
        if (atomicReference == null) {
            this.topicStorageEngineReferenceMap.remove(str);
        } else {
            this.topicStorageEngineReferenceMap.put(str, atomicReference);
        }
    }

    @Override // com.linkedin.davinci.ingestion.IngestionBackendBase
    public StorageMetadataService getStorageMetadataService() {
        return this.storageMetadataService;
    }

    @Override // com.linkedin.davinci.ingestion.IngestionBackendBase
    public KafkaStoreIngestionService getStoreIngestionService() {
        return this.storeIngestionService;
    }

    @Override // com.linkedin.davinci.ingestion.IngestionBackendBase
    public StorageService getStorageService() {
        return this.storageService;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
    }
}
