package org.apache.beam.runners.direct;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.direct.DirectExecutionContext;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.direct.repackaged.runners.core.DoFnRunners;
import org.apache.beam.runners.direct.repackaged.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.direct.repackaged.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner;
import org.apache.beam.runners.direct.repackaged.runners.core.TimerInternals;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/ParDoEvaluator.class */
public class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
    private final PushbackSideInputDoFnRunner<InputT, ?> fnRunner;
    private final AppliedPTransform<?, ?, ?> transform;
    private final BundleOutputManager outputManager;
    private final DirectExecutionContext.DirectStepContext stepContext;
    private final ImmutableList.Builder<WindowedValue<InputT>> unprocessedElements = ImmutableList.builder();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/ParDoEvaluator$BundleOutputManager.class */
    public static class BundleOutputManager implements DoFnRunners.OutputManager {
        private final Map<TupleTag<?>, UncommittedBundle<?>> bundles;
        private final Map<TupleTag<?>, List<?>> undeclaredOutputs = new HashMap();

        public static BundleOutputManager create(Map<TupleTag<?>, UncommittedBundle<?>> map) {
            return new BundleOutputManager(map);
        }

        private BundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> map) {
            this.bundles = map;
        }

        @Override // org.apache.beam.runners.direct.repackaged.runners.core.DoFnRunners.OutputManager
        public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
            UncommittedBundle<?> uncommittedBundle = this.bundles.get(tupleTag);
            if (uncommittedBundle != null) {
                uncommittedBundle.add(windowedValue);
                return;
            }
            List<?> list = this.undeclaredOutputs.get(tupleTag);
            if (list == null) {
                list = new ArrayList();
                this.undeclaredOutputs.put(tupleTag, list);
            }
            list.add(windowedValue);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/ParDoEvaluator$DoFnRunnerFactory.class */
    public interface DoFnRunnerFactory<InputT, OutputT> {
        PushbackSideInputDoFnRunner<InputT, OutputT> createRunner(PipelineOptions pipelineOptions, DoFn<InputT, OutputT> doFn, List<PCollectionView<?>> list, ReadyCheckingSideInputReader readyCheckingSideInputReader, DoFnRunners.OutputManager outputManager, TupleTag<OutputT> tupleTag, List<TupleTag<?>> list2, DirectExecutionContext.DirectStepContext directStepContext, WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy);
    }

    public static <InputT, OutputT> DoFnRunnerFactory<InputT, OutputT> defaultRunnerFactory() {
        return new DoFnRunnerFactory<InputT, OutputT>() { // from class: org.apache.beam.runners.direct.ParDoEvaluator.1
            @Override // org.apache.beam.runners.direct.ParDoEvaluator.DoFnRunnerFactory
            public PushbackSideInputDoFnRunner<InputT, OutputT> createRunner(PipelineOptions pipelineOptions, DoFn<InputT, OutputT> doFn, List<PCollectionView<?>> list, ReadyCheckingSideInputReader readyCheckingSideInputReader, DoFnRunners.OutputManager outputManager, TupleTag<OutputT> tupleTag, List<TupleTag<?>> list2, DirectExecutionContext.DirectStepContext directStepContext, WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy) {
                return SimplePushbackSideInputDoFnRunner.create(DoFnRunners.simpleRunner(pipelineOptions, doFn, readyCheckingSideInputReader, outputManager, tupleTag, list2, directStepContext, windowingStrategy), list, readyCheckingSideInputReader);
            }
        };
    }

    public static <InputT, OutputT> ParDoEvaluator<InputT> create(EvaluationContext evaluationContext, DirectExecutionContext.DirectStepContext directStepContext, AppliedPTransform<?, ?, ?> appliedPTransform, WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy, DoFn<InputT, OutputT> doFn, StructuralKey<?> structuralKey, List<PCollectionView<?>> list, TupleTag<OutputT> tupleTag, List<TupleTag<?>> list2, Map<TupleTag<?>, PCollection<?>> map, DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory) {
        BundleOutputManager createOutputManager = createOutputManager(evaluationContext, structuralKey, map);
        return create(doFnRunnerFactory.createRunner(evaluationContext.getPipelineOptions(), doFn, list, evaluationContext.createSideInputReader(list), createOutputManager, tupleTag, list2, directStepContext, windowingStrategy), directStepContext, appliedPTransform, createOutputManager);
    }

    public static <InputT, OutputT> ParDoEvaluator<InputT> create(PushbackSideInputDoFnRunner<InputT, OutputT> pushbackSideInputDoFnRunner, DirectExecutionContext.DirectStepContext directStepContext, AppliedPTransform<?, ?, ?> appliedPTransform, BundleOutputManager bundleOutputManager) {
        return new ParDoEvaluator<>(pushbackSideInputDoFnRunner, appliedPTransform, bundleOutputManager, directStepContext);
    }

    static BundleOutputManager createOutputManager(EvaluationContext evaluationContext, StructuralKey<?> structuralKey, Map<TupleTag<?>, PCollection<?>> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TupleTag<?>, PCollection<?>> entry : map.entrySet()) {
            if (evaluationContext.isKeyed((PValue) entry.getValue())) {
                hashMap.put(entry.getKey(), evaluationContext.createKeyedBundle(structuralKey, entry.getValue()));
            } else {
                hashMap.put(entry.getKey(), evaluationContext.createBundle(entry.getValue()));
            }
        }
        return BundleOutputManager.create(hashMap);
    }

    private ParDoEvaluator(PushbackSideInputDoFnRunner<InputT, ?> pushbackSideInputDoFnRunner, AppliedPTransform<?, ?, ?> appliedPTransform, BundleOutputManager bundleOutputManager, DirectExecutionContext.DirectStepContext directStepContext) {
        this.fnRunner = pushbackSideInputDoFnRunner;
        this.transform = appliedPTransform;
        this.outputManager = bundleOutputManager;
        this.stepContext = directStepContext;
        try {
            pushbackSideInputDoFnRunner.startBundle();
        } catch (Exception e) {
            throw UserCodeException.wrap(e);
        }
    }

    public BundleOutputManager getOutputManager() {
        return this.outputManager;
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluator
    public void processElement(WindowedValue<InputT> windowedValue) {
        try {
            this.unprocessedElements.addAll(this.fnRunner.processElementInReadyWindows(windowedValue));
        } catch (Exception e) {
            throw UserCodeException.wrap(e);
        }
    }

    public void onTimer(TimerInternals.TimerData timerData, BoundedWindow boundedWindow) {
        try {
            this.fnRunner.onTimer(timerData.getTimerId(), boundedWindow, timerData.getTimestamp(), timerData.getDomain());
        } catch (Exception e) {
            throw UserCodeException.wrap(e);
        }
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluator
    public TransformResult<InputT> finishBundle() {
        try {
            this.fnRunner.finishBundle();
            CopyOnAccessInMemoryStateInternals commitState = this.stepContext.commitState();
            return (commitState != null ? StepTransformResult.withHold(this.transform, commitState.getEarliestWatermarkHold()).withState(commitState) : StepTransformResult.withoutHold(this.transform)).addOutput(this.outputManager.bundles.values()).withTimerUpdate(this.stepContext.getTimerUpdate()).addUnprocessedElements(this.unprocessedElements.build()).build();
        } catch (Exception e) {
            throw UserCodeException.wrap(e);
        }
    }
}
