package org.apache.cassandra.net;

import com.datastax.dse.byos.shade.com.google.common.annotations.VisibleForTesting;
import com.datastax.dse.byos.shade.com.google.common.collect.HashMultimap;
import com.datastax.dse.byos.shade.com.google.common.collect.Lists;
import com.datastax.dse.byos.shade.com.google.common.collect.Multimap;
import com.datastax.dse.byos.shade.com.google.common.collect.Multimaps;
import io.netty.util.concurrent.FastThreadLocalThread;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.IOError;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ServerSocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javax.management.ObjectName;
import javax.net.ssl.SSLHandshakeException;
import org.apache.cassandra.concurrent.ExecutorLocals;
import org.apache.cassandra.concurrent.TPC;
import org.apache.cassandra.concurrent.TPCEventLoop;
import org.apache.cassandra.concurrent.TPCUtils;
import org.apache.cassandra.concurrent.TracingAwareExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.locator.ILatencySubscriber;
import org.apache.cassandra.metrics.ConnectionMetrics;
import org.apache.cassandra.metrics.DroppedMessageMetrics;
import org.apache.cassandra.metrics.MessagingMetrics;
import org.apache.cassandra.net.DroppedMessages;
import org.apache.cassandra.net.OneWayRequest;
import org.apache.cassandra.net.Request;
import org.apache.cassandra.net.interceptors.Interceptor;
import org.apache.cassandra.net.interceptors.Interceptors;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ExpiringMap;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.SetsFactory;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
import org.apache.cassandra.utils.time.ApolloTime;
import org.jctools.maps.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/net/MessagingService.class */
public final class MessagingService implements MessagingServiceMBean {
    private static final Logger logger;
    private static final NoSpamLogger noSpamLogger;
    public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService";
    public static final MessagingVersion current_version;
    public static final int PROTOCOL_MAGIC = -900387334;
    private static final AtomicInteger idGen;
    private final Interceptors messageInterceptors;
    public final MessagingMetrics metrics;
    public static final long STARTUP_TIME;
    private final ExpiringMap<Integer, CallbackInfo<?>> callbacks;

    @VisibleForTesting
    final ConcurrentMap<InetAddress, OutboundTcpConnectionPool> connectionManagers;
    private final List<SocketThread> socketThreads;
    private final SimpleCondition listenGate;
    private final DroppedMessages droppedMessages;
    private final List<ILatencySubscriber> subscribers;
    private final ConcurrentMap<InetAddress, MessagingVersion> versions;
    private final BackPressureStrategy backPressure;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/cassandra/net/MessagingService$MSHandle.class */
    public static class MSHandle {
        public static final MessagingService instance = new MessagingService(false);

        private MSHandle() {
        }
    }

    /* loaded from: input_file:org/apache/cassandra/net/MessagingService$MSTestHandle.class */
    private static class MSTestHandle {
        public static final MessagingService instance = new MessagingService(true);

        private MSTestHandle() {
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/cassandra/net/MessagingService$SocketThread.class */
    public static class SocketThread extends Thread {
        private final ServerSocket server;

        @VisibleForTesting
        public final Multimap<InetAddress, Closeable> connections;

        SocketThread(ServerSocket serverSocket, String str) {
            super(str);
            this.connections = Multimaps.synchronizedMultimap(HashMultimap.create());
            this.server = serverSocket;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            FastThreadLocalThread incomingTcpConnection;
            while (!this.server.isClosed()) {
                Socket socket = null;
                try {
                    socket = this.server.accept();
                    if (authenticate(socket)) {
                        socket.setKeepAlive(true);
                        socket.setSoTimeout(10000);
                        DataInputStream dataInputStream = new DataInputStream(socket.getInputStream());
                        MessagingService.validateMagic(dataInputStream.readInt());
                        int readInt = dataInputStream.readInt();
                        boolean z = MessagingService.getBits(readInt, 3, 1) == 1;
                        ProtocolVersion fromProtocolHeader = ProtocolVersion.fromProtocolHeader(readInt);
                        MessagingService.logger.trace("Connection version {} from {}", fromProtocolHeader, socket.getInetAddress());
                        socket.setSoTimeout(0);
                        if (z) {
                            incomingTcpConnection = new IncomingStreamingConnection(fromProtocolHeader, socket, this.connections);
                        } else {
                            incomingTcpConnection = new IncomingTcpConnection(fromProtocolHeader, MessagingService.getBits(readInt, 2, 1) == 1, socket, this.connections);
                        }
                        FastThreadLocalThread fastThreadLocalThread = incomingTcpConnection;
                        fastThreadLocalThread.start();
                        this.connections.put(socket.getInetAddress(), (Closeable) fastThreadLocalThread);
                    } else {
                        MessagingService.logger.trace("remote failed to authenticate");
                        socket.close();
                    }
                } catch (AsynchronousCloseException e) {
                    MessagingService.logger.trace("Asynchronous close seen by server thread");
                } catch (ClosedChannelException e2) {
                    MessagingService.logger.trace("MessagingService server thread already closed");
                } catch (SSLHandshakeException e3) {
                    MessagingService.logger.error("SSL handshake error for inbound connection from " + socket, e3);
                    FileUtils.closeQuietly((Closeable) socket);
                } catch (Throwable th) {
                    MessagingService.logger.trace("Error reading the socket {}", socket, th);
                    FileUtils.closeQuietly((Closeable) socket);
                }
            }
            MessagingService.logger.info("MessagingService has terminated the accept() thread");
        }

        void close() throws IOException {
            MessagingService.logger.trace("Closing accept() thread");
            try {
                this.server.close();
            } catch (IOException e) {
                MessagingService.handleIOExceptionOnClose(e);
            }
            synchronized (this.connections) {
                Iterator it2 = Lists.newArrayList(this.connections.values()).iterator();
                while (it2.hasNext()) {
                    ((Closeable) it2.next()).close();
                }
            }
        }

        private boolean authenticate(Socket socket) {
            return DatabaseDescriptor.getInternodeAuthenticator().authenticate(socket.getInetAddress(), socket.getPort());
        }
    }

    public static MessagingService instance() {
        return MSHandle.instance;
    }

    static MessagingService test() {
        return MSTestHandle.instance;
    }

    private MessagingService(boolean z) {
        this.messageInterceptors = new Interceptors();
        this.metrics = new MessagingMetrics();
        this.connectionManagers = new NonBlockingHashMap();
        this.socketThreads = Lists.newArrayList();
        this.droppedMessages = new DroppedMessages();
        this.subscribers = new CopyOnWriteArrayList();
        this.versions = new NonBlockingHashMap();
        this.backPressure = DatabaseDescriptor.getBackPressureStrategy();
        this.listenGate = new SimpleCondition();
        this.versions.put(FBUtilities.getBroadcastAddress(), current_version);
        if (!z) {
            this.droppedMessages.scheduleLogging();
        }
        this.callbacks = new ExpiringMap<>(DatabaseDescriptor.getMinRpcTimeout(), expiringObject -> {
            CallbackInfo callbackInfo = (CallbackInfo) expiringObject.getValue();
            MessageCallback<Q> messageCallback = callbackInfo.callback;
            InetAddress inetAddress = callbackInfo.target;
            addLatency(callbackInfo.verb, inetAddress, expiringObject.timeoutMillis());
            ConnectionMetrics.totalTimeouts.mark();
            OutboundTcpConnectionPool join = getConnectionPool(callbackInfo.target).join();
            if (join != null) {
                join.incrementTimeout();
            }
            updateBackPressureOnReceive(inetAddress, callbackInfo.verb, true).join();
            callbackInfo.responseExecutor.execute(() -> {
                messageCallback.onTimeout(inetAddress);
            }, ExecutorLocals.create());
        });
        if (!z) {
            try {
                ManagementFactory.getPlatformMBeanServer().registerMBean(this, new ObjectName(MBEAN_NAME));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        this.messageInterceptors.load();
    }

    public void addInterceptor(Interceptor interceptor) {
        this.messageInterceptors.add(interceptor);
    }

    public void removeInterceptor(Interceptor interceptor) {
        this.messageInterceptors.remove(interceptor);
    }

    public void clearInterceptors() {
        this.messageInterceptors.clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static int newMessageId() {
        return idGen.incrementAndGet();
    }

    public <Q> void send(Request<?, Q> request, MessageCallback<Q> messageCallback) {
        if (request.isLocal()) {
            deliverLocally(request, messageCallback);
            return;
        }
        registerCallback(request, messageCallback);
        ClientWarn.instance.storeForRequest(request.id());
        updateBackPressureOnSend(request);
        sendRequest(request, messageCallback);
    }

    public <Q> void send(Request.Dispatcher<?, Q> dispatcher, MessageCallback<Q> messageCallback) {
        if (!$assertionsDisabled && (messageCallback == null || dispatcher.verb().isOneWay())) {
            throw new AssertionError();
        }
        Iterator<? extends Request<?, Q>> it2 = dispatcher.remoteRequests().iterator();
        while (it2.hasNext()) {
            send(it2.next(), messageCallback);
        }
        if (dispatcher.hasLocalRequest()) {
            deliverLocally(dispatcher.localRequest(), messageCallback);
        }
    }

    private <P, Q> void deliverLocally(Request<P, Q> request, MessageCallback<Q> messageCallback) {
        Consumer consumer = response -> {
            request.responseExecutor().execute(() -> {
                deliverLocalResponse(request, response, messageCallback);
            }, ExecutorLocals.create());
        };
        Consumer consumer2 = this.messageInterceptors.isEmpty() ? consumer : response2 -> {
            this.messageInterceptors.intercept(response2, consumer, null);
        };
        Runnable runnable = () -> {
            request.responseExecutor().execute(() -> {
                Tracing.trace("Discarding partial local response (timed out)");
                instance().incrementDroppedMessages(request);
                messageCallback.onTimeout(FBUtilities.getBroadcastAddress());
            }, ExecutorLocals.create());
        };
        deliverLocallyInternal(request, request2 -> {
            if (request2.isTimedOut(ApolloTime.millisTime())) {
                runnable.run();
            } else {
                request2.execute(consumer2, runnable);
            }
        }, messageCallback);
    }

    private <P, Q> void deliverLocalResponse(Request<P, Q> request, Response<Q> response, MessageCallback<Q> messageCallback) {
        addLatency(request.verb(), request.to(), request.lifetimeMillis());
        response.deliverTo(messageCallback);
    }

    private <P, Q> void registerCallback(Request<P, Q> request, MessageCallback<Q> messageCallback) {
        long operationStartMillis = request.operationStartMillis();
        long timeoutMillis = request.timeoutMillis();
        TracingAwareExecutor responseExecutor = request.responseExecutor();
        for (Request.Forward forward : request.forwards()) {
            registerCallback(forward.id, forward.to, request.verb(), messageCallback, operationStartMillis, timeoutMillis, responseExecutor);
        }
        registerCallback(request.id(), request.to(), request.verb(), messageCallback, operationStartMillis, timeoutMillis, responseExecutor);
    }

    private <Q> void registerCallback(int i, InetAddress inetAddress, Verb<?, Q> verb, MessageCallback<Q> messageCallback, long j, long j2, TracingAwareExecutor tracingAwareExecutor) {
        CallbackInfo<?> put = this.callbacks.put(Integer.valueOf(i), new CallbackInfo<>(inetAddress, messageCallback, verb, tracingAwareExecutor, j), j2 + DatabaseDescriptor.getEndpointSnitch().getCrossDcRttLatency(inetAddress));
        if (!$assertionsDisabled && put != null) {
            throw new AssertionError(String.format("Callback already exists for id %d! (%s)", Integer.valueOf(i), put));
        }
    }

    public <Q> CompletableFuture<Q> sendSingleTarget(Request<?, Q> request) {
        SingleTargetCallback singleTargetCallback = new SingleTargetCallback();
        send(request, singleTargetCallback);
        return singleTargetCallback;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void forward(ForwardRequest<?, ?> forwardRequest) {
        sendInternal(forwardRequest);
    }

    public void send(OneWayRequest<?> oneWayRequest) {
        if (oneWayRequest.isLocal()) {
            deliverLocallyOneWay(oneWayRequest);
        } else {
            sendRequest(oneWayRequest, null);
        }
    }

    public void send(OneWayRequest.Dispatcher<?> dispatcher) {
        Iterator<OneWayRequest<?>> it2 = dispatcher.remoteRequests().iterator();
        while (it2.hasNext()) {
            sendRequest(it2.next(), null);
        }
        if (dispatcher.hasLocalRequest()) {
            deliverLocallyOneWay(dispatcher.localRequest());
        }
    }

    private void deliverLocallyOneWay(OneWayRequest<?> oneWayRequest) {
        deliverLocallyInternal(oneWayRequest, oneWayRequest2 -> {
            oneWayRequest2.execute(oneWayRequest2.verb().EMPTY_RESPONSE_CONSUMER, () -> {
            });
        }, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reply(Response<?> response) {
        Tracing.trace("Enqueuing {} response to {}", response.verb(), response.from());
        sendResponse(response);
    }

    private <P, Q> void sendRequest(Request<P, Q> request, MessageCallback<Q> messageCallback) {
        this.messageInterceptors.interceptRequest(request, (v1) -> {
            sendInternal(v1);
        }, messageCallback);
    }

    private <Q> void sendResponse(Response<Q> response) {
        this.messageInterceptors.intercept(response, (v1) -> {
            sendInternal(v1);
        }, null);
    }

    private CompletableFuture<Void> sendInternal(Message<?> message) {
        if (logger.isTraceEnabled()) {
            logger.trace("Sending {}", message);
        }
        return getConnection(message).thenAccept(outboundTcpConnection -> {
            CallbackInfo<?> registeredCallback;
            if (outboundTcpConnection.enqueue(message)) {
                return;
            }
            noSpamLogger.debug("Failed to send message to {}, the outbound queue rejected it.", message.to());
            if (!message.isRequest() || (registeredCallback = getRegisteredCallback(message.id(), true, message.to())) == null) {
                return;
            }
            registeredCallback.callback.onFailure(((Request) message).respondWithFailure(RequestFailureReason.UNKNOWN));
        });
    }

    private <P, Q, M extends Request<P, Q>> void deliverLocallyInternal(M m, Consumer<M> consumer, MessageCallback<Q> messageCallback) {
        try {
            this.messageInterceptors.interceptRequest(m, request -> {
                request.requestExecutor().execute(() -> {
                    consumer.accept(request);
                }, ExecutorLocals.create());
            }, messageCallback);
        } catch (Throwable th) {
            logger.error("{} while locally processing {} request.", th.getClass().getCanonicalName(), m.verb());
            logger.trace("Stacktrace: ", th);
            if (m.verb().isOneWay()) {
                return;
            }
            instance().incrementDroppedMessages(m);
            messageCallback.onFailure(m.respondWithFailure(RequestFailureReason.UNKNOWN));
        }
    }

    <Q> CompletableFuture<Void> updateBackPressureOnSend(Request<?, Q> request) {
        return (request.verb().supportsBackPressure() && DatabaseDescriptor.backPressureEnabled()) ? getConnectionPool(request.to()).thenAccept(outboundTcpConnectionPool -> {
            if (outboundTcpConnectionPool != null) {
                outboundTcpConnectionPool.getBackPressureState().onRequestSent(request);
            }
        }) : CompletableFuture.completedFuture(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> updateBackPressureOnReceive(InetAddress inetAddress, Verb<?, ?> verb, boolean z) {
        return (verb.supportsBackPressure() && DatabaseDescriptor.backPressureEnabled()) ? getConnectionPool(inetAddress).thenAccept(outboundTcpConnectionPool -> {
            if (outboundTcpConnectionPool != null) {
                BackPressureState backPressureState = outboundTcpConnectionPool.getBackPressureState();
                if (z) {
                    backPressureState.onResponseTimeout();
                } else {
                    backPressureState.onResponseReceived();
                }
            }
        }) : CompletableFuture.completedFuture(null);
    }

    public CompletableFuture<Void> applyBackPressure(Iterable<InetAddress> iterable, long j) {
        if (DatabaseDescriptor.backPressureEnabled()) {
            Set newSet = SetsFactory.newSet();
            CompletableFuture<Void> completableFuture = null;
            for (InetAddress inetAddress : iterable) {
                if (!inetAddress.equals(FBUtilities.getBroadcastAddress())) {
                    CompletableFuture<Void> thenAccept = getConnectionPool(inetAddress).thenAccept(outboundTcpConnectionPool -> {
                        if (outboundTcpConnectionPool != null) {
                            newSet.add(outboundTcpConnectionPool.getBackPressureState());
                        }
                    });
                    completableFuture = completableFuture == null ? thenAccept : completableFuture.thenAcceptBoth((CompletionStage) thenAccept, (r1, r2) -> {
                    });
                }
            }
            if (completableFuture != null) {
                return completableFuture.thenCompose(r10 -> {
                    return this.backPressure.apply(newSet, j, TimeUnit.NANOSECONDS);
                });
            }
        }
        return CompletableFuture.completedFuture(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addLatency(Verb<?, ?> verb, InetAddress inetAddress, long j) {
        Iterator<ILatencySubscriber> it2 = this.subscribers.iterator();
        while (it2.hasNext()) {
            it2.next().receiveTiming(verb, inetAddress, j);
        }
    }

    public CompletableFuture<Void> convict(InetAddress inetAddress) {
        return getConnectionPool(inetAddress).thenAccept(outboundTcpConnectionPool -> {
            if (outboundTcpConnectionPool == null) {
                logger.debug("Not resetting pool for {} because internode authenticator said not to connect", inetAddress);
            } else {
                logger.debug("Resetting pool for " + inetAddress);
                outboundTcpConnectionPool.reset();
            }
        });
    }

    public void listen() {
        this.callbacks.reset();
        listen(FBUtilities.getLocalAddress());
        if (DatabaseDescriptor.shouldListenOnBroadcastAddress() && !FBUtilities.getLocalAddress().equals(FBUtilities.getBroadcastAddress())) {
            listen(FBUtilities.getBroadcastAddress());
        }
        this.listenGate.signalAll();
    }

    private void listen(InetAddress inetAddress) throws ConfigurationException {
        Iterator<ServerSocket> it2 = getServerSockets(inetAddress).iterator();
        while (it2.hasNext()) {
            SocketThread socketThread = new SocketThread(it2.next(), "ACCEPT-" + inetAddress);
            socketThread.start();
            this.socketThreads.add(socketThread);
        }
    }

    private List<ServerSocket> getServerSockets(InetAddress inetAddress) throws ConfigurationException {
        ArrayList arrayList = new ArrayList(2);
        if (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none) {
            try {
                arrayList.add(SSLFactory.getServerSocket(DatabaseDescriptor.getServerEncryptionOptions(), inetAddress, DatabaseDescriptor.getSSLStoragePort()));
                logger.info("Starting Encrypted Messaging Service on SSL port {}", Integer.valueOf(DatabaseDescriptor.getSSLStoragePort()));
            } catch (IOException e) {
                throw new ConfigurationException("Unable to create ssl socket", e);
            }
        }
        if (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.all) {
            try {
                ServerSocket socket = ServerSocketChannel.open().socket();
                try {
                    socket.setReuseAddress(true);
                    InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, DatabaseDescriptor.getStoragePort());
                    try {
                        socket.bind(inetSocketAddress, 500);
                        String networkInterface = FBUtilities.getNetworkInterface(inetAddress);
                        Logger logger2 = logger;
                        Object[] objArr = new Object[3];
                        objArr[0] = inetAddress;
                        objArr[1] = Integer.valueOf(DatabaseDescriptor.getStoragePort());
                        objArr[2] = networkInterface == null ? "" : String.format(" (%s)", networkInterface);
                        logger2.info("Starting Messaging Service on {}:{}{}", objArr);
                        arrayList.add(socket);
                    } catch (BindException e2) {
                        FileUtils.closeQuietly((Closeable) socket);
                        if (e2.getMessage().contains("in use")) {
                            throw new ConfigurationException(inetSocketAddress + " is in use by another process.  Change listen_address:storage_port in cassandra.yaml to values that do not conflict with other services");
                        }
                        if (e2.getMessage().contains("Cannot assign requested address")) {
                            throw new ConfigurationException("Unable to bind to address " + inetSocketAddress + ". Set listen_address in cassandra.yaml to an interface you can bind to, e.g., your private IP address on EC2");
                        }
                        throw new RuntimeException(e2);
                    } catch (IOException e3) {
                        FileUtils.closeQuietly((Closeable) socket);
                        throw new RuntimeException(e3);
                    }
                } catch (SocketException e4) {
                    FileUtils.closeQuietly((Closeable) socket);
                    throw new ConfigurationException("Insufficient permissions to setReuseAddress", e4);
                }
            } catch (IOException e5) {
                throw new RuntimeException(e5);
            }
        }
        return arrayList;
    }

    public void waitUntilListening() {
        try {
            this.listenGate.await();
        } catch (InterruptedException e) {
            logger.trace("await interrupted");
        }
    }

    public boolean isListening() {
        return this.listenGate.isSignaled();
    }

    public void destroyConnectionPool(InetAddress inetAddress) {
        logger.trace("Destroy pool {}", inetAddress);
        OutboundTcpConnectionPool outboundTcpConnectionPool = this.connectionManagers.get(inetAddress);
        if (outboundTcpConnectionPool == null) {
            return;
        }
        outboundTcpConnectionPool.close();
        this.connectionManagers.remove(inetAddress);
    }

    public CompletableFuture<OutboundTcpConnectionPool> getConnectionPool(InetAddress inetAddress) {
        OutboundTcpConnectionPool outboundTcpConnectionPool = this.connectionManagers.get(inetAddress);
        return outboundTcpConnectionPool != null ? outboundTcpConnectionPool.isStarted() ? CompletableFuture.completedFuture(outboundTcpConnectionPool) : CompletableFuture.supplyAsync(() -> {
            outboundTcpConnectionPool.waitForStarted();
            return outboundTcpConnectionPool;
        }) : CompletableFuture.supplyAsync(() -> {
            if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(inetAddress, OutboundTcpConnectionPool.portFor(inetAddress))) {
                return null;
            }
            OutboundTcpConnectionPool outboundTcpConnectionPool2 = new OutboundTcpConnectionPool(inetAddress, SystemKeyspace.getPreferredIP(inetAddress), this.backPressure.newState(inetAddress));
            OutboundTcpConnectionPool putIfAbsent = this.connectionManagers.putIfAbsent(inetAddress, outboundTcpConnectionPool2);
            if (putIfAbsent != null) {
                outboundTcpConnectionPool2 = putIfAbsent;
            } else {
                outboundTcpConnectionPool2.start();
            }
            outboundTcpConnectionPool2.waitForStarted();
            return outboundTcpConnectionPool2;
        });
    }

    public boolean hasValidIncomingConnections(InetAddress inetAddress, int i) {
        long approximateNanoTime = ApolloTime.approximateNanoTime();
        Iterator<SocketThread> it2 = this.socketThreads.iterator();
        while (it2.hasNext()) {
            Collection<Closeable> collection = it2.next().connections.get(inetAddress);
            if (collection != null) {
                for (Closeable closeable : collection) {
                    if ((closeable instanceof IncomingTcpConnection) && (approximateNanoTime - ((IncomingTcpConnection) closeable).getConnectTime() > TimeUnit.SECONDS.toNanos(i) || TimeUnit.NANOSECONDS.toSeconds(approximateNanoTime - STARTUP_TIME) < i * 2)) {
                        return true;
                    }
                }
            }
        }
        return false;
    }

    public CompletableFuture<OutboundTcpConnection> getConnection(Message message) {
        return getConnectionPool(message.to()).thenApply(outboundTcpConnectionPool -> {
            if (outboundTcpConnectionPool == null) {
                return null;
            }
            return outboundTcpConnectionPool.getConnection(message);
        });
    }

    public void register(ILatencySubscriber iLatencySubscriber) {
        this.subscribers.add(iLatencySubscriber);
    }

    public void clearCallbacksUnsafe() {
        this.callbacks.reset();
    }

    public void shutdown() {
        logger.info("Waiting for messaging service to quiesce");
        if (!this.callbacks.shutdownBlocking(DatabaseDescriptor.getMinRpcTimeout() * 2)) {
            logger.warn("Failed to wait for messaging service callbacks shutdown");
        }
        try {
            Iterator<SocketThread> it2 = this.socketThreads.iterator();
            while (it2.hasNext()) {
                try {
                    it2.next().close();
                } catch (IOException e) {
                    handleIOExceptionOnClose(e);
                }
            }
        } catch (IOException e2) {
            throw new IOError(e2);
        }
    }

    public void receive(Message<?> message) {
        this.messageInterceptors.intercept(message, this::receiveInternal, this::reply);
    }

    private void receiveInternal(Message<?> message) {
        TraceState initializeFromMessage = Tracing.instance.initializeFromMessage(message);
        if (initializeFromMessage != null) {
            initializeFromMessage.trace("{} message received from {}", message.verb(), message.from());
        }
        ExecutorLocals create = ExecutorLocals.create(initializeFromMessage, ClientWarn.instance.getForMessage(message.id()));
        try {
            if (message.isRequest()) {
                receiveRequestInternal((Request) message, create);
            } else {
                receiveResponseInternal((Response) message, create);
            }
        } catch (Throwable th) {
            logger.error("{} while receiving {} from {}, caused by: {}", new Object[]{th.getClass().getCanonicalName(), message.verb(), message.from(), th.getMessage()});
            logger.trace("Stacktrace: ", th);
            replyWithError(message, RequestFailureReason.UNKNOWN);
        }
    }

    private void replyWithError(Message<?> message, RequestFailureReason requestFailureReason) {
        if (!message.isRequest() || message.verb().isOneWay()) {
            return;
        }
        instance().incrementDroppedMessages(message);
        reply(((Request) message).respondWithFailure(requestFailureReason));
    }

    private <P, Q> void receiveRequestInternal(Request<P, Q> request, ExecutorLocals executorLocals) {
        if (!$assertionsDisabled && TPCUtils.isTPCThread()) {
            throw new AssertionError();
        }
        TracingAwareExecutor requestExecutor = request.requestExecutor();
        int coreId = requestExecutor.coreId();
        if (request.verb.supportsBackPressure() && TPC.isValidCoreId(coreId)) {
            try {
                TPCEventLoop tPCEventLoop = TPC.eventLoopGroup().eventLoops().get(coreId);
                long operationStartMillis = request.operationStartMillis() + (request.timeoutMillis() / 4);
                CompletableFuture<RejectedExecutionException> completableFuture = new CompletableFuture<>();
                checkTPCBackpressure(tPCEventLoop, completableFuture, operationStartMillis);
                if (completableFuture.get() != null) {
                    logger.warn("Backpressure rejection while receiving {} from {}", request.verb(), request.from());
                    replyWithError(request, RequestFailureReason.UNKNOWN);
                    return;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (Throwable th) {
                throw new IllegalStateException(th);
            }
        }
        requestExecutor.execute(MessageDeliveryTask.forRequest(request), executorLocals);
    }

    private <Q> void receiveResponseInternal(Response<Q> response, ExecutorLocals executorLocals) {
        CallbackInfo<Q> registeredCallback = getRegisteredCallback(response, false);
        if (registeredCallback != null) {
            registeredCallback.responseExecutor.execute(MessageDeliveryTask.forResponse(response), executorLocals);
        }
    }

    private void checkTPCBackpressure(TPCEventLoop tPCEventLoop, CompletableFuture<RejectedExecutionException> completableFuture, long j) {
        boolean shouldBackpressure = tPCEventLoop.shouldBackpressure(true);
        if (shouldBackpressure && ApolloTime.millisTime() < j - 100) {
            TPC.bestTPCTimer().onTimeout(() -> {
                checkTPCBackpressure(tPCEventLoop, completableFuture, j);
            }, 100L, TimeUnit.MILLISECONDS);
        } else if (shouldBackpressure) {
            completableFuture.complete(new RejectedExecutionException("Too many pending remote requests!"));
        } else {
            completableFuture.complete(null);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CallbackInfo<?> getRegisteredCallback(int i, boolean z, InetAddress inetAddress) {
        CallbackInfo<?> remove = z ? this.callbacks.remove(Integer.valueOf(i)) : this.callbacks.get(Integer.valueOf(i));
        if (remove != null) {
            return remove;
        }
        logger.trace("Callback already removed for message {} from {}, ignoring response", Integer.valueOf(i), inetAddress);
        Tracing.trace("Callback already removed for message {} from {}, ignoring response", Integer.valueOf(i), inetAddress);
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <Q> CallbackInfo<Q> getRegisteredCallback(Response<Q> response, boolean z) {
        return (CallbackInfo<Q>) getRegisteredCallback(response.id(), z, response.from());
    }

    public static void validateMagic(int i) throws IOException {
        if (i != -900387334) {
            throw new IOException("invalid protocol header");
        }
    }

    public static int getBits(int i, int i2, int i3) {
        return (i >>> ((i2 + 1) - i3)) & (((-1) << i3) ^ (-1));
    }

    public MessagingVersion setVersion(InetAddress inetAddress, MessagingVersion messagingVersion) {
        logger.trace("Setting version {} for {}", messagingVersion, inetAddress);
        MessagingVersion put = this.versions.put(inetAddress, messagingVersion);
        return put == null ? messagingVersion : put;
    }

    public void resetVersion(InetAddress inetAddress) {
        logger.trace("Resetting version for {}", inetAddress);
        this.versions.remove(inetAddress);
    }

    public Optional<MessagingVersion> getVersion(InetAddress inetAddress) {
        return Optional.ofNullable(this.versions.get(inetAddress));
    }

    public boolean versionAtLeast(InetAddress inetAddress, MessagingVersion messagingVersion) {
        MessagingVersion messagingVersion2 = this.versions.get(inetAddress);
        return messagingVersion2 != null && messagingVersion2.compareTo(messagingVersion) >= 0;
    }

    @Override // org.apache.cassandra.net.MessagingServiceMBean
    @Deprecated
    public int getVersion(String str) throws UnknownHostException {
        return getVersion(InetAddress.getByName(str)).orElse(current_version).protocolVersion().handshakeVersion;
    }

    @VisibleForTesting
    public void incrementDroppedMessages(Message<?> message) {
        Verb<?, ?> verb = message.verb();
        if (!$assertionsDisabled && verb.isOneWay()) {
            throw new AssertionError("Shouldn't drop a one-way message");
        }
        if (message.isRequest()) {
            Object payload = message.payload();
            if (payload instanceof IMutation) {
                updateDroppedMutationCount((IMutation) payload);
            }
        }
        this.droppedMessages.onDroppedMessage(message);
    }

    private void updateDroppedMutationCount(IMutation iMutation) {
        if (!$assertionsDisabled && iMutation == null) {
            throw new AssertionError("Mutation should not be null when updating dropped mutations count");
        }
        Iterator<TableId> it2 = iMutation.getTableIds().iterator();
        while (it2.hasNext()) {
            ColumnFamilyStore columnFamilyStore = Keyspace.open(iMutation.getKeyspaceName()).getColumnFamilyStore(it2.next());
            if (columnFamilyStore != null) {
                columnFamilyStore.metric.droppedMutations.inc();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void handleIOExceptionOnClose(IOException iOException) throws IOException {
        if (FBUtilities.isMacOSX) {
            String message = iOException.getMessage();
            boolean z = -1;
            switch (message.hashCode()) {
                case -369062560:
                    if (message.equals("Unknown error: 316")) {
                        z = false;
                        break;
                    }
                    break;
                case 641576986:
                    if (message.equals("No such file or directory")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                case true:
                    return;
            }
        }
        throw iOException;
    }

    @Override // org.apache.cassandra.net.MessagingServiceMBean
    public Map<String, Integer> getLargeMessagePendingTasks() {
        HashMap hashMap = new HashMap(this.connectionManagers.size());
        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : this.connectionManagers.entrySet()) {
            hashMap.put(entry.getKey().getHostAddress(), Integer.valueOf(entry.getValue().large().getPendingMessages()));
        }
        return hashMap;
    }

    @Override // org.apache.cassandra.net.MessagingServiceMBean
    public Map<String, Long> getLargeMessageCompletedTasks() {
        HashMap hashMap = new HashMap(this.connectionManagers.size());
        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : this.connectionManagers.entrySet()) {
            hashMap.put(entry.getKey().getHostAddress(), Long.valueOf(entry.getValue().large().getCompletedMesssages()));
        }
        return hashMap;
    }

    @Override // org.apache.cassandra.net.MessagingServiceMBean
    public Map<String, Long> getLargeMessageDroppedTasks() {
        HashMap hashMap = new HashMap(this.connectionManagers.size());
        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : this.connectionManagers.entrySet()) {
            hashMap.put(entry.getKey().getHostAddress(), Long.valueOf(entry.getValue().large().getDroppedMessages()));
        }
        return hashMap;
    }

    @Override // org.apache.cassandra.net.MessagingServiceMBean
    public Map<String, Integer> getSmallMessagePendingTasks() {
        HashMap hashMap = new HashMap(this.connectionManagers.size());
        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : this.connectionManagers.entrySet()) {
            hashMap.put(entry.getKey().getHostAddress(), Integer.valueOf(entry.getValue().small().getPendingMessages()));
        }
        return hashMap;
    }

    @Override // org.apache.cassandra.net.MessagingServiceMBean
    public Map<String, Long> getSmallMessageCompletedTasks() {
        HashMap hashMap = new HashMap(this.connectionManagers.size());
        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : this.connectionManagers.entrySet()) {
            hashMap.put(entry.getKey().getHostAddress(), Long.valueOf(entry.getValue().small().getCompletedMesssages()));
        }
        return hashMap;
    }

    @Override // org.apache.cassandra.net.MessagingServiceMBean
    public Map<String, Long> getSmallMessageDroppedTasks() {
        HashMap hashMap = new HashMap(this.connectionManagers.size());
        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : this.connectionManagers.entrySet()) {
            hashMap.put(entry.getKey().getHostAddress(), Long.valueOf(entry.getValue().small().getDroppedMessages()));
        }
        return hashMap;
    }

    @Override // org.apache.cassandra.net.MessagingServiceMBean
    public Map<String, Integer> getGossipMessagePendingTasks() {
        HashMap hashMap = new HashMap(this.connectionManagers.size());
        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : this.connectionManagers.entrySet()) {
            hashMap.put(entry.getKey().getHostAddress(), Integer.valueOf(entry.getValue().gossip().getPendingMessages()));
        }
        return hashMap;
    }

    @Override // org.apache.cassandra.net.MessagingServiceMBean
    public Map<String, Long> getGossipMessageCompletedTasks() {
        HashMap hashMap = new HashMap(this.connectionManagers.size());
        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : this.connectionManagers.entrySet()) {
            hashMap.put(entry.getKey().getHostAddress(), Long.valueOf(entry.getValue().gossip().getCompletedMesssages()));
        }
        return hashMap;
    }

    @Override // org.apache.cassandra.net.MessagingServiceMBean
    public Map<String, Long> getGossipMessageDroppedTasks() {
        HashMap hashMap = new HashMap(this.connectionManagers.size());
        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : this.connectionManagers.entrySet()) {
            hashMap.put(entry.getKey().getHostAddress(), Long.valueOf(entry.getValue().gossip().getDroppedMessages()));
        }
        return hashMap;
    }

    @Override // org.apache.cassandra.net.MessagingServiceMBean
    public Map<String, Integer> getDroppedMessages() {
        return this.droppedMessages.getSnapshot();
    }

    public Map<DroppedMessages.Group, DroppedMessageMetrics> getDroppedMessagesWithAllMetrics() {
        return this.droppedMessages.getAllMetrics();
    }

    @Override // org.apache.cassandra.net.MessagingServiceMBean
    public long getTotalTimeouts() {
        return ConnectionMetrics.totalTimeouts.getCount();
    }

    @Override // org.apache.cassandra.net.MessagingServiceMBean
    public Map<String, Long> getTimeoutsPerHost() {
        HashMap hashMap = new HashMap(this.connectionManagers.size());
        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : this.connectionManagers.entrySet()) {
            hashMap.put(entry.getKey().getHostAddress(), Long.valueOf(entry.getValue().getTimeouts()));
        }
        return hashMap;
    }

    @Override // org.apache.cassandra.net.MessagingServiceMBean
    public Map<String, Double> getBackPressurePerHost() {
        HashMap hashMap = new HashMap(this.connectionManagers.size());
        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : this.connectionManagers.entrySet()) {
            hashMap.put(entry.getKey().getHostAddress(), Double.valueOf(entry.getValue().getBackPressureState().getBackPressureRateLimit()));
        }
        return hashMap;
    }

    @Override // org.apache.cassandra.net.MessagingServiceMBean
    public void setBackPressureEnabled(boolean z) {
        DatabaseDescriptor.setBackPressureEnabled(z);
    }

    @Override // org.apache.cassandra.net.MessagingServiceMBean
    public boolean isBackPressureEnabled() {
        return DatabaseDescriptor.backPressureEnabled();
    }

    public static IPartitioner globalPartitioner() {
        return StorageService.instance.getTokenMetadata().partitioner;
    }

    public static void validatePartitioner(Collection<? extends AbstractBounds<?>> collection) {
        Iterator<? extends AbstractBounds<?>> it2 = collection.iterator();
        while (it2.hasNext()) {
            validatePartitioner(it2.next());
        }
    }

    public static void validatePartitioner(AbstractBounds<?> abstractBounds) {
        if (globalPartitioner() != abstractBounds.left.getPartitioner()) {
            throw new AssertionError(String.format("Partitioner in bounds serialization. Expected %s, was %s.", globalPartitioner().getClass().getName(), abstractBounds.left.getPartitioner().getClass().getName()));
        }
    }

    @VisibleForTesting
    public List<SocketThread> getSocketThreads() {
        return this.socketThreads;
    }

    static {
        $assertionsDisabled = !MessagingService.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(MessagingService.class);
        noSpamLogger = NoSpamLogger.getLogger(logger, 1L, TimeUnit.MINUTES);
        current_version = MessagingVersion.DSE_603;
        idGen = new AtomicInteger(0);
        STARTUP_TIME = ApolloTime.approximateNanoTime();
    }
}
