package org.apache.helix.task;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.antlr.v4.runtime.tree.xpath.XPath;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.UserContentStore;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/helix/task/TaskDriver.class */
public class TaskDriver {
    private static final int DEFAULT_TIMEOUT = 300000;
    private static final long DEFAULT_SLEEP = 1000;
    private static final String TASK_START_TIME_KEY = "START_TIME";
    protected long _configsLimitation;
    private final HelixDataAccessor _accessor;
    private final HelixPropertyStore<ZNRecord> _propertyStore;
    private final HelixAdmin _admin;
    private final String _clusterName;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) TaskDriver.class);
    private static final Set<TaskState> ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION = new HashSet(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING, TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED, TaskState.STOPPING, TaskState.STOPPED));
    private static final long DEFAULT_CONFIGS_LIMITATION = HelixUtil.getSystemPropertyAsLong(SystemPropertyKeys.TASK_CONFIG_LIMITATION, 100000);

    /* loaded from: input_file:org/apache/helix/task/TaskDriver$DriverCommand.class */
    public enum DriverCommand {
        start,
        stop,
        delete,
        resume,
        list,
        flush,
        clean
    }

    public TaskDriver(HelixManager helixManager) {
        this(helixManager.getClusterManagmentTool(), helixManager.getHelixDataAccessor(), helixManager.getHelixPropertyStore(), helixManager.getClusterName());
    }

    @Deprecated
    public TaskDriver(RealmAwareZkClient realmAwareZkClient, String str) {
        this(realmAwareZkClient, new ZkBaseDataAccessor(realmAwareZkClient), str);
    }

    @Deprecated
    public TaskDriver(RealmAwareZkClient realmAwareZkClient, ZkBaseDataAccessor<ZNRecord> zkBaseDataAccessor, String str) {
        this(new ZKHelixAdmin(realmAwareZkClient), new ZKHelixDataAccessor(str, zkBaseDataAccessor), new ZkHelixPropertyStore(zkBaseDataAccessor, PropertyPathBuilder.propertyStore(str), (List<String>) null), str);
    }

    @Deprecated
    public TaskDriver(HelixAdmin helixAdmin, HelixDataAccessor helixDataAccessor, ConfigAccessor configAccessor, HelixPropertyStore<ZNRecord> helixPropertyStore, String str) {
        this(helixAdmin, helixDataAccessor, helixPropertyStore, str);
    }

    public TaskDriver(HelixAdmin helixAdmin, HelixDataAccessor helixDataAccessor, HelixPropertyStore<ZNRecord> helixPropertyStore, String str) {
        this._configsLimitation = DEFAULT_CONFIGS_LIMITATION;
        this._admin = helixAdmin;
        this._accessor = helixDataAccessor;
        this._propertyStore = helixPropertyStore;
        this._clusterName = str;
    }

    public void start(Workflow workflow) {
        LOG.info("Starting workflow {}", workflow.getName());
        workflow.validate();
        validateZKNodeLimitation(workflow.getJobConfigs().keySet().size() + 1);
        WorkflowConfig build = new WorkflowConfig.Builder(workflow.getWorkflowConfig()).setWorkflowId(workflow.getName()).build();
        HashMap hashMap = new HashMap();
        for (String str : workflow.getJobConfigs().keySet()) {
            JobConfig.Builder fromMap = JobConfig.Builder.fromMap(workflow.getJobConfigs().get(str));
            if (workflow.getTaskConfigs() != null && workflow.getTaskConfigs().containsKey(str)) {
                fromMap.addTaskConfigs(workflow.getTaskConfigs().get(str));
            }
            JobConfig build2 = fromMap.build();
            if (build2.getJobType() != null) {
                hashMap.put(str, build2.getJobType());
            }
            addJobConfig(str, build2);
        }
        build.setJobTypes(hashMap);
        if (TaskUtil.createWorkflowConfig(this._accessor, workflow.getName(), build)) {
            return;
        }
        HashSet hashSet = new HashSet();
        for (String str2 : workflow.getJobConfigs().keySet()) {
            if (!TaskUtil.removeJobConfig(this._accessor, str2)) {
                hashSet.add(str2);
            }
        }
        throw new HelixException(String.format("Failed to add workflow configuration for workflow %s. It's possible that a workflow of the same name already exists or there was a connection issue. JobConfig deletion attempted but failed for the following jobs: %s", workflow.getName(), hashSet));
    }

    public void updateWorkflow(String str, WorkflowConfig workflowConfig) {
        if (workflowConfig.getWorkflowId() == null || workflowConfig.getWorkflowId().isEmpty()) {
            workflowConfig.getRecord().setSimpleField(WorkflowConfig.WorkflowConfigProperty.WorkflowID.name(), str);
        }
        if (str == null || !str.equals(workflowConfig.getWorkflowId())) {
            throw new HelixException(String.format("Workflow name {%s} does not match the workflow Id from WorkflowConfig {%s}", str, workflowConfig.getWorkflowId()));
        }
        WorkflowConfig workflowConfig2 = TaskUtil.getWorkflowConfig(this._accessor, str);
        if (workflowConfig2 == null) {
            throw new HelixException("Workflow " + str + " does not exist!");
        }
        if (workflowConfig2.isTerminable()) {
            throw new HelixException("Workflow " + str + " is terminable, not allow to change its configuration!");
        }
        workflowConfig.setJobDag(workflowConfig2.getJobDag());
        if (TaskUtil.setWorkflowConfig(this._accessor, str, workflowConfig)) {
            return;
        }
        LOG.error("Failed to update workflow configuration for workflow {}", str);
    }

    public void createQueue(JobQueue jobQueue) {
        start(jobQueue);
    }

    public void flushQueue(String str) {
        cleanupQueue(str);
    }

    public void deleteJob(String str, String str2) {
        deleteNamespacedJob(str, TaskUtil.getNamespacedJobName(str, str2), false);
    }

    public void deleteJob(String str, String str2, boolean z) {
        deleteNamespacedJob(str, TaskUtil.getNamespacedJobName(str, str2), z);
    }

    public void deleteNamespacedJob(String str, String str2, boolean z) {
        WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(this._accessor, str);
        if (z) {
            LOG.info("Forcefully removing job: {} from queue: {}", str2, str);
            if (!TaskUtil.removeJob(this._accessor, this._propertyStore, str2)) {
                LOG.info("Failed to delete job: {} from queue: {}", str2, str);
                throw new HelixException("Failed to delete job: " + str2 + " from queue: " + str);
            }
            if (workflowConfig != null) {
                if (workflowConfig.getScheduleConfig() != null && workflowConfig.getScheduleConfig().isRecurring()) {
                    deleteJobFromLastScheduledQueue(str, TaskUtil.getDenamespacedJobName(str, str2));
                    return;
                }
                return;
            }
            return;
        }
        if (workflowConfig == null) {
            throw new IllegalArgumentException(String.format("JobQueue %s's config is not found!", str));
        }
        if (!workflowConfig.isJobQueue()) {
            throw new IllegalArgumentException(String.format("%s is not a queue!", str));
        }
        boolean z2 = workflowConfig.getScheduleConfig() != null && workflowConfig.getScheduleConfig().isRecurring();
        String denamespacedJobName = TaskUtil.getDenamespacedJobName(str, str2);
        if (z2) {
            deleteJobFromLastScheduledQueue(str, denamespacedJobName);
        }
        deleteJobFromQueue(str, denamespacedJobName);
    }

    private void deleteJobFromLastScheduledQueue(String str, String str2) {
        WorkflowContext workflowContext = TaskUtil.getWorkflowContext(this._propertyStore, str);
        String str3 = null;
        if (workflowContext != null) {
            str3 = workflowContext.getLastScheduledSingleWorkflow();
        }
        if (str3 == null || TaskUtil.getWorkflowConfig(this._accessor, str3) == null) {
            return;
        }
        deleteJobFromQueue(str3, str2);
    }

    private void deleteJobFromQueue(String str, String str2) {
        WorkflowContext workflowContext = TaskUtil.getWorkflowContext(this._propertyStore, str);
        String name = workflowContext != null ? workflowContext.getWorkflowState().name() : TaskState.NOT_STARTED.name();
        if (name.equals(TaskState.IN_PROGRESS.name())) {
            throw new IllegalStateException("Queue " + str + " is still running!");
        }
        if (name.equals(TaskState.COMPLETED.name()) || name.equals(TaskState.FAILED.name()) || name.equals(TaskState.ABORTED.name())) {
            LOG.warn("Queue {} has already reached its final state, skip deleting job from it.", str);
        } else {
            if (TaskUtil.removeJobsFromWorkflow(this._accessor, this._propertyStore, str, Collections.singleton(TaskUtil.getNamespacedJobName(str, str2)), true)) {
                return;
            }
            LOG.error("Failed to delete job {} from queue {}.", str2, str);
            throw new HelixException("Failed to delete job " + str2 + " from queue " + str);
        }
    }

    public void enqueueJob(String str, String str2, JobConfig.Builder builder) {
        enqueueJobs(str, Collections.singletonList(str2), Collections.singletonList(builder));
    }

    public void enqueueJobs(String str, List<String> list, List<JobConfig.Builder> list2) {
        WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(this._accessor, str);
        if (workflowConfig == null) {
            throw new IllegalArgumentException("Queue " + str + " config does not yet exist!");
        }
        if (workflowConfig.isTerminable()) {
            throw new IllegalArgumentException(str + " is not a queue!");
        }
        int capacity = workflowConfig.getCapacity();
        int size = workflowConfig.getJobDag().size();
        if (capacity > 0 && size >= capacity) {
            WorkflowContext workflowContext = TaskUtil.getWorkflowContext(this._propertyStore, str);
            if (workflowContext != null && !TaskUtil.removeJobsFromWorkflow(this._accessor, this._propertyStore, str, TaskUtil.getExpiredJobs(this._accessor, this._propertyStore, workflowConfig, workflowContext), true)) {
                LOG.warn("Failed to clean up expired and completed jobs from queue {}", str);
            }
            if (TaskUtil.getWorkflowConfig(this._accessor, str).getJobDag().size() >= capacity) {
                throw new HelixException(String.format("Failed to enqueue job, queue %s is full.", str));
            }
        }
        if (TaskUtil.getWorkflowConfig(this._accessor, str).getJobDag().size() + list.size() >= capacity) {
            throw new IllegalStateException(String.format("Queue %s already reaches its max capacity %d, failed to add %s", str, Integer.valueOf(capacity), list.toString()));
        }
        validateZKNodeLimitation(1);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        for (int i = 0; i < list2.size(); i++) {
            try {
                JobConfig build = list2.get(i).setWorkflow(str).build();
                String namespacedJobName = TaskUtil.getNamespacedJobName(str, list.get(i));
                addJobConfig(namespacedJobName, build);
                arrayList.add(build);
                arrayList2.add(namespacedJobName);
                arrayList3.add(build.getJobType());
            } catch (HelixException e) {
                LOG.error("Failed to add job configs {}. Remove them all!", list.toString());
                Iterator<String> it2 = list.iterator();
                while (it2.hasNext()) {
                    TaskUtil.removeJobConfig(this._accessor, TaskUtil.getNamespacedJobName(str, it2.next()));
                }
            }
        }
        if (this._accessor.getBaseDataAccessor().update(this._accessor.keyBuilder().resourceConfig(str).getPath(), zNRecord -> {
            if (zNRecord == null) {
                throw new HelixException(String.format("enqueueJobs DataUpdater: JobQueue %s config is not found!", str));
            }
            JobDag fromJson = JobDag.fromJson(zNRecord.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
            Set<String> allNodes = fromJson.getAllNodes();
            if (capacity > 0 && allNodes.size() + arrayList.size() >= capacity) {
                Iterator it3 = list.iterator();
                while (it3.hasNext()) {
                    TaskUtil.removeJobConfig(this._accessor, TaskUtil.getNamespacedJobName(str, (String) it3.next()));
                }
                throw new IllegalStateException(String.format("Queue %s already reaches its max capacity %d, failed to add %s", str, Integer.valueOf(capacity), list.toString()));
            }
            String str2 = null;
            for (int i2 = 0; i2 < arrayList2.size(); i2++) {
                String str3 = (String) arrayList2.get(i2);
                if (allNodes.contains(str3)) {
                    throw new IllegalStateException(String.format("Could not add to queue %s, job %s already exists", str, list.get(i2)));
                }
                fromJson.addNode(str3);
                String str4 = null;
                if (str2 == null) {
                    Iterator<String> it4 = allNodes.iterator();
                    while (true) {
                        if (!it4.hasNext()) {
                            break;
                        }
                        String next = it4.next();
                        if (!next.equals(str3) && fromJson.getDirectChildren(next).isEmpty()) {
                            str4 = next;
                            break;
                        }
                    }
                } else {
                    str4 = str2;
                }
                if (str4 != null) {
                    fromJson.addParentToChild(str4, str3);
                    str2 = str3;
                }
            }
            Map<String, String> mapField = zNRecord.getMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name());
            Iterator it5 = arrayList3.iterator();
            while (it5.hasNext()) {
                String str5 = (String) it5.next();
                if (str5 != null) {
                    if (mapField == null) {
                        mapField = new HashMap();
                    }
                    mapField.put(str, str5);
                }
            }
            if (mapField != null) {
                zNRecord.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), mapField);
            }
            try {
                zNRecord.setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), fromJson.toJson());
                return zNRecord;
            } catch (Exception e2) {
                throw new IllegalStateException(String.format("Could not add jobs %s to queue %s", list.toString(), str), e2);
            }
        }, AccessOption.PERSISTENT)) {
            return;
        }
        LOG.error("Failed to update WorkflowConfig, remove all jobs {}", list.toString());
        Iterator<String> it3 = list.iterator();
        while (it3.hasNext()) {
            TaskUtil.removeJobConfig(this._accessor, it3.next());
        }
        throw new HelixException("Failed to enqueue job");
    }

    public void addTask(String str, String str2, TaskConfig taskConfig) throws TimeoutException, InterruptedException {
        addTask(str, str2, taskConfig, ProducerMetadata.TOPIC_EXPIRY_MS);
    }

    public void addTask(String str, String str2, TaskConfig taskConfig, long j) throws TimeoutException, InterruptedException {
        TaskState jobState;
        if (j < 1000) {
            throw new IllegalArgumentException(String.format("Timeout is less than the minimum acceptable timeout value which is %s ms", 1000L));
        }
        long currentTimeMillis = System.currentTimeMillis() + j;
        validateConfigsForTaskModifications(str, str2, taskConfig);
        String namespacedJobName = TaskUtil.getNamespacedJobName(str, str2);
        Iterator<String> it2 = TaskUtil.getJobConfig(this._accessor, namespacedJobName).getMapConfigs().keySet().iterator();
        while (it2.hasNext()) {
            if (it2.next().equals(taskConfig.getId())) {
                throw new HelixException("Task cannot be added because another task with the same ID already exists!");
            }
        }
        WorkflowContext workflowContext = getWorkflowContext(str);
        JobContext jobContext = getJobContext(namespacedJobName);
        if (workflowContext != null && jobContext != null && (jobState = workflowContext.getJobState(namespacedJobName)) != null && ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) {
            throw new HelixException("Job " + namespacedJobName + " is in illegal state for task addition. Job State is " + jobState);
        }
        updateTaskInJobConfig(str, str2, zNRecord -> {
            if (zNRecord != null) {
                zNRecord.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
            } else {
                LOG.error("JobConfig DataUpdater: Fails to update JobConfig. CurrentData is null.");
            }
            return zNRecord;
        });
        WorkflowContext workflowContext2 = (WorkflowContext) this._accessor.getProperty(this._accessor.keyBuilder().workflowContextZNode(str));
        JobContext jobContext2 = (JobContext) this._accessor.getProperty(this._accessor.keyBuilder().jobContextZNode(str, str2));
        if (workflowContext2 == null || jobContext2 == null) {
            return;
        }
        String id = taskConfig.getId();
        while (System.currentTimeMillis() <= currentTimeMillis) {
            JobContext jobContext3 = (JobContext) this._accessor.getProperty(this._accessor.keyBuilder().jobContextZNode(str, str2));
            WorkflowContext workflowContext3 = (WorkflowContext) this._accessor.getProperty(this._accessor.keyBuilder().workflowContextZNode(str));
            if (jobContext3.getTaskIdPartitionMap().containsKey(id) && workflowContext3.getJobState(namespacedJobName) == TaskState.IN_PROGRESS) {
                return;
            } else {
                Thread.sleep(1000L);
            }
        }
        throw new TimeoutException("An unexpected issue happened while task being added to the job!");
    }

    public void deleteTask(String str, String str2, String str3) throws TimeoutException, InterruptedException {
        deleteTask(str, str2, str3, ProducerMetadata.TOPIC_EXPIRY_MS);
    }

    public void deleteTask(String str, final String str2, final String str3, long j) throws TimeoutException, InterruptedException {
        TaskState jobState;
        long currentTimeMillis = System.currentTimeMillis() + j;
        String namespacedJobName = TaskUtil.getNamespacedJobName(str, str2);
        JobConfig jobConfig = getJobConfig(namespacedJobName);
        if (jobConfig == null) {
            throw new IllegalArgumentException("Job " + namespacedJobName + " config does not exist!");
        }
        TaskConfig taskConfig = null;
        for (Map.Entry<String, TaskConfig> entry : jobConfig.getTaskConfigMap().entrySet()) {
            if (entry.getKey().equals(str3)) {
                taskConfig = entry.getValue();
            }
        }
        validateConfigsForTaskModifications(str, str2, taskConfig);
        WorkflowContext workflowContext = getWorkflowContext(str);
        JobContext jobContext = getJobContext(namespacedJobName);
        if (workflowContext != null && jobContext != null && (jobState = workflowContext.getJobState(namespacedJobName)) != null && ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) {
            throw new HelixException("Job " + namespacedJobName + " is in illegal state for task deletion. Job State is " + jobState);
        }
        updateTaskInJobConfig(str, str2, new DataUpdater<ZNRecord>() { // from class: org.apache.helix.task.TaskDriver.1
            @Override // org.apache.helix.zookeeper.zkclient.DataUpdater
            public ZNRecord update(ZNRecord zNRecord) {
                if (zNRecord != null) {
                    Map<String, Map<String, String>> mapFields = zNRecord.getMapFields();
                    if (mapFields == null) {
                        TaskDriver.LOG.warn("Could not update the jobConfig: {} Znode MapField is null.", str2);
                        return null;
                    }
                    HashMap hashMap = new HashMap();
                    for (Map.Entry<String, Map<String, String>> entry2 : mapFields.entrySet()) {
                        if (!entry2.getKey().equals(str3)) {
                            hashMap.put(entry2.getKey(), entry2.getValue());
                        }
                    }
                    zNRecord.setMapFields(hashMap);
                }
                return zNRecord;
            }
        });
        WorkflowContext workflowContext2 = (WorkflowContext) this._accessor.getProperty(this._accessor.keyBuilder().workflowContextZNode(str));
        JobContext jobContext2 = (JobContext) this._accessor.getProperty(this._accessor.keyBuilder().jobContextZNode(str, str2));
        if (workflowContext2 == null || jobContext2 == null) {
            return;
        }
        while (System.currentTimeMillis() <= currentTimeMillis) {
            JobContext jobContext3 = (JobContext) this._accessor.getProperty(this._accessor.keyBuilder().jobContextZNode(str, str2));
            if (!jobContext3.getTaskIdPartitionMap().containsKey(str3)) {
                return;
            } else {
                Thread.sleep(1000L);
            }
        }
        throw new TimeoutException("An unexpected issue happened while task being deleted from the job!");
    }

    private void validateConfigsForTaskModifications(String str, String str2, TaskConfig taskConfig) {
        WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(this._accessor, str);
        String namespacedJobName = TaskUtil.getNamespacedJobName(str, str2);
        JobConfig jobConfig = TaskUtil.getJobConfig(this._accessor, namespacedJobName);
        if (workflowConfig == null) {
            throw new IllegalArgumentException(String.format("Workflow config for workflow %s does not exist!", str));
        }
        if (jobConfig == null) {
            throw new IllegalArgumentException(String.format("Job config for job %s does not exist!", namespacedJobName));
        }
        if (taskConfig == null) {
            throw new IllegalArgumentException("TaskConfig is null!");
        }
        if (taskConfig.getId() == null) {
            throw new HelixException("Task cannot be added or deleted because taskID is null!");
        }
        if (jobConfig.getTargetResource() != null) {
            throw new HelixException(String.format("Job %s is a targeted job. New task cannot be added/deleted to/from this job!", namespacedJobName));
        }
        if ((taskConfig.getCommand() == null) == (jobConfig.getCommand() == null)) {
            throw new HelixException(String.format("Command must exist in either jobconfig (%s) or taskconfig (%s), not both!", str2, taskConfig.getId()));
        }
    }

    private void updateTaskInJobConfig(String str, String str2, DataUpdater<ZNRecord> dataUpdater) {
        String namespacedJobName = TaskUtil.getNamespacedJobName(str, str2);
        if (this._accessor.getBaseDataAccessor().update(this._accessor.keyBuilder().resourceConfig(namespacedJobName).getPath(), dataUpdater, AccessOption.PERSISTENT)) {
            return;
        }
        LOG.error("Failed to update task in the job {}", namespacedJobName);
        throw new HelixException("Failed to update task in the job");
    }

    @Deprecated
    public void cleanupJobQueue(String str) {
        cleanupQueue(str);
    }

    public void cleanupQueue(String str) {
        WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(this._accessor, str);
        if (workflowConfig == null) {
            throw new IllegalArgumentException("Queue " + str + " does not yet exist!");
        }
        if (!workflowConfig.isJobQueue() || workflowConfig.isTerminable()) {
            throw new IllegalArgumentException(str + " is not a queue!");
        }
        WorkflowContext workflowContext = TaskUtil.getWorkflowContext(this._propertyStore, str);
        if (workflowContext == null || workflowContext.getWorkflowState() == null) {
            throw new IllegalStateException("Queue " + str + " does not have a valid work state!");
        }
        HashSet hashSet = new HashSet();
        for (String str2 : workflowConfig.getJobDag().getAllNodes()) {
            TaskState jobState = workflowContext.getJobState(str2);
            if ((jobState != null && jobState == TaskState.ABORTED) || jobState == TaskState.COMPLETED || jobState == TaskState.FAILED) {
                hashSet.add(str2);
            }
        }
        TaskUtil.removeJobsFromWorkflow(this._accessor, this._propertyStore, str, hashSet, true);
    }

    private void addJobConfig(String str, JobConfig jobConfig) {
        LOG.info("Add job configuration " + str);
        if (!TaskUtil.createJobConfig(this._accessor, str, new JobConfig(str, jobConfig))) {
            throw new HelixException("Failed to add job configuration for job " + str + ". It's possible that a job of the same name already exists or there was a connection issue");
        }
    }

    public void resume(String str) {
        setWorkflowTargetState(str, TargetState.START);
    }

    public void stop(String str) {
        setWorkflowTargetState(str, TargetState.STOP);
    }

    public void waitToStop(String str, long j) throws InterruptedException {
        setWorkflowTargetState(str, TargetState.STOP);
        long currentTimeMillis = System.currentTimeMillis() + j;
        while (System.currentTimeMillis() <= currentTimeMillis) {
            WorkflowContext workflowContext = getWorkflowContext(str);
            if (workflowContext != null && TaskState.STOPPED.equals(workflowContext.getWorkflowState())) {
                return;
            } else {
                Thread.sleep(1000L);
            }
        }
        throw new HelixException(String.format("Fail to stop the workflow/queue %s with in %d milliseconds.", str, Long.valueOf(j)));
    }

    public void delete(String str) {
        delete(str, false);
    }

    public void delete(String str, boolean z) {
        WorkflowContext workflowContext = TaskUtil.getWorkflowContext(this._propertyStore, str);
        if (z) {
            LOG.info("Forcefully removing workflow: " + str);
            removeWorkflowFromZK(str);
        } else {
            setWorkflowTargetState(str, TargetState.DELETE);
        }
        if (workflowContext == null || workflowContext.getScheduledWorkflows() == null) {
            return;
        }
        for (String str2 : workflowContext.getScheduledWorkflows()) {
            if (z) {
                WorkflowContext workflowContext2 = TaskUtil.getWorkflowContext(this._propertyStore, str2);
                if (workflowContext2 != null && workflowContext2.getFinishTime() != -1) {
                    removeWorkflowFromZK(str2);
                }
            } else {
                setWorkflowTargetState(str2, TargetState.DELETE);
            }
        }
    }

    private void removeWorkflowFromZK(String str) {
        HashSet hashSet = new HashSet();
        WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(this._accessor, str);
        if (workflowConfig != null) {
            hashSet.addAll(workflowConfig.getJobDag().getAllNodes());
        }
        if (TaskUtil.removeWorkflow(this._accessor, this._propertyStore, str, hashSet)) {
            return;
        }
        LOG.info("Failed to delete the workflow " + str);
        throw new HelixException("Failed to delete the workflow " + str);
    }

    public void deleteAndWaitForCompletion(String str, long j) throws InterruptedException {
        delete(str);
        long currentTimeMillis = System.currentTimeMillis() + j;
        BaseDataAccessor<ZNRecord> baseDataAccessor = this._accessor.getBaseDataAccessor();
        PropertyKey.Builder keyBuilder = this._accessor.keyBuilder();
        String path = keyBuilder.idealStates(str).getPath();
        String path2 = keyBuilder.resourceConfig(str).getPath();
        String path3 = keyBuilder.workflowContext(str).getPath();
        while (System.currentTimeMillis() <= currentTimeMillis) {
            if (!baseDataAccessor.exists(path, AccessOption.PERSISTENT) && !baseDataAccessor.exists(path2, AccessOption.PERSISTENT) && !baseDataAccessor.exists(path3, AccessOption.PERSISTENT)) {
                return;
            } else {
                Thread.sleep(1000L);
            }
        }
        StringBuilder sb = new StringBuilder();
        if (baseDataAccessor.exists(path, AccessOption.PERSISTENT)) {
            sb.append("IdealState ");
        }
        if (baseDataAccessor.exists(path2, AccessOption.PERSISTENT)) {
            sb.append("WorkflowConfig ");
        }
        if (baseDataAccessor.exists(path3, AccessOption.PERSISTENT)) {
            sb.append("WorkflowContext ");
        }
        throw new HelixException(String.format("Failed to delete the workflow/queue %s within %d milliseconds. The following components still remain: %s", str, Long.valueOf(j), sb.toString()));
    }

    private void setWorkflowTargetState(String str, TargetState targetState) {
        String lastScheduledSingleWorkflow;
        setSingleWorkflowTargetState(str, targetState);
        WorkflowContext workflowContext = TaskUtil.getWorkflowContext(this._propertyStore, str);
        if (workflowContext == null || (lastScheduledSingleWorkflow = workflowContext.getLastScheduledSingleWorkflow()) == null) {
            return;
        }
        setSingleWorkflowTargetState(lastScheduledSingleWorkflow, targetState);
    }

    private void setSingleWorkflowTargetState(String str, TargetState targetState) {
        LOG.info("Set " + str + " to target state " + targetState);
        if (TaskUtil.getWorkflowConfig(this._accessor, str) == null) {
            LOG.warn("WorkflowConfig for {} not found!", str);
            return;
        }
        WorkflowContext workflowContext = TaskUtil.getWorkflowContext(this._propertyStore, str);
        if (targetState != TargetState.DELETE && workflowContext != null && workflowContext.getFinishTime() != -1) {
            LOG.info("Workflow {} is already completed, skip to update its target state {}", str, targetState);
            return;
        }
        this._accessor.getBaseDataAccessor().update(TaskUtil.getWorkflowConfigKey(this._accessor, str).getPath(), zNRecord -> {
            if (zNRecord != null) {
                zNRecord.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(), targetState.name());
            } else {
                LOG.warn("TargetState DataUpdater: Fails to update target state for {}. CurrentData is null.", str);
            }
            return zNRecord;
        }, AccessOption.PERSISTENT);
    }

    public WorkflowConfig getWorkflowConfig(String str) {
        return TaskUtil.getWorkflowConfig(this._accessor, str);
    }

    public WorkflowContext getWorkflowContext(String str) {
        return TaskUtil.getWorkflowContext(this._propertyStore, str);
    }

    public JobConfig getJobConfig(String str) {
        return TaskUtil.getJobConfig(this._accessor, str);
    }

    public JobContext getJobContext(String str) {
        return TaskUtil.getJobContext(this._propertyStore, str);
    }

    public static JobContext getJobContext(HelixManager helixManager, String str) {
        return TaskUtil.getJobContext(helixManager, str);
    }

    public static WorkflowConfig getWorkflowConfig(HelixManager helixManager, String str) {
        return TaskUtil.getWorkflowConfig(helixManager, str);
    }

    public static WorkflowContext getWorkflowContext(HelixManager helixManager, String str) {
        return TaskUtil.getWorkflowContext(helixManager, str);
    }

    public static JobConfig getJobConfig(HelixManager helixManager, String str) {
        return TaskUtil.getJobConfig(helixManager, str);
    }

    public Map<String, WorkflowConfig> getWorkflows() {
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : this._accessor.getChildValuesMap(this._accessor.keyBuilder().resourceConfigs(), true).entrySet()) {
            try {
                hashMap.put(entry.getKey(), WorkflowConfig.fromHelixProperty((HelixProperty) entry.getValue()));
            } catch (IllegalArgumentException e) {
            }
        }
        return hashMap;
    }

    public TaskState pollForWorkflowState(String str, long j, TaskState... taskStateArr) throws InterruptedException {
        WorkflowContext workflowContext;
        long currentTimeMillis = System.currentTimeMillis();
        HashSet hashSet = new HashSet(Arrays.asList(taskStateArr));
        long j2 = j > 100 ? 100L : j;
        do {
            Thread.sleep(j2);
            workflowContext = getWorkflowContext(str);
            if (workflowContext != null && workflowContext.getWorkflowState() != null && hashSet.contains(workflowContext.getWorkflowState())) {
                break;
            }
        } while (System.currentTimeMillis() < currentTimeMillis + j);
        if (workflowContext != null && hashSet.contains(workflowContext.getWorkflowState())) {
            return workflowContext.getWorkflowState();
        }
        Object[] objArr = new Object[3];
        objArr[0] = str;
        objArr[1] = Arrays.asList(taskStateArr);
        objArr[2] = workflowContext == null ? "null" : workflowContext.getWorkflowState().toString();
        throw new HelixException(String.format("Workflow %s context is empty or not in states: %s, current state: %s.", objArr));
    }

    public TaskState pollForWorkflowState(String str, TaskState... taskStateArr) throws InterruptedException {
        return pollForWorkflowState(str, ProducerMetadata.TOPIC_EXPIRY_MS, taskStateArr);
    }

    public TaskState pollForJobState(String str, String str2, long j, TaskState... taskStateArr) throws InterruptedException {
        WorkflowContext workflowContext;
        WorkflowContext workflowContext2;
        WorkflowConfig workflowConfig = getWorkflowConfig(str);
        if (workflowConfig == null) {
            throw new HelixException(String.format("Workflow %s does not exists!", str));
        }
        long j2 = j > 50 ? 50L : j;
        if (workflowConfig.isRecurring()) {
            while (true) {
                Thread.sleep(j2);
                workflowContext2 = getWorkflowContext(str);
                if (workflowContext2 != null && workflowContext2.getLastScheduledSingleWorkflow() != null) {
                    break;
                }
            }
            str2 = str2.substring(str.length() + 1);
            str = workflowContext2.getLastScheduledSingleWorkflow();
        }
        HashSet hashSet = new HashSet(Arrays.asList(taskStateArr));
        long currentTimeMillis = System.currentTimeMillis();
        do {
            Thread.sleep(j2);
            workflowContext = getWorkflowContext(str);
            if (workflowContext != null && workflowContext.getJobState(str2) != null && hashSet.contains(workflowContext.getJobState(str2))) {
                break;
            }
        } while (System.currentTimeMillis() < currentTimeMillis + j);
        if (workflowContext != null && hashSet.contains(workflowContext.getJobState(str2))) {
            return workflowContext.getJobState(str2);
        }
        WorkflowConfig workflowConfig2 = getWorkflowConfig(str);
        JobConfig jobConfig = getJobConfig(str2);
        JobContext jobContext = getJobContext(str2);
        Object[] objArr = new Object[8];
        objArr[0] = str;
        objArr[1] = str2;
        objArr[2] = hashSet;
        objArr[3] = workflowContext == null ? "null" : workflowContext;
        objArr[4] = workflowContext != null ? workflowContext.getJobState(str2) : "null";
        objArr[5] = workflowConfig2;
        objArr[6] = jobConfig;
        objArr[7] = jobContext;
        throw new HelixException(String.format("Workflow %s context is null or job %s is not in states: %s; ctx is %s, jobState is %s, wf cfg %s, jobcfg %s, jbctx %s", objArr));
    }

    public TaskState pollForJobState(String str, String str2, TaskState... taskStateArr) throws InterruptedException {
        return pollForJobState(str, str2, ProducerMetadata.TOPIC_EXPIRY_MS, taskStateArr);
    }

    public long getLastScheduledTaskTimestamp(String str) {
        return getLastScheduledTaskExecutionInfo(str).getStartTimeStamp().longValue();
    }

    public TaskExecutionInfo getLastScheduledTaskExecutionInfo(String str) {
        JobContext jobContext;
        long j = -1;
        String str2 = null;
        Integer num = null;
        TaskPartitionState taskPartitionState = null;
        WorkflowContext workflowContext = getWorkflowContext(str);
        if (workflowContext != null) {
            for (Map.Entry<String, TaskState> entry : workflowContext.getJobStates().entrySet()) {
                if (!entry.getValue().equals(TaskState.NOT_STARTED) && (jobContext = getJobContext(entry.getKey())) != null) {
                    for (Integer num2 : jobContext.getPartitionSet()) {
                        String str3 = jobContext.getMapField(num2.intValue()).get(TASK_START_TIME_KEY);
                        if (str3 != null) {
                            long parseLong = Long.parseLong(str3);
                            if (parseLong > j) {
                                j = parseLong;
                                str2 = entry.getKey();
                                num = num2;
                                taskPartitionState = jobContext.getPartitionState(num2.intValue());
                            }
                        }
                    }
                }
            }
        }
        return new TaskExecutionInfo(str2, num, taskPartitionState, Long.valueOf(j));
    }

    @Deprecated
    public String getUserContent(String str, UserContentStore.Scope scope, String str2, String str3, String str4) {
        return TaskUtil.getUserContent(this._propertyStore, str, scope, str2, str3, str4);
    }

    public Map<String, String> getWorkflowUserContentMap(String str) {
        return TaskUtil.getWorkflowJobUserContentMap(this._propertyStore, str);
    }

    public Map<String, String> getJobUserContentMap(String str, String str2) {
        return TaskUtil.getWorkflowJobUserContentMap(this._propertyStore, TaskUtil.getNamespacedJobName(str, str2));
    }

    public Map<String, String> getTaskUserContentMap(String str, String str2, String str3) {
        String namespacedJobName = TaskUtil.getNamespacedJobName(str, str2);
        return TaskUtil.getTaskUserContentMap(this._propertyStore, namespacedJobName, TaskUtil.getNamespacedTaskName(namespacedJobName, str3));
    }

    public void addOrUpdateWorkflowUserContentMap(String str, Map<String, String> map) {
        TaskUtil.addOrUpdateWorkflowJobUserContentMap(this._propertyStore, str, map);
    }

    public void addOrUpdateJobUserContentMap(String str, String str2, Map<String, String> map) {
        TaskUtil.addOrUpdateWorkflowJobUserContentMap(this._propertyStore, TaskUtil.getNamespacedJobName(str, str2), map);
    }

    public void addOrUpdateTaskUserContentMap(String str, String str2, String str3, Map<String, String> map) {
        String namespacedJobName = TaskUtil.getNamespacedJobName(str, str2);
        TaskUtil.addOrUpdateTaskUserContentMap(this._propertyStore, namespacedJobName, TaskUtil.getNamespacedTaskName(namespacedJobName, str3), map);
    }

    private void validateZKNodeLimitation(int i) {
        if (this._accessor.getChildNames(this._accessor.keyBuilder().resourceConfigs()).size() + i > this._configsLimitation) {
            throw new HelixException("Cannot create more workflows or jobs because there are already too many items created in the path CONFIGS.");
        }
    }

    public int getTargetTaskThreadPoolSize(String str) {
        return getInstanceConfig(str).getTargetTaskThreadPoolSize();
    }

    public void setTargetTaskThreadPoolSize(String str, int i) {
        InstanceConfig instanceConfig = getInstanceConfig(str);
        instanceConfig.setTargetTaskThreadPoolSize(i);
        this._accessor.setProperty(this._accessor.keyBuilder().instanceConfig(str), instanceConfig);
    }

    private InstanceConfig getInstanceConfig(String str) {
        InstanceConfig instanceConfig = (InstanceConfig) this._accessor.getProperty(this._accessor.keyBuilder().instanceConfig(str));
        if (instanceConfig == null) {
            throw new IllegalArgumentException("Failed to find InstanceConfig with provided instance name " + str + XPath.NOT);
        }
        return instanceConfig;
    }

    public int getGlobalTargetTaskThreadPoolSize() {
        return getClusterConfig().getGlobalTargetTaskThreadPoolSize();
    }

    public void setGlobalTargetTaskThreadPoolSize(int i) {
        ClusterConfig clusterConfig = getClusterConfig();
        clusterConfig.setGlobalTargetTaskThreadPoolSize(i);
        this._accessor.setProperty(this._accessor.keyBuilder().clusterConfig(), clusterConfig);
    }

    private ClusterConfig getClusterConfig() {
        ClusterConfig clusterConfig = (ClusterConfig) this._accessor.getProperty(this._accessor.keyBuilder().clusterConfig());
        if (clusterConfig == null) {
            throw new IllegalStateException("Failed to find ClusterConfig for cluster " + this._clusterName + XPath.NOT);
        }
        return clusterConfig;
    }

    public int getCurrentTaskThreadPoolSize(String str) {
        LiveInstance liveInstance = (LiveInstance) this._accessor.getProperty(this._accessor.keyBuilder().liveInstance(str));
        if (liveInstance == null) {
            throw new IllegalArgumentException("Failed to find LiveInstance with provided instance name " + str + XPath.NOT);
        }
        return liveInstance.getCurrentTaskThreadPoolSize();
    }
}
