package org.apache.cassandra.cql3.continuous.paging;

import com.datastax.dse.byos.shade.com.google.common.util.concurrent.RateLimiter;
import io.netty.channel.Channel;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.cassandra.transport.Frame;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/cassandra/cql3/continuous/paging/ContinuousPageWriter.class */
public class ContinuousPageWriter {
    private static final Logger logger;
    private final int queueSizeInPages;
    private final ArrayBlockingQueue<Frame> pages;
    private final Writer writer;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/cassandra/cql3/continuous/paging/ContinuousPageWriter$Writer.class */
    public static final class Writer implements Runnable {
        private final Channel channel;
        private final ArrayBlockingQueue<Frame> queue;
        private final AtomicBoolean completed = new AtomicBoolean(false);
        private final AtomicReference<Frame> error = new AtomicReference<>(null);
        private final RateLimiter limiter;
        private final CompletableFuture<Boolean> completionFuture;
        private volatile boolean canceled;

        public Writer(Channel channel, ArrayBlockingQueue<Frame> arrayBlockingQueue, int i) {
            this.channel = channel;
            this.queue = arrayBlockingQueue;
            this.limiter = RateLimiter.create(i > 0 ? i : Double.MAX_VALUE);
            this.completionFuture = new CompletableFuture<>();
            channel.closeFuture().addListener(channelFuture -> {
                if (ContinuousPageWriter.logger.isTraceEnabled()) {
                    ContinuousPageWriter.logger.trace("Socket {} closed by the client", channel);
                }
                cancel(null);
            });
        }

        public void cancel(@Nullable Frame frame) {
            if (this.canceled) {
                if (frame != null) {
                    frame.release();
                    return;
                }
                return;
            }
            ContinuousPageWriter.logger.trace("Cancelling continuous page writer");
            this.canceled = true;
            if (frame != null && !this.error.compareAndSet(null, frame)) {
                ContinuousPageWriter.logger.debug("Failed to set final error when cancelling session, another error was already there");
                frame.release();
            }
            complete();
        }

        public void complete() {
            this.completed.compareAndSet(false, true);
            schedule(0L);
        }

        public boolean completed() {
            return this.completed.get();
        }

        public void schedule(long j) {
            if (j > 0) {
                this.channel.eventLoop().schedule(this, j, TimeUnit.MICROSECONDS);
            } else if (this.channel.eventLoop().inEventLoop()) {
                run();
            } else {
                this.channel.eventLoop().execute(this);
            }
        }

        public void setError(Frame frame) {
            if (completed()) {
                ContinuousPageWriter.logger.warn("Got continuous paging error for client but writer was already completed, so could not pass it to the client");
            } else if (this.error.compareAndSet(null, frame)) {
                complete();
                return;
            }
            frame.release();
        }

        public boolean aborted() {
            return this.canceled || this.error.get() != null;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                processPendingPages();
                if (completed() && !this.completionFuture.isDone() && this.queue.isEmpty()) {
                    if (this.error.get() != null) {
                        sendError();
                    }
                    this.completionFuture.complete(Boolean.valueOf(this.channel.isWritable()));
                }
            } catch (Throwable th) {
                JVMStabilityInspector.inspectThrowable(th);
                ContinuousPageWriter.logger.error("Error processing pages in Netty event loop", th);
            }
        }

        private void sendError() {
            if (ContinuousPageWriter.logger.isTraceEnabled()) {
                ContinuousPageWriter.logger.trace("Sending continuous paging error to client");
            }
            this.channel.write(this.error.get());
            this.channel.flush();
        }

        private void processPendingPages() {
            long j = 1;
            boolean aborted = aborted();
            while (true) {
                boolean z = aborted;
                if (!z && !this.channel.isWritable()) {
                    break;
                }
                if (!z && !this.limiter.tryAcquire()) {
                    j = ((long) (TimeUnit.SECONDS.toMicros(1L) / this.limiter.getRate())) / 10;
                    break;
                }
                Frame poll = this.queue.poll();
                if (poll == null) {
                    break;
                }
                processPage(poll);
                aborted = aborted();
            }
            if (this.queue.isEmpty()) {
                return;
            }
            schedule(j);
        }

        private void processPage(Frame frame) {
            if (aborted()) {
                frame.release();
            } else {
                this.channel.write(frame);
                this.channel.flush();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ContinuousPageWriter(Supplier<Channel> supplier, int i, int i2) {
        if (!$assertionsDisabled && i2 < 1) {
            throw new AssertionError("queue size must be at least one");
        }
        this.queueSizeInPages = i2;
        this.pages = new ArrayBlockingQueue<>(i2);
        this.writer = new Writer(supplier.get(), this.pages, i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Boolean> completionFuture() {
        return this.writer.completionFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendPage(Frame frame, boolean z) {
        if (this.writer.canceled) {
            logger.trace("Discarding page because writer was cancelled");
            frame.release();
            return;
        }
        if (!$assertionsDisabled && this.writer.completed()) {
            throw new AssertionError("Received unexpected page when writer was already completed");
        }
        try {
            this.pages.add(frame);
            if (z) {
                this.writer.schedule(0L);
            } else {
                this.writer.complete();
            }
        } catch (Throwable th) {
            logger.warn("Failed to add continuous paging result to queue: {}", th.getMessage());
            frame.release();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasSpace() {
        return this.pages.size() < this.queueSizeInPages;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean halfQueueAvailable() {
        return this.pages.size() < (this.queueSizeInPages / 2) + 1;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int pendingPages() {
        return this.pages.size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendError(Frame frame) {
        this.writer.setError(frame);
    }

    public void cancel(Frame frame) {
        this.writer.cancel(frame);
    }

    public boolean completed() {
        return this.writer.completed();
    }

    public boolean aborted() {
        return this.writer.aborted();
    }

    static {
        $assertionsDisabled = !ContinuousPageWriter.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger((Class<?>) ContinuousPageWriter.class);
    }
}
