package org.apache.flink.streaming.api.operators.python;

import java.io.IOException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.python.PythonConfig;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.python.env.ProcessPythonEnvironmentManager;
import org.apache.flink.python.env.PythonDependencyInfo;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryReservationException;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.functions.python.PythonEnv;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.class */
public abstract class AbstractPythonFunctionOperator<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {
    private static final long serialVersionUID = 1;
    private transient PythonFunctionRunner<IN> pythonFunctionRunner;
    private transient AtomicBoolean bundleStarted;
    private transient int elementCount;
    private transient int maxBundleSize;
    private transient long maxBundleTimeMills;
    private transient long lastFinishBundleTime;
    private transient ScheduledFuture<?> checkFinishBundleTimer;
    private transient Runnable bundleFinishedCallback;
    private transient long reservedMemory;
    private final PythonConfig config;

    public AbstractPythonFunctionOperator(Configuration configuration) {
        this.config = new PythonConfig((Configuration) Preconditions.checkNotNull(configuration));
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    public void open() throws Exception {
        try {
            this.bundleStarted = new AtomicBoolean(false);
            reserveMemoryForPythonWorker();
            this.maxBundleSize = this.config.getMaxBundleSize();
            if (this.maxBundleSize <= 0) {
                this.maxBundleSize = ((Integer) PythonOptions.MAX_BUNDLE_SIZE.defaultValue()).intValue();
                LOG.error("Invalid value for the maximum bundle size. Using default value of " + this.maxBundleSize + '.');
            } else {
                LOG.info("The maximum bundle size is configured to {}.", Integer.valueOf(this.maxBundleSize));
            }
            this.maxBundleTimeMills = this.config.getMaxBundleTimeMills();
            if (this.maxBundleTimeMills <= 0) {
                this.maxBundleTimeMills = ((Long) PythonOptions.MAX_BUNDLE_TIME_MILLS.defaultValue()).longValue();
                LOG.error("Invalid value for the maximum bundle time. Using default value of " + this.maxBundleTimeMills + '.');
            } else {
                LOG.info("The maximum bundle time is configured to {} milliseconds.", Long.valueOf(this.maxBundleTimeMills));
            }
            this.pythonFunctionRunner = createPythonFunctionRunner();
            this.pythonFunctionRunner.open();
            this.elementCount = 0;
            this.lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
            long max = Math.max(this.maxBundleTimeMills, serialVersionUID);
            this.checkFinishBundleTimer = getProcessingTimeService().scheduleAtFixedRate(j -> {
                checkInvokeFinishBundleByTime();
            }, max, max);
        } finally {
            super.open();
        }
    }

    public void close() throws Exception {
        try {
            invokeFinishBundle();
        } finally {
            super.close();
        }
    }

    public void dispose() throws Exception {
        try {
            if (this.checkFinishBundleTimer != null) {
                this.checkFinishBundleTimer.cancel(true);
                this.checkFinishBundleTimer = null;
            }
            if (this.pythonFunctionRunner != null) {
                this.pythonFunctionRunner.close();
                this.pythonFunctionRunner = null;
            }
            if (this.reservedMemory > 0) {
                getContainingTask().getEnvironment().getMemoryManager().releaseMemory(this, MemoryType.OFF_HEAP, this.reservedMemory);
                this.reservedMemory = -1L;
            }
        } finally {
            super.dispose();
        }
    }

    public void endInput() throws Exception {
        invokeFinishBundle();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        checkInvokeStartBundle();
        this.pythonFunctionRunner.processElement(streamRecord.getValue());
        checkInvokeFinishBundleByCount();
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        try {
            invokeFinishBundle();
        } finally {
            super.prepareSnapshotPreBarrier(j);
        }
    }

    public void processWatermark(Watermark watermark) throws Exception {
        if (watermark.getTimestamp() == Long.MAX_VALUE) {
            invokeFinishBundle();
            super.processWatermark(watermark);
        } else if (this.bundleStarted.get()) {
            this.bundleFinishedCallback = () -> {
                try {
                    super.processWatermark(watermark);
                } catch (Exception e) {
                    throw new RuntimeException("Failed to process watermark after finished bundle.", e);
                }
            };
        } else {
            super.processWatermark(watermark);
        }
    }

    public abstract PythonFunctionRunner<IN> createPythonFunctionRunner() throws Exception;

    public abstract PythonEnv getPythonEnv();

    public abstract void emitResults();

    private void reserveMemoryForPythonWorker() throws MemoryReservationException {
        long bytes = MemorySize.parse(this.config.getPythonFrameworkMemorySize()).add(MemorySize.parse(this.config.getPythonDataBufferMemorySize())).getBytes();
        MemoryManager memoryManager = getContainingTask().getEnvironment().getMemoryManager();
        long computeMemorySize = memoryManager.computeMemorySize(getOperatorConfig().getManagedMemoryFraction());
        if (bytes > computeMemorySize) {
            LOG.warn("Required Python worker memory {} exceeds the available managed off-heap memory {}. Skipping reserving off-heap memory from the MemoryManager. This does not affect the functionality. However, it may affect the stability of a job as the memory used by the Python worker is not managed by Flink.", Long.valueOf(bytes), Long.valueOf(computeMemorySize));
            this.reservedMemory = -1L;
        } else {
            memoryManager.reserveMemory(this, MemoryType.OFF_HEAP, bytes);
            LOG.info("Reserved memory {} for Python worker.", Long.valueOf(bytes));
            this.reservedMemory = bytes;
        }
    }

    private void checkInvokeStartBundle() throws Exception {
        if (this.bundleStarted.compareAndSet(false, true)) {
            this.pythonFunctionRunner.startBundle();
        }
    }

    private void checkInvokeFinishBundleByCount() throws Exception {
        this.elementCount++;
        if (this.elementCount >= this.maxBundleSize) {
            invokeFinishBundle();
        }
    }

    private void checkInvokeFinishBundleByTime() throws Exception {
        if (getProcessingTimeService().getCurrentProcessingTime() - this.lastFinishBundleTime >= this.maxBundleTimeMills) {
            invokeFinishBundle();
        }
    }

    private void invokeFinishBundle() throws Exception {
        if (this.bundleStarted.compareAndSet(true, false)) {
            this.pythonFunctionRunner.finishBundle();
            emitResults();
            this.elementCount = 0;
            this.lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
            if (this.bundleFinishedCallback != null) {
                this.bundleFinishedCallback.run();
                this.bundleFinishedCallback = null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PythonEnvironmentManager createPythonEnvironmentManager() throws IOException {
        PythonDependencyInfo create = PythonDependencyInfo.create(this.config, getRuntimeContext().getDistributedCache());
        PythonEnv pythonEnv = getPythonEnv();
        if (pythonEnv.getExecType() == PythonEnv.ExecType.PROCESS) {
            return new ProcessPythonEnvironmentManager(create, getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(), System.getenv());
        }
        throw new UnsupportedOperationException(String.format("Execution type '%s' is not supported.", pythonEnv.getExecType()));
    }
}
