package org.apache.flink.table.runtime.arrow.sources;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.api.python.shaded.org.apache.arrow.memory.BufferAllocator;
import org.apache.flink.api.python.shaded.org.apache.arrow.vector.VectorLoader;
import org.apache.flink.api.python.shaded.org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.flink.api.python.shaded.org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.flink.api.python.shaded.org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.flink.api.python.shaded.org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.runtime.arrow.ArrowReader;
import org.apache.flink.table.runtime.arrow.ArrowUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/arrow/sources/AbstractArrowSourceFunction.class */
public abstract class AbstractArrowSourceFunction<OUT> extends RichParallelSourceFunction<OUT> implements ResultTypeQueryable<OUT>, CheckpointedFunction {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(AbstractArrowSourceFunction.class);
    final DataType dataType;
    private final byte[][] arrowData;
    private transient BufferAllocator allocator;
    private transient VectorSchemaRoot root;
    private volatile transient boolean running;
    private transient Deque<Tuple2<Integer, Integer>> indexesToEmit;
    private transient ListState<Tuple2<Integer, Integer>> checkpointedState;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractArrowSourceFunction(DataType dataType, byte[][] bArr) {
        this.dataType = (DataType) Preconditions.checkNotNull(dataType);
        this.arrowData = (byte[][]) Preconditions.checkNotNull(bArr);
    }

    public void open(Configuration configuration) throws Exception {
        this.allocator = ArrowUtils.getRootAllocator().newChildAllocator("ArrowSourceFunction", 0L, Long.MAX_VALUE);
        this.root = VectorSchemaRoot.create(ArrowUtils.toArrowSchema(this.dataType.getLogicalType()), this.allocator);
        this.running = true;
    }

    public void close() throws Exception {
        try {
            super.close();
        } finally {
            if (this.root != null) {
                this.root.close();
                this.root = null;
            }
            if (this.allocator != null) {
                this.allocator.close();
                this.allocator = null;
            }
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        Preconditions.checkState(this.checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized.");
        this.checkpointedState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("arrow-source-state", new TupleSerializer(Tuple2.class, new TypeSerializer[]{IntSerializer.INSTANCE, IntSerializer.INSTANCE})));
        this.indexesToEmit = new ArrayDeque();
        if (functionInitializationContext.isRestored()) {
            Iterator it = ((Iterable) this.checkpointedState.get()).iterator();
            while (it.hasNext()) {
                this.indexesToEmit.add((Tuple2) it.next());
            }
            LOG.info("Subtask {} restored state: {}.", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), this.indexesToEmit);
            return;
        }
        int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        int i = indexOfThisSubtask;
        while (true) {
            int i2 = i;
            if (i2 >= this.arrowData.length) {
                LOG.info("Subtask {} has no restore state, initialized with {}.", Integer.valueOf(indexOfThisSubtask), this.indexesToEmit);
                return;
            } else {
                this.indexesToEmit.add(Tuple2.of(Integer.valueOf(i2), 0));
                i = i2 + numberOfParallelSubtasks;
            }
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        Preconditions.checkState(this.checkpointedState != null, "The " + getClass().getSimpleName() + " state has not been properly initialized.");
        this.checkpointedState.clear();
        Iterator<Tuple2<Integer, Integer>> it = this.indexesToEmit.iterator();
        while (it.hasNext()) {
            this.checkpointedState.add(it.next());
        }
    }

    public void run(SourceFunction.SourceContext<OUT> sourceContext) throws Exception {
        VectorLoader vectorLoader = new VectorLoader(this.root);
        while (this.running && !this.indexesToEmit.isEmpty()) {
            Tuple2<Integer, Integer> peek = this.indexesToEmit.peek();
            ArrowRecordBatch loadBatch = loadBatch(((Integer) peek.f0).intValue());
            vectorLoader.load(loadBatch);
            loadBatch.close();
            ArrowReader<OUT> createArrowReader = createArrowReader(this.root);
            int rowCount = this.root.getRowCount();
            int intValue = ((Integer) peek.f1).intValue();
            while (intValue < rowCount) {
                OUT read = createArrowReader.read(intValue);
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(read);
                    intValue++;
                    peek.setField(Integer.valueOf(intValue), 1);
                }
            }
            synchronized (sourceContext.getCheckpointLock()) {
                this.indexesToEmit.pop();
            }
        }
    }

    public void cancel() {
        this.running = false;
    }

    abstract ArrowReader<OUT> createArrowReader(VectorSchemaRoot vectorSchemaRoot);

    private ArrowRecordBatch loadBatch(int i) throws IOException {
        return MessageSerializer.deserializeRecordBatch(new ReadChannel(Channels.newChannel(new ByteArrayInputStream(this.arrowData[i]))), this.allocator);
    }

    static {
        ArrowUtils.checkArrowUsable();
    }
}
