package com.datastax.oss.dsbulk.executor.api.subscription;

import com.datastax.oss.driver.api.core.AsyncPagingIterable;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.detach.AttachmentPoint;
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
import com.datastax.oss.dsbulk.executor.api.exception.BulkExecutionException;
import com.datastax.oss.dsbulk.executor.api.listener.DefaultExecutionContext;
import com.datastax.oss.dsbulk.executor.api.listener.ExecutionContext;
import com.datastax.oss.dsbulk.executor.api.listener.ExecutionListener;
import com.datastax.oss.dsbulk.executor.api.result.Result;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.Collections;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.jctools.queues.SpscArrayQueue;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datastax/oss/dsbulk/executor/api/subscription/ResultSubscription.class */
public abstract class ResultSubscription<R extends Result, P extends AsyncPagingIterable<Row, P>> implements Subscription {
    private static final Logger LOG = LoggerFactory.getLogger(ResultSubscription.class);
    private static final int MAX_ENQUEUED_PAGES = 4;
    private Subscriber<? super R> subscriber;
    final Statement<?> statement;

    @NonNull
    final AttachmentPoint attachmentPoint;

    @Nullable
    final ExecutionListener listener;

    @Nullable
    private final Semaphore maxConcurrentRequests;

    @Nullable
    final RateLimiter rateLimiter;

    @Nullable
    final RateLimiter bytesRateLimiter;
    private final boolean failFast;
    final int batchSize;
    private final AtomicLong requested = new AtomicLong(0);
    final Queue<ResultSubscription<R, P>.Page> pages = new SpscArrayQueue(MAX_ENQUEUED_PAGES);
    private volatile ResultSubscription<R, P>.Page last = null;
    private final AtomicInteger pagesSize = new AtomicInteger(0);
    private final AtomicInteger draining = new AtomicInteger(0);
    private final DefaultExecutionContext global = new DefaultExecutionContext();
    private final CompletableFuture<Void> initial = new CompletableFuture<>();
    private volatile boolean cancelled = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/datastax/oss/dsbulk/executor/api/subscription/ResultSubscription$Page.class */
    public class Page {
        final Iterator<R> rows;
        final Callable<CompletionStage<? extends P>> nextPage;
        final CompletableFuture<Void> fullyConsumed;

        private Page(Callable<CompletionStage<? extends P>> callable) {
            this.nextPage = callable;
            this.rows = Collections.emptyIterator();
            this.fullyConsumed = ResultSubscription.this.initial;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Page(Iterator<R> it, Callable<CompletionStage<? extends P>> callable) {
            this.nextPage = callable;
            this.rows = it;
            this.fullyConsumed = new CompletableFuture<>();
        }

        boolean hasMorePages() {
            return this.nextPage != null;
        }

        CompletionStage<? extends P> nextPage() {
            try {
                return this.nextPage.call();
            } catch (Exception e) {
                CompletableFuture completableFuture = new CompletableFuture();
                completableFuture.completeExceptionally(e);
                return completableFuture;
            }
        }

        boolean hasMoreRows() {
            return this.rows.hasNext();
        }

        R nextRow() {
            return this.rows.next();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ResultSubscription(@NonNull Subscriber<? super R> subscriber, @NonNull Statement<?> statement, @NonNull AttachmentPoint attachmentPoint, @Nullable ExecutionListener executionListener, @Nullable Semaphore semaphore, @Nullable RateLimiter rateLimiter, @Nullable RateLimiter rateLimiter2, boolean z) {
        this.subscriber = subscriber;
        this.statement = statement;
        this.attachmentPoint = attachmentPoint;
        this.listener = executionListener;
        this.maxConcurrentRequests = semaphore;
        this.rateLimiter = rateLimiter;
        this.bytesRateLimiter = rateLimiter2;
        this.failFast = z;
        if (statement instanceof BatchStatement) {
            this.batchSize = ((BatchStatement) statement).size();
        } else {
            this.batchSize = 1;
        }
    }

    public void start(Callable<CompletionStage<? extends P>> callable) {
        this.global.start();
        if (this.listener != null) {
            this.listener.onExecutionStarted(this.statement, this.global);
        }
        fetchNextPage(new Page(callable));
    }

    public void request(long j) {
        if (this.cancelled) {
            return;
        }
        if (j < 1) {
            doOnError(new IllegalArgumentException(this.subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements."));
            return;
        }
        Operators.addCap(this.requested, j);
        if (!this.initial.isDone()) {
            this.initial.complete(null);
        }
        drain();
    }

    public void cancel() {
        if (this.cancelled) {
            return;
        }
        this.cancelled = true;
        if (this.draining.getAndIncrement() == 0) {
            clear();
        }
    }

    private void drain() {
        long j;
        if (this.draining.getAndIncrement() != 0) {
            return;
        }
        int i = 1;
        do {
            long j2 = this.requested.get();
            long j3 = 0;
            while (true) {
                j = j3;
                if (j == j2) {
                    break;
                }
                if (this.cancelled) {
                    clear();
                    return;
                }
                R tryNext = tryNext();
                if (tryNext == null) {
                    break;
                }
                if (tryNext.isSuccess() || !this.failFast) {
                    doOnNext(tryNext);
                }
                if (isExhausted()) {
                    stop(tryNext.getError().orElse(null));
                    clear();
                    return;
                }
                j3 = j + 1;
            }
            if (isExhausted()) {
                stop(null);
                clear();
                return;
            } else if (this.cancelled) {
                clear();
                return;
            } else {
                if (j != 0) {
                    Operators.subCap(this.requested, j);
                }
                i = this.draining.addAndGet(-i);
            }
        } while (i != 0);
    }

    private R tryNext() {
        ResultSubscription<R, P>.Page dequeue;
        ResultSubscription<R, P>.Page peek = this.pages.peek();
        if (peek == null) {
            return null;
        }
        if (peek.hasMoreRows()) {
            return (R) peek.nextRow();
        }
        if (peek.hasMorePages() && (dequeue = dequeue()) != null && dequeue.hasMoreRows()) {
            return (R) dequeue.nextRow();
        }
        return null;
    }

    private boolean isExhausted() {
        if (this.cancelled) {
            return true;
        }
        ResultSubscription<R, P>.Page peek = this.pages.peek();
        return (peek == null || peek.hasMoreRows() || peek.hasMorePages()) ? false : true;
    }

    private void fetchNextPage(ResultSubscription<R, P>.Page page) {
        DefaultExecutionContext defaultExecutionContext = new DefaultExecutionContext();
        onBeforeRequestStarted();
        defaultExecutionContext.start();
        onRequestStarted(defaultExecutionContext);
        page.nextPage().whenComplete((asyncPagingIterable, th) -> {
            if (this.maxConcurrentRequests != null) {
                this.maxConcurrentRequests.release();
            }
            defaultExecutionContext.stop();
            if (th == null) {
                onRequestSuccessful(asyncPagingIterable, defaultExecutionContext);
            } else {
                onRequestFailed(th, defaultExecutionContext);
            }
        }).handle((asyncPagingIterable2, th2) -> {
            ResultSubscription<R, P>.Page errorPage;
            if (th2 == null) {
                errorPage = toPage(asyncPagingIterable2, defaultExecutionContext);
            } else {
                if (th2 instanceof CompletionException) {
                    th2 = th2.getCause();
                }
                errorPage = toErrorPage(th2);
            }
            return errorPage;
        }).thenCombine(page.fullyConsumed, (page2, r3) -> {
            return page2;
        }).thenAccept(page3 -> {
            enqueue(page3);
            if (page3.hasMorePages() && !this.cancelled) {
                fetchNextPage(page3);
            }
            drain();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onBeforeRequestStarted() {
        if (this.maxConcurrentRequests != null) {
            this.maxConcurrentRequests.acquireUninterruptibly();
        }
    }

    abstract void onRequestStarted(ExecutionContext executionContext);

    abstract void onRequestSuccessful(P p, ExecutionContext executionContext);

    abstract void onRequestFailed(Throwable th, ExecutionContext executionContext);

    private void enqueue(ResultSubscription<R, P>.Page page) {
        if (!this.pages.offer(page)) {
            throw new AssertionError("Queue is full, this should not happen");
        }
        this.last = page;
        if (this.pagesSize.incrementAndGet() < MAX_ENQUEUED_PAGES) {
            page.fullyConsumed.complete(null);
        }
    }

    private ResultSubscription<R, P>.Page dequeue() {
        if (this.pages.poll() == null) {
            throw new AssertionError("Queue is empty, this should not happen");
        }
        this.pagesSize.decrementAndGet();
        this.last.fullyConsumed.complete(null);
        return this.pages.peek();
    }

    private void doOnNext(R r) {
        try {
            onBeforeResultEmitted(r);
            this.subscriber.onNext(r);
        } catch (Throwable th) {
            LOG.error(this.subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onNext.", th);
            cancel();
        }
    }

    void onBeforeResultEmitted(R r) {
    }

    private void stop(@Nullable BulkExecutionException bulkExecutionException) {
        this.global.stop();
        if (this.listener != null) {
            if (bulkExecutionException != null) {
                this.listener.onExecutionFailed(bulkExecutionException, this.global);
            } else {
                this.listener.onExecutionSuccessful(this.statement, this.global);
            }
        }
        if (!this.failFast || bulkExecutionException == null) {
            doOnComplete();
        } else {
            doOnError(bulkExecutionException);
        }
    }

    private void doOnComplete() {
        try {
            this.subscriber.onComplete();
        } catch (Throwable th) {
            LOG.error(this.subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onComplete.", th);
        }
        cancel();
    }

    public void doOnError(Throwable th) {
        try {
            this.subscriber.onError(th);
        } catch (Throwable th2) {
            th2.addSuppressed(th);
            LOG.error(this.subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", th2);
        }
        cancel();
    }

    private void clear() {
        this.pages.clear();
        this.subscriber = null;
    }

    abstract ResultSubscription<R, P>.Page toPage(P p, ExecutionContext executionContext);

    private ResultSubscription<R, P>.Page toErrorPage(Throwable th) {
        return new Page(Collections.singleton(toErrorResult(new BulkExecutionException(th, this.statement))).iterator(), (Callable) null);
    }

    abstract R toErrorResult(BulkExecutionException bulkExecutionException);
}
