package org.apache.beam.runners.flink;

import java.util.Map;
import org.apache.beam.runners.core.SplittableParDo;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.runners.core.construction.UnconsumedReads;
import org.apache.beam.runners.flink.FlinkStreamingViewOverrides;
import org.apache.beam.runners.flink.p0002.p00110.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.class */
public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkStreamingPipelineTranslator.class);
    private final FlinkStreamingTranslationContext streamingContext;
    private int depth = 0;
    private FlinkRunner flinkRunner;

    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator$ReflectiveOneToOneOverrideFactory.class */
    private static class ReflectiveOneToOneOverrideFactory<InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>> extends SingleInputOutputOverrideFactory<PCollection<InputT>, PCollection<OutputT>, TransformT> {
        private final Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement;
        private final FlinkRunner runner;

        private ReflectiveOneToOneOverrideFactory(Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> cls, FlinkRunner flinkRunner) {
            this.replacement = cls;
            this.runner = flinkRunner;
        }

        public PTransformOverrideFactory.PTransformReplacement<PCollection<InputT>, PCollection<OutputT>> getReplacementTransform(AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, TransformT> appliedPTransform) {
            return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(appliedPTransform), (PTransform) InstanceBuilder.ofType(this.replacement).withArg(FlinkRunner.class, this.runner).withArg(appliedPTransform.getTransform().getClass(), appliedPTransform.getTransform()).build());
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator$SplittableParDoOverrideFactory.class */
    static class SplittableParDoOverrideFactory<InputT, OutputT> implements PTransformOverrideFactory<PCollection<InputT>, PCollectionTuple, ParDo.MultiOutput<InputT, OutputT>> {
        SplittableParDoOverrideFactory() {
        }

        public PTransformOverrideFactory.PTransformReplacement<PCollection<InputT>, PCollectionTuple> getReplacementTransform(AppliedPTransform<PCollection<InputT>, PCollectionTuple, ParDo.MultiOutput<InputT, OutputT>> appliedPTransform) {
            return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(appliedPTransform), new SplittableParDo(appliedPTransform.getTransform()));
        }

        public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> map, PCollectionTuple pCollectionTuple) {
            return ReplacementOutputs.tagged(map, pCollectionTuple);
        }

        public /* bridge */ /* synthetic */ Map mapOutputs(Map map, POutput pOutput) {
            return mapOutputs((Map<TupleTag<?>, PValue>) map, (PCollectionTuple) pOutput);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator$StreamTransformTranslator.class */
    public static abstract class StreamTransformTranslator<T extends PTransform> {
        abstract void translateNode(T t, FlinkStreamingTranslationContext flinkStreamingTranslationContext);

        boolean canTranslate(T t, FlinkStreamingTranslationContext flinkStreamingTranslationContext) {
            return true;
        }
    }

    public FlinkStreamingPipelineTranslator(FlinkRunner flinkRunner, StreamExecutionEnvironment streamExecutionEnvironment, PipelineOptions pipelineOptions) {
        this.streamingContext = new FlinkStreamingTranslationContext(streamExecutionEnvironment, pipelineOptions);
        this.flinkRunner = flinkRunner;
    }

    @Override // org.apache.beam.runners.flink.FlinkPipelineTranslator
    public void translate(Pipeline pipeline) {
        ImmutableList build = ImmutableList.builder().add((ImmutableList.Builder) PTransformOverride.of(PTransformMatchers.splittableParDoMulti(), new SplittableParDoOverrideFactory())).add((ImmutableList.Builder) PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsIterable.class), new ReflectiveOneToOneOverrideFactory(FlinkStreamingViewOverrides.StreamingViewAsIterable.class, this.flinkRunner))).add((ImmutableList.Builder) PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsList.class), new ReflectiveOneToOneOverrideFactory(FlinkStreamingViewOverrides.StreamingViewAsList.class, this.flinkRunner))).add((ImmutableList.Builder) PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMap.class), new ReflectiveOneToOneOverrideFactory(FlinkStreamingViewOverrides.StreamingViewAsMap.class, this.flinkRunner))).add((ImmutableList.Builder) PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMultimap.class), new ReflectiveOneToOneOverrideFactory(FlinkStreamingViewOverrides.StreamingViewAsMultimap.class, this.flinkRunner))).add((ImmutableList.Builder) PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsSingleton.class), new ReflectiveOneToOneOverrideFactory(FlinkStreamingViewOverrides.StreamingViewAsSingleton.class, this.flinkRunner))).add((ImmutableList.Builder) PTransformOverride.of(PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), new ReflectiveOneToOneOverrideFactory(FlinkStreamingViewOverrides.StreamingCombineGloballyAsSingletonView.class, this.flinkRunner))).build();
        UnconsumedReads.ensureAllReadsConsumed(pipeline);
        pipeline.replaceAll(build);
        super.translate(pipeline);
    }

    public Pipeline.PipelineVisitor.CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
        StreamTransformTranslator<?> translator;
        LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
        this.depth++;
        PTransform<?, ?> transform = node.getTransform();
        if (transform == null || (translator = FlinkStreamingTransformTranslators.getTranslator(transform)) == null || !applyCanTranslate(transform, node, translator)) {
            return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
        }
        applyStreamingTransform(transform, node, translator);
        LOG.info("{} translated- {}", genSpaces(this.depth), node.getFullName());
        return Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
    }

    public void leaveCompositeTransform(TransformHierarchy.Node node) {
        this.depth--;
        LOG.info("{} leaveCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
    }

    public void visitPrimitiveTransform(TransformHierarchy.Node node) {
        LOG.info("{} visitPrimitiveTransform- {}", genSpaces(this.depth), node.getFullName());
        PTransform<?, ?> transform = node.getTransform();
        StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
        if (translator == null || !applyCanTranslate(transform, node, translator)) {
            LOG.info(node.getTransform().getClass().toString());
            throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
        }
        applyStreamingTransform(transform, node, translator);
    }

    public void visitValue(PValue pValue, TransformHierarchy.Node node) {
    }

    private <T extends PTransform<?, ?>> void applyStreamingTransform(PTransform<?, ?> pTransform, TransformHierarchy.Node node, StreamTransformTranslator<?> streamTransformTranslator) {
        this.streamingContext.setCurrentTransform(node.toAppliedPTransform());
        streamTransformTranslator.translateNode(pTransform, this.streamingContext);
    }

    private <T extends PTransform<?, ?>> boolean applyCanTranslate(PTransform<?, ?> pTransform, TransformHierarchy.Node node, StreamTransformTranslator<?> streamTransformTranslator) {
        this.streamingContext.setCurrentTransform(node.toAppliedPTransform());
        return streamTransformTranslator.canTranslate(pTransform, this.streamingContext);
    }
}
