package org.apache.beam.runners.spark;

import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.translation.SparkPipelineEvaluator;
import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
import org.apache.beam.runners.spark.translation.SparkProcessContext;
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext;
import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
import org.apache.beam.runners.spark.translation.streaming.StreamingWindowPipelineDetector;
import org.apache.beam.runners.spark.util.SinglePrimitiveOutputPTransform;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.TransformTreeNode;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/spark/SparkRunner.class */
public final class SparkRunner extends PipelineRunner<EvaluationResult> {
    private static final Logger LOG = LoggerFactory.getLogger(SparkRunner.class);
    private final SparkPipelineOptions mOptions;

    /* loaded from: input_file:org/apache/beam/runners/spark/SparkRunner$Evaluator.class */
    public static abstract class Evaluator extends Pipeline.PipelineVisitor.Defaults {
        protected static final Logger LOG = LoggerFactory.getLogger(Evaluator.class);
        protected final SparkPipelineTranslator translator;

        /* JADX INFO: Access modifiers changed from: protected */
        public Evaluator(SparkPipelineTranslator sparkPipelineTranslator) {
            this.translator = sparkPipelineTranslator;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public Pipeline.PipelineVisitor.CompositeBehavior enterCompositeTransform(TransformTreeNode transformTreeNode) {
            if (transformTreeNode.getTransform() != null) {
                Class<?> cls = transformTreeNode.getTransform().getClass();
                if (this.translator.hasTranslation(cls)) {
                    LOG.info("Entering directly-translatable composite transform: '{}'", transformTreeNode.getFullName());
                    LOG.debug("Composite transform class: '{}'", cls);
                    doVisitTransform(transformTreeNode);
                    return Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
                }
            }
            return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
        }

        public void visitPrimitiveTransform(TransformTreeNode transformTreeNode) {
            doVisitTransform(transformTreeNode);
        }

        protected abstract <TransformT extends PTransform<? super PInput, POutput>> void doVisitTransform(TransformTreeNode transformTreeNode);
    }

    public static SparkRunner create() {
        SparkPipelineOptions sparkPipelineOptions = (SparkPipelineOptions) PipelineOptionsFactory.as(SparkPipelineOptions.class);
        sparkPipelineOptions.setRunner(SparkRunner.class);
        return new SparkRunner(sparkPipelineOptions);
    }

    public static SparkRunner create(SparkPipelineOptions sparkPipelineOptions) {
        return new SparkRunner(sparkPipelineOptions);
    }

    public static SparkRunner fromOptions(PipelineOptions pipelineOptions) {
        return new SparkRunner((SparkPipelineOptions) PipelineOptionsValidator.validate(SparkPipelineOptions.class, pipelineOptions));
    }

    public <OutputT extends POutput, InputT extends PInput> OutputT apply(PTransform<InputT, OutputT> pTransform, InputT inputt) {
        return pTransform instanceof GroupByKey ? (OutputT) ((PCollection) inputt).apply(new GroupByKeyViaGroupByKeyOnly((GroupByKey) pTransform)) : pTransform instanceof Create.Values ? (OutputT) super.apply(new SinglePrimitiveOutputPTransform((Create.Values) pTransform), inputt) : (OutputT) super.apply(pTransform, inputt);
    }

    private SparkRunner(SparkPipelineOptions sparkPipelineOptions) {
        this.mOptions = sparkPipelineOptions;
    }

    /* renamed from: run, reason: merged with bridge method [inline-methods] */
    public EvaluationResult m1run(Pipeline pipeline) {
        try {
            if (this.mOptions.isStreaming() && !(this.mOptions instanceof SparkStreamingPipelineOptions)) {
                throw new RuntimeException("A streaming job must be configured with " + SparkStreamingPipelineOptions.class.getSimpleName() + ", found " + this.mOptions.getClass().getSimpleName());
            }
            LOG.info("Executing pipeline using the SparkRunner.");
            JavaSparkContext sparkContext = SparkContextFactory.getSparkContext(this.mOptions.getSparkMaster(), this.mOptions.getAppName());
            if (!this.mOptions.isStreaming()) {
                EvaluationContext evaluationContext = new EvaluationContext(sparkContext, pipeline);
                pipeline.traverseTopologically(new SparkPipelineEvaluator(evaluationContext, new TransformTranslator.Translator()));
                evaluationContext.computeOutputs();
                LOG.info("Pipeline execution complete.");
                return evaluationContext;
            }
            StreamingTransformTranslator.Translator translator = new StreamingTransformTranslator.Translator(new TransformTranslator.Translator());
            StreamingWindowPipelineDetector streamingWindowPipelineDetector = new StreamingWindowPipelineDetector(translator);
            pipeline.traverseTopologically(streamingWindowPipelineDetector);
            if (!streamingWindowPipelineDetector.isWindowing()) {
                throw new IllegalStateException("Spark streaming pipeline must be windowed!");
            }
            Duration batchDuration = streamingWindowPipelineDetector.getBatchDuration();
            LOG.info("Setting Spark streaming batchInterval to {} msec", Long.valueOf(batchDuration.milliseconds()));
            EvaluationContext createStreamingEvaluationContext = createStreamingEvaluationContext(sparkContext, pipeline, batchDuration);
            pipeline.traverseTopologically(new SparkPipelineEvaluator(createStreamingEvaluationContext, translator));
            createStreamingEvaluationContext.computeOutputs();
            LOG.info("Streaming pipeline construction complete. Starting execution..");
            ((StreamingEvaluationContext) createStreamingEvaluationContext).getStreamingContext().start();
            return createStreamingEvaluationContext;
        } catch (Exception e) {
            if (!(e instanceof SparkException) || e.getCause() == null) {
                throw new RuntimeException(e);
            }
            if (!(e.getCause() instanceof SparkProcessContext.SparkProcessException) || e.getCause().getCause() == null) {
                throw new RuntimeException(e.getCause());
            }
            throw new RuntimeException(e.getCause().getCause());
        }
    }

    private EvaluationContext createStreamingEvaluationContext(JavaSparkContext javaSparkContext, Pipeline pipeline, Duration duration) {
        return new StreamingEvaluationContext(javaSparkContext, pipeline, new JavaStreamingContext(javaSparkContext, duration), ((SparkStreamingPipelineOptions) this.mOptions).getTimeout().longValue());
    }
}
