package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamRecordWriter.class */
public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
    private static final String DEFAULT_OUTPUT_FLUSH_THREAD_NAME = "OutputFlusher";
    private final StreamRecordWriter<T>.OutputFlusher outputFlusher;
    private final boolean flushAlways;
    private Throwable flusherException;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamRecordWriter$OutputFlusher.class */
    private class OutputFlusher extends Thread {
        private final long timeout;
        private volatile boolean running;

        OutputFlusher(String str, long j) {
            super(str);
            this.running = true;
            setDaemon(true);
            this.timeout = j;
        }

        public void terminate() {
            this.running = false;
            interrupt();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.running) {
                try {
                    try {
                        Thread.sleep(this.timeout);
                    } catch (InterruptedException e) {
                        if (this.running) {
                            throw new Exception(e);
                        }
                    }
                    StreamRecordWriter.this.flush();
                } catch (Throwable th) {
                    StreamRecordWriter.this.notifyFlusherException(th);
                    return;
                }
            }
        }
    }

    public StreamRecordWriter(ResultPartitionWriter resultPartitionWriter, ChannelSelector<T> channelSelector, long j) {
        this(resultPartitionWriter, channelSelector, j, null);
    }

    public StreamRecordWriter(ResultPartitionWriter resultPartitionWriter, ChannelSelector<T> channelSelector, long j, String str) {
        super(resultPartitionWriter, channelSelector);
        Preconditions.checkArgument(j >= -1);
        if (j == -1) {
            this.flushAlways = false;
            this.outputFlusher = null;
        } else if (j == 0) {
            this.flushAlways = true;
            this.outputFlusher = null;
        } else {
            this.flushAlways = false;
            this.outputFlusher = new OutputFlusher(str == null ? DEFAULT_OUTPUT_FLUSH_THREAD_NAME : "Output Timeout Flusher - " + str, j);
            this.outputFlusher.start();
        }
    }

    public void emit(T t) throws IOException, InterruptedException {
        checkErroneous();
        super.emit(t);
        if (this.flushAlways) {
            flush();
        }
    }

    public void broadcastEmit(T t) throws IOException, InterruptedException {
        checkErroneous();
        super.broadcastEmit(t);
        if (this.flushAlways) {
            flush();
        }
    }

    public void close() {
        if (this.outputFlusher != null) {
            this.outputFlusher.terminate();
            try {
                this.outputFlusher.join();
            } catch (InterruptedException e) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyFlusherException(Throwable th) {
        if (this.flusherException == null) {
            this.flusherException = th;
        }
    }

    private void checkErroneous() throws IOException {
        if (this.flusherException != null) {
            throw new IOException("An exception happened while flushing the outputs", this.flusherException);
        }
    }
}
