package com.linkedin.davinci.ingestion.main;

import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.ingestion.HttpClientTransport;
import com.linkedin.davinci.ingestion.isolated.IsolatedIngestionServer;
import com.linkedin.davinci.ingestion.utils.IsolatedIngestionUtils;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.ingestion.protocol.IngestionStorageMetadata;
import com.linkedin.venice.ingestion.protocol.IngestionTaskCommand;
import com.linkedin.venice.ingestion.protocol.IngestionTaskReport;
import com.linkedin.venice.ingestion.protocol.ProcessShutdownCommand;
import com.linkedin.venice.ingestion.protocol.enums.IngestionAction;
import com.linkedin.venice.ingestion.protocol.enums.IngestionCommandType;
import com.linkedin.venice.ingestion.protocol.enums.IngestionComponentType;
import com.linkedin.venice.meta.IngestionMetadataUpdateType;
import com.linkedin.venice.utils.ForkedJavaProcess;
import com.linkedin.venice.utils.Utils;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/ingestion/main/MainIngestionRequestClient.class */
public class MainIngestionRequestClient implements Closeable {
    private static final Logger LOGGER = LogManager.getLogger(MainIngestionRequestClient.class);
    private static final int REQUEST_MAX_ATTEMPT = 10;
    private HttpClientTransport httpClientTransport;
    private final int heartbeatRequestTimeoutSeconds;

    public MainIngestionRequestClient(VeniceConfigLoader veniceConfigLoader) {
        this.heartbeatRequestTimeoutSeconds = veniceConfigLoader.getCombinedProperties().getInt("server.ingestion.isolation.heartbeat.request.timeout.seconds", 5);
        this.httpClientTransport = new HttpClientTransport(IsolatedIngestionUtils.getSSLFactory(veniceConfigLoader), veniceConfigLoader.getVeniceServerConfig().getIngestionServicePort(), veniceConfigLoader.getCombinedProperties().getInt("server.ingestion.isolation.request.timeout.seconds", 120));
    }

    public synchronized Process startForkedIngestionProcess(VeniceConfigLoader veniceConfigLoader) {
        int ingestionServicePort = veniceConfigLoader.getVeniceServerConfig().getIngestionServicePort();
        int i = 0;
        ForkedJavaProcess forkedJavaProcess = null;
        ArrayList arrayList = new ArrayList();
        for (String str : veniceConfigLoader.getCombinedProperties().getString("server.forked.process.jvm.arg.list", "").split(";")) {
            if (str.length() != 0) {
                arrayList.add(str);
            }
        }
        String buildAndSaveConfigsForForkedIngestionProcess = IsolatedIngestionUtils.buildAndSaveConfigsForForkedIngestionProcess(veniceConfigLoader);
        IsolatedIngestionUtils.saveForkedIngestionKafkaClusterMapConfig(veniceConfigLoader);
        while (true) {
            if (i >= 3) {
                break;
            }
            try {
                IsolatedIngestionUtils.destroyLingeringIsolatedIngestionProcess(veniceConfigLoader);
                forkedJavaProcess = ForkedJavaProcess.exec(IsolatedIngestionServer.class, Collections.singletonList(buildAndSaveConfigsForForkedIngestionProcess), arrayList, false);
                LOGGER.info("Forked new isolated ingestion process at PID: " + forkedJavaProcess.pid());
                IsolatedIngestionUtils.saveForkedIngestionProcessMetadata(veniceConfigLoader, forkedJavaProcess);
                IsolatedIngestionUtils.waitPortBinding(ingestionServicePort, 100);
                waitHealthCheck();
                LOGGER.info("Isolated ingestion service initialization finished.");
                break;
            } catch (Exception e) {
                i++;
                if (i == 3) {
                    throw new VeniceException("Exception caught during initialization of ingestion service:", e);
                }
                LOGGER.warn("Caught exception when initializing forked process in attempt " + i + "/3", e);
            }
        }
        return forkedJavaProcess;
    }

    public boolean startConsumption(String str, int i) {
        IngestionTaskCommand ingestionTaskCommand = new IngestionTaskCommand();
        ingestionTaskCommand.commandType = IngestionCommandType.START_CONSUMPTION.getValue();
        ingestionTaskCommand.topicName = str;
        ingestionTaskCommand.partitionId = i;
        return sendIngestionCommandWithRetry(ingestionTaskCommand, str, Optional.of(Integer.valueOf(i)), REQUEST_MAX_ATTEMPT);
    }

    public boolean stopConsumption(String str, int i) {
        IngestionTaskCommand ingestionTaskCommand = new IngestionTaskCommand();
        ingestionTaskCommand.commandType = IngestionCommandType.STOP_CONSUMPTION.getValue();
        ingestionTaskCommand.topicName = str;
        ingestionTaskCommand.partitionId = i;
        return sendIngestionCommandWithRetry(ingestionTaskCommand, str, Optional.of(Integer.valueOf(i)), REQUEST_MAX_ATTEMPT);
    }

    public void killConsumptionTask(String str) {
        IngestionTaskCommand ingestionTaskCommand = new IngestionTaskCommand();
        ingestionTaskCommand.commandType = IngestionCommandType.KILL_CONSUMPTION.getValue();
        ingestionTaskCommand.topicName = str;
        sendIngestionCommandWithRetry(ingestionTaskCommand, str, Optional.empty(), 1);
    }

    public void shutdownIngestionTask(String str) {
        IngestionTaskCommand ingestionTaskCommand = new IngestionTaskCommand();
        ingestionTaskCommand.commandType = IngestionCommandType.SHUTDOWN_INGESTION_TASK.getValue();
        ingestionTaskCommand.topicName = str;
        sendIngestionCommandWithRetry(ingestionTaskCommand, str, Optional.empty(), 1);
    }

    public void removeStorageEngine(String str) {
        IngestionTaskCommand ingestionTaskCommand = new IngestionTaskCommand();
        ingestionTaskCommand.commandType = IngestionCommandType.REMOVE_STORAGE_ENGINE.getValue();
        ingestionTaskCommand.topicName = str;
        sendIngestionCommandWithRetry(ingestionTaskCommand, str, Optional.empty(), REQUEST_MAX_ATTEMPT);
    }

    public void openStorageEngine(String str) {
        IngestionTaskCommand ingestionTaskCommand = new IngestionTaskCommand();
        ingestionTaskCommand.commandType = IngestionCommandType.OPEN_STORAGE_ENGINE.getValue();
        ingestionTaskCommand.topicName = str;
        sendIngestionCommandWithRetry(ingestionTaskCommand, str, Optional.empty(), REQUEST_MAX_ATTEMPT);
    }

    public boolean unsubscribeTopicPartition(String str, int i) {
        IngestionTaskCommand ingestionTaskCommand = new IngestionTaskCommand();
        ingestionTaskCommand.commandType = IngestionCommandType.REMOVE_PARTITION.getValue();
        ingestionTaskCommand.topicName = str;
        ingestionTaskCommand.partitionId = i;
        return sendIngestionCommandWithRetry(ingestionTaskCommand, str, Optional.empty(), REQUEST_MAX_ATTEMPT);
    }

    public boolean promoteToLeader(String str, int i) {
        IngestionTaskCommand ingestionTaskCommand = new IngestionTaskCommand();
        ingestionTaskCommand.commandType = IngestionCommandType.PROMOTE_TO_LEADER.getValue();
        ingestionTaskCommand.topicName = str;
        ingestionTaskCommand.partitionId = i;
        return sendIngestionCommandWithRetry(ingestionTaskCommand, str, Optional.of(Integer.valueOf(i)), REQUEST_MAX_ATTEMPT);
    }

    public boolean demoteToStandby(String str, int i) {
        IngestionTaskCommand ingestionTaskCommand = new IngestionTaskCommand();
        ingestionTaskCommand.commandType = IngestionCommandType.DEMOTE_TO_STANDBY.getValue();
        ingestionTaskCommand.topicName = str;
        ingestionTaskCommand.partitionId = i;
        return sendIngestionCommandWithRetry(ingestionTaskCommand, str, Optional.of(Integer.valueOf(i)), REQUEST_MAX_ATTEMPT);
    }

    public void resetTopicPartition(String str, int i) {
        IngestionTaskCommand ingestionTaskCommand = new IngestionTaskCommand();
        ingestionTaskCommand.commandType = IngestionCommandType.RESET_PARTITION.getValue();
        ingestionTaskCommand.topicName = str;
        ingestionTaskCommand.partitionId = i;
        sendIngestionCommandWithRetry(ingestionTaskCommand, str, Optional.of(Integer.valueOf(i)), REQUEST_MAX_ATTEMPT);
    }

    public boolean updateMetadata(IngestionStorageMetadata ingestionStorageMetadata) {
        try {
            LOGGER.info("Sending UPDATE_METADATA request to child process: " + IngestionMetadataUpdateType.valueOf(ingestionStorageMetadata.metadataUpdateType) + " for topic: " + ((Object) ingestionStorageMetadata.topicName) + " partition: " + ingestionStorageMetadata.partitionId);
            return this.httpClientTransport.sendRequest(IngestionAction.UPDATE_METADATA, ingestionStorageMetadata).isPositive;
        } catch (Exception e) {
            LOGGER.warn("Encounter exception when sending metadata updates to child process for topic: " + ((Object) ingestionStorageMetadata.topicName) + ", partition: " + ingestionStorageMetadata.partitionId);
            return false;
        }
    }

    public void shutdownForkedProcessComponent(IngestionComponentType ingestionComponentType) {
        ProcessShutdownCommand processShutdownCommand = new ProcessShutdownCommand();
        processShutdownCommand.componentType = ingestionComponentType.getValue();
        LOGGER.info("Sending shutdown component request to forked process for component: " + ingestionComponentType.name());
        try {
            this.httpClientTransport.sendRequest(IngestionAction.SHUTDOWN_COMPONENT, processShutdownCommand);
        } catch (Exception e) {
            LOGGER.warn("Encounter exception when shutting down component: " + ingestionComponentType.name());
        }
    }

    public boolean sendHeartbeatRequest() {
        try {
            this.httpClientTransport.sendRequest(IngestionAction.HEARTBEAT, IsolatedIngestionUtils.getDummyCommand(), this.heartbeatRequestTimeoutSeconds);
            return true;
        } catch (Exception e) {
            LOGGER.warn("Unable to get heartbeat from ingestion service");
            return false;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.httpClientTransport.close();
    }

    protected void setHttpClientTransport(HttpClientTransport httpClientTransport) {
        this.httpClientTransport = httpClientTransport;
    }

    private boolean sendIngestionCommandWithRetry(IngestionTaskCommand ingestionTaskCommand, String str, Optional<Integer> optional, int i) {
        String ingestionCommandType = IngestionCommandType.valueOf(ingestionTaskCommand.commandType).toString();
        String str2 = " for topic: " + str + ((String) optional.map(num -> {
            return ", partition: " + num;
        }).orElse(""));
        LOGGER.info("Sending request: " + ingestionCommandType + " to forked process" + str2);
        try {
            IngestionTaskReport sendRequestWithRetry = this.httpClientTransport.sendRequestWithRetry(IngestionAction.COMMAND, ingestionTaskCommand, i);
            if (sendRequestWithRetry == null || !sendRequestWithRetry.exceptionThrown) {
                return sendRequestWithRetry != null && sendRequestWithRetry.isPositive;
            }
            throw new VeniceException("Caught exception when executing command in isolated process: " + ingestionCommandType + str2 + " " + ((Object) sendRequestWithRetry.message));
        } catch (Exception e) {
            throw new VeniceException("Caught exception when sending command: " + ingestionCommandType + str2, e);
        }
    }

    private void waitHealthCheck() {
        int i = 0;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                if (!sendHeartbeatRequest()) {
                    throw new VeniceException("Got non-OK response from ingestion service.");
                }
                LOGGER.info("Ingestion service server health check passed in " + (System.currentTimeMillis() - currentTimeMillis) + " ms.");
                return;
            } catch (Exception e) {
                i++;
                if (i > 100) {
                    LOGGER.info("Fail to pass health-check for ingestion service after 100 retries.");
                    throw e;
                }
                Utils.sleep(1000L);
            }
        }
    }
}
