package org.apache.flink.api.common.operators;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.AggregatorWithName;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.common.operators.base.BulkIterationBase;
import org.apache.flink.api.common.operators.base.DeltaIterationBase;
import org.apache.flink.api.common.operators.util.TypeComparable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.types.Value;
import org.apache.flink.util.Visitor;

/* loaded from: input_file:org/apache/flink/api/common/operators/CollectionExecutor.class */
public class CollectionExecutor {
    private static final boolean DEFAULT_MUTABLE_OBJECT_SAFE_MODE = true;
    private final ExecutionConfig executionConfig;
    private int iterationSuperstep;
    private final Map<Operator<?>, List<?>> intermediateResults = new HashMap();
    private final Map<String, Accumulator<?, ?>> accumulators = new HashMap();
    private final Map<String, Value> previousAggregates = new HashMap();
    private final Map<String, Aggregator<?>> aggregators = new HashMap();
    private final Map<String, Future<Path>> cachedFiles = new HashMap();
    private final ClassLoader classLoader = getClass().getClassLoader();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/api/common/operators/CollectionExecutor$CompletedFuture.class */
    public static final class CompletedFuture implements Future<Path> {
        private final Path result;

        public CompletedFuture(Path path) {
            try {
                this.result = path.isAbsolute() ? new Path(path.toUri().getPath()) : new Path(((LocalFileSystem) path.getFileSystem()).getWorkingDirectory(), path);
            } catch (Exception e) {
                throw new RuntimeException("DistributedCache supports only local files for Collection Environments");
            }
        }

        @Override // java.util.concurrent.Future
        public boolean cancel(boolean z) {
            return false;
        }

        @Override // java.util.concurrent.Future
        public boolean isCancelled() {
            return false;
        }

        @Override // java.util.concurrent.Future
        public boolean isDone() {
            return true;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Future
        public Path get() throws InterruptedException, ExecutionException {
            return this.result;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Future
        public Path get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
            return get();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/api/common/operators/CollectionExecutor$DynamicPathCollector.class */
    public static final class DynamicPathCollector implements Visitor<Operator<?>> {
        private final Set<Operator<?>> visited = new HashSet();
        private final Set<Operator<?>> dynamicPathOperations;

        public DynamicPathCollector(Set<Operator<?>> set) {
            this.dynamicPathOperations = set;
        }

        @Override // org.apache.flink.util.Visitor
        public boolean preVisit(Operator<?> operator) {
            return this.visited.add(operator);
        }

        @Override // org.apache.flink.util.Visitor
        public void postVisit(Operator<?> operator) {
            if (operator instanceof SingleInputOperator) {
                SingleInputOperator singleInputOperator = (SingleInputOperator) operator;
                if (this.dynamicPathOperations.contains(singleInputOperator.getInput())) {
                    this.dynamicPathOperations.add(operator);
                    return;
                }
                Iterator<Operator<?>> it = singleInputOperator.getBroadcastInputs().values().iterator();
                while (it.hasNext()) {
                    if (this.dynamicPathOperations.contains(it.next())) {
                        this.dynamicPathOperations.add(operator);
                        return;
                    }
                }
                return;
            }
            if (!(operator instanceof DualInputOperator)) {
                if (operator.getClass() == BulkIterationBase.PartialSolutionPlaceHolder.class || operator.getClass() == DeltaIterationBase.WorksetPlaceHolder.class || operator.getClass() == DeltaIterationBase.SolutionSetPlaceHolder.class) {
                    this.dynamicPathOperations.add(operator);
                    return;
                } else {
                    if (!(operator instanceof GenericDataSourceBase)) {
                        throw new RuntimeException("Cannot handle operator type " + operator.getClass().getName());
                    }
                    return;
                }
            }
            DualInputOperator dualInputOperator = (DualInputOperator) operator;
            if (this.dynamicPathOperations.contains(dualInputOperator.getFirstInput())) {
                this.dynamicPathOperations.add(operator);
                return;
            }
            if (this.dynamicPathOperations.contains(dualInputOperator.getSecondInput())) {
                this.dynamicPathOperations.add(operator);
                return;
            }
            Iterator<Operator<?>> it2 = dualInputOperator.getBroadcastInputs().values().iterator();
            while (it2.hasNext()) {
                if (this.dynamicPathOperations.contains(it2.next())) {
                    this.dynamicPathOperations.add(operator);
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/api/common/operators/CollectionExecutor$IterationRuntimeUDFContext.class */
    public class IterationRuntimeUDFContext extends RuntimeUDFContext implements IterationRuntimeContext {
        public IterationRuntimeUDFContext(String str, int i, int i2, ClassLoader classLoader, ExecutionConfig executionConfig, Map<String, Future<Path>> map, Map<String, Accumulator<?, ?>> map2) {
            super(str, i, i2, classLoader, executionConfig, map, map2);
        }

        @Override // org.apache.flink.api.common.functions.IterationRuntimeContext
        public int getSuperstepNumber() {
            return CollectionExecutor.this.iterationSuperstep;
        }

        @Override // org.apache.flink.api.common.functions.IterationRuntimeContext
        public <T extends Aggregator<?>> T getIterationAggregator(String str) {
            return (T) CollectionExecutor.this.aggregators.get(str);
        }

        @Override // org.apache.flink.api.common.functions.IterationRuntimeContext
        public <T extends Value> T getPreviousIterationAggregate(String str) {
            return (T) CollectionExecutor.this.previousAggregates.get(str);
        }
    }

    public CollectionExecutor(ExecutionConfig executionConfig) {
        this.executionConfig = executionConfig;
    }

    public JobExecutionResult execute(Plan plan) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        initCache(plan.getCachedFiles());
        Iterator<? extends GenericDataSinkBase<?>> it = plan.getDataSinks().iterator();
        while (it.hasNext()) {
            execute(it.next());
        }
        return new JobExecutionResult(null, System.currentTimeMillis() - currentTimeMillis, AccumulatorHelper.toResultMap(this.accumulators));
    }

    private void initCache(Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>> set) {
        for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : set) {
            this.cachedFiles.put(entry.getKey(), new CompletedFuture(new Path(entry.getValue().filePath)));
        }
    }

    private List<?> execute(Operator<?> operator) throws Exception {
        return execute(operator, 0);
    }

    private List<?> execute(Operator<?> operator, int i) throws Exception {
        List<?> emptyList;
        List<?> list = this.intermediateResults.get(operator);
        if (list != null) {
            return list;
        }
        if (operator instanceof BulkIterationBase) {
            emptyList = executeBulkIteration((BulkIterationBase) operator);
        } else if (operator instanceof DeltaIterationBase) {
            emptyList = executeDeltaIteration((DeltaIterationBase) operator);
        } else if (operator instanceof SingleInputOperator) {
            emptyList = executeUnaryOperator((SingleInputOperator) operator, i);
        } else if (operator instanceof DualInputOperator) {
            emptyList = executeBinaryOperator((DualInputOperator) operator, i);
        } else if (operator instanceof GenericDataSourceBase) {
            emptyList = executeDataSource((GenericDataSourceBase) operator, i);
        } else {
            if (!(operator instanceof GenericDataSinkBase)) {
                throw new RuntimeException("Cannot execute operator " + operator.getClass().getName());
            }
            executeDataSink((GenericDataSinkBase) operator, i);
            emptyList = Collections.emptyList();
        }
        this.intermediateResults.put(operator, emptyList);
        return emptyList;
    }

    private <IN> void executeDataSink(GenericDataSinkBase<?> genericDataSinkBase, int i) throws Exception {
        RuntimeContext runtimeContext;
        Operator<?> input = genericDataSinkBase.getInput();
        if (input == null) {
            throw new InvalidProgramException("The data sink " + genericDataSinkBase.getName() + " has no input.");
        }
        List<?> execute = execute(input);
        if (RichOutputFormat.class.isAssignableFrom(genericDataSinkBase.getUserCodeWrapper().getUserCodeClass())) {
            runtimeContext = i == 0 ? new RuntimeUDFContext(genericDataSinkBase.getName(), 1, 0, getClass().getClassLoader(), this.executionConfig, this.cachedFiles, this.accumulators) : new IterationRuntimeUDFContext(genericDataSinkBase.getName(), 1, 0, this.classLoader, this.executionConfig, this.cachedFiles, this.accumulators);
        } else {
            runtimeContext = null;
        }
        genericDataSinkBase.executeOnCollections(execute, runtimeContext, this.executionConfig);
    }

    private <OUT> List<OUT> executeDataSource(GenericDataSourceBase<?, ?> genericDataSourceBase, int i) throws Exception {
        RuntimeContext runtimeContext;
        if (RichInputFormat.class.isAssignableFrom(genericDataSourceBase.getUserCodeWrapper().getUserCodeClass())) {
            runtimeContext = i == 0 ? new RuntimeUDFContext(genericDataSourceBase.getName(), 1, 0, getClass().getClassLoader(), this.executionConfig, this.cachedFiles, this.accumulators) : new IterationRuntimeUDFContext(genericDataSourceBase.getName(), 1, 0, this.classLoader, this.executionConfig, this.cachedFiles, this.accumulators);
        } else {
            runtimeContext = null;
        }
        return (List<OUT>) genericDataSourceBase.executeOnCollections(runtimeContext, this.executionConfig);
    }

    private <IN, OUT> List<OUT> executeUnaryOperator(SingleInputOperator<?, ?, ?> singleInputOperator, int i) throws Exception {
        RuntimeUDFContext runtimeUDFContext;
        Operator<?> input = singleInputOperator.getInput();
        if (input == null) {
            throw new InvalidProgramException("The unary operation " + singleInputOperator.getName() + " has no input.");
        }
        List<?> execute = execute(input, i);
        if (RichFunction.class.isAssignableFrom(singleInputOperator.getUserCodeWrapper().getUserCodeClass())) {
            runtimeUDFContext = i == 0 ? new RuntimeUDFContext(singleInputOperator.getName(), 1, 0, getClass().getClassLoader(), this.executionConfig, this.cachedFiles, this.accumulators) : new IterationRuntimeUDFContext(singleInputOperator.getName(), 1, 0, this.classLoader, this.executionConfig, this.cachedFiles, this.accumulators);
            for (Map.Entry<String, Operator<?>> entry : singleInputOperator.getBroadcastInputs().entrySet()) {
                runtimeUDFContext.setBroadcastVariable(entry.getKey(), execute(entry.getValue()));
            }
        } else {
            runtimeUDFContext = null;
        }
        return (List<OUT>) singleInputOperator.executeOnCollections(execute, runtimeUDFContext, this.executionConfig);
    }

    private <IN1, IN2, OUT> List<OUT> executeBinaryOperator(DualInputOperator<?, ?, ?, ?> dualInputOperator, int i) throws Exception {
        RuntimeUDFContext runtimeUDFContext;
        Operator<?> firstInput = dualInputOperator.getFirstInput();
        Operator<?> secondInput = dualInputOperator.getSecondInput();
        if (firstInput == null) {
            throw new InvalidProgramException("The binary operation " + dualInputOperator.getName() + " has no first input.");
        }
        if (secondInput == null) {
            throw new InvalidProgramException("The binary operation " + dualInputOperator.getName() + " has no second input.");
        }
        List<?> execute = execute(firstInput, i);
        List<?> execute2 = execute(secondInput, i);
        if (RichFunction.class.isAssignableFrom(dualInputOperator.getUserCodeWrapper().getUserCodeClass())) {
            runtimeUDFContext = i == 0 ? new RuntimeUDFContext(dualInputOperator.getName(), 1, 0, this.classLoader, this.executionConfig, this.cachedFiles, this.accumulators) : new IterationRuntimeUDFContext(dualInputOperator.getName(), 1, 0, this.classLoader, this.executionConfig, this.cachedFiles, this.accumulators);
            for (Map.Entry<String, Operator<?>> entry : dualInputOperator.getBroadcastInputs().entrySet()) {
                runtimeUDFContext.setBroadcastVariable(entry.getKey(), execute(entry.getValue()));
            }
        } else {
            runtimeUDFContext = null;
        }
        return (List<OUT>) dualInputOperator.executeOnCollections(execute, execute2, runtimeUDFContext, this.executionConfig);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T> List<T> executeBulkIteration(BulkIterationBase<?> bulkIterationBase) throws Exception {
        Operator<?> input = bulkIterationBase.getInput();
        if (input == null) {
            throw new InvalidProgramException("The iteration " + bulkIterationBase.getName() + " has no input (initial partial solution).");
        }
        if (bulkIterationBase.getNextPartialSolution() == null) {
            throw new InvalidProgramException("The iteration " + bulkIterationBase.getName() + " has no next partial solution defined (is not closed).");
        }
        List<?> execute = execute(input);
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        DynamicPathCollector dynamicPathCollector = new DynamicPathCollector(linkedHashSet);
        bulkIterationBase.getNextPartialSolution().accept(dynamicPathCollector);
        if (bulkIterationBase.getTerminationCriterion() != null) {
            bulkIterationBase.getTerminationCriterion().accept(dynamicPathCollector);
        }
        for (AggregatorWithName<?> aggregatorWithName : bulkIterationBase.getAggregators().getAllRegisteredAggregators()) {
            this.aggregators.put(aggregatorWithName.getName(), aggregatorWithName.getAggregator());
        }
        String convergenceCriterionAggregatorName = bulkIterationBase.getAggregators().getConvergenceCriterionAggregatorName();
        ConvergenceCriterion<?> convergenceCriterion = bulkIterationBase.getAggregators().getConvergenceCriterion();
        List<?> list = execute;
        int maximumNumberOfIterations = bulkIterationBase.getMaximumNumberOfIterations();
        for (int i = 1; i <= maximumNumberOfIterations; i++) {
            this.intermediateResults.put(bulkIterationBase.getPartialSolution(), list);
            this.iterationSuperstep = i;
            list = execute(bulkIterationBase.getNextPartialSolution(), i);
            if (bulkIterationBase.getTerminationCriterion() != null) {
                execute(bulkIterationBase.getTerminationCriterion(), i);
            }
            if (convergenceCriterion != null && convergenceCriterionAggregatorName != null) {
                if (convergenceCriterion.isConverged(i, this.aggregators.get(convergenceCriterionAggregatorName).getAggregate())) {
                    break;
                }
            }
            Iterator it = linkedHashSet.iterator();
            while (it.hasNext()) {
                this.intermediateResults.remove((Operator) it.next());
            }
            for (Map.Entry<String, Aggregator<?>> entry : this.aggregators.entrySet()) {
                this.previousAggregates.put(entry.getKey(), entry.getValue().getAggregate());
                entry.getValue().reset();
            }
        }
        this.previousAggregates.clear();
        this.aggregators.clear();
        return (List<T>) list;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T> List<T> executeDeltaIteration(DeltaIterationBase<?, ?> deltaIterationBase) throws Exception {
        Operator<?> initialSolutionSet = deltaIterationBase.getInitialSolutionSet();
        Operator<?> initialWorkset = deltaIterationBase.getInitialWorkset();
        if (initialSolutionSet == null) {
            throw new InvalidProgramException("The delta iteration " + deltaIterationBase.getName() + " has no initial solution set.");
        }
        if (initialWorkset == null) {
            throw new InvalidProgramException("The delta iteration " + deltaIterationBase.getName() + " has no initial workset.");
        }
        if (deltaIterationBase.getSolutionSetDelta() == null) {
            throw new InvalidProgramException("The iteration " + deltaIterationBase.getName() + " has no solution set delta defined (is not closed).");
        }
        if (deltaIterationBase.getNextWorkset() == null) {
            throw new InvalidProgramException("The iteration " + deltaIterationBase.getName() + " has no workset defined (is not closed).");
        }
        List<?> execute = execute(initialSolutionSet);
        List<?> execute2 = execute(initialWorkset);
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        DynamicPathCollector dynamicPathCollector = new DynamicPathCollector(linkedHashSet);
        deltaIterationBase.getSolutionSetDelta().accept(dynamicPathCollector);
        deltaIterationBase.getNextWorkset().accept(dynamicPathCollector);
        TypeInformation<?> firstInputType = deltaIterationBase.getOperatorInfo().getFirstInputType();
        int[] solutionSetKeyFields = deltaIterationBase.getSolutionSetKeyFields();
        TypeComparator<T> createComparator = ((CompositeType) firstInputType).createComparator(solutionSetKeyFields, new boolean[solutionSetKeyFields.length], 0, this.executionConfig);
        HashMap hashMap = new HashMap(execute.size());
        for (Object obj : execute) {
            hashMap.put(new TypeComparable(obj, createComparator), obj);
        }
        List<?> list = execute2;
        for (AggregatorWithName<?> aggregatorWithName : deltaIterationBase.getAggregators().getAllRegisteredAggregators()) {
            this.aggregators.put(aggregatorWithName.getName(), aggregatorWithName.getAggregator());
        }
        int maximumNumberOfIterations = deltaIterationBase.getMaximumNumberOfIterations();
        for (int i = 1; i <= maximumNumberOfIterations; i++) {
            ArrayList arrayList = new ArrayList(hashMap.size());
            arrayList.addAll(hashMap.values());
            this.intermediateResults.put(deltaIterationBase.getSolutionSet(), arrayList);
            this.intermediateResults.put(deltaIterationBase.getWorkset(), list);
            this.iterationSuperstep = i;
            List<?> execute3 = execute(deltaIterationBase.getSolutionSetDelta(), i);
            this.intermediateResults.put(deltaIterationBase.getSolutionSetDelta(), execute3);
            for (Object obj2 : execute3) {
                hashMap.put(new TypeComparable(obj2, createComparator), obj2);
            }
            list = execute(deltaIterationBase.getNextWorkset(), i);
            if (list.isEmpty()) {
                break;
            }
            Iterator it = linkedHashSet.iterator();
            while (it.hasNext()) {
                this.intermediateResults.remove((Operator) it.next());
            }
            for (Map.Entry<String, Aggregator<?>> entry : this.aggregators.entrySet()) {
                this.previousAggregates.put(entry.getKey(), entry.getValue().getAggregate());
                entry.getValue().reset();
            }
        }
        this.previousAggregates.clear();
        this.aggregators.clear();
        ArrayList arrayList2 = new ArrayList(hashMap.size());
        arrayList2.addAll(hashMap.values());
        return arrayList2;
    }
}
