package org.apache.cassandra.net;

import com.beust.jcommander.Parameters;
import com.clearspring.analytics.stream.frequency.CountMinSketch;
import com.datastax.dse.byos.shade.com.google.common.annotations.VisibleForTesting;
import com.datastax.dse.byos.shade.com.google.common.util.concurrent.Uninterruptibles;
import io.netty.util.concurrent.FastThreadLocalThread;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
import java.nio.channels.Channels;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import javax.net.ssl.SSLHandshakeException;
import net.jpountz.lz4.LZ4BlockOutputStream;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.xxhash.XXHashFactory;
import org.apache.cassandra.auth.IInternodeAuthenticator;
import org.apache.cassandra.concurrent.ParkedThreadsMonitor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.monitoring.ApproximateTime;
import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.CoalescingStrategies;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.NoSpamLogger;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscGrowableArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/net/OutboundTcpConnection.class */
public class OutboundTcpConnection extends FastThreadLocalThread implements ParkedThreadsMonitor.MonitorableThread {
    private static final Logger logger;
    private static final NoSpamLogger nospamLogger;
    private static final String PREFIX = "cassandra.";
    private static final String INTRADC_TCP_NODELAY_PROPERTY = "cassandra.otc_intradc_tcp_nodelay";
    private static final boolean INTRADC_TCP_NODELAY;
    private static final String BUFFER_SIZE_PROPERTY = "cassandra.otc_buffer_size";
    private static final int BUFFER_SIZE;
    public static final int MAX_COALESCED_MESSAGES = 128;
    private static final int SOFT_MAX_QUEUE_SIZE_SMALL;
    private static final int SOFT_MAX_QUEUE_SIZE_LARGE;
    private volatile boolean isStopped;
    private volatile ParkedThreadsMonitor.MonitorableThread.ThreadState state;
    private Thread thread;
    private static final int OPEN_RETRY_DELAY = 100;
    public static final int WAIT_FOR_VERSION_MAX_TIME = 5000;
    static final int LZ4_HASH_SEED = -1756908916;
    private final MessagePassingQueue<QueuedMessage> backlog;
    private final int maxBackLogSize;
    private final AtomicInteger numBacklogMessages;
    private final OutboundTcpConnectionPool poolReference;
    private final CoalescingStrategies.CoalescingStrategy cs;
    private DataOutputStreamPlus out;
    private Socket socket;
    private volatile long completed;
    private final AtomicLong dropped;
    private volatile int currentMsgBufferCount;
    private final boolean isGossip;
    private volatile MessagingVersion targetVersion;
    private volatile Message.Serializer messageSerializer;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/cassandra/net/OutboundTcpConnection$InternodeAuthFailed.class */
    public static class InternodeAuthFailed extends Exception {
        private InternodeAuthFailed() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/cassandra/net/OutboundTcpConnection$QueuedMessage.class */
    public static class QueuedMessage implements CoalescingStrategies.Coalescable {
        final Message message;
        final long timestampNanos = ApproximateTime.nanoTime();

        QueuedMessage(Message message) {
            this.message = message;
        }

        boolean shouldRetry() {
            return true;
        }

        @Override // org.apache.cassandra.utils.CoalescingStrategies.Coalescable
        public long timestampNanos() {
            return this.timestampNanos;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/cassandra/net/OutboundTcpConnection$RetriedQueuedMessage.class */
    public static class RetriedQueuedMessage extends QueuedMessage {
        RetriedQueuedMessage(QueuedMessage queuedMessage) {
            super(queuedMessage.message);
        }

        @Override // org.apache.cassandra.net.OutboundTcpConnection.QueuedMessage
        boolean shouldRetry() {
            return false;
        }
    }

    private static int getMaxBackLogSize(Message.Kind kind) {
        switch (kind) {
            case GOSSIP:
                return Integer.MAX_VALUE;
            case SMALL:
                return SOFT_MAX_QUEUE_SIZE_SMALL;
            case LARGE:
                return SOFT_MAX_QUEUE_SIZE_LARGE;
            default:
                throw new IllegalStateException("Unsupported message kind: " + kind);
        }
    }

    private static CoalescingStrategies.CoalescingStrategy newCoalescingStrategy(String str, OutboundTcpConnection outboundTcpConnection) {
        return CoalescingStrategies.newCoalescingStrategy(DatabaseDescriptor.getOtcCoalescingStrategy(), DatabaseDescriptor.getOtcCoalescingWindow(), outboundTcpConnection, logger, str);
    }

    public OutboundTcpConnection(OutboundTcpConnectionPool outboundTcpConnectionPool, Message.Kind kind) {
        this(outboundTcpConnectionPool, kind.toString(), kind == Message.Kind.GOSSIP, getMaxBackLogSize(kind));
    }

    @VisibleForTesting
    OutboundTcpConnection(OutboundTcpConnectionPool outboundTcpConnectionPool, String str, boolean z, int i) {
        super("MessagingService-Outgoing-" + outboundTcpConnectionPool.endPoint() + Parameters.DEFAULT_OPTION_PREFIXES + str);
        this.isStopped = false;
        this.state = ParkedThreadsMonitor.MonitorableThread.ThreadState.WORKING;
        this.backlog = new MpscGrowableArrayQueue(4096, 1073741824);
        this.numBacklogMessages = new AtomicInteger(0);
        this.dropped = new AtomicLong();
        this.currentMsgBufferCount = 0;
        this.poolReference = outboundTcpConnectionPool;
        this.isGossip = z;
        this.maxBackLogSize = i;
        this.cs = newCoalescingStrategy(outboundTcpConnectionPool.endPoint().getHostAddress(), this);
        this.targetVersion = MessagingService.instance().getVersion(outboundTcpConnectionPool.endPoint()).orElse(MessagingService.current_version);
    }

    private static boolean isLocalDC(InetAddress inetAddress) {
        return isLocalDC(DatabaseDescriptor.getEndpointSnitch().getDatacenter(inetAddress));
    }

    private static boolean isLocalDC(String str) {
        return str.equals(DatabaseDescriptor.getLocalDataCenter()) || DatabaseDescriptor.getEndpointSnitch().isDefaultDC(str);
    }

    public boolean enqueue(Message message) {
        if (message.isDroppable() && this.numBacklogMessages.get() >= this.maxBackLogSize) {
            nospamLogger.warn("Rejecting droppable message because too many messages are already pending.", new Object[0]);
            return false;
        }
        boolean relaxedOffer = this.backlog.relaxedOffer(new QueuedMessage(message));
        if (!$assertionsDisabled && !relaxedOffer) {
            throw new AssertionError(String.format("Dropped a message that should not have been dropped: %s, kind: %s", message.toString(), message.kind()));
        }
        if (relaxedOffer) {
            this.numBacklogMessages.incrementAndGet();
        }
        return relaxedOffer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeSocket(boolean z) {
        logger.debug("Enqueuing socket close for {} with backlog size: {}", this.poolReference.endPoint(), this.numBacklogMessages);
        this.isStopped = z;
        this.numBacklogMessages.addAndGet(-this.backlog.drain(queuedMessage -> {
        }));
        enqueue(Message.CLOSE_SENTINEL);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void softCloseSocket() {
        enqueue(Message.CLOSE_SENTINEL);
    }

    public ParkedThreadsMonitor.MonitorableThread.ThreadState getThreadState() {
        return this.state;
    }

    public void park() {
        this.state = ParkedThreadsMonitor.MonitorableThread.ThreadState.PARKED;
        LockSupport.park();
    }

    @Override // org.apache.cassandra.concurrent.ParkedThreadsMonitor.MonitorableThread
    public void unpark() {
        if (!$assertionsDisabled && this.thread == null) {
            throw new AssertionError();
        }
        this.state = ParkedThreadsMonitor.MonitorableThread.ThreadState.WORKING;
        LockSupport.unpark(this.thread);
    }

    @Override // org.apache.cassandra.concurrent.ParkedThreadsMonitor.MonitorableThread
    public boolean shouldUnpark(long j) {
        return this.state == ParkedThreadsMonitor.MonitorableThread.ThreadState.PARKED && !this.backlog.isEmpty();
    }

    public void run() {
        Message<?> message;
        if (this.thread == null) {
            this.thread = Thread.currentThread();
        }
        ParkedThreadsMonitor.instance.get().addThreadToMonitor(this);
        ArrayList arrayList = new ArrayList(128);
        loop0: while (!this.isStopped) {
            try {
                this.cs.coalesce(this.backlog, arrayList, 128);
                int size = arrayList.size();
                this.currentMsgBufferCount = size;
                int i = size;
                this.numBacklogMessages.addAndGet(-i);
                Iterator it2 = arrayList.iterator();
                while (true) {
                    if (it2.hasNext()) {
                        QueuedMessage queuedMessage = (QueuedMessage) it2.next();
                        try {
                            message = queuedMessage.message;
                        } catch (InternodeAuthFailed e) {
                            logger.warn("Internode auth failed connecting to {}", this.poolReference.endPoint());
                            MessagingService.instance().destroyConnectionPool(this.poolReference.endPoint());
                        } catch (Exception e2) {
                            JVMStabilityInspector.inspectThrowable(e2);
                            logger.error("error processing a message intended for {}", this.poolReference.endPoint(), e2);
                        }
                        if (message == Message.CLOSE_SENTINEL) {
                            logger.trace("Disconnecting because CLOSE_SENTINEL detected");
                            disconnect();
                            if (this.isStopped) {
                                break loop0;
                            }
                        } else {
                            if (!message.isTimedOut(ApproximateTime.currentTimeMillis())) {
                                if (this.socket == null && !connect()) {
                                    int drain = this.backlog.drain(queuedMessage2 -> {
                                    });
                                    this.dropped.addAndGet(drain);
                                    this.numBacklogMessages.addAndGet(-drain);
                                    break;
                                }
                                writeConnected(queuedMessage, i == 1 && this.backlog.isEmpty());
                            } else {
                                this.dropped.incrementAndGet();
                            }
                            i--;
                            this.currentMsgBufferCount = i;
                        }
                    }
                }
                this.dropped.addAndGet(this.currentMsgBufferCount);
                arrayList.clear();
            } catch (InterruptedException e3) {
                throw new AssertionError(e3);
            }
        }
        ParkedThreadsMonitor.instance.get().removeThreadToMonitor(this);
    }

    public int getPendingMessages() {
        return this.numBacklogMessages.get() + this.currentMsgBufferCount;
    }

    public long getCompletedMesssages() {
        return this.completed;
    }

    public long getDroppedMessages() {
        return this.dropped.get();
    }

    private static boolean shouldCompressConnection(InetAddress inetAddress) {
        switch (DatabaseDescriptor.internodeCompression()) {
            case none:
                return false;
            case all:
                return true;
            case dc:
                return !isLocalDC(inetAddress);
            default:
                throw new AssertionError("internode-compression " + DatabaseDescriptor.internodeCompression());
        }
    }

    public static boolean shouldCompressConnection(String str) {
        switch (DatabaseDescriptor.internodeCompression()) {
            case none:
                return false;
            case all:
                return true;
            case dc:
                return !isLocalDC(str);
            default:
                throw new AssertionError("internode-compression " + DatabaseDescriptor.internodeCompression());
        }
    }

    private void writeConnected(QueuedMessage queuedMessage, boolean z) {
        try {
            long serializedSize = this.messageSerializer.serializedSize(queuedMessage.message);
            if (!$assertionsDisabled && serializedSize > CountMinSketch.PRIME_MODULUS) {
                throw new AssertionError("Invalid message, too large: " + serializedSize);
            }
            int i = (int) serializedSize;
            Tracing.instance.onMessageSend(queuedMessage.message, i);
            this.messageSerializer.writeSerializedSize(i, this.out);
            this.messageSerializer.serialize(queuedMessage.message, this.out);
            this.completed++;
            if (z || queuedMessage.message.verb() == Verbs.GOSSIP.ECHO) {
                this.out.flush();
            }
        } catch (Throwable th) {
            JVMStabilityInspector.inspectThrowable(th);
            disconnect();
            if (!(th instanceof IOException) && !(th.getCause() instanceof IOException)) {
                logger.error("error writing to {}", this.poolReference.endPoint(), th);
                return;
            }
            logger.debug("Error writing to {}", this.poolReference.endPoint(), th);
            if (queuedMessage.shouldRetry()) {
                boolean relaxedOffer = this.backlog.relaxedOffer(new RetriedQueuedMessage(queuedMessage));
                if (!$assertionsDisabled && !relaxedOffer) {
                    throw new AssertionError();
                }
                this.numBacklogMessages.incrementAndGet();
            }
        }
    }

    public boolean isSocketOpen() {
        return this.socket != null && this.socket.isConnected();
    }

    private void disconnect() {
        if (this.socket != null) {
            try {
                if (this.out != null) {
                    this.out.flush();
                }
            } catch (IOException e) {
                if (logger.isTraceEnabled()) {
                    logger.trace("exception flushing output stream before closing connection to " + this.poolReference.endPoint(), e);
                }
            }
            try {
                this.socket.close();
                logger.debug("Socket to {} closed", this.poolReference.endPoint());
            } catch (IOException e2) {
                logger.debug("Exception closing connection to {}", this.poolReference.endPoint(), e2);
            }
            this.out = null;
            this.socket = null;
        }
    }

    private boolean connect() throws InternodeAuthFailed {
        ProtocolVersion protocolVersion;
        boolean shouldCompressConnection;
        ProtocolVersion handshakeVersion;
        InetAddress endPoint = this.poolReference.endPoint();
        IInternodeAuthenticator internodeAuthenticator = DatabaseDescriptor.getInternodeAuthenticator();
        OutboundTcpConnectionPool outboundTcpConnectionPool = this.poolReference;
        if (!internodeAuthenticator.authenticate(endPoint, OutboundTcpConnectionPool.portFor(endPoint))) {
            throw new InternodeAuthFailed();
        }
        logger.debug("Attempting to connect to {}", endPoint);
        long nanoTime = System.nanoTime();
        long nanos = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getRpcTimeout());
        while (System.nanoTime() - nanoTime < nanos) {
            this.targetVersion = MessagingService.instance().getVersion(endPoint).orElse(MessagingService.current_version);
            try {
                try {
                    try {
                        try {
                            this.socket = this.poolReference.newSocket();
                            this.socket.setKeepAlive(true);
                            if (isLocalDC(endPoint) || this.isGossip) {
                                this.socket.setTcpNoDelay(INTRADC_TCP_NODELAY);
                            } else {
                                this.socket.setTcpNoDelay(DatabaseDescriptor.getInterDCTcpNoDelay());
                            }
                            if (DatabaseDescriptor.getInternodeSendBufferSize() > 0) {
                                try {
                                    this.socket.setSendBufferSize(DatabaseDescriptor.getInternodeSendBufferSize());
                                } catch (SocketException e) {
                                    logger.warn("Failed to set send buffer size on internode socket.", e);
                                }
                            }
                            SocketChannel channel = this.socket.getChannel();
                            this.out = new BufferedDataOutputStreamPlus(channel != null ? channel : Channels.newChannel(this.socket.getOutputStream()), BUFFER_SIZE);
                            protocolVersion = this.targetVersion.protocolVersion();
                            shouldCompressConnection = shouldCompressConnection(this.poolReference.endPoint());
                            this.out.writeInt(MessagingService.PROTOCOL_MAGIC);
                            this.out.writeInt(protocolVersion.makeProtocolHeader(shouldCompressConnection, false));
                            this.out.flush();
                            handshakeVersion = handshakeVersion(this.socket, new DataInputStream(this.socket.getInputStream()));
                        } catch (ConnectException e2) {
                            disconnect();
                            nospamLogger.debug(String.format("Unable to connect to %s (%s)", this.poolReference.endPoint(), e2.toString()), new Object[0]);
                            Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
                            if (0 == 0) {
                                disconnect();
                            }
                        }
                    } catch (SSLHandshakeException e3) {
                        logger.error("SSL handshake error for outbound connection to " + this.socket, e3);
                        disconnect();
                        if (0 == 0) {
                            disconnect();
                        }
                        return false;
                    }
                } catch (IOException e4) {
                    disconnect();
                    logger.debug("unable to connect to " + this.poolReference.endPoint(), e4);
                    Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
                    if (0 == 0) {
                        disconnect();
                    }
                }
                if (handshakeVersion != null) {
                    MessagingService.instance().setVersion(endPoint, MessagingVersion.from(handshakeVersion));
                    if (protocolVersion.compareTo(handshakeVersion) > 0) {
                        logger.trace("Target max version is {}; will reconnect with that version", handshakeVersion);
                        try {
                            if (DatabaseDescriptor.getSeeds().contains(endPoint)) {
                                logger.warn("Seed gossip version is {}; will not connect with that version", handshakeVersion);
                            }
                        } catch (Throwable th) {
                            JVMStabilityInspector.inspectThrowable(th);
                            logger.warn("Configuration error prevented outbound connection: {}", th.getLocalizedMessage());
                        }
                        if (0 == 0) {
                            disconnect();
                        }
                        return false;
                    }
                    if (protocolVersion.compareTo(handshakeVersion) < 0 && this.targetVersion != MessagingService.current_version) {
                        logger.trace("Detected higher max version {} (using {}); will reconnect when queued messages are done", handshakeVersion, protocolVersion);
                        softCloseSocket();
                    }
                    long currentTimeMillis = ApproximateTime.currentTimeMillis();
                    this.messageSerializer = Message.createSerializer(this.targetVersion, currentTimeMillis);
                    this.out.writeInt(MessagingService.current_version.protocolVersion().handshakeVersion);
                    CompactEndpointSerializationHelper.serialize(FBUtilities.getBroadcastAddress(), this.out);
                    if (this.targetVersion.isDSE()) {
                        MessageParameters.serializer().serialize(MessageParameters.builder().putLong("BASE_TIMESTAMP", currentTimeMillis).build(), this.out);
                    }
                    if (shouldCompressConnection) {
                        this.out.flush();
                        logger.trace("Upgrading OutputStream to {} to be compressed", endPoint);
                        this.out = new WrappedDataOutputStreamPlus(new LZ4BlockOutputStream(this.socket.getOutputStream(), 16384, LZ4Factory.fastestInstance().fastCompressor(), XXHashFactory.fastestInstance().newStreamingHash32(LZ4_HASH_SEED).asChecksum(), true));
                    }
                    logger.debug("Done connecting to {}", endPoint);
                    if (1 == 0) {
                        disconnect();
                    }
                    return true;
                }
                logger.trace("Target max version is {}; no version information yet, will retry", handshakeVersion);
                disconnect();
                if (0 == 0) {
                    disconnect();
                }
            } catch (Throwable th2) {
                if (0 == 0) {
                    disconnect();
                }
                throw th2;
            }
        }
        return false;
    }

    private ProtocolVersion handshakeVersion(Socket socket, DataInputStream dataInputStream) throws IOException {
        int soTimeout = socket.getSoTimeout();
        try {
            socket.setSoTimeout(5000);
            ProtocolVersion fromHandshakeVersion = ProtocolVersion.fromHandshakeVersion(dataInputStream.readInt());
            socket.setSoTimeout(soTimeout);
            return fromHandshakeVersion;
        } catch (Throwable th) {
            socket.setSoTimeout(soTimeout);
            throw th;
        }
    }

    @VisibleForTesting
    MessagePassingQueue<QueuedMessage> backlog() {
        return this.backlog;
    }

    @VisibleForTesting
    void drain(MessagePassingQueue.Consumer<QueuedMessage> consumer, int i) {
        this.numBacklogMessages.addAndGet(-this.backlog.drain(consumer, i));
    }

    static {
        $assertionsDisabled = !OutboundTcpConnection.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(OutboundTcpConnection.class);
        nospamLogger = NoSpamLogger.getLogger(logger, 10L, TimeUnit.SECONDS);
        INTRADC_TCP_NODELAY = Boolean.parseBoolean(System.getProperty(INTRADC_TCP_NODELAY_PROPERTY, "true"));
        BUFFER_SIZE = Integer.getInteger(BUFFER_SIZE_PROPERTY, 65536).intValue();
        SOFT_MAX_QUEUE_SIZE_SMALL = Integer.parseInt(System.getProperty("dse.outbound.connection.small.queue.max", "32768"));
        SOFT_MAX_QUEUE_SIZE_LARGE = Integer.parseInt(System.getProperty("dse.outbound.connection.large.queue.max", "4096"));
        if (SOFT_MAX_QUEUE_SIZE_SMALL < 1) {
            throw new IllegalStateException("SOFT_MAX_QUEUE_SIZE_SMALL should be positive!");
        }
        if (SOFT_MAX_QUEUE_SIZE_LARGE < 1) {
            throw new IllegalStateException("SOFT_MAX_QUEUE_SIZE_LARGE should be positive!");
        }
        String otcCoalescingStrategy = DatabaseDescriptor.getOtcCoalescingStrategy();
        boolean z = -1;
        switch (otcCoalescingStrategy.hashCode()) {
            case -2005403122:
                if (otcCoalescingStrategy.equals("TIMEHORIZON")) {
                    z = false;
                    break;
                }
                break;
            case -864683537:
                if (otcCoalescingStrategy.equals("MOVINGAVERAGE")) {
                    z = true;
                    break;
                }
                break;
            case 66907988:
                if (otcCoalescingStrategy.equals("FIXED")) {
                    z = 2;
                    break;
                }
                break;
            case 1053567612:
                if (otcCoalescingStrategy.equals("DISABLED")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                break;
            case true:
            case true:
            case true:
                logger.info("OutboundTcpConnection using coalescing strategy {}", otcCoalescingStrategy);
                break;
            default:
                newCoalescingStrategy("dummy", null);
                break;
        }
        int otcCoalescingWindow = DatabaseDescriptor.getOtcCoalescingWindow();
        if (otcCoalescingWindow != 200) {
            logger.info("OutboundTcpConnection coalescing window set to {}μs", Integer.valueOf(otcCoalescingWindow));
        }
        if (otcCoalescingWindow < 0) {
            throw new ExceptionInInitializerError("Value provided for coalescing window must be greater than 0: " + otcCoalescingWindow);
        }
    }
}
