package com.datastax.bdp.node.transport;

import com.datastax.bdp.node.transport.Message;
import com.datastax.bdp.node.transport.internal.FailedProcessorException;
import com.datastax.bdp.node.transport.internal.HandshakeProcessor;
import com.datastax.bdp.node.transport.internal.SystemMessageTypes;
import com.datastax.bdp.node.transport.internal.UnsupportedMessageException;
import com.datastax.dse.byos.shade.com.google.common.base.Preconditions;
import com.datastax.dse.byos.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.net.InetAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.cassandra.transport.CBUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datastax/bdp/node/transport/MessageServer.class */
public abstract class MessageServer {
    protected final int acceptorThreads;
    protected final int workerThreads;
    protected final Optional<SSLOptions> sslOptions;
    protected final String name;
    private final Map<MessageType, ServerProcessor> processors;
    private final ExecutorService workExecutor;
    private final MessageCodec codec;
    protected final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    protected final List<EventExecutorGroup> groups = new CopyOnWriteArrayList();
    private final Logger logger = LoggerFactory.getLogger(getClass());

    /* loaded from: input_file:com/datastax/bdp/node/transport/MessageServer$Builder.class */
    public static class Builder {
        private int acceptorThreads;
        private int workerThreads;
        private Optional<SSLOptions> sslOptions;
        private MessageCodec codec;
        private Map<MessageType, ServerProcessor> processors;

        private Builder() {
            this.acceptorThreads = FBUtilities.getAvailableProcessors();
            this.workerThreads = FBUtilities.getAvailableProcessors() * 8;
            this.sslOptions = Optional.empty();
            this.processors = new HashMap();
        }

        public Builder withAcceptorThreads(int i) {
            this.acceptorThreads = i;
            return this;
        }

        public Builder withWorkerThreads(int i) {
            this.workerThreads = i;
            return this;
        }

        public Builder withSSLOptions(Optional<SSLOptions> optional) {
            this.sslOptions = optional;
            return this;
        }

        public Builder withMessageCodec(MessageCodec messageCodec) {
            this.codec = messageCodec;
            return this;
        }

        public Builder withProcessor(MessageType messageType, ServerProcessor serverProcessor) {
            if (this.processors.putIfAbsent(messageType, serverProcessor) != null) {
                throw new IllegalArgumentException("Type already exists: " + messageType);
            }
            return this;
        }

        public RemoteMessageServer buildRemote(String str, Iterable<InetAddress> iterable, int i) {
            Preconditions.checkState(i > 0, "Invalid port number: " + i);
            Preconditions.checkState(StringUtils.isNotBlank(str), "Name cannot be empty or null");
            return new RemoteMessageServer(this.acceptorThreads, this.workerThreads, this.sslOptions, this.processors, this.codec, str, iterable, i);
        }

        public LocalMessageServer buildLocal(String str) {
            Preconditions.checkState(StringUtils.isNotBlank(str), "Name cannot be empty or null");
            return new LocalMessageServer(this.acceptorThreads, this.workerThreads, this.sslOptions, this.processors, this.codec, str);
        }
    }

    /* loaded from: input_file:com/datastax/bdp/node/transport/MessageServer$ChannelStateHandler.class */
    private class ChannelStateHandler extends ChannelDuplexHandler {
        private ChannelStateHandler() {
        }

        @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
        public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
            MessageServer.this.channels.add(channelHandlerContext.channel());
        }

        @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
        public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
            MessageServer.this.channels.remove(channelHandlerContext.channel());
        }
    }

    /* loaded from: input_file:com/datastax/bdp/node/transport/MessageServer$MessageRequestHandler.class */
    private class MessageRequestHandler extends ChannelInboundHandlerAdapter {
        private MessageRequestHandler() {
        }

        @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
            if (!(obj instanceof Message)) {
                throw new IllegalStateException("Unknown message: " + obj.getClass().getCanonicalName());
            }
            MessageServer.this.logger.trace("Received request: {}", Long.valueOf(((Message) obj).getId()));
            MessageServer.this.workExecutor.execute(() -> {
                Message message;
                ServerProcessor serverProcessor = null;
                Message message2 = (Message) obj;
                MessageServer.this.logger.trace("Handling request message {}", Long.valueOf(message2.getId()));
                if (message2.getFlags().contains(Message.Flag.UNSUPPORTED_MESSAGE)) {
                    UnsupportedMessageException unsupportedMessageException = (UnsupportedMessageException) message2.getBody();
                    message = message2;
                    MessageServer.this.logger.error(unsupportedMessageException.getMessage(), unsupportedMessageException);
                } else {
                    MessageType type = message2.getType();
                    Object body = message2.getBody();
                    serverProcessor = (ServerProcessor) MessageServer.this.processors.get(type);
                    if (serverProcessor != null) {
                        try {
                            message = serverProcessor.process(new RequestContext(message2.getId(), channelHandlerContext.channel()), body);
                            message.trySetVersion(message2.getVersion());
                        } catch (Exception e) {
                            message = new Message(EnumSet.of(Message.Flag.FAILED_PROCESSOR), message2.getId(), SystemMessageTypes.FAILED_PROCESSOR, new FailedProcessorException(e.getClass(), e.getMessage()));
                            message.trySetVersion((byte) -1);
                            MessageServer.this.logger.error("Failed to process request: " + message2, e);
                        }
                    } else {
                        UnsupportedMessageException unsupportedMessageException2 = new UnsupportedMessageException("Cannot find processor for message type: " + type);
                        message = new Message(EnumSet.of(Message.Flag.UNSUPPORTED_MESSAGE), message2.getId(), SystemMessageTypes.UNSUPPORTED_MESSAGE, unsupportedMessageException2);
                        message.trySetVersion((byte) -1);
                        MessageServer.this.logger.error(unsupportedMessageException2.getMessage());
                    }
                }
                sendResponse(channelHandlerContext, message, serverProcessor);
            });
        }

        @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelHandlerAdapter, io.netty.channel.ChannelHandler, io.netty.channel.ChannelInboundHandler
        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
            MessageServer.this.logger.error(th.getMessage(), th);
        }

        private void sendResponse(ChannelHandlerContext channelHandlerContext, Message message, ServerProcessor serverProcessor) {
            MessageServer.this.logger.trace("Sending response to request {} ", Long.valueOf(message.getId()));
            ChannelFuture writeAndFlush = channelHandlerContext.writeAndFlush(message);
            writeAndFlush.awaitUninterruptibly2();
            MessageServer.this.logger.trace("Sending response to request {} completed", Long.valueOf(message.getId()));
            if (serverProcessor != null) {
                serverProcessor.onComplete(message);
            }
            if (writeAndFlush.isSuccess()) {
                return;
            }
            Throwable cause = writeAndFlush.cause();
            if (cause instanceof ClosedChannelException) {
                MessageServer.this.logger.error("Cannot send response to {} due to closed channel!", channelHandlerContext.channel().remoteAddress());
            } else if (cause != null) {
                MessageServer.this.logger.error(writeAndFlush.cause().getMessage(), writeAndFlush.cause());
            } else {
                MessageServer.this.logger.error("Cannot send response to {} due to unknown error!", channelHandlerContext.channel().remoteAddress());
            }
        }
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    public MessageServer(int i, int i2, Optional<SSLOptions> optional, Map<MessageType, ServerProcessor> map, MessageCodec messageCodec, String str) {
        this.acceptorThreads = i;
        this.workerThreads = i2;
        this.sslOptions = optional;
        this.processors = new ConcurrentHashMap(map);
        this.codec = messageCodec;
        this.name = str;
        this.workExecutor = Executors.newFixedThreadPool(Math.max(1, i2 / 4), new ThreadFactoryBuilder().setDaemon(true).setNameFormat(getClass().getSimpleName() + " query worker - %s").build());
    }

    /* JADX WARN: Type inference failed for: r1v5, types: [io.netty.channel.ChannelFuture] */
    public void bind() throws InterruptedException {
        ServerBootstrap bootstrap = bootstrap();
        bootstrap.childOption(ChannelOption.ALLOCATOR, CBUtil.allocator);
        bootstrap.childHandler(new ChannelInitializer() { // from class: com.datastax.bdp.node.transport.MessageServer.1
            @Override // io.netty.channel.ChannelInitializer
            protected void initChannel(Channel channel) throws Exception {
                ChannelPipeline pipeline = channel.pipeline();
                MessageServer.this.sslOptions.ifPresent(sSLOptions -> {
                    pipeline.addFirst(sSLOptions.createServerSslHandler());
                });
                pipeline.addLast(new ChannelStateHandler());
                pipeline.addLast(MessageServer.this.codec.newPipeline());
                pipeline.addLast(new MessageRequestHandler());
            }
        });
        Iterator<ChannelFuture> it2 = doBind(bootstrap).iterator();
        while (it2.hasNext()) {
            this.channels.add(it2.next().sync2().channel());
        }
        this.processors.put(SystemMessageTypes.HANDSHAKE, new HandshakeProcessor(this.codec.getCurrentVersion()));
    }

    public void shutdown() {
        this.channels.close().awaitUninterruptibly2();
        Iterator<EventExecutorGroup> it2 = this.groups.iterator();
        while (it2.hasNext()) {
            it2.next().shutdownGracefully().syncUninterruptibly2();
        }
        this.workExecutor.shutdown();
        this.logger.info("{} message server finished shutting down.", this.name);
    }

    protected abstract ServerBootstrap bootstrap();

    protected abstract Collection<ChannelFuture> doBind(ServerBootstrap serverBootstrap);
}
