package scassandra.org.apache.cassandra.streaming;

import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SocketChannel;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scassandra.com.google.common.util.concurrent.Futures;
import scassandra.com.google.common.util.concurrent.ListenableFuture;
import scassandra.com.google.common.util.concurrent.SettableFuture;
import scassandra.org.apache.cassandra.io.util.DataOutputStreamAndChannel;
import scassandra.org.apache.cassandra.streaming.messages.StreamInitMessage;
import scassandra.org.apache.cassandra.streaming.messages.StreamMessage;
import scassandra.org.apache.cassandra.utils.FBUtilities;
import scassandra.org.apache.cassandra.utils.JVMStabilityInspector;
import scassandra.org.apache.commons.cli.HelpFormatter;

/* loaded from: input_file:scassandra/org/apache/cassandra/streaming/ConnectionHandler.class */
public class ConnectionHandler {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ConnectionHandler.class);
    private final StreamSession session;
    private IncomingMessageHandler incoming;
    private OutgoingMessageHandler outgoing;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:scassandra/org/apache/cassandra/streaming/ConnectionHandler$IncomingMessageHandler.class */
    public static class IncomingMessageHandler extends MessageHandler {
        IncomingMessageHandler(StreamSession streamSession) {
            super(streamSession);
        }

        @Override // scassandra.org.apache.cassandra.streaming.ConnectionHandler.MessageHandler
        protected String name() {
            return "STREAM-IN";
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    try {
                        ReadableByteChannel readChannel = getReadChannel(this.socket);
                        while (!isClosed()) {
                            StreamMessage deserialize = StreamMessage.deserialize(readChannel, this.protocolVersion, this.session);
                            if (deserialize != null) {
                                ConnectionHandler.logger.debug("[Stream #{}] Received {}", this.session.planId(), deserialize);
                                this.session.messageReceived(deserialize);
                            }
                        }
                        signalCloseDone();
                    } catch (Throwable th) {
                        JVMStabilityInspector.inspectThrowable(th);
                        this.session.onError(th);
                        signalCloseDone();
                    }
                } catch (SocketException e) {
                    close();
                    signalCloseDone();
                }
            } catch (Throwable th2) {
                signalCloseDone();
                throw th2;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:scassandra/org/apache/cassandra/streaming/ConnectionHandler$MessageHandler.class */
    public static abstract class MessageHandler implements Runnable {
        protected final StreamSession session;
        protected int protocolVersion;
        protected Socket socket;
        private final AtomicReference<SettableFuture<?>> closeFuture = new AtomicReference<>();

        protected MessageHandler(StreamSession streamSession) {
            this.session = streamSession;
        }

        protected abstract String name();

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v6, types: [java.nio.channels.WritableByteChannel] */
        protected static DataOutputStreamAndChannel getWriteChannel(Socket socket) throws IOException {
            SocketChannel channel = socket.getChannel();
            if (channel == null) {
                channel = Channels.newChannel(socket.getOutputStream());
            }
            return new DataOutputStreamAndChannel(socket.getOutputStream(), channel);
        }

        protected static ReadableByteChannel getReadChannel(Socket socket) throws IOException {
            SocketChannel channel = socket.getChannel();
            return channel == null ? Channels.newChannel(socket.getInputStream()) : channel;
        }

        public void sendInitMessage(Socket socket, boolean z) throws IOException {
            getWriteChannel(socket).write(new StreamInitMessage(FBUtilities.getBroadcastAddress(), this.session.sessionIndex(), this.session.planId(), this.session.description(), z).createMessage(false, this.protocolVersion));
        }

        public void start(Socket socket, int i) {
            this.socket = socket;
            this.protocolVersion = i;
            new Thread(this, name() + HelpFormatter.DEFAULT_OPT_PREFIX + this.session.peer).start();
        }

        public ListenableFuture<?> close() {
            SettableFuture<?> create = SettableFuture.create();
            return this.closeFuture.compareAndSet(null, create) ? create : this.closeFuture.get();
        }

        public boolean isClosed() {
            return this.closeFuture.get() != null;
        }

        protected void signalCloseDone() {
            this.closeFuture.get().set(null);
            try {
                this.socket.close();
            } catch (IOException e) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:scassandra/org/apache/cassandra/streaming/ConnectionHandler$OutgoingMessageHandler.class */
    public static class OutgoingMessageHandler extends MessageHandler {
        private final PriorityBlockingQueue<StreamMessage> messageQueue;

        OutgoingMessageHandler(StreamSession streamSession) {
            super(streamSession);
            this.messageQueue = new PriorityBlockingQueue<>(64, new Comparator<StreamMessage>() { // from class: scassandra.org.apache.cassandra.streaming.ConnectionHandler.OutgoingMessageHandler.1
                @Override // java.util.Comparator
                public int compare(StreamMessage streamMessage, StreamMessage streamMessage2) {
                    return streamMessage2.getPriority() - streamMessage.getPriority();
                }
            });
        }

        @Override // scassandra.org.apache.cassandra.streaming.ConnectionHandler.MessageHandler
        protected String name() {
            return "STREAM-OUT";
        }

        public void enqueue(StreamMessage streamMessage) {
            this.messageQueue.put(streamMessage);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    try {
                        DataOutputStreamAndChannel writeChannel = getWriteChannel(this.socket);
                        while (!isClosed()) {
                            StreamMessage poll = this.messageQueue.poll(1L, TimeUnit.SECONDS);
                            if (poll != null) {
                                ConnectionHandler.logger.debug("[Stream #{}] Sending {}", this.session.planId(), poll);
                                sendMessage(writeChannel, poll);
                                if (poll.type == StreamMessage.Type.SESSION_FAILED) {
                                    close();
                                }
                            }
                        }
                        while (true) {
                            StreamMessage poll2 = this.messageQueue.poll();
                            if (poll2 == null) {
                                signalCloseDone();
                                return;
                            }
                            sendMessage(writeChannel, poll2);
                        }
                    } catch (Throwable th) {
                        this.session.onError(th);
                        signalCloseDone();
                    }
                } catch (InterruptedException e) {
                    throw new AssertionError(e);
                }
            } catch (Throwable th2) {
                signalCloseDone();
                throw th2;
            }
        }

        private void sendMessage(DataOutputStreamAndChannel dataOutputStreamAndChannel, StreamMessage streamMessage) {
            try {
                StreamMessage.serialize(streamMessage, dataOutputStreamAndChannel, this.protocolVersion, this.session);
            } catch (SocketException e) {
                this.session.onError(e);
                close();
            } catch (IOException e2) {
                this.session.onError(e2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConnectionHandler(StreamSession streamSession) {
        this.session = streamSession;
        this.incoming = new IncomingMessageHandler(streamSession);
        this.outgoing = new OutgoingMessageHandler(streamSession);
    }

    public void initiate() throws IOException {
        logger.debug("[Stream #{}] Sending stream init for incoming stream", this.session.planId());
        Socket createConnection = this.session.createConnection();
        this.incoming.start(createConnection, 2);
        this.incoming.sendInitMessage(createConnection, true);
        logger.debug("[Stream #{}] Sending stream init for outgoing stream", this.session.planId());
        Socket createConnection2 = this.session.createConnection();
        this.outgoing.start(createConnection2, 2);
        this.outgoing.sendInitMessage(createConnection2, false);
    }

    public void initiateOnReceivingSide(Socket socket, boolean z, int i) throws IOException {
        if (z) {
            this.outgoing.start(socket, i);
        } else {
            this.incoming.start(socket, i);
        }
    }

    public ListenableFuture<?> close() {
        logger.debug("[Stream #{}] Closing stream connection handler on {}", this.session.planId(), this.session.peer);
        return Futures.allAsList(this.incoming == null ? Futures.immediateFuture(null) : this.incoming.close(), this.outgoing == null ? Futures.immediateFuture(null) : this.outgoing.close());
    }

    public void sendMessages(Collection<? extends StreamMessage> collection) {
        Iterator<? extends StreamMessage> it2 = collection.iterator();
        while (it2.hasNext()) {
            sendMessage(it2.next());
        }
    }

    public void sendMessage(StreamMessage streamMessage) {
        if (this.outgoing.isClosed()) {
            throw new RuntimeException("Outgoing stream handler has been closed");
        }
        this.outgoing.enqueue(streamMessage);
    }

    public boolean isOutgoingConnected() {
        return (this.outgoing == null || this.outgoing.isClosed()) ? false : true;
    }
}
