package org.apache.flink.python;

import java.io.File;
import java.util.List;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/python/AbstractPythonFunctionRunner.class */
public abstract class AbstractPythonFunctionRunner<IN, OUT> implements PythonFunctionRunner<IN> {
    protected static final Logger LOG = LoggerFactory.getLogger(AbstractPythonFunctionRunner.class);
    private static final String MAIN_INPUT_ID = "input";
    private final String taskName;
    private final FnDataReceiver<OUT> resultReceiver;
    private final PythonEnvironmentManager environmentManager;
    private transient JobBundleFactory jobBundleFactory;
    private transient StageBundleFactory stageBundleFactory;
    private final StateRequestHandler stateRequestHandler;
    private transient BundleProgressHandler progressHandler;
    private transient RemoteBundle remoteBundle;
    private transient FnDataReceiver<WindowedValue<?>> mainInputReceiver;
    private transient TypeSerializer<IN> inputTypeSerializer;
    private transient TypeSerializer<OUT> outputTypeSerializer;
    private transient ByteArrayInputStreamWithPos bais;
    private transient DataInputViewStreamWrapper baisWrapper;
    private transient ByteArrayOutputStreamWithPos baos;
    private transient DataOutputViewStreamWrapper baosWrapper;
    private transient List<File> pythonInternalLibs;

    public AbstractPythonFunctionRunner(String str, FnDataReceiver<OUT> fnDataReceiver, PythonEnvironmentManager pythonEnvironmentManager, StateRequestHandler stateRequestHandler) {
        this.taskName = (String) Preconditions.checkNotNull(str);
        this.resultReceiver = (FnDataReceiver) Preconditions.checkNotNull(fnDataReceiver);
        this.environmentManager = (PythonEnvironmentManager) Preconditions.checkNotNull(pythonEnvironmentManager);
        this.stateRequestHandler = (StateRequestHandler) Preconditions.checkNotNull(stateRequestHandler);
    }

    @Override // org.apache.flink.python.PythonFunctionRunner
    public void open() throws Exception {
        this.bais = new ByteArrayInputStreamWithPos();
        this.baisWrapper = new DataInputViewStreamWrapper(this.bais);
        this.baos = new ByteArrayOutputStreamWithPos();
        this.baosWrapper = new DataOutputViewStreamWrapper(this.baos);
        this.inputTypeSerializer = mo2927getInputTypeSerializer();
        this.outputTypeSerializer = mo2926getOutputTypeSerializer();
        this.environmentManager.open();
        PortablePipelineOptions portablePipelineOptions = (PortablePipelineOptions) PipelineOptionsFactory.as(PortablePipelineOptions.class);
        portablePipelineOptions.setSdkWorkerParallelism(1L);
        this.jobBundleFactory = createJobBundleFactory(PipelineOptionsTranslation.toProto(portablePipelineOptions));
        this.stageBundleFactory = createStageBundleFactory();
        this.progressHandler = BundleProgressHandler.ignored();
    }

    private StageBundleFactory createStageBundleFactory() throws Exception {
        try {
            return this.jobBundleFactory.forStage(createExecutableStage());
        } catch (Throwable th) {
            throw new RuntimeException(this.environmentManager.getBootLog(), th);
        }
    }

    @Override // org.apache.flink.python.PythonFunctionRunner
    public void close() throws Exception {
        try {
            if (this.pythonInternalLibs != null) {
                this.pythonInternalLibs.forEach((v0) -> {
                    v0.delete();
                });
            }
            try {
                if (this.jobBundleFactory != null) {
                    this.jobBundleFactory.close();
                }
                this.environmentManager.close();
            } finally {
                this.jobBundleFactory = null;
            }
        } finally {
            this.pythonInternalLibs = null;
        }
    }

    @Override // org.apache.flink.python.PythonFunctionRunner
    public void startBundle() {
        try {
            this.remoteBundle = this.stageBundleFactory.getBundle(new OutputReceiverFactory() { // from class: org.apache.flink.python.AbstractPythonFunctionRunner.1
                @Override // org.apache.beam.runners.fnexecution.control.OutputReceiverFactory
                public FnDataReceiver<WindowedValue<byte[]>> create(String str) {
                    return windowedValue -> {
                        AbstractPythonFunctionRunner.this.bais.setBuffer((byte[]) windowedValue.getValue(), 0, ((byte[]) windowedValue.getValue()).length);
                        AbstractPythonFunctionRunner.this.resultReceiver.accept(AbstractPythonFunctionRunner.this.outputTypeSerializer.deserialize(AbstractPythonFunctionRunner.this.baisWrapper));
                    };
                }
            }, this.stateRequestHandler, this.progressHandler);
            this.mainInputReceiver = (FnDataReceiver) Preconditions.checkNotNull(this.remoteBundle.getInputReceivers().get(MAIN_INPUT_ID), "Failed to retrieve main input receiver.");
        } catch (Throwable th) {
            throw new RuntimeException("Failed to start remote bundle", th);
        }
    }

    @Override // org.apache.flink.python.PythonFunctionRunner
    public void finishBundle() {
        RuntimeException runtimeException;
        try {
            try {
                this.remoteBundle.close();
                this.remoteBundle = null;
            } finally {
            }
        } catch (Throwable th) {
            this.remoteBundle = null;
            throw th;
        }
    }

    @Override // org.apache.flink.python.PythonFunctionRunner
    public void processElement(IN in) {
        try {
            this.baos.reset();
            this.inputTypeSerializer.serialize(in, this.baosWrapper);
            this.mainInputReceiver.accept(WindowedValue.valueInGlobalWindow(this.baos.toByteArray()));
        } catch (Throwable th) {
            throw new RuntimeException("Failed to process element when sending data to Python SDK harness.", th);
        }
    }

    @VisibleForTesting
    public JobBundleFactory createJobBundleFactory(Struct struct) throws Exception {
        return DefaultJobBundleFactory.create(JobInfo.create(this.taskName, this.taskName, this.environmentManager.createRetrievalToken(), struct));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RunnerApi.Environment createPythonExecutionEnvironment() throws Exception {
        return this.environmentManager.createEnvironment();
    }

    public abstract ExecutableStage createExecutableStage() throws Exception;

    /* renamed from: getInputTypeSerializer */
    public abstract TypeSerializer<IN> mo2927getInputTypeSerializer();

    /* renamed from: getOutputTypeSerializer */
    public abstract TypeSerializer<OUT> mo2926getOutputTypeSerializer();
}
