package org.apache.cassandra.transport;

import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.AttributeKey;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import org.apache.cassandra.concurrent.LocalAwareExecutorService;
import org.apache.cassandra.concurrent.SharedExecutorPool;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.FrameEncoder;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.ExceptionHandlers;
import org.apache.cassandra.transport.Flusher;
import org.apache.cassandra.transport.Message;
import org.apache.cassandra.transport.messages.ErrorMessage;
import org.apache.cassandra.transport.messages.EventMessage;
import org.apache.cassandra.utils.JVMStabilityInspector;

/* loaded from: input_file:org/apache/cassandra/transport/Dispatcher.class */
public class Dispatcher {
    private static final LocalAwareExecutorService requestExecutor;
    private static final ConcurrentMap<EventLoop, Flusher> flusherLookup;
    private final boolean useLegacyFlusher;
    static final AttributeKey<Consumer<EventMessage>> EVENT_DISPATCHER;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/cassandra/transport/Dispatcher$FlushItemConverter.class */
    public interface FlushItemConverter {
        Flusher.FlushItem<?> toFlushItem(Channel channel, Message.Request request, Message.Response response);
    }

    public Dispatcher(boolean z) {
        this.useLegacyFlusher = z;
    }

    public void dispatch(Channel channel, Message.Request request, FlushItemConverter flushItemConverter) {
        requestExecutor.submit(() -> {
            processRequest(channel, request, flushItemConverter);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Message.Response processRequest(ServerConnection serverConnection, Message.Request request) {
        long nanoTime = System.nanoTime();
        if (serverConnection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4)) {
            ClientWarn.instance.captureWarnings();
        }
        QueryState validateNewMessage = serverConnection.validateNewMessage(request.type, serverConnection.getVersion());
        Message.logger.trace("Received: {}, v={}", request, serverConnection.getVersion());
        serverConnection.requests.inc();
        Message.Response execute = request.execute(validateNewMessage, nanoTime);
        execute.setStreamId(request.getStreamId());
        execute.setWarnings(ClientWarn.instance.getWarnings());
        execute.attach(serverConnection);
        serverConnection.applyStateTransition(request.type, execute.type);
        return execute;
    }

    void processRequest(Channel channel, Message.Request request, FlushItemConverter flushItemConverter) {
        Flusher.FlushItem<?> flushItem;
        try {
            try {
            } catch (Throwable th) {
                JVMStabilityInspector.inspectThrowable(th);
                ErrorMessage fromException = ErrorMessage.fromException(th, new ExceptionHandlers.UnexpectedChannelExceptionHandler(channel, true));
                fromException.setStreamId(request.getStreamId());
                flushItem = flushItemConverter.toFlushItem(channel, request, fromException);
                ClientWarn.instance.resetWarnings();
            }
            if (!$assertionsDisabled && !(request.connection() instanceof ServerConnection)) {
                throw new AssertionError();
            }
            ServerConnection serverConnection = (ServerConnection) request.connection();
            Message.Response processRequest = processRequest(serverConnection, request);
            flushItem = flushItemConverter.toFlushItem(channel, request, processRequest);
            Message.logger.trace("Responding: {}, v={}", processRequest, serverConnection.getVersion());
            ClientWarn.instance.resetWarnings();
            flush(flushItem);
        } catch (Throwable th2) {
            ClientWarn.instance.resetWarnings();
            throw th2;
        }
    }

    private void flush(Flusher.FlushItem<?> flushItem) {
        EventLoop eventLoop = flushItem.channel.eventLoop();
        Flusher flusher = flusherLookup.get(eventLoop);
        if (flusher == null) {
            Flusher legacy = this.useLegacyFlusher ? Flusher.legacy(eventLoop) : Flusher.immediate(eventLoop);
            flusher = legacy;
            Flusher putIfAbsent = flusherLookup.putIfAbsent(eventLoop, legacy);
            if (putIfAbsent != null) {
                flusher = putIfAbsent;
            }
        }
        flusher.enqueue(flushItem);
        flusher.start();
    }

    public static void shutdown() {
        if (requestExecutor != null) {
            requestExecutor.shutdown();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Consumer<EventMessage> eventDispatcher(Channel channel, ProtocolVersion protocolVersion, FrameEncoder.PayloadAllocator payloadAllocator) {
        return eventMessage -> {
            flush(new Flusher.FlushItem.Framed(channel, eventMessage.encode(protocolVersion), null, payloadAllocator, flushItem -> {
                ((Envelope) flushItem.response).release();
            }));
        };
    }

    static {
        $assertionsDisabled = !Dispatcher.class.desiredAssertionStatus();
        requestExecutor = SharedExecutorPool.SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(), DatabaseDescriptor::setNativeTransportMaxThreads, "transport", "Native-Transport-Requests");
        flusherLookup = new ConcurrentHashMap();
        EVENT_DISPATCHER = AttributeKey.valueOf("EVTDISP");
    }
}
