package com.linkedin.davinci.ingestion;

import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModel;
import com.linkedin.davinci.ingestion.main.MainIngestionMonitorService;
import com.linkedin.davinci.ingestion.main.MainIngestionRequestClient;
import com.linkedin.davinci.ingestion.main.MainIngestionStorageMetadataService;
import com.linkedin.davinci.ingestion.main.MainPartitionIngestionStatus;
import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService;
import com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType;
import com.linkedin.davinci.notifier.RelayNotifier;
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.ingestion.protocol.enums.IngestionCommandType;
import com.linkedin.venice.ingestion.protocol.enums.IngestionComponentType;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import com.linkedin.venice.utils.locks.AutoCloseableSingleLock;
import io.tehuti.metrics.MetricsRepository;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/ingestion/IsolatedIngestionBackend.class */
public class IsolatedIngestionBackend extends DefaultIngestionBackend implements DaVinciIngestionBackend, VeniceIngestionBackend {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) IsolatedIngestionBackend.class);
    private static final int RETRY_WAIT_TIME_IN_MS = 1000;
    private final MainIngestionRequestClient mainIngestionRequestClient;
    private final MainIngestionMonitorService mainIngestionMonitorService;
    private final VeniceConfigLoader configLoader;
    private final ExecutorService completionReportHandlingExecutor;
    private Process isolatedIngestionServiceProcess;
    private AtomicBoolean isShuttingDown;

    public IsolatedIngestionBackend(VeniceConfigLoader veniceConfigLoader, ReadOnlyStoreRepository readOnlyStoreRepository, MetricsRepository metricsRepository, StorageMetadataService storageMetadataService, KafkaStoreIngestionService kafkaStoreIngestionService, StorageService storageService) {
        super(storageMetadataService, kafkaStoreIngestionService, storageService);
        this.completionReportHandlingExecutor = Executors.newFixedThreadPool(10);
        this.isShuttingDown = new AtomicBoolean(false);
        int ingestionServicePort = veniceConfigLoader.getVeniceServerConfig().getIngestionServicePort();
        int ingestionApplicationPort = veniceConfigLoader.getVeniceServerConfig().getIngestionApplicationPort();
        this.configLoader = veniceConfigLoader;
        this.mainIngestionRequestClient = new MainIngestionRequestClient(veniceConfigLoader);
        this.isolatedIngestionServiceProcess = this.mainIngestionRequestClient.startForkedIngestionProcess(veniceConfigLoader);
        try {
            this.mainIngestionMonitorService = new MainIngestionMonitorService(this, veniceConfigLoader);
            this.mainIngestionMonitorService.setStoreRepository(readOnlyStoreRepository);
            this.mainIngestionMonitorService.setMetricsRepository(metricsRepository);
            this.mainIngestionMonitorService.setStoreIngestionService(kafkaStoreIngestionService);
            this.mainIngestionMonitorService.setStorageMetadataService((MainIngestionStorageMetadataService) storageMetadataService);
            this.mainIngestionMonitorService.startInner();
            LOGGER.info("Ingestion Report Listener started.");
            LOGGER.info("Created isolated ingestion backend with service port: {}, listener port: {}", Integer.valueOf(ingestionServicePort), Integer.valueOf(ingestionApplicationPort));
        } catch (Exception e) {
            throw new VeniceException("Unable to start ingestion report listener.", e);
        }
    }

    @Override // com.linkedin.davinci.ingestion.DefaultIngestionBackend, com.linkedin.davinci.ingestion.IngestionBackendBase
    public void startConsumption(VeniceStoreVersionConfig veniceStoreVersionConfig, int i, Optional<LeaderFollowerStateType> optional) {
        executeCommandWithRetry(veniceStoreVersionConfig.getStoreVersionName(), i, IngestionCommandType.START_CONSUMPTION, () -> {
            return Boolean.valueOf(this.mainIngestionRequestClient.startConsumption(veniceStoreVersionConfig.getStoreVersionName(), i));
        }, () -> {
            super.startConsumption(veniceStoreVersionConfig, i, optional);
        });
    }

    @Override // com.linkedin.davinci.ingestion.DefaultIngestionBackend, com.linkedin.davinci.ingestion.IngestionBackendBase
    public void stopConsumption(VeniceStoreVersionConfig veniceStoreVersionConfig, int i) {
        executeCommandWithRetry(veniceStoreVersionConfig.getStoreVersionName(), i, IngestionCommandType.STOP_CONSUMPTION, () -> {
            return Boolean.valueOf(getMainIngestionRequestClient().stopConsumption(veniceStoreVersionConfig.getStoreVersionName(), i));
        }, () -> {
            super.stopConsumption(veniceStoreVersionConfig, i);
        });
    }

    @Override // com.linkedin.davinci.ingestion.DefaultIngestionBackend, com.linkedin.davinci.ingestion.VeniceIngestionBackend
    public void promoteToLeader(VeniceStoreVersionConfig veniceStoreVersionConfig, int i, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker leaderSessionIdChecker) {
        String storeVersionName = veniceStoreVersionConfig.getStoreVersionName();
        executeCommandWithRetry(storeVersionName, i, IngestionCommandType.PROMOTE_TO_LEADER, () -> {
            boolean promoteToLeader = this.mainIngestionRequestClient.promoteToLeader(storeVersionName, i);
            if (promoteToLeader) {
                getMainIngestionMonitorService().setTopicPartitionToLeaderState(storeVersionName, i);
            }
            return Boolean.valueOf(promoteToLeader);
        }, () -> {
            super.promoteToLeader(veniceStoreVersionConfig, i, leaderSessionIdChecker);
        });
    }

    @Override // com.linkedin.davinci.ingestion.DefaultIngestionBackend, com.linkedin.davinci.ingestion.VeniceIngestionBackend
    public void demoteToStandby(VeniceStoreVersionConfig veniceStoreVersionConfig, int i, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker leaderSessionIdChecker) {
        String storeVersionName = veniceStoreVersionConfig.getStoreVersionName();
        executeCommandWithRetry(storeVersionName, i, IngestionCommandType.DEMOTE_TO_STANDBY, () -> {
            boolean demoteToStandby = this.mainIngestionRequestClient.demoteToStandby(storeVersionName, i);
            if (demoteToStandby) {
                getMainIngestionMonitorService().setTopicIngestionToFollowerState(storeVersionName, i);
            }
            return Boolean.valueOf(demoteToStandby);
        }, () -> {
            super.demoteToStandby(veniceStoreVersionConfig, i, leaderSessionIdChecker);
        });
    }

    @Override // com.linkedin.davinci.ingestion.DefaultIngestionBackend, com.linkedin.davinci.ingestion.IngestionBackendBase
    public void dropStoragePartitionGracefully(VeniceStoreVersionConfig veniceStoreVersionConfig, int i, int i2, boolean z) {
        String storeVersionName = veniceStoreVersionConfig.getStoreVersionName();
        executeCommandWithRetry(storeVersionName, i, IngestionCommandType.REMOVE_PARTITION, () -> {
            boolean removeTopicPartition = getMainIngestionRequestClient().removeTopicPartition(storeVersionName, i);
            if (removeTopicPartition) {
                getMainIngestionMonitorService().cleanupTopicPartitionState(storeVersionName, i);
                getMainIngestionRequestClient().resetTopicPartition(storeVersionName, i);
            }
            return Boolean.valueOf(removeTopicPartition);
        }, () -> {
            removeTopicPartitionLocally(veniceStoreVersionConfig, i, i2, z);
            getMainIngestionMonitorService().cleanupTopicPartitionState(storeVersionName, i);
            getMainIngestionRequestClient().resetTopicPartition(storeVersionName, i);
        });
        if (getMainIngestionMonitorService().getTopicPartitionCount(storeVersionName) == 0) {
            LOGGER.info("No serving partitions exist for topic: {}, dropping the topic storage.", storeVersionName);
            getMainIngestionMonitorService().cleanupTopicState(storeVersionName);
            getMainIngestionRequestClient().removeStorageEngine(storeVersionName);
        }
    }

    @Override // com.linkedin.davinci.ingestion.DefaultIngestionBackend, com.linkedin.davinci.ingestion.DaVinciIngestionBackend
    public void removeStorageEngine(String str) {
        this.mainIngestionRequestClient.removeStorageEngine(str);
        super.removeStorageEngine(str);
        this.mainIngestionMonitorService.cleanupTopicState(str);
    }

    @Override // com.linkedin.davinci.ingestion.DefaultIngestionBackend, com.linkedin.davinci.ingestion.IngestionBackendBase
    public void killConsumptionTask(String str) {
        this.mainIngestionRequestClient.killConsumptionTask(str);
        super.killConsumptionTask(str);
        this.mainIngestionMonitorService.cleanupTopicState(str);
    }

    @Override // com.linkedin.davinci.ingestion.DefaultIngestionBackend, com.linkedin.davinci.ingestion.IngestionBackendBase
    public void shutdownIngestionTask(String str) {
        this.mainIngestionRequestClient.shutdownIngestionTask(str);
        super.shutdownIngestionTask(str);
        this.mainIngestionMonitorService.cleanupTopicState(str);
    }

    @Override // com.linkedin.davinci.ingestion.DefaultIngestionBackend, com.linkedin.davinci.ingestion.IngestionBackendBase
    public void addIngestionNotifier(VeniceNotifier veniceNotifier) {
        if (veniceNotifier != null) {
            super.addIngestionNotifier(veniceNotifier);
            this.mainIngestionMonitorService.addIngestionNotifier(getIsolatedIngestionNotifier(veniceNotifier));
        }
    }

    @Override // com.linkedin.davinci.ingestion.DefaultIngestionBackend, com.linkedin.davinci.ingestion.VeniceIngestionBackend
    public void addPushStatusNotifier(VeniceNotifier veniceNotifier) {
        if (veniceNotifier != null) {
            getStoreIngestionService().addIngestionNotifier(new RelayNotifier(veniceNotifier) { // from class: com.linkedin.davinci.ingestion.IsolatedIngestionBackend.1
                @Override // com.linkedin.davinci.notifier.RelayNotifier, com.linkedin.davinci.notifier.VeniceNotifier
                public void restarted(String str, int i, long j, String str2) {
                }
            });
            this.mainIngestionMonitorService.addPushStatusNotifier(new RelayNotifier(veniceNotifier) { // from class: com.linkedin.davinci.ingestion.IsolatedIngestionBackend.2
                @Override // com.linkedin.davinci.notifier.RelayNotifier, com.linkedin.davinci.notifier.VeniceNotifier
                public void completed(String str, int i, long j, String str2) {
                }
            });
        }
    }

    @Override // com.linkedin.davinci.ingestion.VeniceIngestionBackend
    public void prepareForShutdown() {
        this.isShuttingDown.set(true);
    }

    public void setIsolatedIngestionServiceProcess(Process process) {
        this.isolatedIngestionServiceProcess = process;
    }

    public Process getIsolatedIngestionServiceProcess() {
        return this.isolatedIngestionServiceProcess;
    }

    public MainIngestionMonitorService getMainIngestionMonitorService() {
        return this.mainIngestionMonitorService;
    }

    public MainIngestionRequestClient getMainIngestionRequestClient() {
        return this.mainIngestionRequestClient;
    }

    public boolean isShuttingDown() {
        return this.isShuttingDown.get();
    }

    void removeTopicPartitionLocally(VeniceStoreVersionConfig veniceStoreVersionConfig, int i, int i2, boolean z) {
        super.dropStoragePartitionGracefully(veniceStoreVersionConfig, i, i2, z);
    }

    @Override // com.linkedin.davinci.ingestion.DefaultIngestionBackend, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        try {
            this.completionReportHandlingExecutor.shutdownNow();
            this.mainIngestionMonitorService.stopInner();
            this.mainIngestionRequestClient.shutdownForkedProcessComponent(IngestionComponentType.KAFKA_INGESTION_SERVICE);
            this.mainIngestionRequestClient.shutdownForkedProcessComponent(IngestionComponentType.STORAGE_SERVICE);
            this.isolatedIngestionServiceProcess.destroy();
            this.mainIngestionRequestClient.close();
            super.close();
        } catch (Exception e) {
            LOGGER.info("Unable to close {}", getClass().getSimpleName(), e);
        }
    }

    boolean isTopicPartitionHostedInMainProcess(String str, int i) {
        return getMainIngestionMonitorService().getTopicPartitionIngestionStatus(str, i).equals(MainPartitionIngestionStatus.MAIN);
    }

    boolean isTopicPartitionHosted(String str, int i) {
        return !getMainIngestionMonitorService().getTopicPartitionIngestionStatus(str, i).equals(MainPartitionIngestionStatus.NOT_EXIST);
    }

    ExecutorService getCompletionHandlingExecutor() {
        return this.completionReportHandlingExecutor;
    }

    VeniceConfigLoader getConfigLoader() {
        return this.configLoader;
    }

    void startConsumptionLocally(VeniceStoreVersionConfig veniceStoreVersionConfig, int i, Optional<LeaderFollowerStateType> optional) {
        super.startConsumption(veniceStoreVersionConfig, i, optional);
    }

    VeniceNotifier getIsolatedIngestionNotifier(final VeniceNotifier veniceNotifier) {
        return new RelayNotifier(veniceNotifier) { // from class: com.linkedin.davinci.ingestion.IsolatedIngestionBackend.3
            @Override // com.linkedin.davinci.notifier.RelayNotifier, com.linkedin.davinci.notifier.VeniceNotifier
            public void completed(String str, int i, long j, String str2, Optional<LeaderFollowerStateType> optional) {
                if (!IsolatedIngestionBackend.this.isTopicPartitionHosted(str, i)) {
                    IsolatedIngestionBackend.LOGGER.error("Partition: {} of topic: {} is not assigned to this host, will not resume the ingestion on main process.", Integer.valueOf(i), str);
                    return;
                }
                ExecutorService completionHandlingExecutor = IsolatedIngestionBackend.this.getCompletionHandlingExecutor();
                VeniceNotifier veniceNotifier2 = veniceNotifier;
                completionHandlingExecutor.submit(() -> {
                    try {
                        try {
                            VeniceStoreVersionConfig storeConfig = IsolatedIngestionBackend.this.getConfigLoader().getStoreConfig(str);
                            storeConfig.setRestoreDataPartitions(false);
                            storeConfig.setRestoreMetadataPartition(false);
                            IsolatedIngestionBackend.this.startConsumptionLocally(storeConfig, i, optional);
                            IsolatedIngestionBackend.this.getMainIngestionMonitorService().setVersionPartitionToLocalIngestion(str, i);
                        } catch (Exception e) {
                            veniceNotifier2.error(str, i, "Failed to resume the ingestion in main process for topic: " + str, e);
                            IsolatedIngestionBackend.this.getMainIngestionMonitorService().setVersionPartitionToLocalIngestion(str, i);
                        }
                    } catch (Throwable th) {
                        IsolatedIngestionBackend.this.getMainIngestionMonitorService().setVersionPartitionToLocalIngestion(str, i);
                        throw th;
                    }
                });
            }
        };
    }

    void executeCommandWithRetry(String str, int i, IngestionCommandType ingestionCommandType, Supplier<Boolean> supplier, Runnable runnable) {
        while (!isTopicPartitionHostedInMainProcess(str, i) && (isTopicPartitionHosted(str, i) || ingestionCommandType == IngestionCommandType.START_CONSUMPTION)) {
            LOGGER.info("Sending command {} of topic: {}, partition: {} to fork process.", ingestionCommandType, str, Integer.valueOf(i));
            if (ingestionCommandType.equals(IngestionCommandType.START_CONSUMPTION)) {
                Utils.waitStoreVersionOrThrow(str, getStoreIngestionService().getMetadataRepo());
                getMainIngestionMonitorService().setVersionPartitionToIsolatedIngestion(str, i);
            }
            AutoCloseableLock of = AutoCloseableSingleLock.of(getMainIngestionMonitorService().getForkProcessActionLock().readLock());
            try {
                try {
                    if (supplier.get().booleanValue()) {
                        if (of != null) {
                            of.close();
                            return;
                        }
                        return;
                    }
                    if (of != null) {
                        of.close();
                    }
                    if (ingestionCommandType.equals(IngestionCommandType.STOP_CONSUMPTION) && getStoreIngestionService().isPartitionConsuming(str, i)) {
                        LOGGER.warn("Expect topic: {}, partition: {} in forked process but found in main process, will execute command {} locally.", str, Integer.valueOf(i), ingestionCommandType);
                        runnable.run();
                        return;
                    } else if (isShuttingDown()) {
                        LOGGER.warn("Command {} rejected by remote ingestion process, but will not retry since server is shutting down.", ingestionCommandType);
                        return;
                    } else {
                        LOGGER.info("Command {} rejected by remote ingestion process, will retry in {} ms.", (Object) ingestionCommandType, (Object) 1000);
                        if (!Utils.sleep(1000L)) {
                            return;
                        }
                    }
                } catch (Exception e) {
                    if (ingestionCommandType.equals(IngestionCommandType.START_CONSUMPTION)) {
                        LOGGER.warn("Clean up ingestion status for topic: {}, partition: {}.", str, Integer.valueOf(i));
                        getMainIngestionMonitorService().cleanupTopicPartitionState(str, i);
                    }
                    throw e;
                }
            } catch (Throwable th) {
                if (of != null) {
                    try {
                        of.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        LOGGER.info("Executing command {} of topic: {}, partition: {} in main process process.", ingestionCommandType, str, Integer.valueOf(i));
        runnable.run();
    }
}
