package org.apache.cassandra.transport;

import com.google.common.base.Strings;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.Version;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.net.AbstractMessageHandler;
import org.apache.cassandra.net.BufferPoolAllocator;
import org.apache.cassandra.net.FrameDecoder;
import org.apache.cassandra.net.FrameDecoderCrc;
import org.apache.cassandra.net.FrameDecoderLZ4;
import org.apache.cassandra.net.FrameEncoder;
import org.apache.cassandra.net.FrameEncoderCrc;
import org.apache.cassandra.net.FrameEncoderLZ4;
import org.apache.cassandra.net.GlobalBufferPoolAllocator;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.transport.CQLMessageHandler;
import org.apache.cassandra.transport.ClientResourceLimits;
import org.apache.cassandra.transport.Connection;
import org.apache.cassandra.transport.Envelope;
import org.apache.cassandra.transport.Message;
import org.apache.cassandra.transport.PreV5Handlers;
import org.apache.cassandra.transport.messages.StartupMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cassandra-all-4.0-beta4.jar:org/apache/cassandra/transport/PipelineConfigurator.class */
public class PipelineConfigurator {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) PipelineConfigurator.class);
    private static final boolean DEBUG = Boolean.getBoolean("cassandra.unsafe_verbose_debug_client_protocol");
    private static final ConnectionLimitHandler connectionLimitHandler = new ConnectionLimitHandler();
    private static final String CONNECTION_LIMIT_HANDLER = "connectionLimitHandler";
    private static final String IDLE_STATE_HANDLER = "idleStateHandler";
    private static final String INITIAL_HANDLER = "initialHandler";
    private static final String EXCEPTION_HANDLER = "exceptionHandler";
    private static final String DEBUG_HANDLER = "debugHandler";
    private static final String SSL_HANDLER = "ssl";
    private static final String ENVELOPE_DECODER = "envelopeDecoder";
    private static final String ENVELOPE_ENCODER = "envelopeEncoder";
    private static final String MESSAGE_DECOMPRESSOR = "decompressor";
    private static final String MESSAGE_COMPRESSOR = "compressor";
    private static final String MESSAGE_DECODER = "messageDecoder";
    private static final String MESSAGE_ENCODER = "messageEncoder";
    private static final String LEGACY_MESSAGE_PROCESSOR = "legacyCqlProcessor";
    private static final String FRAME_DECODER = "frameDecoder";
    private static final String FRAME_ENCODER = "frameEncoder";
    private static final String MESSAGE_PROCESSOR = "cqlProcessor";
    private final boolean epoll;
    private final boolean keepAlive;
    private final EncryptionOptions.TlsEncryptionPolicy tlsEncryptionPolicy;
    private final Dispatcher dispatcher;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cassandra-all-4.0-beta4.jar:org/apache/cassandra/transport/PipelineConfigurator$EncryptionConfig.class */
    public interface EncryptionConfig {
        void applyTo(Channel channel) throws Exception;
    }

    public PipelineConfigurator(boolean z, boolean z2, boolean z3, EncryptionOptions.TlsEncryptionPolicy tlsEncryptionPolicy) {
        this.epoll = z;
        this.keepAlive = z2;
        this.tlsEncryptionPolicy = tlsEncryptionPolicy;
        this.dispatcher = dispatcher(z3);
    }

    public ChannelFuture initializeChannel(EventLoopGroup eventLoopGroup, InetSocketAddress inetSocketAddress, Connection.Factory factory) {
        ServerBootstrap childOption = new ServerBootstrap().channel(this.epoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_LINGER, 0).childOption(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(this.keepAlive)).childOption(ChannelOption.ALLOCATOR, CBUtil.allocator).childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8192, 32768));
        if (eventLoopGroup != null) {
            childOption = childOption.group(eventLoopGroup);
        }
        childOption.childHandler(initializer(factory));
        logger.info("Using Netty Version: {}", Version.identify().entrySet());
        logger.info("Starting listening for CQL clients on {} ({})...", inetSocketAddress, this.tlsEncryptionPolicy.description());
        return childOption.bind(inetSocketAddress);
    }

    protected ChannelInitializer<Channel> initializer(final Connection.Factory factory) {
        final EncryptionConfig encryptionConfig = encryptionConfig();
        return new ChannelInitializer<Channel>() { // from class: org.apache.cassandra.transport.PipelineConfigurator.1
            @Override // io.netty.channel.ChannelInitializer
            protected void initChannel(Channel channel) throws Exception {
                PipelineConfigurator.this.configureInitialPipeline(channel, factory);
                encryptionConfig.applyTo(channel);
            }
        };
    }

    protected EncryptionConfig encryptionConfig() {
        EncryptionOptions nativeProtocolEncryptionOptions = DatabaseDescriptor.getNativeProtocolEncryptionOptions();
        switch (this.tlsEncryptionPolicy) {
            case UNENCRYPTED:
                return channel -> {
                };
            case OPTIONAL:
                logger.debug("Enabling optionally encrypted CQL connections between client and server");
                return channel2 -> {
                    final SslContext orCreateSslContext = SSLFactory.getOrCreateSslContext(nativeProtocolEncryptionOptions, nativeProtocolEncryptionOptions.require_client_auth, SSLFactory.SocketType.SERVER);
                    channel2.pipeline().addFirst("ssl", new ByteToMessageDecoder() { // from class: org.apache.cassandra.transport.PipelineConfigurator.2
                        @Override // io.netty.handler.codec.ByteToMessageDecoder
                        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
                            if (byteBuf.readableBytes() < 5) {
                                return;
                            }
                            if (!SslHandler.isEncrypted(byteBuf)) {
                                channelHandlerContext.pipeline().remove("ssl");
                            } else {
                                channelHandlerContext.pipeline().replace("ssl", "ssl", orCreateSslContext.newHandler(channel2.alloc()));
                            }
                        }
                    });
                };
            case ENCRYPTED:
                logger.debug("Enabling encrypted CQL connections between client and server");
                return channel3 -> {
                    channel3.pipeline().addFirst("ssl", SSLFactory.getOrCreateSslContext(nativeProtocolEncryptionOptions, nativeProtocolEncryptionOptions.require_client_auth, SSLFactory.SocketType.SERVER).newHandler(channel3.alloc()));
                };
            default:
                throw new IllegalStateException("Unrecognized TLS encryption policy: " + this.tlsEncryptionPolicy);
        }
    }

    public void configureInitialPipeline(final Channel channel, Connection.Factory factory) {
        ChannelPipeline pipeline = channel.pipeline();
        if (DatabaseDescriptor.getNativeTransportMaxConcurrentConnections() > 0 || DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp() > 0) {
            pipeline.addFirst(CONNECTION_LIMIT_HANDLER, connectionLimitHandler);
        }
        final long nativeTransportIdleTimeout = DatabaseDescriptor.nativeTransportIdleTimeout();
        if (nativeTransportIdleTimeout > 0) {
            pipeline.addLast(IDLE_STATE_HANDLER, new IdleStateHandler(false, 0L, 0L, nativeTransportIdleTimeout, TimeUnit.MILLISECONDS) { // from class: org.apache.cassandra.transport.PipelineConfigurator.3
                @Override // io.netty.handler.timeout.IdleStateHandler
                protected void channelIdle(ChannelHandlerContext channelHandlerContext, IdleStateEvent idleStateEvent) {
                    PipelineConfigurator.logger.info("Closing client connection {} after timeout of {}ms", channel.remoteAddress(), Long.valueOf(nativeTransportIdleTimeout));
                    channelHandlerContext.close();
                }
            });
        }
        if (DEBUG) {
            pipeline.addLast(DEBUG_HANDLER, new LoggingHandler(LogLevel.INFO));
        }
        pipeline.addLast(ENVELOPE_ENCODER, Envelope.Encoder.instance);
        pipeline.addLast(INITIAL_HANDLER, new InitialConnectionHandler(new Envelope.Decoder(), factory, this));
        pipeline.addLast(EXCEPTION_HANDLER, PreV5Handlers.ExceptionHandler.instance);
        onInitialPipelineReady(pipeline);
    }

    public void configureModernPipeline(ChannelHandlerContext channelHandlerContext, ClientResourceLimits.Allocator allocator, ProtocolVersion protocolVersion, Map<String, String> map) {
        GlobalBufferPoolAllocator globalBufferPoolAllocator = GlobalBufferPoolAllocator.instance;
        channelHandlerContext.channel().config().setOption(ChannelOption.ALLOCATOR, globalBufferPoolAllocator);
        String str = map.get(StartupMessage.COMPRESSION);
        FrameDecoder frameDecoder = frameDecoder(str, globalBufferPoolAllocator);
        FrameEncoder frameEncoder = frameEncoder(str);
        FrameEncoder.PayloadAllocator allocator2 = frameEncoder.allocator();
        ChannelInboundHandlerAdapter postV5Handler = ExceptionHandlers.postV5Handler(allocator2, protocolVersion);
        Message.Decoder<Message.Request> messageDecoder = messageDecoder();
        Envelope.Decoder decoder = new Envelope.Decoder();
        ChannelPipeline pipeline = channelHandlerContext.channel().pipeline();
        ChannelHandlerContext firstContext = pipeline.firstContext();
        firstContext.getClass();
        CQLMessageHandler.ErrorHandler errorHandler = firstContext::fireExceptionCaught;
        int nativeTransportReceiveQueueCapacityInBytes = DatabaseDescriptor.getNativeTransportReceiveQueueCapacityInBytes();
        ClientResourceLimits.ResourceProvider resourceProvider = resourceProvider(allocator);
        AbstractMessageHandler.OnHandlerClosed onHandlerClosed = abstractMessageHandler -> {
            resourceProvider.release();
        };
        boolean equals = "1".equals(map.get(StartupMessage.THROW_ON_OVERLOAD));
        CQLMessageHandler cQLMessageHandler = new CQLMessageHandler(channelHandlerContext.channel(), frameDecoder, decoder, messageDecoder, messageConsumer(), allocator2, nativeTransportReceiveQueueCapacityInBytes, resourceProvider, onHandlerClosed, errorHandler, equals);
        pipeline.remove(ENVELOPE_ENCODER);
        pipeline.addBefore(INITIAL_HANDLER, FRAME_DECODER, frameDecoder);
        pipeline.addBefore(INITIAL_HANDLER, FRAME_ENCODER, frameEncoder);
        pipeline.addBefore(INITIAL_HANDLER, MESSAGE_PROCESSOR, cQLMessageHandler);
        pipeline.replace(EXCEPTION_HANDLER, EXCEPTION_HANDLER, postV5Handler);
        pipeline.remove(INITIAL_HANDLER);
        onNegotiationComplete(pipeline);
    }

    protected void onInitialPipelineReady(ChannelPipeline channelPipeline) {
    }

    protected void onNegotiationComplete(ChannelPipeline channelPipeline) {
    }

    protected ClientResourceLimits.ResourceProvider resourceProvider(ClientResourceLimits.Allocator allocator) {
        return new ClientResourceLimits.ResourceProvider.Default(allocator);
    }

    protected Dispatcher dispatcher(boolean z) {
        return new Dispatcher(z);
    }

    protected CQLMessageHandler.MessageConsumer<Message.Request> messageConsumer() {
        Dispatcher dispatcher = this.dispatcher;
        dispatcher.getClass();
        return dispatcher::dispatch;
    }

    protected Message.Decoder<Message.Request> messageDecoder() {
        return Message.requestDecoder();
    }

    protected FrameDecoder frameDecoder(String str, BufferPoolAllocator bufferPoolAllocator) {
        if (null == str) {
            return FrameDecoderCrc.create(bufferPoolAllocator);
        }
        if (str.equalsIgnoreCase("LZ4")) {
            return FrameDecoderLZ4.fast(bufferPoolAllocator);
        }
        throw new ProtocolException("Unsupported compression type: " + str);
    }

    protected FrameEncoder frameEncoder(String str) {
        if (Strings.isNullOrEmpty(str)) {
            return FrameEncoderCrc.instance;
        }
        if (str.equalsIgnoreCase("LZ4")) {
            return FrameEncoderLZ4.fastInstance;
        }
        throw new ProtocolException("Unsupported compression type: " + str);
    }

    public void configureLegacyPipeline(ChannelHandlerContext channelHandlerContext, ClientResourceLimits.Allocator allocator) {
        ChannelPipeline pipeline = channelHandlerContext.channel().pipeline();
        pipeline.addBefore(ENVELOPE_ENCODER, ENVELOPE_DECODER, new Envelope.Decoder());
        pipeline.addBefore(INITIAL_HANDLER, MESSAGE_DECOMPRESSOR, Envelope.Decompressor.instance);
        pipeline.addBefore(INITIAL_HANDLER, MESSAGE_COMPRESSOR, Envelope.Compressor.instance);
        pipeline.addBefore(INITIAL_HANDLER, MESSAGE_DECODER, PreV5Handlers.ProtocolDecoder.instance);
        pipeline.addBefore(INITIAL_HANDLER, MESSAGE_ENCODER, PreV5Handlers.ProtocolEncoder.instance);
        pipeline.addBefore(INITIAL_HANDLER, LEGACY_MESSAGE_PROCESSOR, new PreV5Handlers.LegacyDispatchHandler(this.dispatcher, allocator));
        pipeline.remove(INITIAL_HANDLER);
        onNegotiationComplete(pipeline);
    }
}
