package org.apache.cassandra.net;

import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
import java.io.IOException;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.cassandra.net.Crc;
import org.apache.cassandra.net.FrameDecoder;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.ResourceLimits;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.NoSpamLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cassandra-all-4.0-beta4.jar:org/apache/cassandra/net/AbstractMessageHandler.class */
public abstract class AbstractMessageHandler extends ChannelInboundHandlerAdapter implements FrameDecoder.FrameProcessor {
    private static final Logger logger;
    private static final NoSpamLogger noSpamLogger;
    protected final FrameDecoder decoder;
    protected final Channel channel;
    protected final int largeThreshold;
    protected LargeMessage<?> largeMessage;
    protected final long queueCapacity;
    private static final AtomicLongFieldUpdater<AbstractMessageHandler> queueSizeUpdater;
    protected final ResourceLimits.Limit endpointReserveCapacity;
    protected final WaitQueue endpointWaitQueue;
    protected final ResourceLimits.Limit globalReserveCapacity;
    protected final WaitQueue globalWaitQueue;
    protected final OnHandlerClosed onClosed;
    protected long corruptFramesRecovered;
    protected long corruptFramesUnrecovered;
    protected long receivedCount;
    protected long receivedBytes;
    protected long throttledCount;
    protected long throttledNanos;
    private boolean isClosed;
    static final /* synthetic */ boolean $assertionsDisabled;
    volatile long queueSize = 0;
    private WaitQueue.Ticket ticket = null;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:cassandra-all-4.0-beta4.jar:org/apache/cassandra/net/AbstractMessageHandler$LargeMessage.class */
    public abstract class LargeMessage<H> {
        protected final int size;
        protected final H header;
        protected final List<ShareableBytes> buffers;
        protected int received;
        protected final long expiresAtNanos;
        protected boolean isExpired;
        protected boolean isCorrupt;

        /* JADX INFO: Access modifiers changed from: protected */
        public LargeMessage(int i, H h, long j, boolean z) {
            this.buffers = new ArrayList();
            this.size = i;
            this.header = h;
            this.expiresAtNanos = j;
            this.isExpired = z;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public LargeMessage(AbstractMessageHandler abstractMessageHandler, int i, H h, long j, ShareableBytes shareableBytes) {
            this(i, (Object) h, j, false);
            this.buffers.add(shareableBytes);
        }

        public boolean supply(FrameDecoder.Frame frame) {
            if (frame instanceof FrameDecoder.IntactFrame) {
                onIntactFrame((FrameDecoder.IntactFrame) frame);
            } else {
                onCorruptFrame();
            }
            this.received += frame.frameSize;
            if (this.size == this.received) {
                onComplete();
            }
            return this.size == this.received;
        }

        private void onIntactFrame(FrameDecoder.IntactFrame intactFrame) {
            boolean isAfter = MonotonicClock.approxTime.isAfter(this.expiresAtNanos);
            if (!this.isExpired && !this.isCorrupt) {
                if (!isAfter) {
                    this.buffers.add(intactFrame.contents.sliceAndConsume(intactFrame.frameSize).share());
                    return;
                }
                releaseBuffersAndCapacity();
            }
            intactFrame.consume();
            this.isExpired |= isAfter;
        }

        private void onCorruptFrame() {
            if (!this.isExpired && !this.isCorrupt) {
                releaseBuffersAndCapacity();
            }
            this.isCorrupt = true;
            this.isExpired |= MonotonicClock.approxTime.isAfter(this.expiresAtNanos);
        }

        protected abstract void onComplete();

        protected abstract void abort();

        /* JADX INFO: Access modifiers changed from: protected */
        public void releaseBuffers() {
            this.buffers.forEach((v0) -> {
                v0.release();
            });
            this.buffers.clear();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void releaseBuffersAndCapacity() {
            releaseBuffers();
            AbstractMessageHandler.this.releaseCapacity(this.size);
        }
    }

    /* loaded from: input_file:cassandra-all-4.0-beta4.jar:org/apache/cassandra/net/AbstractMessageHandler$OnHandlerClosed.class */
    public interface OnHandlerClosed {
        void call(AbstractMessageHandler abstractMessageHandler);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cassandra-all-4.0-beta4.jar:org/apache/cassandra/net/AbstractMessageHandler$UpToOneMessageFrameProcessor.class */
    public class UpToOneMessageFrameProcessor implements FrameDecoder.FrameProcessor {
        private final ResourceLimits.Limit endpointReserve;
        private final ResourceLimits.Limit globalReserve;
        boolean isActive;
        boolean firstFrame;

        private UpToOneMessageFrameProcessor(ResourceLimits.Limit limit, ResourceLimits.Limit limit2) {
            this.isActive = true;
            this.firstFrame = true;
            this.endpointReserve = limit;
            this.globalReserve = limit2;
        }

        @Override // org.apache.cassandra.net.FrameDecoder.FrameProcessor
        public boolean process(FrameDecoder.Frame frame) throws IOException {
            if (!this.firstFrame) {
                return processSubsequentFrame(frame);
            }
            if (!(frame instanceof FrameDecoder.IntactFrame)) {
                throw new IllegalStateException("First backlog frame must be intact");
            }
            this.firstFrame = false;
            return processFirstFrame((FrameDecoder.IntactFrame) frame);
        }

        private boolean processFirstFrame(FrameDecoder.IntactFrame intactFrame) throws IOException {
            if (intactFrame.isSelfContained) {
                this.isActive = AbstractMessageHandler.this.processOneContainedMessage(intactFrame.contents, this.endpointReserve, this.globalReserve);
                return false;
            }
            this.isActive = AbstractMessageHandler.this.processFirstFrameOfLargeMessage(intactFrame, this.endpointReserve, this.globalReserve);
            return this.isActive;
        }

        private boolean processSubsequentFrame(FrameDecoder.Frame frame) throws IOException {
            if (frame instanceof FrameDecoder.IntactFrame) {
                AbstractMessageHandler.this.processSubsequentFrameOfLargeMessage(frame);
            } else {
                AbstractMessageHandler.this.processCorruptFrame((FrameDecoder.CorruptFrame) frame);
            }
            return AbstractMessageHandler.this.largeMessage != null;
        }
    }

    /* loaded from: input_file:cassandra-all-4.0-beta4.jar:org/apache/cassandra/net/AbstractMessageHandler$WaitQueue.class */
    public static final class WaitQueue {
        private static final int NOT_RUNNING = 0;
        private static final int RUNNING = 1;
        private static final int RUN_AGAIN = 2;
        private volatile int scheduled;
        private static final AtomicIntegerFieldUpdater<WaitQueue> scheduledUpdater = AtomicIntegerFieldUpdater.newUpdater(WaitQueue.class, "scheduled");
        private final Kind kind;
        private final ResourceLimits.Limit reserveCapacity;
        private final ManyToOneConcurrentLinkedQueue<Ticket> queue = new ManyToOneConcurrentLinkedQueue<>();

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:cassandra-all-4.0-beta4.jar:org/apache/cassandra/net/AbstractMessageHandler$WaitQueue$Kind.class */
        public enum Kind {
            ENDPOINT,
            GLOBAL
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:cassandra-all-4.0-beta4.jar:org/apache/cassandra/net/AbstractMessageHandler$WaitQueue$ReactivateHandlers.class */
        public class ReactivateHandlers implements Runnable {
            List<Ticket> tickets;
            long capacity;

            private ReactivateHandlers() {
                this.tickets = new ArrayList();
                this.capacity = 0L;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public void add(Ticket ticket, boolean z) {
                this.tickets.add(ticket);
                if (z) {
                    this.capacity += ticket.bytesRequested;
                }
            }

            @Override // java.lang.Runnable
            public void run() {
                ResourceLimits.Basic basic = new ResourceLimits.Basic(this.capacity);
                try {
                    Iterator<Ticket> it2 = this.tickets.iterator();
                    while (it2.hasNext()) {
                        it2.next().reactivateHandler(basic);
                    }
                } finally {
                    long remaining = basic.remaining();
                    if (remaining > 0) {
                        WaitQueue.this.reserveCapacity.release(remaining);
                        WaitQueue.this.signal();
                    }
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:cassandra-all-4.0-beta4.jar:org/apache/cassandra/net/AbstractMessageHandler$WaitQueue$Ticket.class */
        public static final class Ticket {
            private static final int WAITING = 0;
            private static final int CALLED = 1;
            private static final int INVALIDATED = 2;
            private volatile int state;
            private static final AtomicIntegerFieldUpdater<Ticket> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(Ticket.class, "state");
            private final WaitQueue waitQueue;
            private final AbstractMessageHandler handler;
            private final int bytesRequested;
            private final long reigsteredAtNanos;
            private final long expiresAtNanos;

            private Ticket(WaitQueue waitQueue, AbstractMessageHandler abstractMessageHandler, int i, long j, long j2) {
                this.waitQueue = waitQueue;
                this.handler = abstractMessageHandler;
                this.bytesRequested = i;
                this.reigsteredAtNanos = j;
                this.expiresAtNanos = j2;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public void reactivateHandler(ResourceLimits.Limit limit) {
                long now = MonotonicClock.approxTime.now() - this.reigsteredAtNanos;
                try {
                    if (this.waitQueue.kind == Kind.ENDPOINT) {
                        this.handler.onEndpointReserveCapacityRegained(limit, now);
                    } else {
                        this.handler.onGlobalReserveCapacityRegained(limit, now);
                    }
                } catch (Throwable th) {
                    AbstractMessageHandler.logger.error("{} exception caught while reactivating a handler", this.handler.id(), th);
                }
            }

            /* JADX INFO: Access modifiers changed from: private */
            public boolean isWaiting() {
                return this.state == 0;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public boolean isLive(long j) {
                return !MonotonicClock.approxTime.isAfter(j, this.expiresAtNanos);
            }

            /* JADX INFO: Access modifiers changed from: private */
            public void invalidate() {
                this.state = 2;
                this.waitQueue.signal();
            }

            /* JADX INFO: Access modifiers changed from: private */
            public boolean call() {
                return stateUpdater.compareAndSet(this, 0, 1);
            }

            /* JADX INFO: Access modifiers changed from: private */
            public boolean reset() {
                return stateUpdater.compareAndSet(this, 1, 0);
            }
        }

        private WaitQueue(Kind kind, ResourceLimits.Limit limit) {
            this.kind = kind;
            this.reserveCapacity = limit;
        }

        public static WaitQueue endpoint(ResourceLimits.Limit limit) {
            return new WaitQueue(Kind.ENDPOINT, limit);
        }

        public static WaitQueue global(ResourceLimits.Limit limit) {
            return new WaitQueue(Kind.GLOBAL, limit);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Ticket register(AbstractMessageHandler abstractMessageHandler, int i, long j, long j2) {
            Ticket ticket = new Ticket(abstractMessageHandler, i, j, j2);
            Ticket relaxedPeekLastAndOffer = this.queue.relaxedPeekLastAndOffer(ticket);
            if (null == relaxedPeekLastAndOffer || !relaxedPeekLastAndOffer.isWaiting()) {
                signal();
            }
            return ticket;
        }

        @VisibleForTesting
        public void signal() {
            if (this.queue.relaxedIsEmpty() || 0 != scheduledUpdater.getAndUpdate(this, i -> {
                return Math.min(2, i + 1);
            })) {
                return;
            }
            do {
                schedule();
            } while (2 == scheduledUpdater.getAndDecrement(this));
        }

        private void schedule() {
            IdentityHashMap identityHashMap = null;
            long now = MonotonicClock.approxTime.now();
            while (true) {
                Ticket peek = this.queue.peek();
                if (peek != null) {
                    if (peek.call()) {
                        boolean isLive = peek.isLive(now);
                        if (isLive && !this.reserveCapacity.tryAllocate(peek.bytesRequested)) {
                            if (peek.reset()) {
                                break;
                            } else {
                                this.queue.remove();
                            }
                        } else {
                            if (null == identityHashMap) {
                                identityHashMap = new IdentityHashMap();
                            }
                            this.queue.remove();
                            ((ReactivateHandlers) identityHashMap.computeIfAbsent(peek.handler.eventLoop(), eventLoop -> {
                                return new ReactivateHandlers();
                            })).add(peek, isLive);
                        }
                    } else {
                        this.queue.remove();
                    }
                } else {
                    break;
                }
            }
            if (null != identityHashMap) {
                identityHashMap.forEach((v0, v1) -> {
                    v0.execute(v1);
                });
            }
        }
    }

    public AbstractMessageHandler(FrameDecoder frameDecoder, Channel channel, int i, long j, ResourceLimits.Limit limit, ResourceLimits.Limit limit2, WaitQueue waitQueue, WaitQueue waitQueue2, OnHandlerClosed onHandlerClosed) {
        this.decoder = frameDecoder;
        this.channel = channel;
        this.largeThreshold = i;
        this.queueCapacity = j;
        this.endpointReserveCapacity = limit;
        this.endpointWaitQueue = waitQueue;
        this.globalReserveCapacity = limit2;
        this.globalWaitQueue = waitQueue2;
        this.onClosed = onHandlerClosed;
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        throw new IllegalStateException("InboundMessageHandler doesn't expect channelRead() to be invoked");
    }

    @Override // io.netty.channel.ChannelHandlerAdapter, io.netty.channel.ChannelHandler
    public void handlerAdded(ChannelHandlerContext channelHandlerContext) {
        this.decoder.activate(this);
    }

    @Override // org.apache.cassandra.net.FrameDecoder.FrameProcessor
    public boolean process(FrameDecoder.Frame frame) throws IOException {
        if (frame instanceof FrameDecoder.IntactFrame) {
            return processIntactFrame((FrameDecoder.IntactFrame) frame, this.endpointReserveCapacity, this.globalReserveCapacity);
        }
        processCorruptFrame((FrameDecoder.CorruptFrame) frame);
        return true;
    }

    private boolean processIntactFrame(FrameDecoder.IntactFrame intactFrame, ResourceLimits.Limit limit, ResourceLimits.Limit limit2) throws IOException {
        return intactFrame.isSelfContained ? processFrameOfContainedMessages(intactFrame.contents, limit, limit2) : null == this.largeMessage ? processFirstFrameOfLargeMessage(intactFrame, limit, limit2) : processSubsequentFrameOfLargeMessage(intactFrame);
    }

    private boolean processFrameOfContainedMessages(ShareableBytes shareableBytes, ResourceLimits.Limit limit, ResourceLimits.Limit limit2) throws IOException {
        while (shareableBytes.hasRemaining()) {
            if (!processOneContainedMessage(shareableBytes, limit, limit2)) {
                return false;
            }
        }
        return true;
    }

    protected abstract boolean processOneContainedMessage(ShareableBytes shareableBytes, ResourceLimits.Limit limit, ResourceLimits.Limit limit2) throws IOException;

    protected abstract boolean processFirstFrameOfLargeMessage(FrameDecoder.IntactFrame intactFrame, ResourceLimits.Limit limit, ResourceLimits.Limit limit2) throws IOException;

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean processSubsequentFrameOfLargeMessage(FrameDecoder.Frame frame) {
        this.receivedBytes += frame.frameSize;
        if (!this.largeMessage.supply(frame)) {
            return true;
        }
        this.receivedCount++;
        this.largeMessage = null;
        return true;
    }

    protected abstract void processCorruptFrame(FrameDecoder.CorruptFrame corruptFrame) throws Crc.InvalidCrc;

    /* JADX INFO: Access modifiers changed from: private */
    public void onEndpointReserveCapacityRegained(ResourceLimits.Limit limit, long j) {
        onReserveCapacityRegained(limit, this.globalReserveCapacity, j);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onGlobalReserveCapacityRegained(ResourceLimits.Limit limit, long j) {
        onReserveCapacityRegained(this.endpointReserveCapacity, limit, j);
    }

    private void onReserveCapacityRegained(ResourceLimits.Limit limit, ResourceLimits.Limit limit2, long j) {
        if (this.isClosed) {
            return;
        }
        if (!$assertionsDisabled && !this.channel.eventLoop().inEventLoop()) {
            throw new AssertionError();
        }
        this.ticket = null;
        this.throttledNanos += j;
        try {
            if (processUpToOneMessage(limit, limit2)) {
                this.decoder.reactivate();
            }
        } catch (Throwable th) {
            fatalExceptionCaught(th);
        }
    }

    protected abstract void fatalExceptionCaught(Throwable th);

    private boolean processUpToOneMessage(ResourceLimits.Limit limit, ResourceLimits.Limit limit2) throws IOException {
        UpToOneMessageFrameProcessor upToOneMessageFrameProcessor = new UpToOneMessageFrameProcessor(limit, limit2);
        this.decoder.processBacklog(upToOneMessageFrameProcessor);
        return upToOneMessageFrameProcessor.isActive;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean acquireCapacity(ResourceLimits.Limit limit, ResourceLimits.Limit limit2, int i, long j, long j2) {
        ResourceLimits.Outcome acquireCapacity = acquireCapacity(limit, limit2, i);
        if (acquireCapacity == ResourceLimits.Outcome.INSUFFICIENT_ENDPOINT) {
            this.ticket = this.endpointWaitQueue.register(this, i, j, j2);
        } else if (acquireCapacity == ResourceLimits.Outcome.INSUFFICIENT_GLOBAL) {
            this.ticket = this.globalWaitQueue.register(this, i, j, j2);
        }
        if (acquireCapacity != ResourceLimits.Outcome.SUCCESS) {
            this.throttledCount++;
        }
        return acquireCapacity == ResourceLimits.Outcome.SUCCESS;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ResourceLimits.Outcome acquireCapacity(ResourceLimits.Limit limit, ResourceLimits.Limit limit2, int i) {
        long j = this.queueSize;
        if (j + i <= this.queueCapacity) {
            queueSizeUpdater.addAndGet(this, i);
            return ResourceLimits.Outcome.SUCCESS;
        }
        long min = Math.min((j + i) - this.queueCapacity, i);
        if (!limit2.tryAllocate(min)) {
            return ResourceLimits.Outcome.INSUFFICIENT_GLOBAL;
        }
        if (!limit.tryAllocate(min)) {
            limit2.release(min);
            this.globalWaitQueue.signal();
            return ResourceLimits.Outcome.INSUFFICIENT_ENDPOINT;
        }
        long max = Math.max(0L, Math.min(queueSizeUpdater.addAndGet(this, i) - this.queueCapacity, i));
        if (max != min) {
            long j2 = min - max;
            limit.release(j2);
            limit2.release(j2);
            this.endpointWaitQueue.signal();
            this.globalWaitQueue.signal();
        }
        return ResourceLimits.Outcome.SUCCESS;
    }

    public void releaseCapacity(int i) {
        long andAdd = queueSizeUpdater.getAndAdd(this, -i);
        if (andAdd > this.queueCapacity) {
            long min = Math.min(andAdd - this.queueCapacity, i);
            this.endpointReserveCapacity.release(min);
            this.globalReserveCapacity.release(min);
            this.endpointWaitQueue.signal();
            this.globalWaitQueue.signal();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    public void releaseProcessedCapacity(int i, Message.Header header) {
        releaseCapacity(i);
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        this.isClosed = true;
        if (null != this.largeMessage) {
            this.largeMessage.abort();
        }
        if (null != this.ticket) {
            this.ticket.invalidate();
        }
        this.onClosed.call(this);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public EventLoop eventLoop() {
        return this.channel.eventLoop();
    }

    protected abstract String id();

    static {
        $assertionsDisabled = !AbstractMessageHandler.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger((Class<?>) AbstractMessageHandler.class);
        noSpamLogger = NoSpamLogger.getLogger(logger, 1L, TimeUnit.SECONDS);
        queueSizeUpdater = AtomicLongFieldUpdater.newUpdater(AbstractMessageHandler.class, "queueSize");
    }
}
