package org.apache.cassandra.cql3.continuous.paging;

import com.datastax.dse.byos.shade.com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.reactivex.Single;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.cassandra.concurrent.TPC;
import org.apache.cassandra.concurrent.TPCUtils;
import org.apache.cassandra.config.ContinuousPagingConfig;
import org.apache.cassandra.cql3.PagingResult;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.ResultSet;
import org.apache.cassandra.cql3.selection.ResultBuilder;
import org.apache.cassandra.cql3.selection.Selection;
import org.apache.cassandra.cql3.statements.RequestValidations;
import org.apache.cassandra.db.aggregation.GroupMaker;
import org.apache.cassandra.exceptions.ClientWriteException;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.metrics.ContinuousPagingMetrics;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.pager.PagingState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.CBUtil;
import org.apache.cassandra.transport.Frame;
import org.apache.cassandra.transport.Message;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.transport.messages.ErrorMessage;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/cql3/continuous/paging/ContinuousPagingService.class */
public class ContinuousPagingService {
    private static final Logger logger;
    public static final ContinuousPagingMetrics metrics;
    private static final ConcurrentHashMap<SessionKey, ContinuousPagingSession> sessions;

    @VisibleForTesting
    static final AtomicIntegerArray numSessions;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/cassandra/cql3/continuous/paging/ContinuousPagingService$ContinuousPagingSession.class */
    public static final class ContinuousPagingSession {
        private final ResultSet.ResultMetadata resultMetaData;
        private final QueryState queryState;
        private final Selection.Selectors selectors;
        private final GroupMaker groupMaker;
        private final QueryOptions options;
        private final ContinuousPageWriter pageWriter;
        private final ContinuousPagingState continuousPagingState;
        private final QueryOptions.PagingOptions pagingOptions;
        private final SessionKey key;
        private int avgRowSize;

        @VisibleForTesting
        int numPagesSent = 0;
        private int numPagesRequested;
        private Page currentPage;
        private final AtomicReference<State> state;
        private volatile long paused;

        @Nullable
        private volatile PagingState pagingState;
        static final /* synthetic */ boolean $assertionsDisabled;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/cassandra/cql3/continuous/paging/ContinuousPagingService$ContinuousPagingSession$Builder.class */
        public static class Builder extends ResultBuilder {

            @VisibleForTesting
            final ContinuousPagingSession session;

            Builder(ContinuousPagingSession continuousPagingSession, Selection.Selectors selectors, GroupMaker groupMaker) {
                super(selectors, groupMaker);
                this.session = continuousPagingSession;
            }

            @Override // org.apache.cassandra.cql3.selection.ResultBuilder
            public boolean onRowCompleted(List<ByteBuffer> list, boolean z) {
                return this.session.onRowCompleted(list, z);
            }

            @Override // org.apache.cassandra.cql3.selection.ResultBuilder
            public boolean resultIsEmpty() {
                return this.session.resultIsEmpty();
            }

            @Override // org.apache.cassandra.cql3.selection.ResultBuilder
            public void complete(Throwable th) {
                this.session.sendError(th);
            }

            @Override // org.apache.cassandra.cql3.selection.ResultBuilder
            public void complete() {
                if (ContinuousPagingService.logger.isTraceEnabled()) {
                    ContinuousPagingService.logger.trace("Completing continuous paging session {}", this.session.key);
                }
                super.complete();
                this.session.sendLastPage();
                long nanoTime = this.session.continuousPagingState.timeSource.nanoTime() - this.session.continuousPagingState.executor.queryStartTimeInNanos();
                ContinuousPagingService.metrics.addTotalDuration(this.session.continuousPagingState.executor.isLocalQuery(), nanoTime);
                if (ContinuousPagingService.logger.isTraceEnabled()) {
                    ContinuousPagingService.logger.trace("Completed continuous paging session {} after {} milliseconds", this.session.key, Long.valueOf(TimeUnit.NANOSECONDS.toMillis(nanoTime)));
                }
            }

            public String toString() {
                return String.format("Continuous paging session %s", this.session.key);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/cassandra/cql3/continuous/paging/ContinuousPagingService$ContinuousPagingSession$EncodedPage.class */
        public static class EncodedPage extends ResultMessage {
            final ResultSet.ResultMetadata metadata;
            final int numRows;
            final ByteBuf buff;
            public static final Message.Codec<EncodedPage> codec = new Message.Codec<EncodedPage>() { // from class: org.apache.cassandra.cql3.continuous.paging.ContinuousPagingService.ContinuousPagingSession.EncodedPage.1
                static final /* synthetic */ boolean $assertionsDisabled;

                @Override // org.apache.cassandra.transport.CBCodec
                public EncodedPage decode(ByteBuf byteBuf, ProtocolVersion protocolVersion) {
                    if ($assertionsDisabled) {
                        return null;
                    }
                    throw new AssertionError("should never be called");
                }

                @Override // org.apache.cassandra.transport.CBCodec
                public void encode(EncodedPage encodedPage, ByteBuf byteBuf, ProtocolVersion protocolVersion) {
                    byteBuf.writeInt(encodedPage.kind.id);
                    ResultSet.codec.encodeHeader(encodedPage.metadata, byteBuf, encodedPage.numRows, protocolVersion);
                    byteBuf.writeBytes(encodedPage.buff);
                }

                @Override // org.apache.cassandra.transport.CBCodec
                public int encodedSize(EncodedPage encodedPage, ProtocolVersion protocolVersion) {
                    return 4 + ResultSet.codec.encodedHeaderSize(encodedPage.metadata, protocolVersion) + encodedPage.buff.readableBytes();
                }

                static {
                    $assertionsDisabled = !ContinuousPagingService.class.desiredAssertionStatus();
                }
            };

            private EncodedPage(ResultSet.ResultMetadata resultMetadata, int i, ByteBuf byteBuf) {
                super(ResultMessage.Kind.ROWS);
                this.metadata = resultMetadata;
                this.numRows = i;
                this.buff = byteBuf;
            }

            public String toString() {
                return String.format("ENCODED PAGE (%d ROWS)", Integer.valueOf(this.numRows));
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/cassandra/cql3/continuous/paging/ContinuousPagingService$ContinuousPagingSession$Page.class */
        public static class Page {
            final ResultSet.ResultMetadata metadata;
            final QueryState state;
            final QueryOptions.PagingOptions pagingOptions;
            final ProtocolVersion version;
            final SessionKey sessionKey;
            ByteBuf buf;
            int numRows;
            int seqNo;
            int maxPageSize;

            Page(int i, ResultSet.ResultMetadata resultMetadata, QueryState queryState, QueryOptions queryOptions, SessionKey sessionKey, int i2, int i3) {
                this.metadata = resultMetadata;
                this.state = queryState;
                this.pagingOptions = queryOptions.getPagingOptions();
                this.version = queryOptions.getProtocolVersion();
                this.sessionKey = sessionKey;
                this.buf = CBUtil.allocator.buffer(i);
                this.seqNo = i2;
                this.maxPageSize = i3;
            }

            Frame makeFrame(PagingResult pagingResult) {
                this.metadata.setPagingResult(pagingResult);
                EncodedPage encodedPage = new EncodedPage(this.metadata, this.numRows, this.buf);
                encodedPage.setWarnings(ClientWarn.instance.getWarnings());
                if (Tracing.isTracing()) {
                    encodedPage.setTracingId(Tracing.instance.getSessionId());
                }
                return makeFrame(encodedPage, EncodedPage.codec, this.version, this.state.getStreamId());
            }

            static <M extends Message.Response> Frame makeFrame(M m, Message.Codec codec, ProtocolVersion protocolVersion, int i) {
                m.setStreamId(i);
                Frame makeFrame = Message.ProtocolEncoder.makeFrame(m, codec.encodedSize(m, protocolVersion), protocolVersion);
                codec.encode(m, makeFrame.body, protocolVersion);
                return makeFrame;
            }

            void addRow(List<ByteBuffer> list) {
                int writerIndex = this.buf.writerIndex();
                if (ResultSet.codec.encodeRow(list, this.metadata, this.buf, true)) {
                    this.numRows++;
                    return;
                }
                this.buf.writerIndex(writerIndex);
                int encodedRowSize = ResultSet.codec.encodedRowSize(list, this.metadata);
                int max = Math.max(this.buf.readableBytes() + encodedRowSize, Math.min(this.buf.capacity() * 2, this.maxPageSize));
                if (ContinuousPagingService.logger.isTraceEnabled()) {
                    ContinuousPagingService.logger.trace("Reallocating page buffer from {}/{} to {} for row size {} - {}", new Object[]{Integer.valueOf(this.buf.readableBytes()), Integer.valueOf(this.buf.capacity()), Integer.valueOf(max), Integer.valueOf(encodedRowSize), this.sessionKey});
                }
                ByteBuf byteBuf = this.buf;
                try {
                    this.buf = null;
                    this.buf = CBUtil.allocator.buffer(max);
                    this.buf.writeBytes(byteBuf);
                    ResultSet.codec.encodeRow(list, this.metadata, this.buf, false);
                    this.numRows++;
                    byteBuf.release();
                } catch (Throwable th) {
                    byteBuf.release();
                    throw th;
                }
            }

            int size() {
                return this.buf.readableBytes();
            }

            boolean isEmpty() {
                return this.numRows == 0;
            }

            void reuse(int i) {
                this.numRows = 0;
                this.seqNo = i;
                this.buf.clear();
            }

            void release() {
                this.buf.release();
                this.buf = null;
            }

            int seqNo() {
                return this.seqNo;
            }

            int avgRowSize(int i) {
                return (this.buf == null || this.numRows == 0) ? i : ((this.buf.readableBytes() / this.numRows) + i) / 2;
            }

            public String toString() {
                return String.format("[Page rows: %d]", Integer.valueOf(this.numRows));
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/cassandra/cql3/continuous/paging/ContinuousPagingService$ContinuousPagingSession$State.class */
        public enum State {
            RUNNING,
            CANCEL_REQUESTED,
            STOPPED
        }

        ContinuousPagingSession(Selection.Selectors selectors, GroupMaker groupMaker, ResultSet.ResultMetadata resultMetadata, QueryState queryState, QueryOptions queryOptions, ContinuousPagingState continuousPagingState, SessionKey sessionKey) {
            this.selectors = selectors;
            this.groupMaker = groupMaker;
            this.resultMetaData = resultMetadata;
            this.queryState = queryState;
            this.options = queryOptions;
            this.continuousPagingState = continuousPagingState;
            this.key = sessionKey;
            this.pagingOptions = queryOptions.getPagingOptions();
            this.numPagesRequested = this.pagingOptions.nextPages() <= 0 ? Integer.MAX_VALUE : this.pagingOptions.nextPages();
            this.state = new AtomicReference<>(State.RUNNING);
            this.paused = -1L;
            this.pageWriter = new ContinuousPageWriter(continuousPagingState.channel, this.pagingOptions.maxPagesPerSecond(), continuousPagingState.config.max_session_pages);
            this.avgRowSize = continuousPagingState.averageRowSize;
            allocatePage(1);
        }

        int coreId() {
            return this.continuousPagingState.executor.coreId();
        }

        int pendingPages() {
            return this.pageWriter.pendingPages();
        }

        public CompletableFuture<Void> cancel() {
            if (this.state.compareAndSet(State.RUNNING, State.CANCEL_REQUESTED)) {
                ErrorMessage fromException = ErrorMessage.fromException(new RuntimeException("Session cancelled by the user"), th -> {
                    return true;
                });
                this.pageWriter.cancel(Page.makeFrame(fromException, fromException.type.codec, this.options.getProtocolVersion(), this.queryState.getStreamId()));
                this.continuousPagingState.executor.schedule(this::maybeResume);
                ContinuousPagingService.logger.trace("Continuous paging session {} cancelled by the user", this.key);
            } else if (ContinuousPagingService.logger.isTraceEnabled()) {
                ContinuousPagingService.logger.trace("Could not cancel continuous paging session {}, not running ({})", this.key, this.state.get());
            }
            return this.pageWriter.completionFuture();
        }

        boolean updateBackpressure(int i) {
            if (!$assertionsDisabled && i <= 0) {
                throw new AssertionError("nextPages should be positive");
            }
            int i2 = this.numPagesRequested;
            this.numPagesRequested += i;
            if (this.numPagesRequested <= i2) {
                ContinuousPagingService.logger.warn("Tried to increase numPagesRequested from {} by {} but got {} for session {}, setting it to {}", new Object[]{Integer.valueOf(i2), Integer.valueOf(i), Integer.valueOf(this.numPagesRequested), this.key, Integer.MAX_VALUE});
                this.numPagesRequested = Integer.MAX_VALUE;
                return false;
            }
            if (ContinuousPagingService.logger.isTraceEnabled()) {
                ContinuousPagingService.logger.trace("Updated numPagesRequested to {} for continuous paging session {}", Integer.valueOf(this.numPagesRequested), this.key);
            }
            this.continuousPagingState.executor.schedule(this::maybeResume);
            return true;
        }

        public ContinuousPagingConfig config() {
            return this.continuousPagingState.config;
        }

        private void allocatePage(int i) {
            if (this.currentPage != null) {
                if (ContinuousPagingService.logger.isTraceEnabled()) {
                    ContinuousPagingService.logger.trace("Reusing page with buffer size {}, avg row size {} for {}", new Object[]{Integer.valueOf(this.currentPage.buf.capacity()), Integer.valueOf(this.avgRowSize), this.key});
                }
                this.currentPage.reuse(i);
            } else {
                int maxPageSize = maxPageSize();
                int min = Math.min(maxPageSize, this.pagingOptions.pageSize().inEstimatedBytes(this.avgRowSize) + safePageMargin());
                if (ContinuousPagingService.logger.isTraceEnabled()) {
                    ContinuousPagingService.logger.trace("Allocating page with buffer size {}, avg row size {} for {}", new Object[]{Integer.valueOf(min), Integer.valueOf(this.avgRowSize), this.key});
                }
                this.currentPage = new Page(min, this.resultMetaData, this.queryState, this.options, this.key, i, maxPageSize);
            }
        }

        int maxPageSize() {
            return this.continuousPagingState.config.max_page_size_mb * 1024 * 1024;
        }

        private int safePageMargin() {
            return 2 * this.avgRowSize;
        }

        private void processPage(boolean z, boolean z2) {
            if (this.state.get() == State.CANCEL_REQUESTED) {
                return;
            }
            if (!$assertionsDisabled && this.currentPage.isEmpty() && !z) {
                throw new AssertionError();
            }
            this.avgRowSize = this.currentPage.avgRowSize(this.avgRowSize);
            this.pagingState = this.continuousPagingState.executor.state(z2);
            if (ContinuousPagingService.logger.isTraceEnabled()) {
                ContinuousPagingService.logger.trace("Sending page nr. {} with {} rows, average row size: {}, last: {}, nextRowPending: {}, pagingState {} for session {}", new Object[]{Integer.valueOf(this.numPagesSent + 1), Integer.valueOf(this.currentPage.numRows), Integer.valueOf(this.avgRowSize), Boolean.valueOf(z), Boolean.valueOf(z2), this.pagingState, this.key});
            }
            this.pageWriter.sendPage(this.currentPage.makeFrame(new PagingResult(this.pagingState, this.currentPage.seqNo, z)), !z);
            this.numPagesSent++;
        }

        boolean onRowCompleted(List<ByteBuffer> list, boolean z) {
            if (this.state.get() == State.CANCEL_REQUESTED) {
                cancelSession();
                return false;
            }
            if (this.currentPage == null) {
                if ($assertionsDisabled || this.state.get() == State.STOPPED) {
                    return false;
                }
                throw new AssertionError("Invalid state with null page: " + this.state.get());
            }
            this.currentPage.addRow(list);
            if (!(!z || pageCompleted(this.currentPage.numRows, this.currentPage.size(), this.avgRowSize) || pageIsCloseToMax())) {
                return true;
            }
            boolean z2 = !z || isLastPage(this.currentPage.seqNo());
            if (z2) {
                ContinuousPagingService.removeSession(this.key);
            }
            processPage(z2, z);
            if (this.numPagesSent == Integer.MAX_VALUE) {
                sendError(new ClientWriteException(String.format("Reached maximum number of pages (%d), stopping session to avoid overflow", Integer.valueOf(this.numPagesSent))));
            } else if (z2 || this.state.get() != State.RUNNING) {
                stop();
            } else {
                allocatePage(this.currentPage.seqNo() + 1);
                maybePause();
            }
            return this.state.get() == State.RUNNING;
        }

        private void cancelSession() {
            ContinuousPagingService.metrics.addTotalDuration(this.continuousPagingState.executor.isLocalQuery(), this.continuousPagingState.timeSource.nanoTime() - this.continuousPagingState.executor.queryStartTimeInNanos());
            stop();
        }

        void maybePause() {
            ContinuousBackPressureException continuousBackPressureException;
            long localStartTimeInMillis = this.continuousPagingState.executor.localStartTimeInMillis();
            if (!this.pageWriter.hasSpace()) {
                continuousBackPressureException = new ContinuousBackPressureException(String.format("Continuous paging queue is full (%d pages in the queue)", Integer.valueOf(this.pageWriter.pendingPages())));
            } else if (this.numPagesSent >= this.numPagesRequested) {
                continuousBackPressureException = new ContinuousBackPressureException(String.format("Continuous paging backpressure was triggered, requested %d sent %d", Integer.valueOf(this.numPagesRequested), Integer.valueOf(this.numPagesSent)));
            } else if (localStartTimeInMillis <= 0 || this.continuousPagingState.timeSource.currentTimeMillis() - localStartTimeInMillis < this.continuousPagingState.config.max_local_query_time_ms) {
                return;
            } else {
                continuousBackPressureException = new ContinuousBackPressureException(String.format("Locally optimized query running longer than %d milliseconds", Integer.valueOf(this.continuousPagingState.config.max_local_query_time_ms)));
            }
            if (ContinuousPagingService.logger.isTraceEnabled()) {
                ContinuousPagingService.logger.trace("Pausing session {}: {}", this.key, continuousBackPressureException.getMessage());
            }
            this.paused = this.continuousPagingState.timeSource.nanoTime();
            ContinuousPagingService.metrics.serverBlocked.inc();
            this.continuousPagingState.executor.schedule(this::maybeResume, this.continuousPagingState.config.paused_check_interval_ms, TimeUnit.MILLISECONDS);
            throw continuousBackPressureException;
        }

        private void maybeResume() {
            if (this.paused == -1 || this.state.get() == State.STOPPED) {
                return;
            }
            long nanoTime = this.continuousPagingState.timeSource.nanoTime();
            boolean z = this.state.get() == State.CANCEL_REQUESTED || (this.numPagesRequested != Integer.MAX_VALUE ? !(!this.pageWriter.hasSpace() || this.numPagesSent >= this.numPagesRequested) : this.pageWriter.halfQueueAvailable());
            if (z && ContinuousPagingService.logger.isTraceEnabled()) {
                ContinuousPagingService.logger.trace("Resuming session {}, pages requested: {}, pages sent: {}, queue size: {}, paging state {}", new Object[]{this.key, Integer.valueOf(this.numPagesRequested), Integer.valueOf(this.numPagesSent), Integer.valueOf(this.pageWriter.pendingPages()), this.pagingState});
            }
            if (!z) {
                if (nanoTime - this.paused < TimeUnit.SECONDS.toNanos(this.continuousPagingState.config.client_timeout_sec)) {
                    this.continuousPagingState.executor.schedule(this::maybeResume, this.continuousPagingState.config.paused_check_interval_ms, TimeUnit.MILLISECONDS);
                    return;
                } else {
                    ContinuousPagingService.metrics.serverBlockedLatency.addNano(nanoTime - this.paused);
                    sendError(new ClientWriteException(String.format("Paused for longer than %d seconds and unable to write pages to client", Integer.valueOf(this.continuousPagingState.config.client_timeout_sec))));
                    return;
                }
            }
            ContinuousPagingService.metrics.serverBlockedLatency.addNano(nanoTime - this.paused);
            this.paused = -1L;
            if (this.state.get() == State.CANCEL_REQUESTED) {
                cancelSession();
            } else {
                this.continuousPagingState.executor.schedule(this.pagingState, new Builder(this, this.selectors, this.groupMaker));
            }
        }

        private boolean pageCompleted(int i, int i2, int i3) {
            return this.pagingOptions.pageSize().isComplete(i, (i2 + i3) - 1);
        }

        private boolean isLastPage(int i) {
            return i >= this.pagingOptions.maxPages();
        }

        private boolean pageIsCloseToMax() {
            return this.currentPage != null && maxPageSize() - this.currentPage.size() < safePageMargin();
        }

        private void stop() {
            State state = this.state.get();
            if (!$assertionsDisabled && state != State.RUNNING && state != State.CANCEL_REQUESTED) {
                throw new AssertionError("Invalid state when stopping: " + state);
            }
            if (!this.state.compareAndSet(state, State.STOPPED)) {
                ContinuousPagingService.logger.error("Failed to stop session {} ({})", this.key, this.state.get());
                return;
            }
            if (ContinuousPagingService.logger.isTraceEnabled()) {
                ContinuousPagingService.logger.trace("Continuous paging session {} stopped, previously in state {}", this.key, state);
            }
            release();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void release() {
            if (this.currentPage != null) {
                this.currentPage.release();
                this.currentPage = null;
            }
        }

        boolean resultIsEmpty() {
            return this.numPagesSent == 0 && this.currentPage != null && this.currentPage.isEmpty();
        }

        void sendError(Throwable th) {
            if (ContinuousPagingService.logger.isTraceEnabled()) {
                ContinuousPagingService.logger.trace("Sending error {}/{} for session {}", new Object[]{th.getClass(), th.getMessage(), this.key});
            }
            if (th instanceof ClientWriteException) {
                ContinuousPagingService.metrics.clientWriteExceptions.mark();
            } else {
                ContinuousPagingService.metrics.failures.mark();
            }
            if (this.currentPage != null) {
                stop();
            }
            ContinuousPagingService.removeSession(this.key);
            ErrorMessage fromException = ErrorMessage.fromException(th);
            this.pageWriter.sendError(Page.makeFrame(fromException, fromException.type.codec, this.options.getProtocolVersion(), this.queryState.getStreamId()));
        }

        void sendLastPage() {
            if (this.state.get() != State.STOPPED) {
                ContinuousPagingService.removeSession(this.key);
                processPage(true, false);
                stop();
            }
        }

        static {
            $assertionsDisabled = !ContinuousPagingService.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/cassandra/cql3/continuous/paging/ContinuousPagingService$SessionKey.class */
    public static final class SessionKey {
        private final InetSocketAddress address;
        private final int streamId;

        SessionKey(InetSocketAddress inetSocketAddress, int i) {
            this.address = inetSocketAddress;
            this.streamId = i;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof SessionKey)) {
                return false;
            }
            SessionKey sessionKey = (SessionKey) obj;
            return Objects.equals(this.address, sessionKey.address) && this.streamId == sessionKey.streamId;
        }

        public int hashCode() {
            return Objects.hash(this.address, Integer.valueOf(this.streamId));
        }

        public String toString() {
            return String.format("%s/%d", this.address, Integer.valueOf(this.streamId));
        }
    }

    public static ResultBuilder createSession(Selection.Selectors selectors, GroupMaker groupMaker, ResultSet.ResultMetadata resultMetadata, ContinuousPagingState continuousPagingState, QueryState queryState, QueryOptions queryOptions) throws RequestValidationException, RequestExecutionException {
        if (!$assertionsDisabled && !queryOptions.continuousPagesRequested()) {
            throw new AssertionError();
        }
        SessionKey sessionKey = new SessionKey(queryState.getClientState().getRemoteAddress(), queryState.getStreamId());
        if (!canCreateSession(continuousPagingState)) {
            metrics.creationFailures.mark();
            metrics.tooManySessions.mark();
            long liveSessions = liveSessions();
            if (logger.isDebugEnabled()) {
                logger.debug("Too many continuous paging sessions are already running: {}", Long.valueOf(liveSessions));
            }
            throw RequestValidations.invalidRequest("Invalid request, too many continuous paging sessions are already running: %d", Long.valueOf(liveSessions));
        }
        if (logger.isTraceEnabled()) {
            logger.trace("Starting continuous paging session {} with paging options {}, local {}, total number of sessions running: {}", new Object[]{sessionKey, queryOptions.getPagingOptions(), Boolean.valueOf(continuousPagingState.executor.isLocalQuery()), Long.valueOf(liveSessions())});
        }
        ContinuousPagingSession continuousPagingSession = new ContinuousPagingSession(selectors, groupMaker, resultMetadata, queryState, queryOptions, continuousPagingState, sessionKey);
        if (sessions.putIfAbsent(sessionKey, continuousPagingSession) == null) {
            return new ContinuousPagingSession.Builder(continuousPagingSession, selectors, groupMaker);
        }
        continuousPagingSession.release();
        numSessions.decrementAndGet(continuousPagingSession.coreId());
        metrics.creationFailures.mark();
        logger.error("Continuous paging session {} already exists", sessionKey);
        throw RequestValidations.invalidRequest("Invalid request, already executing continuous paging session %s", sessionKey);
    }

    private static boolean canCreateSession(ContinuousPagingState continuousPagingState) {
        Pair<Integer, Integer> preferredCore;
        int bestTPCCore = TPC.bestTPCCore();
        do {
            preferredCore = getPreferredCore(bestTPCCore, continuousPagingState);
            if (preferredCore.left.intValue() == -1) {
                logger.debug("Failed to find a core with fewer sessions than maximum allowed, current distrib: {}. max distrib: {}", numSessions, continuousPagingState.config.getMaxSessionsCoreDistribution(numSessions.length()));
                return false;
            }
        } while (!numSessions.compareAndSet(preferredCore.left.intValue(), preferredCore.right.intValue(), preferredCore.right.intValue() + 1));
        if (logger.isTraceEnabled()) {
            logger.trace("Assigning core {} for next session, current distrib: {}, max distrib: {}", new Object[]{preferredCore.left, numSessions, continuousPagingState.config.getMaxSessionsCoreDistribution(numSessions.length())});
        }
        continuousPagingState.executor.setCoreId(preferredCore.left.intValue());
        return true;
    }

    private static Pair<Integer, Integer> getPreferredCore(int i, ContinuousPagingState continuousPagingState) {
        int i2 = numSessions.get(i);
        Pair<Integer, Integer> create = i2 < continuousPagingState.config.getMaxSessionsPerCore(numSessions.length(), i) ? Pair.create(Integer.valueOf(i), Integer.valueOf(i2)) : Pair.create(-1, 0);
        for (int i3 = 0; i3 < numSessions.length() && (create.right.intValue() != 0 || create.left.intValue() == -1); i3++) {
            int i4 = numSessions.get(i3);
            if (i4 < continuousPagingState.config.getMaxSessionsPerCore(numSessions.length(), i3) && (i4 < create.right.intValue() || create.left.intValue() == -1)) {
                create = Pair.create(Integer.valueOf(i3), Integer.valueOf(i4));
            }
        }
        return create;
    }

    private static ContinuousPagingSession getSession(SessionKey sessionKey) {
        return sessions.get(sessionKey);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static ContinuousPagingSession removeSession(SessionKey sessionKey) {
        ContinuousPagingSession remove = sessions.remove(sessionKey);
        if (remove != null) {
            numSessions.decrementAndGet(remove.coreId());
            if (logger.isTraceEnabled()) {
                logger.trace("Removed continuous paging session {}, {} sessions still running", sessionKey, Long.valueOf(liveSessions()));
            }
        }
        return remove;
    }

    public static long liveSessions() {
        long j = 0;
        for (int i = 0; i < numSessions.length(); i++) {
            j += numSessions.get(i);
        }
        return j;
    }

    public static long pendingPages() {
        return ((Integer) sessions.values().stream().map((v0) -> {
            return v0.pendingPages();
        }).reduce((v0, v1) -> {
            return Integer.sum(v0, v1);
        }).orElseGet(() -> {
            return 0;
        })).intValue();
    }

    public static Single<Boolean> cancel(Single<QueryState> single, int i) {
        return single.flatMap(queryState -> {
            SessionKey sessionKey = new SessionKey(queryState.getClientState().getRemoteAddress(), i);
            ContinuousPagingSession removeSession = removeSession(sessionKey);
            if (removeSession == null) {
                if (logger.isTraceEnabled()) {
                    logger.trace("Cannot cancel continuous paging session {}: not found", sessionKey);
                }
                return Single.just(false);
            }
            if (logger.isTraceEnabled()) {
                logger.trace("Cancelling continuous paging session {}", sessionKey);
            }
            return TPCUtils.toSingle(removeSession.cancel().thenCompose(r2 -> {
                return TPCUtils.completedFuture(true);
            })).timeout(removeSession.config().cancel_timeout_sec, TimeUnit.SECONDS);
        });
    }

    public static Single<Boolean> updateBackpressure(Single<QueryState> single, int i, int i2) {
        return single.map(queryState -> {
            SessionKey sessionKey = new SessionKey(queryState.getClientState().getRemoteAddress(), i);
            if (i2 <= 0) {
                throw RequestValidations.invalidRequest(String.format("Cannot update next_pages for continuous paging session %s, expected positive value but got %d", sessionKey, Integer.valueOf(i2)));
            }
            ContinuousPagingSession session = getSession(sessionKey);
            if (session == null || session.state.get() != ContinuousPagingSession.State.RUNNING) {
                if (logger.isTraceEnabled()) {
                    logger.trace("Cannot update next_pages for continuous paging session {}: not found or no longer running", sessionKey);
                }
                return false;
            }
            if (session.numPagesRequested != Integer.MAX_VALUE) {
                return Boolean.valueOf(session.updateBackpressure(i2));
            }
            logger.warn("Cannot update next_pages for continuous paging session {}, numPagesRequested already set to maximum", sessionKey);
            return false;
        });
    }

    static {
        $assertionsDisabled = !ContinuousPagingService.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(ContinuousPagingService.class);
        metrics = new ContinuousPagingMetrics();
        sessions = new ConcurrentHashMap<>();
        numSessions = new AtomicIntegerArray(TPC.getNumCores());
    }
}
