package org.apache.flink.runtime.executiongraph.failover;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.class */
public class RestartPipelinedRegionStrategy extends FailoverStrategy {
    private static final Logger LOG = LoggerFactory.getLogger(RestartPipelinedRegionStrategy.class);
    private final ExecutionGraph executionGraph;
    private final Executor executor;
    private final HashMap<ExecutionVertex, FailoverRegion> vertexToRegion;

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy$Factory.class */
    public static class Factory implements FailoverStrategy.Factory {
        @Override // org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory
        public FailoverStrategy create(ExecutionGraph executionGraph) {
            return new RestartPipelinedRegionStrategy(executionGraph);
        }
    }

    public RestartPipelinedRegionStrategy(ExecutionGraph executionGraph) {
        this(executionGraph, executionGraph.getFutureExecutor());
    }

    public RestartPipelinedRegionStrategy(ExecutionGraph executionGraph, Executor executor) {
        this.executionGraph = (ExecutionGraph) Preconditions.checkNotNull(executionGraph);
        this.executor = (Executor) Preconditions.checkNotNull(executor);
        this.vertexToRegion = new HashMap<>();
    }

    @Override // org.apache.flink.runtime.executiongraph.failover.FailoverStrategy
    public void onTaskFailure(Execution execution, Throwable th) {
        ExecutionVertex vertex = execution.getVertex();
        FailoverRegion failoverRegion = this.vertexToRegion.get(vertex);
        if (failoverRegion == null) {
            this.executionGraph.failGlobal((Throwable) new FlinkException("Can not find a failover region for the execution " + vertex.getTaskNameWithSubtaskIndex(), th));
        } else {
            LOG.info("Recovering task failure for {} #{} ({}) via restart of failover region", new Object[]{execution.getVertex().getTaskNameWithSubtaskIndex(), Integer.valueOf(execution.getAttemptNumber()), execution.getAttemptId()});
            failoverRegion.onExecutionFail(execution, th);
        }
    }

    @Override // org.apache.flink.runtime.executiongraph.failover.FailoverStrategy
    public void notifyNewVertices(List<ExecutionJobVertex> list) {
        generateAllFailoverRegion(list);
    }

    @Override // org.apache.flink.runtime.executiongraph.failover.FailoverStrategy
    public String getStrategyName() {
        return "Pipelined Region Failover";
    }

    private void generateAllFailoverRegion(List<ExecutionJobVertex> list) {
        IdentityHashMap identityHashMap = new IdentityHashMap();
        IdentityHashMap identityHashMap2 = new IdentityHashMap();
        for (ExecutionJobVertex executionJobVertex : list) {
            if (executionJobVertex.getCoLocationGroup() != null) {
                makeAllOneRegion(list);
                return;
            }
            List<IntermediateResult> inputs = executionJobVertex.getInputs();
            int size = inputs.size();
            boolean z = false;
            Iterator<IntermediateResult> it = inputs.iterator();
            while (true) {
                if (it.hasNext()) {
                    if (it.next().getResultType().isPipelined()) {
                        z = true;
                        break;
                    }
                } else {
                    break;
                }
            }
            if (z) {
                for (ExecutionVertex executionVertex : executionJobVertex.getTaskVertices()) {
                    ArrayList arrayList = null;
                    for (int i = 0; i < size; i++) {
                        if (inputs.get(i).getResultType().isPipelined()) {
                            for (ExecutionEdge executionEdge : executionVertex.getInputEdges(i)) {
                                ExecutionVertex producer = executionEdge.getSource().getProducer();
                                ArrayList arrayList2 = (ArrayList) identityHashMap.get(producer);
                                if (arrayList == null) {
                                    if (producer == null) {
                                        throw new FlinkRuntimeException("bug in the logic to construct the pipelined failover regions");
                                    }
                                    arrayList = arrayList2;
                                    arrayList.add(executionVertex);
                                    identityHashMap.put(executionVertex, arrayList);
                                } else if (arrayList2 != arrayList) {
                                    arrayList2.addAll(arrayList);
                                    identityHashMap2.remove(arrayList);
                                    arrayList = arrayList2;
                                    Iterator it2 = arrayList2.iterator();
                                    while (it2.hasNext()) {
                                        identityHashMap.put((ExecutionVertex) it2.next(), arrayList);
                                    }
                                }
                            }
                        }
                    }
                }
            } else {
                for (ExecutionVertex executionVertex2 : executionJobVertex.getTaskVertices()) {
                    ArrayList arrayList3 = new ArrayList(1);
                    arrayList3.add(executionVertex2);
                    identityHashMap.put(executionVertex2, arrayList3);
                    identityHashMap2.put(arrayList3, null);
                }
            }
        }
        LOG.info("Creating {} individual failover regions for job {} ({})", this.executionGraph.getJobName(), this.executionGraph.getJobID());
        for (List list2 : identityHashMap2.keySet()) {
            FailoverRegion failoverRegion = new FailoverRegion(this.executionGraph, this.executor, list2);
            Iterator it3 = list2.iterator();
            while (it3.hasNext()) {
                this.vertexToRegion.put((ExecutionVertex) it3.next(), failoverRegion);
            }
        }
    }

    private void makeAllOneRegion(List<ExecutionJobVertex> list) {
        LOG.warn("Cannot decompose ExecutionGraph into individual failover regions due to use of Co-Location constraints (iterations). Job will fail over as one holistic unit.");
        ArrayList arrayList = new ArrayList();
        for (ExecutionJobVertex executionJobVertex : list) {
            arrayList.ensureCapacity(arrayList.size() + executionJobVertex.getParallelism());
            for (ExecutionVertex executionVertex : executionJobVertex.getTaskVertices()) {
                arrayList.add(executionVertex);
            }
        }
        FailoverRegion failoverRegion = new FailoverRegion(this.executionGraph, this.executor, arrayList);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            this.vertexToRegion.put((ExecutionVertex) it.next(), failoverRegion);
        }
    }

    @VisibleForTesting
    public FailoverRegion getFailoverRegion(ExecutionVertex executionVertex) {
        return this.vertexToRegion.get(executionVertex);
    }
}
