package org.apache.beam.runners.direct.portable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.KeyedWorkItem;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.KeyedWorkItems;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.RehydratedComponents;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.PipelineNode;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.wire.WireCoders;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.StructuralKey;
import org.apache.beam.runners.direct.ExecutableGraph;
import org.apache.beam.runners.direct.portable.DirectGroupByKey;
import org.apache.beam.runners.direct.portable.StepTransformResult;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/portable/GroupByKeyOnlyEvaluatorFactory.class */
public class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
    private final RunnerApi.Components components;
    private final BundleFactory bundleFactory;
    private final ExecutableGraph<PipelineNode.PTransformNode, PipelineNode.PCollectionNode> graph;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/portable/GroupByKeyOnlyEvaluatorFactory$GroupByKeyOnlyEvaluator.class */
    public class GroupByKeyOnlyEvaluator<K, V> implements TransformEvaluator<KV<K, V>> {
        private final Coder<K> keyCoder;
        private final Map<StructuralKey<K>, List<WindowedValue<V>>> groupingMap;
        private final PipelineNode.PCollectionNode outputPCollection;
        private final StepTransformResult.Builder<KV<K, V>> resultBuilder;

        private GroupByKeyOnlyEvaluator(PipelineNode.PTransformNode pTransformNode) {
            this.keyCoder = getKeyCoder(pTransformNode);
            this.groupingMap = new HashMap();
            this.outputPCollection = (PipelineNode.PCollectionNode) Iterables.getOnlyElement(GroupByKeyOnlyEvaluatorFactory.this.graph.getProduced(pTransformNode));
            this.resultBuilder = StepTransformResult.withoutHold(pTransformNode);
        }

        private Coder<K> getKeyCoder(PipelineNode.PTransformNode pTransformNode) {
            PipelineNode.PCollectionNode pCollectionNode = (PipelineNode.PCollectionNode) Iterables.getOnlyElement(GroupByKeyOnlyEvaluatorFactory.this.graph.getPerElementInputs(pTransformNode));
            try {
                RunnerApi.Components.Builder builder = GroupByKeyOnlyEvaluatorFactory.this.components.toBuilder();
                WindowedValue.WindowedValueCoder coder = RehydratedComponents.forComponents(builder.build()).getCoder(WireCoders.addRunnerWireCoder(pCollectionNode, builder));
                Preconditions.checkArgument(coder instanceof WindowedValue.WindowedValueCoder, "Wire %s must be a %s", Coder.class.getSimpleName(), WindowedValue.WindowedValueCoder.class.getSimpleName());
                WindowedValue.WindowedValueCoder windowedValueCoder = coder;
                Preconditions.checkArgument(windowedValueCoder.getValueCoder() instanceof KvCoder, "Input elements to %s must be encoded with a %s", DirectGroupByKey.DirectGroupByKeyOnly.class.getSimpleName(), KvCoder.class.getSimpleName());
                return windowedValueCoder.getValueCoder().getKeyCoder();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Override // org.apache.beam.runners.direct.portable.TransformEvaluator
        public void processElement(WindowedValue<KV<K, V>> windowedValue) {
            KV kv = (KV) windowedValue.getValue();
            this.groupingMap.computeIfAbsent(StructuralKey.of(kv.getKey(), this.keyCoder), structuralKey -> {
                return new ArrayList();
            }).add(windowedValue.withValue(kv.getValue()));
        }

        @Override // org.apache.beam.runners.direct.portable.TransformEvaluator
        public TransformResult<KV<K, V>> finishBundle() {
            for (Map.Entry<StructuralKey<K>, List<WindowedValue<V>>> entry : this.groupingMap.entrySet()) {
                K key = entry.getKey().getKey();
                KeyedWorkItem elementsWorkItem = KeyedWorkItems.elementsWorkItem(key, entry.getValue());
                UncommittedBundle<?> createKeyedBundle = GroupByKeyOnlyEvaluatorFactory.this.bundleFactory.createKeyedBundle(StructuralKey.of(key, this.keyCoder), this.outputPCollection);
                createKeyedBundle.add(WindowedValue.valueInGlobalWindow(elementsWorkItem));
                this.resultBuilder.addOutput(createKeyedBundle, new UncommittedBundle[0]);
            }
            return this.resultBuilder.build();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GroupByKeyOnlyEvaluatorFactory(ExecutableGraph<PipelineNode.PTransformNode, PipelineNode.PCollectionNode> executableGraph, RunnerApi.Components components, BundleFactory bundleFactory) {
        this.components = components;
        this.bundleFactory = bundleFactory;
        this.graph = executableGraph;
    }

    @Override // org.apache.beam.runners.direct.portable.TransformEvaluatorFactory
    public <InputT> TransformEvaluator<InputT> forApplication(PipelineNode.PTransformNode pTransformNode, CommittedBundle<?> committedBundle) {
        return createEvaluator(pTransformNode);
    }

    @Override // org.apache.beam.runners.direct.portable.TransformEvaluatorFactory
    public void cleanup() {
    }

    private <K, V> TransformEvaluator<KV<K, V>> createEvaluator(PipelineNode.PTransformNode pTransformNode) {
        return new GroupByKeyOnlyEvaluator(pTransformNode);
    }
}
