package org.apache.flink.table.planner.plan.nodes.exec.processor;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.transformations.ShuffleMode;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecBoundedStreamScan;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecMultipleInput;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecExchange;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecUnion;
import org.apache.flink.table.planner.plan.nodes.exec.processor.utils.InputOrderCalculator;
import org.apache.flink.table.planner.plan.nodes.exec.processor.utils.InputPriorityConflictResolver;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDataStreamScan;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMultipleInput;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessor.class */
public class MultipleInputNodeCreationProcessor implements ExecNodeGraphProcessor {
    private final boolean isStreaming;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessor$ExecNodeWrapper.class */
    public static class ExecNodeWrapper {
        private final ExecNode<?> execNode;
        private final List<ExecNodeWrapper> inputs;
        private final List<ExecNodeWrapper> outputs;
        private MultipleInputGroup group;

        private ExecNodeWrapper(ExecNode<?> execNode) {
            this.execNode = execNode;
            this.inputs = new ArrayList();
            this.outputs = new ArrayList();
            this.group = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessor$MultipleInputGroup.class */
    public static class MultipleInputGroup {
        private final List<ExecNodeWrapper> members;
        private ExecNodeWrapper root;

        private MultipleInputGroup(ExecNodeWrapper execNodeWrapper) {
            this.members = new ArrayList();
            this.members.add(execNodeWrapper);
            this.root = execNodeWrapper;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addMember(ExecNodeWrapper execNodeWrapper) {
            Preconditions.checkState(execNodeWrapper.group == null, "The given exec node wrapper is already in a multiple input group. This is a bug.");
            this.members.add(execNodeWrapper);
            execNodeWrapper.group = this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void removeMember(ExecNodeWrapper execNodeWrapper) {
            if (execNodeWrapper == this.root) {
                removeRoot();
            } else {
                Preconditions.checkState(this.members.remove(execNodeWrapper), "The given exec node wrapper does not exist in the multiple input group. This is a bug.");
                execNodeWrapper.group = null;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void removeRoot() {
            Preconditions.checkNotNull(this.root, "Multiple input group does not have a root. This is a bug.");
            HashSet hashSet = new HashSet();
            for (ExecNodeWrapper execNodeWrapper : this.root.inputs) {
                if (this.members.contains(execNodeWrapper)) {
                    hashSet.add(execNodeWrapper);
                }
            }
            Preconditions.checkState(hashSet.size() < 2, "There are two or more inputs of the root remaining in the multiple input group. This is a bug.");
            this.members.remove(this.root);
            this.root.group = null;
            if (hashSet.isEmpty()) {
                this.root = null;
            } else {
                this.root = (ExecNodeWrapper) hashSet.iterator().next();
            }
        }
    }

    public MultipleInputNodeCreationProcessor(boolean z) {
        this.isStreaming = z;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.processor.ExecNodeGraphProcessor
    public ExecNodeGraph process(ExecNodeGraph execNodeGraph, ProcessorContext processorContext) {
        if (!this.isStreaming) {
            new InputPriorityConflictResolver(execNodeGraph.getRootNodes(), InputProperty.DamBehavior.BLOCKING, ShuffleMode.PIPELINED, processorContext.getPlanner().getTableConfig().getConfiguration()).detectAndResolve();
        }
        List<ExecNodeWrapper> wrapExecNodes = wrapExecNodes(execNodeGraph.getRootNodes());
        List<ExecNodeWrapper> list = topologicalSort(wrapExecNodes);
        createMultipleInputGroups(list);
        optimizeMultipleInputGroups(list, processorContext);
        return new ExecNodeGraph(createMultipleInputNodes(wrapExecNodes));
    }

    private List<ExecNodeWrapper> wrapExecNodes(List<ExecNode<?>> list) {
        final HashMap hashMap = new HashMap();
        AbstractExecNodeExactlyOnceVisitor abstractExecNodeExactlyOnceVisitor = new AbstractExecNodeExactlyOnceVisitor() { // from class: org.apache.flink.table.planner.plan.nodes.exec.processor.MultipleInputNodeCreationProcessor.1
            @Override // org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor
            protected void visitNode(ExecNode<?> execNode) {
                ExecNodeWrapper execNodeWrapper = (ExecNodeWrapper) hashMap.computeIfAbsent(execNode, execNode2 -> {
                    return new ExecNodeWrapper(execNode);
                });
                Iterator<ExecEdge> it = execNode.getInputEdges().iterator();
                while (it.hasNext()) {
                    ExecNode<?> source = it.next().getSource();
                    ExecNodeWrapper execNodeWrapper2 = (ExecNodeWrapper) hashMap.computeIfAbsent(source, execNode3 -> {
                        return new ExecNodeWrapper(source);
                    });
                    execNodeWrapper.inputs.add(execNodeWrapper2);
                    execNodeWrapper2.outputs.add(execNodeWrapper);
                }
                visitInputs(execNode);
            }
        };
        list.forEach(execNode -> {
            execNode.accept(abstractExecNodeExactlyOnceVisitor);
        });
        ArrayList arrayList = new ArrayList();
        Iterator<ExecNode<?>> it = list.iterator();
        while (it.hasNext()) {
            ExecNodeWrapper execNodeWrapper = (ExecNodeWrapper) hashMap.get(it.next());
            Preconditions.checkNotNull(execNodeWrapper, "Root node is not wrapped. This is a bug.");
            arrayList.add(execNodeWrapper);
        }
        return arrayList;
    }

    private List<ExecNodeWrapper> topologicalSort(List<ExecNodeWrapper> list) {
        ArrayList arrayList = new ArrayList();
        LinkedList linkedList = new LinkedList(list);
        HashMap hashMap = new HashMap();
        while (!linkedList.isEmpty()) {
            ExecNodeWrapper execNodeWrapper = (ExecNodeWrapper) linkedList.poll();
            arrayList.add(execNodeWrapper);
            for (ExecNodeWrapper execNodeWrapper2 : execNodeWrapper.inputs) {
                if (((Integer) hashMap.compute(execNodeWrapper2, (execNodeWrapper3, num) -> {
                    return Integer.valueOf(num == null ? 1 : num.intValue() + 1);
                })).intValue() == execNodeWrapper2.outputs.size()) {
                    linkedList.offer(execNodeWrapper2);
                }
            }
        }
        return arrayList;
    }

    private void createMultipleInputGroups(List<ExecNodeWrapper> list) {
        for (ExecNodeWrapper execNodeWrapper : list) {
            if (canBeMultipleInputNodeMember(execNodeWrapper)) {
                MultipleInputGroup canBeInSameGroupWithOutputs = canBeInSameGroupWithOutputs(execNodeWrapper);
                if (canBeInSameGroupWithOutputs != null) {
                    canBeInSameGroupWithOutputs.addMember(execNodeWrapper);
                } else if (canBeRootOfMultipleInputGroup(execNodeWrapper)) {
                    execNodeWrapper.group = new MultipleInputGroup(execNodeWrapper);
                }
            }
        }
    }

    private boolean canBeMultipleInputNodeMember(ExecNodeWrapper execNodeWrapper) {
        return (execNodeWrapper.inputs.isEmpty() || (execNodeWrapper.execNode instanceof CommonExecExchange)) ? false : true;
    }

    private MultipleInputGroup canBeInSameGroupWithOutputs(ExecNodeWrapper execNodeWrapper) {
        MultipleInputGroup multipleInputGroup;
        if (execNodeWrapper.outputs.isEmpty() || (multipleInputGroup = ((ExecNodeWrapper) execNodeWrapper.outputs.get(0)).group) == null) {
            return null;
        }
        Iterator it = execNodeWrapper.outputs.iterator();
        while (it.hasNext()) {
            if (((ExecNodeWrapper) it.next()).group != multipleInputGroup) {
                return null;
            }
        }
        return multipleInputGroup;
    }

    private boolean canBeRootOfMultipleInputGroup(ExecNodeWrapper execNodeWrapper) {
        return execNodeWrapper.inputs.size() >= 2;
    }

    private void optimizeMultipleInputGroups(List<ExecNodeWrapper> list, ProcessorContext processorContext) {
        for (int size = list.size() - 1; size >= 0; size--) {
            ExecNodeWrapper execNodeWrapper = list.get(size);
            if (execNodeWrapper.group != null && isEntranceOfMultipleInputGroup(execNodeWrapper)) {
                boolean z = false;
                if (execNodeWrapper.execNode instanceof CommonExecUnion) {
                    z = execNodeWrapper.inputs.stream().noneMatch(execNodeWrapper2 -> {
                        return isChainableSource(execNodeWrapper2.execNode, processorContext);
                    });
                } else if (execNodeWrapper.inputs.size() == 1) {
                    ExecNode execNode = ((ExecNodeWrapper) execNodeWrapper.inputs.get(0)).execNode;
                    z = ((execNode instanceof CommonExecExchange) || isChainableSource(execNode, processorContext)) ? false : true;
                }
                if (z | execNodeWrapper.inputs.stream().anyMatch(execNodeWrapper3 -> {
                    return (execNodeWrapper3.execNode instanceof CommonExecExchange) && execNodeWrapper3.execNode.getInputProperties().get(0).getRequiredDistribution().getType() == InputProperty.DistributionType.SINGLETON;
                })) {
                    execNodeWrapper.group.removeMember(execNodeWrapper);
                }
            }
        }
        for (ExecNodeWrapper execNodeWrapper4 : list) {
            MultipleInputGroup multipleInputGroup = execNodeWrapper4.group;
            if (multipleInputGroup != null && execNodeWrapper4 == execNodeWrapper4.group.root) {
                boolean z2 = execNodeWrapper4.execNode instanceof CommonExecUnion;
                if (multipleInputGroup.members.size() == 1) {
                    if (z2 || execNodeWrapper4.inputs.stream().noneMatch(execNodeWrapper5 -> {
                        return isChainableSource(execNodeWrapper5.execNode, processorContext);
                    })) {
                        execNodeWrapper4.group.removeRoot();
                    }
                } else if (z2) {
                    int i = 0;
                    ArrayList arrayList = new ArrayList();
                    ArrayList arrayList2 = new ArrayList();
                    for (int i2 = 0; i2 < execNodeWrapper4.inputs.size(); i2++) {
                        List<ExecNodeWrapper> inputWrappersInSameGroup = getInputWrappersInSameGroup((ExecNodeWrapper) execNodeWrapper4.inputs.get(i2), execNodeWrapper4.group);
                        arrayList2.add(inputWrappersInSameGroup);
                        if (inputWrappersInSameGroup.stream().filter(execNodeWrapper6 -> {
                            return execNodeWrapper6.inputs.size() >= 2 && !(execNodeWrapper6.execNode instanceof CommonExecUnion);
                        }).count() > 0) {
                            i++;
                        } else {
                            arrayList.add(Integer.valueOf(i2));
                        }
                    }
                    if (i < 2) {
                        Iterator it = arrayList.iterator();
                        while (it.hasNext()) {
                            for (ExecNodeWrapper execNodeWrapper7 : (List) arrayList2.get(((Integer) it.next()).intValue())) {
                                if (execNodeWrapper7.group != null) {
                                    execNodeWrapper7.group.removeMember(execNodeWrapper7);
                                }
                            }
                        }
                        execNodeWrapper4.group.removeRoot();
                    }
                } else if (execNodeWrapper4.inputs.size() == 1) {
                    execNodeWrapper4.group.removeRoot();
                }
            }
        }
    }

    private List<ExecNodeWrapper> getInputWrappersInSameGroup(ExecNodeWrapper execNodeWrapper, MultipleInputGroup multipleInputGroup) {
        ArrayList arrayList = new ArrayList();
        LinkedList linkedList = new LinkedList();
        HashSet hashSet = new HashSet();
        linkedList.add(execNodeWrapper);
        hashSet.add(execNodeWrapper);
        while (!linkedList.isEmpty()) {
            ExecNodeWrapper execNodeWrapper2 = (ExecNodeWrapper) linkedList.poll();
            if (execNodeWrapper2.group == multipleInputGroup) {
                arrayList.add(execNodeWrapper2);
                for (ExecNodeWrapper execNodeWrapper3 : execNodeWrapper2.inputs) {
                    if (!hashSet.contains(execNodeWrapper3)) {
                        linkedList.add(execNodeWrapper3);
                        hashSet.add(execNodeWrapper3);
                    }
                }
            }
        }
        return arrayList;
    }

    private boolean isEntranceOfMultipleInputGroup(ExecNodeWrapper execNodeWrapper) {
        Preconditions.checkNotNull(execNodeWrapper.group, "Exec node wrapper does not have a multiple input group. This is a bug.");
        Iterator it = execNodeWrapper.inputs.iterator();
        while (it.hasNext()) {
            if (((ExecNodeWrapper) it.next()).group == execNodeWrapper.group) {
                return false;
            }
        }
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public static boolean isChainableSource(ExecNode<?> execNode, ProcessorContext processorContext) {
        if (execNode instanceof BatchExecBoundedStreamScan) {
            return ((BatchExecBoundedStreamScan) execNode).getDataStream().getTransformation() instanceof SourceTransformation;
        }
        if (execNode instanceof StreamExecDataStreamScan) {
            return ((StreamExecDataStreamScan) execNode).getDataStream().getTransformation() instanceof SourceTransformation;
        }
        if (execNode instanceof CommonExecTableSourceScan) {
            return execNode.translateToPlan(((ProcessorContext) Preconditions.checkNotNull(processorContext)).getPlanner()) instanceof SourceTransformation;
        }
        return false;
    }

    private List<ExecNode<?>> createMultipleInputNodes(List<ExecNodeWrapper> list) {
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        Iterator<ExecNodeWrapper> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(getMultipleInputNode(it.next(), hashMap));
        }
        return arrayList;
    }

    private ExecNode<?> getMultipleInputNode(ExecNodeWrapper execNodeWrapper, Map<ExecNodeWrapper, ExecNode<?>> map) {
        if (map.containsKey(execNodeWrapper)) {
            return map.get(execNodeWrapper);
        }
        for (int i = 0; i < execNodeWrapper.inputs.size(); i++) {
            execNodeWrapper.execNode.replaceInputEdge(i, ExecEdge.builder().source(getMultipleInputNode((ExecNodeWrapper) execNodeWrapper.inputs.get(i), map)).target(execNodeWrapper.execNode).build());
        }
        ExecNode<?> createMultipleInputNode = (execNodeWrapper.group == null || execNodeWrapper != execNodeWrapper.group.root) ? execNodeWrapper.execNode : createMultipleInputNode(execNodeWrapper.group, map);
        map.put(execNodeWrapper, createMultipleInputNode);
        return createMultipleInputNode;
    }

    private ExecNode<?> createMultipleInputNode(MultipleInputGroup multipleInputGroup, Map<ExecNodeWrapper, ExecNode<?>> map) {
        ArrayList arrayList = new ArrayList();
        for (ExecNodeWrapper execNodeWrapper : multipleInputGroup.members) {
            for (int i = 0; i < execNodeWrapper.inputs.size(); i++) {
                ExecNodeWrapper execNodeWrapper2 = (ExecNodeWrapper) execNodeWrapper.inputs.get(i);
                if (!multipleInputGroup.members.contains(execNodeWrapper2)) {
                    Preconditions.checkState(map.containsKey(execNodeWrapper2), "Input of a multiple input member is not visited. This is a bug.");
                    arrayList.add(Tuple2.of(map.get(execNodeWrapper2), execNodeWrapper.execNode.getInputProperties().get(i)));
                }
            }
        }
        return this.isStreaming ? createStreamMultipleInputNode(multipleInputGroup, arrayList) : createBatchMultipleInputNode(multipleInputGroup, arrayList);
    }

    private StreamExecMultipleInput createStreamMultipleInputNode(MultipleInputGroup multipleInputGroup, List<Tuple2<ExecNode<?>, InputProperty>> list) {
        ExecNode execNode = multipleInputGroup.root.execNode;
        ArrayList arrayList = new ArrayList();
        Iterator<Tuple2<ExecNode<?>, InputProperty>> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().f0);
        }
        StreamExecMultipleInput streamExecMultipleInput = new StreamExecMultipleInput((List) arrayList.stream().map(execNode2 -> {
            return InputProperty.DEFAULT;
        }).collect(Collectors.toList()), execNode, ExecNodeUtil.getMultipleInputDescription(execNode, arrayList, new ArrayList()));
        ArrayList arrayList2 = new ArrayList(arrayList.size());
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            arrayList2.add(ExecEdge.builder().source((ExecNode) it2.next()).target(streamExecMultipleInput).build());
        }
        streamExecMultipleInput.setInputEdges(arrayList2);
        return streamExecMultipleInput;
    }

    private BatchExecMultipleInput createBatchMultipleInputNode(MultipleInputGroup multipleInputGroup, List<Tuple2<ExecNode<?>, InputProperty>> list) {
        HashSet hashSet = new HashSet();
        Iterator<Tuple2<ExecNode<?>, InputProperty>> it = list.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().f0);
        }
        Map<ExecNode<?>, Integer> calculate = new InputOrderCalculator(multipleInputGroup.root.execNode, hashSet, InputProperty.DamBehavior.BLOCKING).calculate();
        ExecNode execNode = multipleInputGroup.root.execNode;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (Tuple2<ExecNode<?>, InputProperty> tuple2 : list) {
            ExecNode execNode2 = (ExecNode) tuple2.f0;
            InputProperty inputProperty = (InputProperty) tuple2.f1;
            arrayList.add(execNode2);
            arrayList2.add(InputProperty.builder().requiredDistribution(inputProperty.getRequiredDistribution()).damBehavior(inputProperty.getDamBehavior()).priority(calculate.get(execNode2).intValue()).build());
        }
        BatchExecMultipleInput batchExecMultipleInput = new BatchExecMultipleInput(arrayList2, execNode, ExecNodeUtil.getMultipleInputDescription(execNode, arrayList, arrayList2));
        ArrayList arrayList3 = new ArrayList(arrayList.size());
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            arrayList3.add(ExecEdge.builder().source((ExecNode) it2.next()).target(batchExecMultipleInput).build());
        }
        batchExecMultipleInput.setInputEdges(arrayList3);
        return batchExecMultipleInput;
    }
}
