package org.apache.beam.runners.spark.translation.streaming;

import com.google.api.client.util.Lists;
import com.google.api.client.util.Maps;
import com.google.api.client.util.Sets;
import com.google.common.reflect.TypeToken;
import java.lang.reflect.ParameterizedType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.spark.io.ConsoleIO;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.runners.spark.io.KafkaIO;
import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
import org.apache.beam.runners.spark.translation.DoFnFunction;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
import org.apache.beam.runners.spark.translation.TransformEvaluator;
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.runners.spark.translation.WindowingHelpers;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.AssignWindowsDoFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PValue;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.class */
public final class StreamingTransformTranslator {
    private static final TransformTranslator.FieldGetter WINDOW_FG = new TransformTranslator.FieldGetter(Window.Bound.class);
    private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps.newHashMap();
    private static final Set<Class<? extends PTransform>> UNSUPPORTED_EVALUATORS;

    /* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator$RDDOutputOperator.class */
    private static final class RDDOutputOperator<TransformT extends PTransform<?, ?>> implements VoidFunction<JavaRDD<WindowedValue<Object>>> {
        private final StreamingEvaluationContext context;
        private final AppliedPTransform<?, ?, ?> appliedPTransform;
        private final TransformEvaluator<TransformT> rddEvaluator;
        private final TransformT transform;

        private RDDOutputOperator(StreamingEvaluationContext streamingEvaluationContext, TransformEvaluator<TransformT> transformEvaluator, TransformT transformt) {
            this.context = streamingEvaluationContext;
            this.appliedPTransform = streamingEvaluationContext.getCurrentTransform();
            this.rddEvaluator = transformEvaluator;
            this.transform = transformt;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void call(JavaRDD<WindowedValue<Object>> javaRDD) throws Exception {
            AppliedPTransform<?, ?, ?> currentTransform = this.context.getCurrentTransform();
            this.context.setCurrentTransform(this.appliedPTransform);
            this.context.setInputRDD(this.transform, javaRDD);
            this.rddEvaluator.evaluate(this.transform, this.context);
            this.context.setCurrentTransform(currentTransform);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator$RDDTransform.class */
    private static final class RDDTransform<TransformT extends PTransform<?, ?>> implements Function<JavaRDD<WindowedValue<Object>>, JavaRDD<WindowedValue<Object>>> {
        private final StreamingEvaluationContext context;
        private final AppliedPTransform<?, ?, ?> appliedPTransform;
        private final TransformEvaluator<TransformT> rddEvaluator;
        private final TransformT transform;

        private RDDTransform(StreamingEvaluationContext streamingEvaluationContext, TransformEvaluator<TransformT> transformEvaluator, TransformT transformt) {
            this.context = streamingEvaluationContext;
            this.appliedPTransform = streamingEvaluationContext.getCurrentTransform();
            this.rddEvaluator = transformEvaluator;
            this.transform = transformt;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public JavaRDD<WindowedValue<Object>> call(JavaRDD<WindowedValue<Object>> javaRDD) throws Exception {
            AppliedPTransform<?, ?, ?> currentTransform = this.context.getCurrentTransform();
            this.context.setCurrentTransform(this.appliedPTransform);
            this.context.setInputRDD(this.transform, javaRDD);
            this.rddEvaluator.evaluate(this.transform, this.context);
            if (!this.context.hasOutputRDD(this.transform)) {
                this.context.setOutputRDD(this.transform, this.context.getSparkContext().emptyRDD());
            }
            JavaRDD<WindowedValue<Object>> outputRDD = this.context.getOutputRDD(this.transform);
            this.context.setCurrentTransform(currentTransform);
            return outputRDD;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator$Translator.class */
    public static class Translator implements SparkPipelineTranslator {
        private final SparkPipelineTranslator rddTranslator;

        public Translator(SparkPipelineTranslator sparkPipelineTranslator) {
            this.rddTranslator = sparkPipelineTranslator;
        }

        @Override // org.apache.beam.runners.spark.translation.SparkPipelineTranslator
        public boolean hasTranslation(Class<? extends PTransform<?, ?>> cls) {
            return StreamingTransformTranslator.EVALUATORS.containsKey(cls) || this.rddTranslator.hasTranslation(cls);
        }

        @Override // org.apache.beam.runners.spark.translation.SparkPipelineTranslator
        public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translate(Class<TransformT> cls) {
            return StreamingTransformTranslator.getTransformEvaluator(cls, this.rddTranslator);
        }
    }

    private StreamingTransformTranslator() {
    }

    private static <T> TransformEvaluator<ConsoleIO.Write.Unbound<T>> print() {
        return new TransformEvaluator<ConsoleIO.Write.Unbound<T>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.1
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(ConsoleIO.Write.Unbound<T> unbound, EvaluationContext evaluationContext) {
                ((StreamingEvaluationContext) evaluationContext).getStream(unbound).map(WindowingHelpers.unwindowFunction()).print(unbound.getNum());
            }
        };
    }

    private static <K, V> TransformEvaluator<KafkaIO.Read.Unbound<K, V>> kafka() {
        return new TransformEvaluator<KafkaIO.Read.Unbound<K, V>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.2
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(KafkaIO.Read.Unbound<K, V> unbound, EvaluationContext evaluationContext) {
                StreamingEvaluationContext streamingEvaluationContext = (StreamingEvaluationContext) evaluationContext;
                streamingEvaluationContext.setStream(unbound, KafkaUtils.createDirectStream(streamingEvaluationContext.getStreamingContext(), unbound.getKeyClass(), unbound.getValueClass(), unbound.getKeyDecoderClass(), unbound.getValueDecoderClass(), unbound.getKafkaParams(), unbound.getTopics()).map(new Function<Tuple2<K, V>, KV<K, V>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.2.1
                    public KV<K, V> call(Tuple2<K, V> tuple2) throws Exception {
                        return KV.of(tuple2._1(), tuple2._2());
                    }
                }).map(WindowingHelpers.windowFunction()));
            }
        };
    }

    private static <T> TransformEvaluator<Create.Values<T>> create() {
        return new TransformEvaluator<Create.Values<T>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.3
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(Create.Values<T> values, EvaluationContext evaluationContext) {
                StreamingEvaluationContext streamingEvaluationContext = (StreamingEvaluationContext) evaluationContext;
                Iterable elements = values.getElements();
                streamingEvaluationContext.setDStreamFromQueue(values, Collections.singletonList(elements), streamingEvaluationContext.getOutput(values).getCoder());
            }
        };
    }

    private static <T> TransformEvaluator<CreateStream.QueuedValues<T>> createFromQueue() {
        return new TransformEvaluator<CreateStream.QueuedValues<T>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.4
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(CreateStream.QueuedValues<T> queuedValues, EvaluationContext evaluationContext) {
                StreamingEvaluationContext streamingEvaluationContext = (StreamingEvaluationContext) evaluationContext;
                streamingEvaluationContext.setDStreamFromQueue(queuedValues, queuedValues.getQueuedValues(), streamingEvaluationContext.getOutput(queuedValues).getCoder());
            }
        };
    }

    private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
        return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.5
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(Flatten.FlattenPCollectionList<T> flattenPCollectionList, EvaluationContext evaluationContext) {
                StreamingEvaluationContext streamingEvaluationContext = (StreamingEvaluationContext) evaluationContext;
                PCollectionList input = streamingEvaluationContext.getInput(flattenPCollectionList);
                JavaDStream stream = streamingEvaluationContext.getStream((PValue) input.get(0));
                ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(input.size() - 1);
                for (int i = 1; i < input.size(); i++) {
                    newArrayListWithCapacity.add(streamingEvaluationContext.getStream((PValue) input.get(i)));
                }
                streamingEvaluationContext.setStream(flattenPCollectionList, streamingEvaluationContext.getStreamingContext().union(stream, newArrayListWithCapacity));
            }
        };
    }

    private static <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> rddTransform(final SparkPipelineTranslator sparkPipelineTranslator) {
        return (TransformEvaluator<TransformT>) new TransformEvaluator<TransformT>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.6
            /* JADX WARN: Incorrect types in method signature: (TTransformT;Lorg/apache/beam/runners/spark/translation/EvaluationContext;)V */
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(PTransform pTransform, EvaluationContext evaluationContext) {
                TransformEvaluator translate = SparkPipelineTranslator.this.translate(pTransform.getClass());
                StreamingEvaluationContext streamingEvaluationContext = (StreamingEvaluationContext) evaluationContext;
                if (streamingEvaluationContext.hasStream(pTransform)) {
                    streamingEvaluationContext.setStream(pTransform, streamingEvaluationContext.getStream((PTransform<?, ?>) pTransform).transform(new RDDTransform(streamingEvaluationContext, translate, pTransform)));
                } else {
                    translate.evaluate(pTransform, evaluationContext);
                }
            }
        };
    }

    private static <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> foreachRDD(final SparkPipelineTranslator sparkPipelineTranslator) {
        return (TransformEvaluator<TransformT>) new TransformEvaluator<TransformT>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.7
            /* JADX WARN: Incorrect types in method signature: (TTransformT;Lorg/apache/beam/runners/spark/translation/EvaluationContext;)V */
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(PTransform pTransform, EvaluationContext evaluationContext) {
                TransformEvaluator translate = SparkPipelineTranslator.this.translate(pTransform.getClass());
                StreamingEvaluationContext streamingEvaluationContext = (StreamingEvaluationContext) evaluationContext;
                if (streamingEvaluationContext.hasStream(pTransform)) {
                    streamingEvaluationContext.getStream((PTransform<?, ?>) pTransform).foreachRDD(new RDDOutputOperator(streamingEvaluationContext, translate, pTransform));
                } else {
                    translate.evaluate(pTransform, evaluationContext);
                }
            }
        };
    }

    private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() {
        return new TransformEvaluator<Window.Bound<T>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.8
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(Window.Bound<T> bound, EvaluationContext evaluationContext) {
                StreamingEvaluationContext streamingEvaluationContext = (StreamingEvaluationContext) evaluationContext;
                FixedWindows fixedWindows = (WindowFn) StreamingTransformTranslator.WINDOW_FG.get("windowFn", bound);
                JavaDStream stream = streamingEvaluationContext.getStream((PTransform<?, ?>) bound);
                if (fixedWindows instanceof FixedWindows) {
                    streamingEvaluationContext.setStream(bound, stream.window(Durations.milliseconds(fixedWindows.getSize().getMillis())));
                } else if (fixedWindows instanceof SlidingWindows) {
                    streamingEvaluationContext.setStream(bound, stream.window(Durations.milliseconds(((SlidingWindows) fixedWindows).getSize().getMillis()), Durations.milliseconds(((SlidingWindows) fixedWindows).getPeriod().getMillis())));
                }
                streamingEvaluationContext.setStream(bound, streamingEvaluationContext.getStream((PTransform<?, ?>) bound).mapPartitions(new DoFnFunction(new AssignWindowsDoFn(fixedWindows), ((StreamingEvaluationContext) evaluationContext).getRuntimeContext(), null)));
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> getTransformEvaluator(Class<TransformT> cls, SparkPipelineTranslator sparkPipelineTranslator) {
        TransformEvaluator<TransformT> transformEvaluator = (TransformEvaluator) EVALUATORS.get(cls);
        if (transformEvaluator != null) {
            return transformEvaluator;
        }
        if (UNSUPPORTED_EVALUATORS.contains(cls)) {
            throw new UnsupportedOperationException("Dataflow transformation " + cls.getCanonicalName() + " is currently unsupported by the Spark streaming pipeline");
        }
        return PDone.class.equals(getPTransformOutputClazz(cls)) ? foreachRDD(sparkPipelineTranslator) : rddTransform(sparkPipelineTranslator);
    }

    private static <TransformT extends PTransform<?, ?>> Class<?> getPTransformOutputClazz(Class<TransformT> cls) {
        return TypeToken.of(cls).resolveType(((ParameterizedType) cls.getGenericSuperclass()).getActualTypeArguments()[1]).getRawType();
    }

    static {
        EVALUATORS.put(ConsoleIO.Write.Unbound.class, print());
        EVALUATORS.put(CreateStream.QueuedValues.class, createFromQueue());
        EVALUATORS.put(Create.Values.class, create());
        EVALUATORS.put(KafkaIO.Read.Unbound.class, kafka());
        EVALUATORS.put(Window.Bound.class, window());
        EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
        UNSUPPORTED_EVALUATORS = Sets.newHashSet();
        UNSUPPORTED_EVALUATORS.add(TextIO.Read.Bound.class);
        UNSUPPORTED_EVALUATORS.add(TextIO.Write.Bound.class);
        UNSUPPORTED_EVALUATORS.add(AvroIO.Read.Bound.class);
        UNSUPPORTED_EVALUATORS.add(AvroIO.Write.Bound.class);
        UNSUPPORTED_EVALUATORS.add(HadoopIO.Read.Bound.class);
        UNSUPPORTED_EVALUATORS.add(HadoopIO.Write.Bound.class);
    }
}
