package org.apache.flink.table.runtime.runners.python;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.core.construction.ModelCoders;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.ImmutableExecutableStage;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.AbstractPythonFunctionRunner;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/runners/python/AbstractPythonStatelessFunctionRunner.class */
public abstract class AbstractPythonStatelessFunctionRunner<IN> extends AbstractPythonFunctionRunner<IN> {
    private static final String INPUT_ID = "input";
    private static final String OUTPUT_ID = "output";
    private static final String TRANSFORM_ID = "transform";
    private static final String MAIN_INPUT_NAME = "input";
    private static final String MAIN_OUTPUT_NAME = "output";
    private static final String INPUT_CODER_ID = "input_coder";
    private static final String OUTPUT_CODER_ID = "output_coder";
    private static final String WINDOW_CODER_ID = "window_coder";
    private static final String WINDOW_STRATEGY = "windowing_strategy";
    private final String functionUrn;
    private final RowType inputType;
    private final RowType outputType;

    public AbstractPythonStatelessFunctionRunner(String str, FnDataReceiver<byte[]> fnDataReceiver, PythonEnvironmentManager pythonEnvironmentManager, RowType rowType, RowType rowType2, String str2, Map<String, String> map, FlinkMetricContainer flinkMetricContainer) {
        super(str, fnDataReceiver, pythonEnvironmentManager, StateRequestHandler.unsupported(), map, flinkMetricContainer);
        this.functionUrn = str2;
        this.inputType = (RowType) Preconditions.checkNotNull(rowType);
        this.outputType = (RowType) Preconditions.checkNotNull(rowType2);
    }

    @Override // org.apache.flink.python.AbstractPythonFunctionRunner
    public ExecutableStage createExecutableStage() throws Exception {
        RunnerApi.Components build = RunnerApi.Components.newBuilder().putPcollections("input", RunnerApi.PCollection.newBuilder().setWindowingStrategyId(WINDOW_STRATEGY).setCoderId(INPUT_CODER_ID).build()).putPcollections("output", RunnerApi.PCollection.newBuilder().setWindowingStrategyId(WINDOW_STRATEGY).setCoderId(OUTPUT_CODER_ID).build()).putTransforms(TRANSFORM_ID, RunnerApi.PTransform.newBuilder().setUniqueName(TRANSFORM_ID).setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(this.functionUrn).setPayload(ByteString.copyFrom(getUserDefinedFunctionsProto().toByteArray())).build()).putInputs("input", "input").putOutputs("output", "output").build()).putWindowingStrategies(WINDOW_STRATEGY, RunnerApi.WindowingStrategy.newBuilder().setWindowCoderId(WINDOW_CODER_ID).build()).putCoders(INPUT_CODER_ID, getInputCoderProto()).putCoders(OUTPUT_CODER_ID, getOutputCoderProto()).putCoders(WINDOW_CODER_ID, getWindowCoderProto()).build();
        return ImmutableExecutableStage.of(build, createPythonExecutionEnvironment(), PipelineNode.pCollection("input", build.getPcollectionsOrThrow("input")), Collections.EMPTY_LIST, Collections.EMPTY_LIST, Collections.EMPTY_LIST, Collections.singletonList(PipelineNode.pTransform(TRANSFORM_ID, build.getTransformsOrThrow(TRANSFORM_ID))), Collections.singletonList(PipelineNode.pCollection("output", build.getPcollectionsOrThrow("output"))), createValueOnlyWireCoderSetting());
    }

    public FlinkFnApi.UserDefinedFunction getUserDefinedFunctionProto(PythonFunctionInfo pythonFunctionInfo) {
        FlinkFnApi.UserDefinedFunction.Builder newBuilder = FlinkFnApi.UserDefinedFunction.newBuilder();
        newBuilder.setPayload(org.apache.flink.api.python.shaded.com.google.protobuf.ByteString.copyFrom(pythonFunctionInfo.getPythonFunction().getSerializedPythonFunction()));
        for (Object obj : pythonFunctionInfo.getInputs()) {
            FlinkFnApi.UserDefinedFunction.Input.Builder newBuilder2 = FlinkFnApi.UserDefinedFunction.Input.newBuilder();
            if (obj instanceof PythonFunctionInfo) {
                newBuilder2.setUdf(getUserDefinedFunctionProto((PythonFunctionInfo) obj));
            } else if (obj instanceof Integer) {
                newBuilder2.setInputOffset(((Integer) obj).intValue());
            } else {
                newBuilder2.setInputConstant(org.apache.flink.api.python.shaded.com.google.protobuf.ByteString.copyFrom((byte[]) obj));
            }
            newBuilder.addInputs(newBuilder2);
        }
        return newBuilder.build();
    }

    private RunnerApi.WireCoderSetting createValueOnlyWireCoderSetting() throws IOException {
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(new byte[0]);
        WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(ByteArrayCoder.of(), GlobalWindow.Coder.INSTANCE);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        of.encode(valueInGlobalWindow, (OutputStream) byteArrayOutputStream);
        return RunnerApi.WireCoderSetting.newBuilder().setUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE)).setPayload(ByteString.copyFrom(byteArrayOutputStream.toByteArray())).build();
    }

    public RowType getInputType() {
        return this.inputType;
    }

    public RowType getOutputType() {
        return this.outputType;
    }

    private RunnerApi.Coder getInputCoderProto() {
        return getRowCoderProto(this.inputType);
    }

    private RunnerApi.Coder getOutputCoderProto() {
        return getRowCoderProto(this.outputType);
    }

    private RunnerApi.Coder getRowCoderProto(RowType rowType) {
        return RunnerApi.Coder.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(getInputOutputCoderUrn()).setPayload(ByteString.copyFrom(toProtoType(rowType).getRowSchema().toByteArray())).build()).build();
    }

    private FlinkFnApi.Schema.FieldType toProtoType(LogicalType logicalType) {
        return (FlinkFnApi.Schema.FieldType) logicalType.accept(new PythonTypeUtils.LogicalTypeToProtoTypeConverter());
    }

    public abstract String getInputOutputCoderUrn();

    private RunnerApi.Coder getWindowCoderProto() {
        return RunnerApi.Coder.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.GLOBAL_WINDOW_CODER_URN).build()).build();
    }

    @VisibleForTesting
    public abstract FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto();
}
