package org.apache.flink.runtime.operators;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.MutableReader;
import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
import org.apache.flink.runtime.operators.sort.ExternalSorter;
import org.apache.flink.runtime.operators.util.CloseableInputProvider;
import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
import org.apache.flink.runtime.operators.util.ReaderIterator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/DataSinkTask.class */
public class DataSinkTask<IT> extends AbstractInvokable {
    private static final Logger LOG = LoggerFactory.getLogger(DataSinkTask.class);
    private volatile OutputFormat<IT> format;
    private MutableReader<?> inputReader;
    private MutableObjectIterator<IT> reader;
    private TypeSerializerFactory<IT> inputTypeSerializerFactory;
    private CloseableInputProvider<IT> localStrategy;
    private TaskConfig config;
    private volatile boolean taskCanceled;
    private volatile boolean cleanupCalled;

    public DataSinkTask(Environment environment) {
        super(environment);
    }

    @Override // org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
    public void invoke() throws Exception {
        Counter simpleCounter;
        MutableObjectIterator iterator;
        TypeSerializer serializer;
        MutableObjectIterator mutableObjectIterator;
        OutputFormat<IT> outputFormat;
        Object next;
        LOG.debug(getLogString("Start registering input and output"));
        initOutputFormat();
        try {
            initInputReaders();
            LOG.debug(getLogString("Finished registering input and output"));
            LOG.debug(getLogString("Starting data sink operator"));
            DistributedRuntimeUDFContext createRuntimeContext = createRuntimeContext();
            try {
                OperatorIOMetricGroup iOMetricGroup = ((OperatorMetricGroup) createRuntimeContext.getMetricGroup()).getIOMetricGroup();
                iOMetricGroup.reuseInputMetricsForTask();
                iOMetricGroup.reuseOutputMetricsForTask();
                simpleCounter = iOMetricGroup.getNumRecordsInCounter();
            } catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", e);
                simpleCounter = new SimpleCounter();
            }
            Counter counter = simpleCounter;
            if (RichOutputFormat.class.isAssignableFrom(this.format.getClass())) {
                this.format.setRuntimeContext(createRuntimeContext);
                LOG.debug(getLogString("Rich Sink detected. Initializing runtime context."));
            }
            boolean isObjectReuseEnabled = getExecutionConfig().isObjectReuseEnabled();
            try {
                try {
                    switch (this.config.getInputLocalStrategy(0)) {
                        case NONE:
                            this.localStrategy = null;
                            iterator = this.reader;
                            break;
                        case SORT:
                            try {
                                TypeComparatorFactory inputComparator = this.config.getInputComparator(0, getUserCodeClassLoader());
                                if (inputComparator != null) {
                                    ExternalSorter build = ExternalSorter.newBuilder(getEnvironment().getMemoryManager(), this, this.inputTypeSerializerFactory.getSerializer(), inputComparator.createComparator()).maxNumFileHandles(this.config.getFilehandlesInput(0)).enableSpilling(getEnvironment().getIOManager(), this.config.getSpillingThresholdInput(0)).memoryFraction(this.config.getRelativeMemoryInput(0)).objectReuse(getExecutionConfig().isObjectReuseEnabled()).largeRecords(this.config.getUseLargeRecordHandler()).build(this.reader);
                                    this.localStrategy = build;
                                    iterator = build.getIterator();
                                    break;
                                } else {
                                    throw new Exception("Missing comparator factory for local strategy on input 0");
                                }
                            } catch (Exception e2) {
                                throw new RuntimeException("Initializing the input processing failed" + (e2.getMessage() == null ? ScopeFormat.SCOPE_SEPARATOR : ": " + e2.getMessage()), e2);
                            }
                        default:
                            throw new RuntimeException("Invalid local strategy for DataSinkTask");
                    }
                    serializer = this.inputTypeSerializerFactory.getSerializer();
                    mutableObjectIterator = iterator;
                    outputFormat = this.format;
                } catch (Throwable th) {
                    if (this.format != null) {
                        try {
                            this.format.close();
                        } catch (Throwable th2) {
                            if (LOG.isWarnEnabled()) {
                                LOG.warn(getLogString("Error closing the output format"), th2);
                            }
                        }
                    }
                    if (this.localStrategy != null) {
                        try {
                            this.localStrategy.close();
                        } catch (Throwable th3) {
                            LOG.error("Error closing local strategy", th3);
                        }
                    }
                    BatchTask.clearReaders(new MutableReader[]{this.inputReader});
                    throw th;
                }
            } catch (Exception e3) {
                try {
                    if (!this.cleanupCalled && (this.format instanceof CleanupWhenUnsuccessful)) {
                        this.cleanupCalled = true;
                        this.format.tryCleanupOnError();
                    }
                } catch (Throwable th4) {
                    LOG.error("Cleanup on error failed.", th4);
                }
                Exception exceptionUnwrap = ExceptionInChainedStubException.exceptionUnwrap(e3);
                if (exceptionUnwrap instanceof CancelTaskException) {
                    throw exceptionUnwrap;
                }
                if (!this.taskCanceled) {
                    if (LOG.isErrorEnabled()) {
                        LOG.error(getLogString("Error in user code: " + exceptionUnwrap.getMessage()), exceptionUnwrap);
                    }
                    throw exceptionUnwrap;
                }
                if (this.format != null) {
                    try {
                        this.format.close();
                    } catch (Throwable th5) {
                        if (LOG.isWarnEnabled()) {
                            LOG.warn(getLogString("Error closing the output format"), th5);
                        }
                    }
                }
                if (this.localStrategy != null) {
                    try {
                        this.localStrategy.close();
                    } catch (Throwable th6) {
                        LOG.error("Error closing local strategy", th6);
                    }
                }
                BatchTask.clearReaders(new MutableReader[]{this.inputReader});
            }
            if (this.taskCanceled) {
                if (this.format != null) {
                    try {
                        this.format.close();
                    } catch (Throwable th7) {
                        if (LOG.isWarnEnabled()) {
                            LOG.warn(getLogString("Error closing the output format"), th7);
                        }
                    }
                }
                if (this.localStrategy != null) {
                    try {
                        this.localStrategy.close();
                    } catch (Throwable th8) {
                        LOG.error("Error closing local strategy", th8);
                    }
                }
                BatchTask.clearReaders(new MutableReader[]{this.inputReader});
                return;
            }
            LOG.debug(getLogString("Starting to produce output"));
            outputFormat.open(getEnvironment().getTaskInfo().getIndexOfThisSubtask(), getEnvironment().getTaskInfo().getNumberOfParallelSubtasks());
            if (isObjectReuseEnabled) {
                Object createInstance = serializer.createInstance();
                while (!this.taskCanceled) {
                    Object next2 = mutableObjectIterator.next(createInstance);
                    createInstance = next2;
                    if (next2 != null) {
                        counter.inc();
                        outputFormat.writeRecord(createInstance);
                    }
                }
            } else {
                while (!this.taskCanceled && (next = mutableObjectIterator.next()) != null) {
                    counter.inc();
                    outputFormat.writeRecord(next);
                }
            }
            if (!this.taskCanceled) {
                this.format.close();
                this.format = null;
            }
            if (this.format != null) {
                try {
                    this.format.close();
                } catch (Throwable th9) {
                    if (LOG.isWarnEnabled()) {
                        LOG.warn(getLogString("Error closing the output format"), th9);
                    }
                }
            }
            if (this.localStrategy != null) {
                try {
                    this.localStrategy.close();
                } catch (Throwable th10) {
                    LOG.error("Error closing local strategy", th10);
                }
            }
            BatchTask.clearReaders(new MutableReader[]{this.inputReader});
            if (this.taskCanceled) {
                LOG.debug(getLogString("Data sink operator cancelled"));
            } else {
                LOG.debug(getLogString("Finished data sink operator"));
            }
        } catch (Exception e4) {
            throw new RuntimeException("Initializing the input streams failed" + (e4.getMessage() == null ? ScopeFormat.SCOPE_SEPARATOR : ": " + e4.getMessage()), e4);
        }
    }

    @Override // org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
    public void cancel() throws Exception {
        this.taskCanceled = true;
        CleanupWhenUnsuccessful cleanupWhenUnsuccessful = this.format;
        if (cleanupWhenUnsuccessful != null) {
            try {
                this.format.close();
            } catch (Throwable th) {
            }
            try {
                if (!this.cleanupCalled && (cleanupWhenUnsuccessful instanceof CleanupWhenUnsuccessful)) {
                    this.cleanupCalled = true;
                    cleanupWhenUnsuccessful.tryCleanupOnError();
                }
            } catch (Throwable th2) {
                LOG.error("Cleanup on error failed.", th2);
            }
        }
        LOG.debug(getLogString("Cancelling data sink operator"));
    }

    private void initOutputFormat() {
        ClassLoader userCodeClassLoader = getUserCodeClassLoader();
        this.config = new TaskConfig(getTaskConfiguration());
        InputOutputFormatContainer inputOutputFormatContainer = new InputOutputFormatContainer(this.config, userCodeClassLoader);
        try {
            Pair<OperatorID, OutputFormat<IT>> uniqueOutputFormat = inputOutputFormatContainer.getUniqueOutputFormat();
            this.format = (OutputFormat) uniqueOutputFormat.getValue();
            if (!OutputFormat.class.isAssignableFrom(this.format.getClass())) {
                throw new RuntimeException("The class '" + this.format.getClass().getName() + "' is not a subclass of '" + OutputFormat.class.getName() + "' as is required.");
            }
            Thread currentThread = Thread.currentThread();
            ClassLoader contextClassLoader = currentThread.getContextClassLoader();
            try {
                try {
                    currentThread.setContextClassLoader(userCodeClassLoader);
                    this.format.configure(inputOutputFormatContainer.getParameters((OperatorID) uniqueOutputFormat.getKey()));
                    currentThread.setContextClassLoader(contextClassLoader);
                } catch (Throwable th) {
                    throw new RuntimeException("The user defined 'configure()' method in the Output Format caused an error: " + th.getMessage(), th);
                }
            } catch (Throwable th2) {
                currentThread.setContextClassLoader(contextClassLoader);
                throw th2;
            }
        } catch (ClassCastException e) {
            throw new RuntimeException("The stub class is not a proper subclass of " + OutputFormat.class.getName(), e);
        }
    }

    private void initInputReaders() throws Exception {
        int groupSize = this.config.getGroupSize(0);
        int i = 0 + groupSize;
        if (groupSize == 1) {
            this.inputReader = new MutableRecordReader(getEnvironment().getInputGate(0), getEnvironment().getTaskManagerInfo().getTmpDirectories());
        } else {
            if (groupSize <= 1) {
                throw new Exception("Illegal input group size in task configuration: " + groupSize);
            }
            this.inputReader = new MutableRecordReader(new UnionInputGate(getEnvironment().getAllInputGates()), getEnvironment().getTaskManagerInfo().getTmpDirectories());
        }
        this.inputTypeSerializerFactory = this.config.getInputSerializer(0, getUserCodeClassLoader());
        this.reader = new ReaderIterator(this.inputReader, this.inputTypeSerializerFactory.getSerializer());
        if (i != this.config.getNumInputs()) {
            throw new Exception("Illegal configuration: Number of input gates and group sizes are not consistent.");
        }
    }

    private String getLogString(String str) {
        return BatchTask.constructLogString(str, getEnvironment().getTaskInfo().getTaskName(), this);
    }

    public DistributedRuntimeUDFContext createRuntimeContext() {
        Environment environment = getEnvironment();
        return new DistributedRuntimeUDFContext(environment.getTaskInfo(), environment.getUserCodeClassLoader(), getExecutionConfig(), environment.getDistributedCacheEntries(), environment.getAccumulatorRegistry().getUserMap(), getEnvironment().getMetricGroup().getOrAddOperator(getEnvironment().getTaskInfo().getTaskName()), environment.getExternalResourceInfoProvider(), environment.getJobID());
    }
}
