package org.apache.cassandra.streaming.async;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOError;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.DataOutputBufferFixed;
import org.apache.cassandra.net.AsyncChannelPromise;
import org.apache.cassandra.net.AsyncStreamingOutputPlus;
import org.apache.cassandra.net.OutboundConnectionSettings;
import org.apache.cassandra.streaming.StreamConnectionFactory;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamingMessageSender;
import org.apache.cassandra.streaming.messages.KeepAliveMessage;
import org.apache.cassandra.streaming.messages.OutgoingStreamMessage;
import org.apache.cassandra.streaming.messages.StreamInitMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cassandra-all-4.0.1.jar:org/apache/cassandra/streaming/async/NettyStreamingMessageSender.class */
public class NettyStreamingMessageSender implements StreamingMessageSender {
    private static final Logger logger;
    private static final int DEFAULT_MAX_PARALLEL_TRANSFERS;
    private static final int MAX_PARALLEL_TRANSFERS;
    private static final long DEFAULT_CLOSE_WAIT_IN_MILLIS;
    private static final Semaphore fileTransferSemaphore;
    private final StreamSession session;
    private final boolean isPreview;
    private final int streamingVersion;
    private final OutboundConnectionSettings template;
    private final StreamConnectionFactory factory;
    private volatile boolean closed;
    private volatile Channel controlMessageChannel;
    private final ThreadPoolExecutor fileTransferExecutor;

    @VisibleForTesting
    static final AttributeKey<Boolean> TRANSFERRING_FILE_ATTR;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Collection<ScheduledFuture<?>> channelKeepAlives = new LinkedBlockingQueue();
    private final ConcurrentMap<Thread, Channel> threadToChannelMap = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cassandra-all-4.0.1.jar:org/apache/cassandra/streaming/async/NettyStreamingMessageSender$FileStreamTask.class */
    public class FileStreamTask implements Runnable {
        private static final int SEMAPHORE_UNAVAILABLE_LOG_INTERVAL = 3;
        private final StreamMessage msg;

        FileStreamTask(OutgoingStreamMessage outgoingStreamMessage) {
            this.msg = outgoingStreamMessage;
        }

        FileStreamTask(StreamMessage streamMessage) {
            this.msg = streamMessage;
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            if (acquirePermit(3)) {
                try {
                    try {
                        try {
                            Channel orCreateChannel = getOrCreateChannel();
                            if (!orCreateChannel.attr(NettyStreamingMessageSender.TRANSFERRING_FILE_ATTR).compareAndSet(false, true)) {
                                throw new IllegalStateException("channel's transferring state is currently set to true. refusing to start new stream");
                            }
                            try {
                                AsyncStreamingOutputPlus asyncStreamingOutputPlus = new AsyncStreamingOutputPlus(orCreateChannel);
                                Throwable th = null;
                                try {
                                    try {
                                        StreamMessage.serialize(this.msg, asyncStreamingOutputPlus, NettyStreamingMessageSender.this.streamingVersion, NettyStreamingMessageSender.this.session);
                                        if (asyncStreamingOutputPlus != null) {
                                            if (0 != 0) {
                                                try {
                                                    asyncStreamingOutputPlus.close();
                                                } catch (Throwable th2) {
                                                    th.addSuppressed(th2);
                                                }
                                            } else {
                                                asyncStreamingOutputPlus.close();
                                            }
                                        }
                                        orCreateChannel.attr(NettyStreamingMessageSender.TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
                                        NettyStreamingMessageSender.fileTransferSemaphore.release();
                                    } catch (Throwable th3) {
                                        th = th3;
                                        throw th3;
                                    }
                                } catch (Throwable th4) {
                                    if (asyncStreamingOutputPlus != null) {
                                        if (th != null) {
                                            try {
                                                asyncStreamingOutputPlus.close();
                                            } catch (Throwable th5) {
                                                th.addSuppressed(th5);
                                            }
                                        } else {
                                            asyncStreamingOutputPlus.close();
                                        }
                                    }
                                    throw th4;
                                }
                            } catch (Throwable th6) {
                                orCreateChannel.attr(NettyStreamingMessageSender.TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
                                throw th6;
                            }
                        } catch (Throwable th7) {
                            NettyStreamingMessageSender.fileTransferSemaphore.release();
                            throw th7;
                        }
                    } catch (Exception e) {
                        NettyStreamingMessageSender.this.session.onError(e);
                        NettyStreamingMessageSender.fileTransferSemaphore.release();
                    }
                } catch (Throwable th8) {
                    if (NettyStreamingMessageSender.this.closed && (Throwables.getRootCause(th8) instanceof ClosedByInterruptException) && NettyStreamingMessageSender.this.fileTransferExecutor.isShutdown()) {
                        NettyStreamingMessageSender.logger.debug("{} Streaming channel was closed due to the executor pool being shutdown", NettyStreamingMessageSender.createLogTag(NettyStreamingMessageSender.this.session, null));
                    } else {
                        JVMStabilityInspector.inspectThrowable(th8);
                        if (!NettyStreamingMessageSender.this.session.state().isFinalState()) {
                            NettyStreamingMessageSender.this.session.onError(th8);
                        }
                    }
                    NettyStreamingMessageSender.fileTransferSemaphore.release();
                }
            }
        }

        boolean acquirePermit(int i) {
            long nanos = TimeUnit.MINUTES.toNanos(i);
            long nanoTime = System.nanoTime();
            while (!NettyStreamingMessageSender.this.closed) {
                if (NettyStreamingMessageSender.fileTransferSemaphore.tryAcquire(1L, TimeUnit.SECONDS)) {
                    return true;
                }
                long nanoTime2 = System.nanoTime();
                if (nanoTime2 - nanoTime > nanos) {
                    nanoTime = nanoTime2;
                    OutgoingStreamMessage outgoingStreamMessage = (OutgoingStreamMessage) this.msg;
                    if (NettyStreamingMessageSender.logger.isInfoEnabled()) {
                        NettyStreamingMessageSender.logger.info("{} waiting to acquire a permit to begin streaming {}. This message logs every {} minutes", NettyStreamingMessageSender.createLogTag(NettyStreamingMessageSender.this.session, null), outgoingStreamMessage.getName(), Integer.valueOf(i));
                    }
                }
            }
            return false;
        }

        private Channel getOrCreateChannel() {
            Thread currentThread = Thread.currentThread();
            try {
                Channel channel = (Channel) NettyStreamingMessageSender.this.threadToChannelMap.get(currentThread);
                if (channel != null) {
                    return channel;
                }
                Channel createChannel = NettyStreamingMessageSender.this.createChannel(false);
                NettyStreamingMessageSender.this.threadToChannelMap.put(currentThread, createChannel);
                return createChannel;
            } catch (Exception e) {
                throw new IOError(e);
            }
        }

        private void onError(Throwable th) {
            try {
                NettyStreamingMessageSender.this.session.onError(th).get(NettyStreamingMessageSender.DEFAULT_CLOSE_WAIT_IN_MILLIS, TimeUnit.MILLISECONDS);
            } catch (Exception e) {
            }
        }

        void injectChannel(Channel channel) {
            Thread currentThread = Thread.currentThread();
            if (NettyStreamingMessageSender.this.threadToChannelMap.get(currentThread) != null) {
                throw new IllegalStateException("previous channel already set");
            }
            NettyStreamingMessageSender.this.threadToChannelMap.put(currentThread, channel);
        }

        void unsetChannel() {
            NettyStreamingMessageSender.this.threadToChannelMap.remove(Thread.currentThread());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cassandra-all-4.0.1.jar:org/apache/cassandra/streaming/async/NettyStreamingMessageSender$KeepAliveTask.class */
    public class KeepAliveTask implements Runnable {
        private final Channel channel;
        private final StreamSession session;
        ScheduledFuture<?> future;

        KeepAliveTask(Channel channel, StreamSession streamSession) {
            this.channel = channel;
            this.session = streamSession;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (!this.channel.isOpen() || NettyStreamingMessageSender.this.closed) {
                this.future.cancel(false);
                return;
            }
            if (((Boolean) this.channel.attr(NettyStreamingMessageSender.TRANSFERRING_FILE_ATTR).get()).booleanValue()) {
                return;
            }
            try {
                if (NettyStreamingMessageSender.logger.isTraceEnabled()) {
                    NettyStreamingMessageSender.logger.trace("{} Sending keep-alive to {}.", NettyStreamingMessageSender.createLogTag(this.session, this.channel), this.session.peer);
                }
                NettyStreamingMessageSender.this.sendControlMessage(this.channel, new KeepAliveMessage(), this::keepAliveListener);
            } catch (IOException e) {
                this.future.cancel(false);
            }
        }

        private void keepAliveListener(Future<? super Void> future) {
            if (future.isSuccess() || future.isCancelled() || !NettyStreamingMessageSender.logger.isDebugEnabled()) {
                return;
            }
            NettyStreamingMessageSender.logger.debug("{} Could not send keep-alive message (perhaps stream session is finished?).", NettyStreamingMessageSender.createLogTag(this.session, this.channel), future.cause());
        }
    }

    public NettyStreamingMessageSender(StreamSession streamSession, OutboundConnectionSettings outboundConnectionSettings, StreamConnectionFactory streamConnectionFactory, int i, boolean z) {
        this.session = streamSession;
        this.streamingVersion = i;
        this.template = outboundConnectionSettings;
        this.factory = streamConnectionFactory;
        this.isPreview = z;
        this.fileTransferExecutor = new DebuggableThreadPoolExecutor(1, MAX_PARALLEL_TRANSFERS, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new NamedThreadFactory("NettyStreaming-Outbound-" + streamSession.peer.toString().replace(':', '.')));
        this.fileTransferExecutor.allowCoreThreadTimeOut(true);
    }

    @Override // org.apache.cassandra.streaming.StreamingMessageSender
    public void initialize() {
        sendMessage(new StreamInitMessage(FBUtilities.getBroadcastAddressAndPort(), this.session.sessionIndex(), this.session.planId(), this.session.streamOperation(), this.session.getPendingRepair(), this.session.getPreviewKind()));
    }

    public boolean hasControlChannel() {
        return this.controlMessageChannel != null;
    }

    public void injectControlMessageChannel(Channel channel) {
        this.controlMessageChannel = channel;
        channel.attr(TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
        scheduleKeepAliveTask(channel);
    }

    private void setupControlMessageChannel() throws IOException {
        if (this.controlMessageChannel == null) {
            this.controlMessageChannel = createChannel(true);
            scheduleKeepAliveTask(this.controlMessageChannel);
        }
    }

    private void scheduleKeepAliveTask(Channel channel) {
        int streamingKeepAlivePeriod = DatabaseDescriptor.getStreamingKeepAlivePeriod();
        if (logger.isDebugEnabled()) {
            logger.debug("{} Scheduling keep-alive task with {}s period.", createLogTag(this.session, channel), Integer.valueOf(streamingKeepAlivePeriod));
        }
        KeepAliveTask keepAliveTask = new KeepAliveTask(channel, this.session);
        io.netty.util.concurrent.ScheduledFuture<?> scheduleAtFixedRate = channel.eventLoop().scheduleAtFixedRate((Runnable) keepAliveTask, 0L, streamingKeepAlivePeriod, TimeUnit.SECONDS);
        this.channelKeepAlives.add(scheduleAtFixedRate);
        keepAliveTask.future = scheduleAtFixedRate;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Channel createChannel(boolean z) throws IOException {
        Channel createConnection = this.factory.createConnection(this.template, this.streamingVersion);
        this.session.attachOutbound(createConnection);
        if (z) {
            createConnection.pipeline().addLast("stream", new StreamingInboundHandler(this.template.to, this.streamingVersion, this.session));
        }
        createConnection.attr(TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
        logger.debug("Creating channel id {} local {} remote {}", createConnection.id(), createConnection.localAddress(), createConnection.remoteAddress());
        return createConnection;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String createLogTag(StreamSession streamSession, Channel channel) {
        StringBuilder sb = new StringBuilder(64);
        sb.append("[Stream");
        if (streamSession != null) {
            sb.append(" #").append(streamSession.planId());
        }
        if (channel != null) {
            sb.append(" channel: ").append(channel.id());
        }
        sb.append(']');
        return sb.toString();
    }

    @Override // org.apache.cassandra.streaming.StreamingMessageSender
    public void sendMessage(StreamMessage streamMessage) {
        if (this.closed) {
            throw new RuntimeException("stream has been closed, cannot send " + streamMessage);
        }
        if (streamMessage instanceof OutgoingStreamMessage) {
            if (this.isPreview) {
                throw new RuntimeException("Cannot send stream data messages for preview streaming sessions");
            }
            if (logger.isDebugEnabled()) {
                logger.debug("{} Sending {}", createLogTag(this.session, null), streamMessage);
            }
            this.fileTransferExecutor.submit(new FileStreamTask((OutgoingStreamMessage) streamMessage));
            return;
        }
        try {
            setupControlMessageChannel();
            sendControlMessage(this.controlMessageChannel, streamMessage, future -> {
                onControlMessageComplete(future, streamMessage);
            });
        } catch (Exception e) {
            close();
            this.session.onError(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendControlMessage(Channel channel, StreamMessage streamMessage, GenericFutureListener genericFutureListener) throws IOException {
        if (logger.isDebugEnabled()) {
            logger.debug("{} Sending {}", createLogTag(this.session, channel), streamMessage);
        }
        long serializedSize = StreamMessage.serializedSize(streamMessage, this.streamingVersion);
        if (serializedSize > 1073741824) {
            throw new IllegalStateException(String.format("%s something is seriously wrong with the calculated stream control message's size: %d bytes, type is %s", createLogTag(this.session, channel), Long.valueOf(serializedSize), streamMessage.type));
        }
        ByteBuf directBuffer = channel.alloc().directBuffer((int) serializedSize, (int) serializedSize);
        ByteBuffer nioBuffer = directBuffer.nioBuffer(0, (int) serializedSize);
        StreamMessage.serialize(streamMessage, new DataOutputBufferFixed(nioBuffer), this.streamingVersion, this.session);
        if (!$assertionsDisabled && nioBuffer.position() != nioBuffer.limit()) {
            throw new AssertionError();
        }
        directBuffer.writerIndex(nioBuffer.position());
        AsyncChannelPromise.writeAndFlush(channel, directBuffer, (GenericFutureListener<? extends Future<? super Void>>) genericFutureListener);
    }

    java.util.concurrent.Future onControlMessageComplete(Future<?> future, StreamMessage streamMessage) {
        ChannelFuture channelFuture = (ChannelFuture) future;
        Throwable cause = future.cause();
        if (cause == null) {
            return null;
        }
        logger.error("{} failed to send a stream message/data to peer {}: msg = {}", createLogTag(this.session, channelFuture.channel()), this.template.to, streamMessage, future.cause());
        return this.session.onError(cause);
    }

    public void setClosed() {
        this.closed = true;
    }

    void setControlMessageChannel(Channel channel) {
        this.controlMessageChannel = channel;
    }

    int semaphoreAvailablePermits() {
        return fileTransferSemaphore.availablePermits();
    }

    @Override // org.apache.cassandra.streaming.StreamingMessageSender
    public boolean connected() {
        return !this.closed && (this.controlMessageChannel == null || this.controlMessageChannel.isOpen());
    }

    @Override // org.apache.cassandra.streaming.StreamingMessageSender
    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        if (logger.isDebugEnabled()) {
            logger.debug("{} Closing stream connection channels on {}", createLogTag(this.session, null), this.template.to);
        }
        Iterator<ScheduledFuture<?>> it = this.channelKeepAlives.iterator();
        while (it.hasNext()) {
            it.next().cancel(false);
        }
        this.channelKeepAlives.clear();
        this.threadToChannelMap.clear();
        this.fileTransferExecutor.shutdownNow();
    }

    static {
        $assertionsDisabled = !NettyStreamingMessageSender.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger((Class<?>) NettyStreamingMessageSender.class);
        DEFAULT_MAX_PARALLEL_TRANSFERS = FBUtilities.getAvailableProcessors();
        MAX_PARALLEL_TRANSFERS = Integer.parseInt(System.getProperty("cassandra.streaming.session.parallelTransfers", Integer.toString(DEFAULT_MAX_PARALLEL_TRANSFERS)));
        DEFAULT_CLOSE_WAIT_IN_MILLIS = TimeUnit.MINUTES.toMillis(5L);
        fileTransferSemaphore = new Semaphore(DEFAULT_MAX_PARALLEL_TRANSFERS, true);
        TRANSFERRING_FILE_ATTR = AttributeKey.valueOf("transferringFile");
    }
}
