package org.apache.beam.runners.spark.translation.streaming;

import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaRDDLike;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.class */
public class StreamingEvaluationContext extends EvaluationContext {
    private final JavaStreamingContext jssc;
    private final long timeout;
    private final Map<PValue, DStreamHolder<?>> pstreams;
    private final Set<DStreamHolder<?>> leafStreams;
    private PipelineResult.State state;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext$DStreamHolder.class */
    public class DStreamHolder<T> {
        private Iterable<Iterable<T>> values;
        private Coder<T> coder;
        private JavaDStream<WindowedValue<T>> dStream;

        DStreamHolder(Iterable<Iterable<T>> iterable, Coder<T> coder) {
            this.values = iterable;
            this.coder = coder;
        }

        DStreamHolder(JavaDStream<WindowedValue<T>> javaDStream) {
            this.dStream = javaDStream;
        }

        JavaDStream<WindowedValue<T>> getDStream() {
            if (this.dStream == null) {
                LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
                Iterator<Iterable<T>> it = this.values.iterator();
                while (it.hasNext()) {
                    StreamingEvaluationContext.this.setOutputRDDFromValues(StreamingEvaluationContext.this.currentTransform.getTransform(), it.next(), this.coder);
                    linkedBlockingQueue.offer(StreamingEvaluationContext.this.getOutputRDD(StreamingEvaluationContext.this.currentTransform.getTransform()));
                }
                this.dStream = StreamingEvaluationContext.this.jssc.queueStream(linkedBlockingQueue, true);
            }
            return this.dStream;
        }
    }

    public StreamingEvaluationContext(JavaSparkContext javaSparkContext, Pipeline pipeline, JavaStreamingContext javaStreamingContext, long j) {
        super(javaSparkContext, pipeline);
        this.pstreams = new LinkedHashMap();
        this.leafStreams = new LinkedHashSet();
        this.state = PipelineResult.State.RUNNING;
        this.jssc = javaStreamingContext;
        this.timeout = j;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> void setDStreamFromQueue(PTransform<?, ?> pTransform, Iterable<Iterable<T>> iterable, Coder<T> coder) {
        this.pstreams.put((PValue) getOutput(pTransform), new DStreamHolder<>(iterable, coder));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> void setStream(PTransform<?, ?> pTransform, JavaDStream<WindowedValue<T>> javaDStream) {
        PValue output = getOutput(pTransform);
        DStreamHolder<?> dStreamHolder = new DStreamHolder<>(javaDStream);
        this.pstreams.put(output, dStreamHolder);
        this.leafStreams.add(dStreamHolder);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasStream(PTransform<?, ?> pTransform) {
        return this.pstreams.containsKey(getInput(pTransform));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public JavaDStreamLike<?, ?, ?> getStream(PTransform<?, ?> pTransform) {
        return getStream((PValue) getInput(pTransform));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public JavaDStreamLike<?, ?, ?> getStream(PValue pValue) {
        DStreamHolder<?> dStreamHolder = this.pstreams.get(pValue);
        JavaDStream<WindowedValue<?>> dStream = dStreamHolder.getDStream();
        this.leafStreams.remove(dStreamHolder);
        return dStream;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> void setInputRDD(PTransform<? extends PInput, ?> pTransform, JavaRDDLike<WindowedValue<T>, ?> javaRDDLike) {
        setRDD((PValue) getInput(pTransform), javaRDDLike);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public JavaRDDLike<?, ?> getOutputRDD(PTransform<?, ?> pTransform) {
        return getRDD((PValue) getOutput(pTransform));
    }

    public JavaStreamingContext getStreamingContext() {
        return this.jssc;
    }

    @Override // org.apache.beam.runners.spark.translation.EvaluationContext
    public void computeOutputs() {
        Iterator<DStreamHolder<?>> it = this.leafStreams.iterator();
        while (it.hasNext()) {
            computeOutput(it.next());
        }
    }

    private static <T> void computeOutput(DStreamHolder<T> dStreamHolder) {
        dStreamHolder.getDStream().foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext.1
            public void call(JavaRDD<WindowedValue<T>> javaRDD) throws Exception {
                javaRDD.rdd().cache();
                javaRDD.count();
            }
        });
    }

    @Override // org.apache.beam.runners.spark.translation.EvaluationContext, org.apache.beam.runners.spark.EvaluationResult
    public void close() {
        if (this.timeout > 0) {
            this.jssc.awaitTerminationOrTimeout(this.timeout);
        } else {
            this.jssc.awaitTermination();
        }
        this.jssc.stop(false, false);
        this.state = PipelineResult.State.DONE;
        super.close();
    }

    @Override // org.apache.beam.runners.spark.translation.EvaluationContext
    public PipelineResult.State getState() {
        return this.state;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.beam.runners.spark.translation.EvaluationContext
    public <InputT extends PInput> InputT getInput(PTransform<InputT, ?> pTransform) {
        return (InputT) super.getInput(pTransform);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.beam.runners.spark.translation.EvaluationContext
    public <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> pTransform) {
        return (OutputT) super.getOutput(pTransform);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.beam.runners.spark.translation.EvaluationContext
    public JavaSparkContext getSparkContext() {
        return super.getSparkContext();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.beam.runners.spark.translation.EvaluationContext
    public SparkRuntimeContext getRuntimeContext() {
        return super.getRuntimeContext();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.beam.runners.spark.translation.EvaluationContext
    public void setCurrentTransform(AppliedPTransform<?, ?, ?> appliedPTransform) {
        super.setCurrentTransform(appliedPTransform);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.beam.runners.spark.translation.EvaluationContext
    public AppliedPTransform<?, ?, ?> getCurrentTransform() {
        return super.getCurrentTransform();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.beam.runners.spark.translation.EvaluationContext
    public <T> void setOutputRDD(PTransform<?, ?> pTransform, JavaRDDLike<WindowedValue<T>, ?> javaRDDLike) {
        super.setOutputRDD(pTransform, javaRDDLike);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.beam.runners.spark.translation.EvaluationContext
    public <T> void setOutputRDDFromValues(PTransform<?, ?> pTransform, Iterable<T> iterable, Coder<T> coder) {
        super.setOutputRDDFromValues(pTransform, iterable, coder);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.beam.runners.spark.translation.EvaluationContext
    public boolean hasOutputRDD(PTransform<? extends PInput, ?> pTransform) {
        return super.hasOutputRDD(pTransform);
    }
}
