package com.datastax.bdp.fs.pipes;

import com.datastax.bdp.fs.exec.SerialExecutionContext;
import com.datastax.bdp.fs.exec.SerialExecutionContextProvider;
import com.datastax.bdp.fs.shaded.io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import scala.MatchError;
import scala.Predef$;
import scala.StringContext;
import scala.concurrent.Await$;
import scala.concurrent.Future$;
import scala.concurrent.duration.Duration;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: DataSourceInputStream.scala */
@ScalaSignature(bytes = "\u0006\u0001\u00055f\u0001B\u0001\u0003\u00015\u0011Q\u0003R1uCN{WO]2f\u0013:\u0004X\u000f^*ue\u0016\fWN\u0003\u0002\u0004\t\u0005)\u0001/\u001b9fg*\u0011QAB\u0001\u0003MNT!a\u0002\u0005\u0002\u0007\t$\u0007O\u0003\u0002\n\u0015\u0005AA-\u0019;bgR\f\u0007PC\u0001\f\u0003\r\u0019w.\\\u0002\u0001'\t\u0001a\u0002\u0005\u0002\u0010)5\t\u0001C\u0003\u0002\u0012%\u0005\u0011\u0011n\u001c\u0006\u0002'\u0005!!.\u0019<b\u0013\t)\u0002CA\u0006J]B,Ho\u0015;sK\u0006l\u0007\u0002C\f\u0001\u0005\u0003\u0005\u000b\u0011\u0002\r\u0002\u0015\u0011\fG/Y*pkJ\u001cW\r\u0005\u0002\u001a55\t!!\u0003\u0002\u001c\u0005\tQA)\u0019;b'>,(oY3\t\u0011u\u0001!\u0011!Q\u0001\ny\tq\u0001^5nK>,H\u000f\u0005\u0002 M5\t\u0001E\u0003\u0002\"E\u0005AA-\u001e:bi&|gN\u0003\u0002$I\u0005Q1m\u001c8dkJ\u0014XM\u001c;\u000b\u0003\u0015\nQa]2bY\u0006L!a\n\u0011\u0003\u0011\u0011+(/\u0019;j_:D\u0001\"\u000b\u0001\u0003\u0002\u0003\u0006IAK\u0001\fEV4g-\u001a:D_VtG\u000f\u0005\u0002,Y5\tA%\u0003\u0002.I\t\u0019\u0011J\u001c;\t\u000b=\u0002A\u0011\u0001\u0019\u0002\rqJg.\u001b;?)\u0011\t$g\r\u001b\u0011\u0005e\u0001\u0001\"B\f/\u0001\u0004A\u0002bB\u000f/!\u0003\u0005\rA\b\u0005\bS9\u0002\n\u00111\u0001+\u0011\u001d1\u0004A1A\u0005\n]\nQ!];fk\u0016,\u0012\u0001\u000f\t\u0004suzT\"\u0001\u001e\u000b\u0005\rZ$B\u0001\u001f\u0013\u0003\u0011)H/\u001b7\n\u0005yR$a\u0005'j].,GM\u00117pG.LgnZ)vKV,\u0007c\u0001!C\t6\t\u0011I\u0003\u0002=I%\u00111)\u0011\u0002\u0004)JL\bCA#L\u001b\u00051%BA$I\u0003\u0019\u0011WO\u001a4fe*\u0011\u0011JS\u0001\u0006]\u0016$H/\u001f\u0006\u0002#%\u0011AJ\u0012\u0002\b\u0005f$XMQ;g\u0011\u0019q\u0005\u0001)A\u0005q\u00051\u0011/^3vK\u0002B\u0011b\u0012\u0001A\u0002\u0003\u0007I\u0011\u0002)\u0016\u0003\u0011C\u0011B\u0015\u0001A\u0002\u0003\u0007I\u0011B*\u0002\u0015\t,hMZ3s?\u0012*\u0017\u000f\u0006\u0002U/B\u00111&V\u0005\u0003-\u0012\u0012A!\u00168ji\"9\u0001,UA\u0001\u0002\u0004!\u0015a\u0001=%c!1!\f\u0001Q!\n\u0011\u000bqAY;gM\u0016\u0014\b\u0005C\u0004]\u0001\u0001\u0007I\u0011B/\u0002\r\rdwn]3e+\u0005q\u0006CA\u0016`\u0013\t\u0001GEA\u0004C_>dW-\u00198\t\u000f\t\u0004\u0001\u0019!C\u0005G\u0006Q1\r\\8tK\u0012|F%Z9\u0015\u0005Q#\u0007b\u0002-b\u0003\u0003\u0005\rA\u0018\u0005\u0007M\u0002\u0001\u000b\u0015\u00020\u0002\u000f\rdwn]3eA!\u0012Q\r\u001b\t\u0003W%L!A\u001b\u0013\u0003\u0011Y|G.\u0019;jY\u0016Dq\u0001\u001c\u0001A\u0002\u0013%Q,A\u0002f_\u001aDqA\u001c\u0001A\u0002\u0013%q.A\u0004f_\u001a|F%Z9\u0015\u0005Q\u0003\bb\u0002-n\u0003\u0003\u0005\rA\u0018\u0005\u0007e\u0002\u0001\u000b\u0015\u00020\u0002\t\u0015|g\r\t\u0015\u0003c\"Dq!\u001e\u0001A\u0002\u0013%Q,A\u0004sK\u0006$\u0017N\\4\t\u000f]\u0004\u0001\u0019!C\u0005q\u0006Y!/Z1eS:<w\fJ3r)\t!\u0016\u0010C\u0004Ym\u0006\u0005\t\u0019\u00010\t\rm\u0004\u0001\u0015)\u0003_\u0003!\u0011X-\u00193j]\u001e\u0004\u0003bB?\u0001\u0005\u0004%YA`\u0001\u0004K\u000e\u0004X#A@\u0011\t\u0005\u0005\u0011qA\u0007\u0003\u0003\u0007Q1!!\u0002\u0005\u0003\u0011)\u00070Z2\n\t\u0005%\u00111\u0001\u0002\u001f'\u0016\u0014\u0018.\u00197Fq\u0016\u001cW\u000f^5p]\u000e{g\u000e^3yiB\u0013xN^5eKJDq!!\u0004\u0001A\u0003%q0\u0001\u0003fGB\u0004\u0003\"CA\t\u0001\t\u0007I1BA\n\u0003\t)7-\u0006\u0002\u0002\u0016A!\u0011\u0011AA\f\u0013\u0011\tI\"a\u0001\u0003-M+'/[1m\u000bb,7-\u001e;j_:\u001cuN\u001c;fqRD\u0001\"!\b\u0001A\u0003%\u0011QC\u0001\u0004K\u000e\u0004\u0003bBA\u0011\u0001\u0011%\u00111E\u0001\u000bG2,\u0017M])vKV,G#\u0001+\t\u000f\u0005\u001d\u0002\u0001\"\u0003\u0002$\u0005iQ.Y=cKJ+\u0017\rZ'pe\u0016D\u0011\"a\u000b\u0001\u0005\u0004%I!!\f\u0002\u001bQLW.Z8vi6KG\u000e\\5t+\t\ty\u0003E\u0002,\u0003cI1!a\r%\u0005\u0011auN\\4\t\u0011\u0005]\u0002\u0001)A\u0005\u0003_\ta\u0002^5nK>,H/T5mY&\u001c\b\u0005C\u0004\u0002<\u0001!I!a\t\u0002\u001bI,G.Z1tK\n+hMZ3s\u0011\u001d\ty\u0004\u0001C\u0005\u0003\u0003\n\u0001#];fk\u0016D\u0015m]'pe\u0016$\u0015\r^1\u0015\u0003yCq!!\u0012\u0001\t\u0013\t\t%A\tck\u001a4WM\u001d%bg6{'/\u001a#bi\u0006Dq!!\u0013\u0001\t\u0013\t\u0019#A\u000fnCf\u0014WMR3uG\"tU\r\u001f;Ck\u001a4WM\u001d$s_6\fV/Z;f\u0011\u001d\ti\u0005\u0001C!\u0003\u001f\nAA]3bIR9!&!\u0015\u0002b\u0005\u0015\u0004\u0002CA*\u0003\u0017\u0002\r!!\u0016\u0002\u0003\t\u0004RaKA,\u00037J1!!\u0017%\u0005\u0015\t%O]1z!\rY\u0013QL\u0005\u0004\u0003?\"#\u0001\u0002\"zi\u0016Dq!a\u0019\u0002L\u0001\u0007!&A\u0002pM\u001aDq!a\u001a\u0002L\u0001\u0007!&A\u0002mK:Dq!!\u0014\u0001\t\u0003\nY\u0007F\u0002+\u0003[B\u0001\"a\u0015\u0002j\u0001\u0007\u0011Q\u000b\u0005\b\u0003\u001b\u0002A\u0011IA9)\u0005Q\u0003bBA;\u0001\u0011\u0005\u00131E\u0001\u0006G2|7/Z\u0004\n\u0003s\u0012\u0011\u0011!E\u0001\u0003w\nQ\u0003R1uCN{WO]2f\u0013:\u0004X\u000f^*ue\u0016\fW\u000eE\u0002\u001a\u0003{2\u0001\"\u0001\u0002\u0002\u0002#\u0005\u0011qP\n\u0005\u0003{\n\t\tE\u0002,\u0003\u0007K1!!\"%\u0005\u0019\te.\u001f*fM\"9q&! \u0005\u0002\u0005%ECAA>\u0011)\ti)! \u0012\u0002\u0013\u0005\u0011qR\u0001\u001cI1,7o]5oSR$sM]3bi\u0016\u0014H\u0005Z3gCVdG\u000f\n\u001a\u0016\u0005\u0005E%f\u0001\u0010\u0002\u0014.\u0012\u0011Q\u0013\t\u0005\u0003/\u000b\t+\u0004\u0002\u0002\u001a*!\u00111TAO\u0003%)hn\u00195fG.,GMC\u0002\u0002 \u0012\n!\"\u00198o_R\fG/[8o\u0013\u0011\t\u0019+!'\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW\r\u0003\u0006\u0002(\u0006u\u0014\u0013!C\u0001\u0003S\u000b1\u0004\n7fgNLg.\u001b;%OJ,\u0017\r^3sI\u0011,g-Y;mi\u0012\u001aTCAAVU\rQ\u00131\u0013")
/* loaded from: input_file:com/datastax/bdp/fs/pipes/DataSourceInputStream.class */
public class DataSourceInputStream extends InputStream {
    public final DataSource com$datastax$bdp$fs$pipes$DataSourceInputStream$$dataSource;
    private final Duration timeout;
    public final int com$datastax$bdp$fs$pipes$DataSourceInputStream$$bufferCount;
    private final LinkedBlockingQueue<Try<ByteBuf>> com$datastax$bdp$fs$pipes$DataSourceInputStream$$queue;
    private ByteBuf buffer;
    private volatile boolean com$datastax$bdp$fs$pipes$DataSourceInputStream$$closed;
    private volatile boolean com$datastax$bdp$fs$pipes$DataSourceInputStream$$eof;
    private boolean com$datastax$bdp$fs$pipes$DataSourceInputStream$$reading;
    private final SerialExecutionContextProvider ecp;
    private final SerialExecutionContext com$datastax$bdp$fs$pipes$DataSourceInputStream$$ec;
    private final long timeoutMillis;

    public LinkedBlockingQueue<Try<ByteBuf>> com$datastax$bdp$fs$pipes$DataSourceInputStream$$queue() {
        return this.com$datastax$bdp$fs$pipes$DataSourceInputStream$$queue;
    }

    private ByteBuf buffer() {
        return this.buffer;
    }

    private void buffer_$eq(ByteBuf byteBuf) {
        this.buffer = byteBuf;
    }

    public boolean com$datastax$bdp$fs$pipes$DataSourceInputStream$$closed() {
        return this.com$datastax$bdp$fs$pipes$DataSourceInputStream$$closed;
    }

    private void com$datastax$bdp$fs$pipes$DataSourceInputStream$$closed_$eq(boolean z) {
        this.com$datastax$bdp$fs$pipes$DataSourceInputStream$$closed = z;
    }

    private boolean com$datastax$bdp$fs$pipes$DataSourceInputStream$$eof() {
        return this.com$datastax$bdp$fs$pipes$DataSourceInputStream$$eof;
    }

    public void com$datastax$bdp$fs$pipes$DataSourceInputStream$$eof_$eq(boolean z) {
        this.com$datastax$bdp$fs$pipes$DataSourceInputStream$$eof = z;
    }

    public boolean com$datastax$bdp$fs$pipes$DataSourceInputStream$$reading() {
        return this.com$datastax$bdp$fs$pipes$DataSourceInputStream$$reading;
    }

    public void com$datastax$bdp$fs$pipes$DataSourceInputStream$$reading_$eq(boolean z) {
        this.com$datastax$bdp$fs$pipes$DataSourceInputStream$$reading = z;
    }

    private SerialExecutionContextProvider ecp() {
        return this.ecp;
    }

    public SerialExecutionContext com$datastax$bdp$fs$pipes$DataSourceInputStream$$ec() {
        return this.com$datastax$bdp$fs$pipes$DataSourceInputStream$$ec;
    }

    private void clearQueue() {
        while (!com$datastax$bdp$fs$pipes$DataSourceInputStream$$queue().isEmpty()) {
            com$datastax$bdp$fs$pipes$DataSourceInputStream$$queue().poll().foreach(new DataSourceInputStream$$anonfun$clearQueue$1(this));
        }
    }

    public void com$datastax$bdp$fs$pipes$DataSourceInputStream$$maybeReadMore() {
        Future$.MODULE$.apply(new DataSourceInputStream$$anonfun$com$datastax$bdp$fs$pipes$DataSourceInputStream$$maybeReadMore$1(this), com$datastax$bdp$fs$pipes$DataSourceInputStream$$ec());
    }

    private long timeoutMillis() {
        return this.timeoutMillis;
    }

    private void releaseBuffer() {
        if (buffer() != null) {
            buffer().release();
        }
    }

    private boolean queueHasMoreData() {
        return (com$datastax$bdp$fs$pipes$DataSourceInputStream$$queue().isEmpty() && com$datastax$bdp$fs$pipes$DataSourceInputStream$$eof()) ? false : true;
    }

    private boolean bufferHasMoreData() {
        return buffer() != null && buffer().readableBytes() > 0;
    }

    private void maybeFetchNextBufferFromQueue() {
        if (com$datastax$bdp$fs$pipes$DataSourceInputStream$$closed()) {
            throw new IllegalStateException("Input stream already closed");
        }
        if (bufferHasMoreData() || !queueHasMoreData()) {
            return;
        }
        releaseBuffer();
        Try<ByteBuf> poll = com$datastax$bdp$fs$pipes$DataSourceInputStream$$queue().poll(timeoutMillis(), TimeUnit.MILLISECONDS);
        if (poll instanceof Success) {
            buffer_$eq((ByteBuf) ((Success) poll).value());
            com$datastax$bdp$fs$pipes$DataSourceInputStream$$maybeReadMore();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!(poll instanceof Failure)) {
                if (poll != null) {
                    throw new MatchError(poll);
                }
                throw new TimeoutException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"No data received from ", " after ", " ms"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.com$datastax$bdp$fs$pipes$DataSourceInputStream$$dataSource, BoxesRunTime.boxToLong(timeoutMillis())})));
            }
            Throwable exception = ((Failure) poll).exception();
            com$datastax$bdp$fs$pipes$DataSourceInputStream$$closed_$eq(true);
            releaseBuffer();
            clearQueue();
            throw new IOException(exception);
        }
    }

    @Override // java.io.InputStream
    public synchronized int read(byte[] bArr, int i, int i2) {
        if (bArr == null) {
            throw new NullPointerException();
        }
        if (i < 0 || i2 < 0 || i2 > bArr.length - i) {
            throw new IndexOutOfBoundsException();
        }
        if (i2 == 0) {
            return 0;
        }
        int i3 = i2;
        int i4 = i;
        while (true) {
            maybeFetchNextBufferFromQueue();
            int min = package$.MODULE$.min(i3, buffer().readableBytes());
            buffer().readBytes(bArr, i4, min);
            i3 -= min;
            i4 += min;
            if (i3 <= 0 || (!bufferHasMoreData() && !queueHasMoreData())) {
                break;
            }
        }
        int i5 = i4 - i;
        if (i5 == 0) {
            return -1;
        }
        return i5;
    }

    @Override // java.io.InputStream
    public synchronized int read(byte[] bArr) {
        return read(bArr, 0, bArr.length);
    }

    @Override // java.io.InputStream
    public synchronized int read() {
        do {
            maybeFetchNextBufferFromQueue();
            if (buffer().readableBytes() != 0) {
                break;
            }
        } while (queueHasMoreData());
        if (buffer().readableBytes() == 0) {
            return -1;
        }
        return buffer().readByte() & 255;
    }

    @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        if (com$datastax$bdp$fs$pipes$DataSourceInputStream$$closed()) {
            return;
        }
        com$datastax$bdp$fs$pipes$DataSourceInputStream$$closed_$eq(true);
        releaseBuffer();
        Await$.MODULE$.result(this.com$datastax$bdp$fs$pipes$DataSourceInputStream$$dataSource.close(), this.timeout);
        clearQueue();
    }

    public DataSourceInputStream(DataSource dataSource, Duration duration, int i) {
        this.com$datastax$bdp$fs$pipes$DataSourceInputStream$$dataSource = dataSource;
        this.timeout = duration;
        this.com$datastax$bdp$fs$pipes$DataSourceInputStream$$bufferCount = i;
        Predef$.MODULE$.require(i > 0, new DataSourceInputStream$$anonfun$1(this));
        this.com$datastax$bdp$fs$pipes$DataSourceInputStream$$queue = new LinkedBlockingQueue<>(i + 1);
        this.com$datastax$bdp$fs$pipes$DataSourceInputStream$$closed = false;
        this.com$datastax$bdp$fs$pipes$DataSourceInputStream$$eof = false;
        this.com$datastax$bdp$fs$pipes$DataSourceInputStream$$reading = false;
        this.ecp = dataSource.executionContextProvider();
        this.com$datastax$bdp$fs$pipes$DataSourceInputStream$$ec = ecp().queuedSerialExecutionContext();
        com$datastax$bdp$fs$pipes$DataSourceInputStream$$maybeReadMore();
        this.timeoutMillis = duration.isFinite() ? duration.toMillis() : Long.MAX_VALUE;
    }
}
