package org.apache.flink.table.runtime.operators.python;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.functions.python.PythonEnv;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/AbstractPythonScalarFunctionOperator.class */
public abstract class AbstractPythonScalarFunctionOperator<IN, OUT, UDFIN, UDFOUT> extends AbstractPythonFunctionOperator<IN, OUT> {
    private static final long serialVersionUID = 1;
    protected final PythonFunctionInfo[] scalarFunctions;
    protected final RowType inputType;
    protected final RowType outputType;
    protected final int[] udfInputOffsets;
    protected final int[] forwardedFields;
    protected transient RowType udfInputType;
    protected transient RowType udfOutputType;
    protected transient LinkedBlockingQueue<IN> forwardedInputQueue;
    protected transient LinkedBlockingQueue<UDFOUT> udfResultQueue;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/python/AbstractPythonScalarFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.class */
    private class ProjectUdfInputPythonScalarFunctionRunner implements PythonFunctionRunner<IN> {
        private final PythonFunctionRunner<UDFIN> pythonFunctionRunner;

        ProjectUdfInputPythonScalarFunctionRunner(PythonFunctionRunner<UDFIN> pythonFunctionRunner) {
            this.pythonFunctionRunner = pythonFunctionRunner;
        }

        @Override // org.apache.flink.python.PythonFunctionRunner
        public void open() throws Exception {
            this.pythonFunctionRunner.open();
        }

        @Override // org.apache.flink.python.PythonFunctionRunner
        public void close() throws Exception {
            this.pythonFunctionRunner.close();
        }

        @Override // org.apache.flink.python.PythonFunctionRunner
        public void startBundle() throws Exception {
            this.pythonFunctionRunner.startBundle();
        }

        @Override // org.apache.flink.python.PythonFunctionRunner
        public void finishBundle() throws Exception {
            this.pythonFunctionRunner.finishBundle();
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.flink.python.PythonFunctionRunner
        public void processElement(IN in) throws Exception {
            this.pythonFunctionRunner.processElement(AbstractPythonScalarFunctionOperator.this.getUdfInput(in));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractPythonScalarFunctionOperator(Configuration configuration, PythonFunctionInfo[] pythonFunctionInfoArr, RowType rowType, RowType rowType2, int[] iArr, int[] iArr2) {
        super(configuration);
        this.scalarFunctions = (PythonFunctionInfo[]) Preconditions.checkNotNull(pythonFunctionInfoArr);
        this.inputType = (RowType) Preconditions.checkNotNull(rowType);
        this.outputType = (RowType) Preconditions.checkNotNull(rowType2);
        this.udfInputOffsets = (int[]) Preconditions.checkNotNull(iArr);
        this.forwardedFields = (int[]) Preconditions.checkNotNull(iArr2);
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        this.forwardedInputQueue = new LinkedBlockingQueue<>();
        this.udfResultQueue = new LinkedBlockingQueue<>();
        this.udfInputType = new RowType((List) Arrays.stream(this.udfInputOffsets).mapToObj(i -> {
            return (RowType.RowField) this.inputType.getFields().get(i);
        }).collect(Collectors.toList()));
        this.udfOutputType = new RowType(this.outputType.getFields().subList(this.forwardedFields.length, this.outputType.getFieldCount()));
        super.open();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        bufferInput(streamRecord.getValue());
        super.processElement(streamRecord);
        emitResults();
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public PythonEnv getPythonEnv() {
        return this.scalarFunctions[0].getPythonFunction().getPythonEnv();
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public PythonFunctionRunner<IN> createPythonFunctionRunner() throws IOException {
        return new ProjectUdfInputPythonScalarFunctionRunner(createPythonFunctionRunner(obj -> {
            this.udfResultQueue.put(obj);
        }, createPythonEnvironmentManager()));
    }

    public abstract void bufferInput(IN in);

    public abstract UDFIN getUdfInput(IN in);

    public abstract PythonFunctionRunner<UDFIN> createPythonFunctionRunner(FnDataReceiver<UDFOUT> fnDataReceiver, PythonEnvironmentManager pythonEnvironmentManager);
}
