package org.apache.beam.runners.flink.translation.wrappers.streaming.io;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
import org.apache.beam.runners.flink.p0002.p00111.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.class */
public class BoundedSourceWrapper<OutputT> extends RichParallelSourceFunction<WindowedValue<OutputT>> implements StoppableFunction {
    private static final Logger LOG = LoggerFactory.getLogger(BoundedSourceWrapper.class);
    private String stepName;
    private final SerializablePipelineOptions serializedOptions;
    private List<? extends BoundedSource<OutputT>> splitSources;
    private transient List<BoundedSource.BoundedReader<OutputT>> readers;
    private volatile boolean isRunning = true;

    public BoundedSourceWrapper(String str, PipelineOptions pipelineOptions, BoundedSource<OutputT> boundedSource, int i) throws Exception {
        this.stepName = str;
        this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
        this.splitSources = boundedSource.split(boundedSource.getEstimatedSizeBytes(pipelineOptions) / i, pipelineOptions);
    }

    public void run(SourceFunction.SourceContext<WindowedValue<OutputT>> sourceContext) throws Exception {
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.splitSources.size(); i++) {
            if (i % numberOfParallelSubtasks == indexOfThisSubtask) {
                arrayList.add(this.splitSources.get(i));
            }
        }
        LOG.info("Bounded Flink Source {}/{} is reading from sources: {}", new Object[]{Integer.valueOf(indexOfThisSubtask), Integer.valueOf(numberOfParallelSubtasks), arrayList});
        ReaderInvocationUtil readerInvocationUtil = new ReaderInvocationUtil(this.stepName, this.serializedOptions.get(), new FlinkMetricContainer(getRuntimeContext()));
        this.readers = new ArrayList();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            this.readers.add(((BoundedSource) it.next()).createReader(this.serializedOptions.get()));
        }
        if (this.readers.size() == 1) {
            BoundedSource.BoundedReader<OutputT> boundedReader = this.readers.get(0);
            if (readerInvocationUtil.invokeStart(boundedReader)) {
                emitElement(sourceContext, boundedReader);
            }
            while (this.isRunning && readerInvocationUtil.invokeAdvance(boundedReader)) {
                emitElement(sourceContext, boundedReader);
            }
        } else {
            int i2 = 0;
            for (BoundedSource.BoundedReader<OutputT> boundedReader2 : this.readers) {
                if (readerInvocationUtil.invokeStart(boundedReader2)) {
                    emitElement(sourceContext, boundedReader2);
                }
            }
            boolean z = false;
            while (this.isRunning && !this.readers.isEmpty()) {
                BoundedSource.BoundedReader<OutputT> boundedReader3 = this.readers.get(i2);
                if (!readerInvocationUtil.invokeAdvance(boundedReader3)) {
                    this.readers.remove(i2);
                    i2--;
                    if (this.readers.isEmpty()) {
                        break;
                    }
                } else {
                    emitElement(sourceContext, boundedReader3);
                    z = true;
                }
                i2 = (i2 + 1) % this.readers.size();
                if (i2 == 0 && !z) {
                    Thread.sleep(50L);
                } else if (i2 == 0) {
                    z = false;
                }
            }
        }
        sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
    }

    private void emitElement(SourceFunction.SourceContext<WindowedValue<OutputT>> sourceContext, BoundedSource.BoundedReader<OutputT> boundedReader) {
        synchronized (sourceContext.getCheckpointLock()) {
            Object current = boundedReader.getCurrent();
            Instant currentTimestamp = boundedReader.getCurrentTimestamp();
            sourceContext.collectWithTimestamp(WindowedValue.of(current, currentTimestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING), currentTimestamp.getMillis());
        }
    }

    public void close() throws Exception {
        super.close();
        if (this.readers != null) {
            Iterator<BoundedSource.BoundedReader<OutputT>> it = this.readers.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
        }
    }

    public void cancel() {
        this.isRunning = false;
    }

    public void stop() {
        this.isRunning = false;
    }

    @VisibleForTesting
    public List<? extends BoundedSource<OutputT>> getSplitSources() {
        return this.splitSources;
    }
}
