package org.apache.flink.runtime.checkpoint.channel;

import java.io.Closeable;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.checkpoint.channel.RefCountingFSDataInputStream;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.state.AbstractChannelStateHandle;
import org.apache.flink.util.Preconditions;

@NotThreadSafe
/* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStateStreamReader.class */
class ChannelStateStreamReader implements Closeable {
    private final RefCountingFSDataInputStream stream;
    private final ChannelStateSerializer serializer;
    private final Queue<Long> offsets;
    private int remainingBytes;
    private boolean closed;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelStateStreamReader(AbstractChannelStateHandle<?> abstractChannelStateHandle, RefCountingFSDataInputStream.RefCountingFSDataInputStreamFactory refCountingFSDataInputStreamFactory) {
        this(refCountingFSDataInputStreamFactory.getOrCreate(abstractChannelStateHandle), abstractChannelStateHandle.getOffsets(), refCountingFSDataInputStreamFactory.getSerializer());
    }

    private ChannelStateStreamReader(RefCountingFSDataInputStream refCountingFSDataInputStream, List<Long> list, ChannelStateSerializer channelStateSerializer) {
        this.remainingBytes = -1;
        this.closed = false;
        this.stream = refCountingFSDataInputStream;
        this.stream.incRef();
        this.serializer = channelStateSerializer;
        this.offsets = new LinkedList(list);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelStateReader.ReadResult readInto(Buffer buffer) throws IOException {
        return readInto(ChannelStateByteBuffer.wrap(buffer));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelStateReader.ReadResult readInto(BufferBuilder bufferBuilder) throws IOException {
        return readInto(ChannelStateByteBuffer.wrap(bufferBuilder));
    }

    private ChannelStateReader.ReadResult readInto(ChannelStateByteBuffer channelStateByteBuffer) throws IOException {
        Preconditions.checkState(!this.closed, "reader is closed");
        readWhilePossible(channelStateByteBuffer);
        if (haveMoreData()) {
            return ChannelStateReader.ReadResult.HAS_MORE_DATA;
        }
        this.closed = true;
        this.stream.decRef();
        return ChannelStateReader.ReadResult.NO_MORE_DATA;
    }

    private void readWhilePossible(ChannelStateByteBuffer channelStateByteBuffer) throws IOException {
        while (haveMoreData() && channelStateByteBuffer.isWritable()) {
            if (this.remainingBytes <= 0) {
                advanceOffset();
            }
            this.remainingBytes -= this.serializer.readData(this.stream, channelStateByteBuffer, this.remainingBytes);
        }
    }

    private boolean haveMoreData() {
        return this.remainingBytes > 0 || !this.offsets.isEmpty();
    }

    private void advanceOffset() throws IOException {
        this.stream.seek(this.offsets.poll().longValue());
        this.remainingBytes = this.serializer.readLength(this.stream);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.stream.close();
    }
}
