package org.apache.cassandra.streaming;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import java.io.EOFException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.metrics.StreamingMetrics;
import org.apache.cassandra.net.OutboundConnectionSettings;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.async.NettyStreamingMessageSender;
import org.apache.cassandra.streaming.messages.CompleteMessage;
import org.apache.cassandra.streaming.messages.IncomingStreamMessage;
import org.apache.cassandra.streaming.messages.OutgoingStreamMessage;
import org.apache.cassandra.streaming.messages.PrepareAckMessage;
import org.apache.cassandra.streaming.messages.PrepareSynAckMessage;
import org.apache.cassandra.streaming.messages.PrepareSynMessage;
import org.apache.cassandra.streaming.messages.ReceivedMessage;
import org.apache.cassandra.streaming.messages.SessionFailedMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.NoSpamLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/streaming/StreamSession.class */
public class StreamSession implements IEndpointStateChangeSubscriber {
    private static final Logger logger;
    public static volatile MessageStateSink sink;
    private final StreamOperation streamOperation;
    public final InetAddressAndPort peer;
    private final OutboundConnectionSettings template;
    private final int index;
    private StreamResultFuture streamResult;
    private final StreamingMetrics metrics;
    private final boolean isFollower;
    private final NettyStreamingMessageSender messageSender;
    private Future<?> closeFuture;
    private final UUID pendingRepair;
    private final PreviewKind previewKind;
    static final /* synthetic */ boolean $assertionsDisabled;
    protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet();

    @VisibleForTesting
    protected final ConcurrentHashMap<TableId, StreamTransferTask> transfers = new ConcurrentHashMap<>();
    private final Map<TableId, StreamReceiveTask> receivers = new ConcurrentHashMap();
    final Map<String, Set<Range<Token>>> transferredRangesPerKeyspace = new HashMap();
    private final ConcurrentMap<ChannelId, Channel> channels = new ConcurrentHashMap();
    private boolean maybeCompleted = false;
    private volatile State state = State.INITIALIZED;

    @VisibleForTesting
    /* loaded from: input_file:org/apache/cassandra/streaming/StreamSession$MessageStateSink.class */
    public interface MessageStateSink {
        public static final MessageStateSink NONE = new MessageStateSink() { // from class: org.apache.cassandra.streaming.StreamSession.MessageStateSink.1
            @Override // org.apache.cassandra.streaming.StreamSession.MessageStateSink
            public void recordState(InetAddressAndPort inetAddressAndPort, State state) {
            }

            @Override // org.apache.cassandra.streaming.StreamSession.MessageStateSink
            public void recordMessage(InetAddressAndPort inetAddressAndPort, StreamMessage.Type type) {
            }

            @Override // org.apache.cassandra.streaming.StreamSession.MessageStateSink
            public void onClose(InetAddressAndPort inetAddressAndPort) {
            }
        };

        void recordState(InetAddressAndPort inetAddressAndPort, State state);

        void recordMessage(InetAddressAndPort inetAddressAndPort, StreamMessage.Type type);

        void onClose(InetAddressAndPort inetAddressAndPort);
    }

    /* loaded from: input_file:org/apache/cassandra/streaming/StreamSession$State.class */
    public enum State {
        INITIALIZED(false),
        PREPARING(false),
        STREAMING(false),
        WAIT_COMPLETE(false),
        COMPLETE(true),
        FAILED(true),
        ABORTED(true);

        private final boolean finalState;

        State(boolean z) {
            this.finalState = z;
        }

        public boolean isFinalState() {
            return this.finalState;
        }
    }

    public StreamSession(StreamOperation streamOperation, InetAddressAndPort inetAddressAndPort, StreamConnectionFactory streamConnectionFactory, boolean z, int i, UUID uuid, PreviewKind previewKind) {
        this.streamOperation = streamOperation;
        this.peer = inetAddressAndPort;
        this.template = new OutboundConnectionSettings(inetAddressAndPort);
        this.isFollower = z;
        this.index = i;
        this.messageSender = new NettyStreamingMessageSender(this, this.template, streamConnectionFactory, 12, previewKind.isPreview());
        this.metrics = StreamingMetrics.get(inetAddressAndPort);
        this.pendingRepair = uuid;
        this.previewKind = previewKind;
        logger.debug("Creating stream session to {} as {}", this.template, z ? "follower" : "initiator");
    }

    public boolean isFollower() {
        return this.isFollower;
    }

    public UUID planId() {
        if (this.streamResult == null) {
            return null;
        }
        return this.streamResult.planId;
    }

    public int sessionIndex() {
        return this.index;
    }

    public StreamOperation streamOperation() {
        if (this.streamResult == null) {
            return null;
        }
        return this.streamResult.streamOperation;
    }

    public StreamOperation getStreamOperation() {
        return this.streamOperation;
    }

    public UUID getPendingRepair() {
        return this.pendingRepair;
    }

    public boolean isPreview() {
        return this.previewKind.isPreview();
    }

    public PreviewKind getPreviewKind() {
        return this.previewKind;
    }

    public StreamReceiver getAggregator(TableId tableId) {
        if ($assertionsDisabled || this.receivers.containsKey(tableId)) {
            return this.receivers.get(tableId).getReceiver();
        }
        throw new AssertionError("Missing tableId " + tableId);
    }

    public void init(StreamResultFuture streamResultFuture) {
        this.streamResult = streamResultFuture;
        StreamHook.instance.reportStreamFuture(this, streamResultFuture);
    }

    public synchronized boolean attachInbound(Channel channel, boolean z) {
        failIfFinished();
        if (!this.messageSender.hasControlChannel() && z) {
            this.messageSender.injectControlMessageChannel(channel);
        }
        channel.closeFuture().addListener(future -> {
            onChannelClose(channel);
        });
        return this.channels.putIfAbsent(channel.id(), channel) == null;
    }

    public synchronized boolean attachOutbound(Channel channel) {
        failIfFinished();
        channel.closeFuture().addListener(future -> {
            onChannelClose(channel);
        });
        return this.channels.putIfAbsent(channel.id(), channel) == null;
    }

    private void onChannelClose(Channel channel) {
        if (this.channels.remove(channel.id()) == null || !this.channels.isEmpty()) {
            return;
        }
        this.messageSender.close();
    }

    public void start() {
        if (this.requests.isEmpty() && this.transfers.isEmpty()) {
            logger.info("[Stream #{}] Session does not have any tasks.", planId());
            closeSession(State.COMPLETE);
            return;
        }
        try {
            Logger logger2 = logger;
            Object[] objArr = new Object[3];
            objArr[0] = planId();
            objArr[1] = this.peer;
            objArr[2] = this.template.connectTo == null ? "" : " through " + this.template.connectTo;
            logger2.info("[Stream #{}] Starting streaming to {}{}", objArr);
            this.messageSender.initialize();
            onInitializationComplete();
        } catch (Exception e) {
            JVMStabilityInspector.inspectThrowable(e);
            onError(e);
        }
    }

    public void addStreamRequest(String str, RangesAtEndpoint rangesAtEndpoint, RangesAtEndpoint rangesAtEndpoint2, Collection<String> collection) {
        if (!$assertionsDisabled && !Iterables.all(rangesAtEndpoint, (v0) -> {
            return v0.isSelf();
        }) && !RangesAtEndpoint.isDummyList(rangesAtEndpoint)) {
            throw new AssertionError(rangesAtEndpoint.toString());
        }
        if (!$assertionsDisabled && !Iterables.all(rangesAtEndpoint2, (v0) -> {
            return v0.isSelf();
        }) && !RangesAtEndpoint.isDummyList(rangesAtEndpoint2)) {
            throw new AssertionError(rangesAtEndpoint2.toString());
        }
        this.requests.add(new StreamRequest(str, rangesAtEndpoint, rangesAtEndpoint2, collection));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void addTransferRanges(String str, RangesAtEndpoint rangesAtEndpoint, Collection<String> collection, boolean z) {
        failIfFinished();
        Collection<ColumnFamilyStore> columnFamilyStores = getColumnFamilyStores(str, collection);
        if (z) {
            flushSSTables(columnFamilyStores);
        }
        addTransferStreams(getOutgoingStreamsForRanges(rangesAtEndpoint.unwrap(), columnFamilyStores, this.pendingRepair, this.previewKind));
        Set<Range<Token>> set = this.transferredRangesPerKeyspace.get(str);
        if (set == null) {
            set = new HashSet();
        }
        set.addAll(rangesAtEndpoint.ranges());
        this.transferredRangesPerKeyspace.put(str, set);
    }

    private void failIfFinished() {
        if (state().isFinalState()) {
            throw new RuntimeException(String.format("Stream %s is finished with state %s", planId(), state().name()));
        }
    }

    private Collection<ColumnFamilyStore> getColumnFamilyStores(String str, Collection<String> collection) {
        HashSet hashSet = new HashSet();
        if (collection.isEmpty()) {
            hashSet.addAll(Keyspace.open(str).getColumnFamilyStores());
        } else {
            Iterator<String> it = collection.iterator();
            while (it.hasNext()) {
                hashSet.add(Keyspace.open(str).getColumnFamilyStore(it.next()));
            }
        }
        return hashSet;
    }

    @VisibleForTesting
    public List<OutgoingStream> getOutgoingStreamsForRanges(RangesAtEndpoint rangesAtEndpoint, Collection<ColumnFamilyStore> collection, UUID uuid, PreviewKind previewKind) {
        ArrayList arrayList = new ArrayList();
        try {
            Iterator<ColumnFamilyStore> it = collection.iterator();
            while (it.hasNext()) {
                arrayList.addAll(it.next().getStreamManager().createOutgoingStreams(this, rangesAtEndpoint, uuid, previewKind));
            }
            return arrayList;
        } catch (Throwable th) {
            arrayList.forEach((v0) -> {
                v0.finish();
            });
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void addTransferStreams(Collection<OutgoingStream> collection) {
        failIfFinished();
        for (OutgoingStream outgoingStream : collection) {
            TableId tableId = outgoingStream.getTableId();
            StreamTransferTask streamTransferTask = this.transfers.get(tableId);
            if (streamTransferTask == null) {
                StreamTransferTask streamTransferTask2 = new StreamTransferTask(this, tableId);
                streamTransferTask = this.transfers.putIfAbsent(tableId, streamTransferTask2);
                if (streamTransferTask == null) {
                    streamTransferTask = streamTransferTask2;
                }
            }
            streamTransferTask.addTransferStream(outgoingStream);
        }
    }

    private synchronized Future<?> closeSession(State state) {
        if (this.closeFuture != null) {
            return this.closeFuture;
        }
        state(state);
        ArrayList arrayList = new ArrayList();
        if (state == State.FAILED || state == State.ABORTED) {
            arrayList.add(ScheduledExecutors.nonPeriodicTasks.submit(this::abortTasks));
        }
        if (!this.isFollower || this.state != State.COMPLETE) {
            logger.debug("[Stream #{}] Will close attached channels {}", planId(), this.channels);
            this.channels.values().forEach(channel -> {
                arrayList.add(channel.close());
            });
        }
        sink.onClose(this.peer);
        this.streamResult.handleSessionComplete(this);
        this.closeFuture = FBUtilities.allOf(arrayList);
        return this.closeFuture;
    }

    private void abortTasks() {
        try {
            this.receivers.values().forEach((v0) -> {
                v0.abort();
            });
            this.transfers.values().forEach((v0) -> {
                v0.abort();
            });
        } catch (Exception e) {
            logger.warn("[Stream #{}] failed to abort some streaming tasks", planId(), e);
        }
    }

    public void state(State state) {
        if (logger.isTraceEnabled()) {
            logger.trace("[Stream #{}] Changing session state from {} to {}", new Object[]{planId(), this.state, state});
        }
        sink.recordState(this.peer, state);
        this.state = state;
    }

    public State state() {
        return this.state;
    }

    public NettyStreamingMessageSender getMessageSender() {
        return this.messageSender;
    }

    public boolean isSuccess() {
        return this.state == State.COMPLETE;
    }

    public synchronized void messageReceived(StreamMessage streamMessage) {
        if (streamMessage.type != StreamMessage.Type.KEEP_ALIVE) {
            failIfFinished();
        }
        sink.recordMessage(this.peer, streamMessage.type);
        switch (streamMessage.type) {
            case STREAM_INIT:
            case KEEP_ALIVE:
                return;
            case PREPARE_SYN:
                PrepareSynMessage prepareSynMessage = (PrepareSynMessage) streamMessage;
                prepare(prepareSynMessage.requests, prepareSynMessage.summaries);
                return;
            case PREPARE_SYNACK:
                prepareSynAck((PrepareSynAckMessage) streamMessage);
                return;
            case PREPARE_ACK:
                prepareAck((PrepareAckMessage) streamMessage);
                return;
            case STREAM:
                receive((IncomingStreamMessage) streamMessage);
                return;
            case RECEIVED:
                ReceivedMessage receivedMessage = (ReceivedMessage) streamMessage;
                received(receivedMessage.tableId, receivedMessage.sequenceNumber);
                return;
            case COMPLETE:
                complete();
                return;
            case SESSION_FAILED:
                sessionFailed();
                return;
            default:
                throw new AssertionError("unhandled StreamMessage type: " + streamMessage.getClass().getName());
        }
    }

    public void onInitializationComplete() {
        state(State.PREPARING);
        PrepareSynMessage prepareSynMessage = new PrepareSynMessage();
        prepareSynMessage.requests.addAll(this.requests);
        Iterator<StreamTransferTask> it = this.transfers.values().iterator();
        while (it.hasNext()) {
            prepareSynMessage.summaries.add(it.next().getSummary());
        }
        this.messageSender.sendMessage(prepareSynMessage);
    }

    public synchronized Future<?> onError(Throwable th) {
        if (th instanceof EOFException) {
            if (this.state.finalState) {
                logger.debug("[Stream #{}] Socket closed after session completed with state {}", planId(), this.state);
                return null;
            }
            logger.error("[Stream #{}] Socket closed before session completion, peer {} is probably down.", new Object[]{planId(), this.peer.getHostAddressAndPort(), th});
            return closeSession(State.FAILED);
        }
        logError(th);
        if (this.messageSender.connected()) {
            state(State.FAILED);
            this.messageSender.sendMessage(new SessionFailedMessage());
        }
        return closeSession(State.FAILED);
    }

    private void logError(Throwable th) {
        if (!(th instanceof SocketTimeoutException)) {
            Logger logger2 = logger;
            Object[] objArr = new Object[4];
            objArr[0] = planId();
            objArr[1] = this.peer.getHostAddressAndPort();
            objArr[2] = this.template.connectTo == null ? "" : " through " + this.template.connectTo.getHostAddressAndPort();
            objArr[3] = th;
            logger2.error("[Stream #{}] Streaming error occurred on session with peer {}{}", objArr);
            return;
        }
        Logger logger3 = logger;
        Object[] objArr2 = new Object[5];
        objArr2[0] = planId();
        objArr2[1] = this.peer.getHostAddressAndPort();
        objArr2[2] = this.template.connectTo == null ? "" : " through " + this.template.connectTo.getHostAddressAndPort();
        objArr2[3] = Integer.valueOf(2 * DatabaseDescriptor.getStreamingKeepAlivePeriod());
        objArr2[4] = th;
        logger3.error("[Stream #{}] Did not receive response from peer {}{} for {} secs. Is peer down? If not, maybe try increasing streaming_keep_alive_period_in_secs.", objArr2);
    }

    public void prepare(Collection<StreamRequest> collection, Collection<StreamSummary> collection2) {
        state(State.PREPARING);
        ScheduledExecutors.nonPeriodicTasks.execute(() -> {
            try {
                prepareAsync(collection, collection2);
            } catch (Exception e) {
                onError(e);
            }
        });
    }

    private void prepareAsync(Collection<StreamRequest> collection, Collection<StreamSummary> collection2) {
        for (StreamRequest streamRequest : collection) {
            addTransferRanges(streamRequest.keyspace, RangesAtEndpoint.concat(streamRequest.full, streamRequest.transientReplicas), streamRequest.columnFamilies, true);
        }
        Iterator<StreamSummary> it = collection2.iterator();
        while (it.hasNext()) {
            prepareReceiving(it.next());
        }
        PrepareSynAckMessage prepareSynAckMessage = new PrepareSynAckMessage();
        if (!this.peer.equals(FBUtilities.getBroadcastAddressAndPort())) {
            Iterator<StreamTransferTask> it2 = this.transfers.values().iterator();
            while (it2.hasNext()) {
                prepareSynAckMessage.summaries.add(it2.next().getSummary());
            }
        }
        this.messageSender.sendMessage(prepareSynAckMessage);
        this.streamResult.handleSessionPrepared(this);
        if (isPreview()) {
            completePreview();
        } else {
            maybeCompleted();
        }
    }

    private void prepareSynAck(PrepareSynAckMessage prepareSynAckMessage) {
        if (!prepareSynAckMessage.summaries.isEmpty()) {
            Iterator<StreamSummary> it = prepareSynAckMessage.summaries.iterator();
            while (it.hasNext()) {
                prepareReceiving(it.next());
            }
            if (!isPreview()) {
                this.messageSender.sendMessage(new PrepareAckMessage());
            }
        }
        if (isPreview()) {
            completePreview();
        } else {
            startStreamingFiles(true);
        }
    }

    private void prepareAck(PrepareAckMessage prepareAckMessage) {
        if (isPreview()) {
            throw new RuntimeException(String.format("[Stream #%s] Cannot receive PrepareAckMessage for preview session", planId()));
        }
        startStreamingFiles(true);
    }

    public void streamSent(OutgoingStreamMessage outgoingStreamMessage) {
        long estimatedSize = outgoingStreamMessage.stream.getEstimatedSize();
        StreamingMetrics.totalOutgoingBytes.inc(estimatedSize);
        this.metrics.outgoingBytes.inc(estimatedSize);
        if (StreamOperation.REPAIR == getStreamOperation()) {
            StreamingMetrics.totalOutgoingRepairBytes.inc(estimatedSize);
            StreamingMetrics.totalOutgoingRepairSSTables.inc(outgoingStreamMessage.stream.getNumFiles());
        }
        StreamTransferTask streamTransferTask = this.transfers.get(outgoingStreamMessage.header.tableId);
        if (streamTransferTask != null) {
            streamTransferTask.scheduleTimeout(outgoingStreamMessage.header.sequenceNumber, 12L, TimeUnit.HOURS);
        }
    }

    public void receive(IncomingStreamMessage incomingStreamMessage) {
        if (isPreview()) {
            throw new RuntimeException(String.format("[Stream #%s] Cannot receive files for preview session", planId()));
        }
        long size = incomingStreamMessage.stream.getSize();
        StreamingMetrics.totalIncomingBytes.inc(size);
        this.metrics.incomingBytes.inc(size);
        this.messageSender.sendMessage(new ReceivedMessage(incomingStreamMessage.header.tableId, incomingStreamMessage.header.sequenceNumber));
        StreamHook.instance.reportIncomingStream(incomingStreamMessage.header.tableId, incomingStreamMessage.stream, this, incomingStreamMessage.header.sequenceNumber);
        long nanoTime = System.nanoTime();
        try {
            this.receivers.get(incomingStreamMessage.header.tableId).received(incomingStreamMessage.stream);
            long nanoTime2 = System.nanoTime() - nanoTime;
            this.metrics.incomingProcessTime.update(nanoTime2, TimeUnit.NANOSECONDS);
            long millis = TimeUnit.NANOSECONDS.toMillis(nanoTime2);
            int internodeStreamingTcpUserTimeoutInMS = DatabaseDescriptor.getInternodeStreamingTcpUserTimeoutInMS();
            if (internodeStreamingTcpUserTimeoutInMS <= 0 || millis <= internodeStreamingTcpUserTimeoutInMS) {
                return;
            }
            NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1L, TimeUnit.MINUTES, "The time taken ({} ms) for processing the incoming stream message ({}) exceeded internode streaming TCP user timeout ({} ms).\nThe streaming connection might be closed due to tcp user timeout.\nTry to increase the internode_streaming_tcp_user_timeout_in_ms or set it to 0 to use system defaults.", Long.valueOf(millis), incomingStreamMessage, Integer.valueOf(internodeStreamingTcpUserTimeoutInMS));
        } catch (Throwable th) {
            long nanoTime3 = System.nanoTime() - nanoTime;
            this.metrics.incomingProcessTime.update(nanoTime3, TimeUnit.NANOSECONDS);
            long millis2 = TimeUnit.NANOSECONDS.toMillis(nanoTime3);
            int internodeStreamingTcpUserTimeoutInMS2 = DatabaseDescriptor.getInternodeStreamingTcpUserTimeoutInMS();
            if (internodeStreamingTcpUserTimeoutInMS2 > 0 && millis2 > internodeStreamingTcpUserTimeoutInMS2) {
                NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1L, TimeUnit.MINUTES, "The time taken ({} ms) for processing the incoming stream message ({}) exceeded internode streaming TCP user timeout ({} ms).\nThe streaming connection might be closed due to tcp user timeout.\nTry to increase the internode_streaming_tcp_user_timeout_in_ms or set it to 0 to use system defaults.", Long.valueOf(millis2), incomingStreamMessage, Integer.valueOf(internodeStreamingTcpUserTimeoutInMS2));
            }
            throw th;
        }
    }

    public void progress(String str, ProgressInfo.Direction direction, long j, long j2) {
        this.streamResult.handleProgress(new ProgressInfo(this.peer, this.index, str, direction, j, j2));
    }

    public void received(TableId tableId, int i) {
        this.transfers.get(tableId).complete(i);
    }

    public synchronized void complete() {
        logger.debug("[Stream #{}] handling Complete message, state = {}", planId(), this.state);
        if (this.isFollower) {
            throw new IllegalStateException(String.format("[Stream #%s] Complete message can be only received by the initiator!", planId()));
        }
        if (this.state == State.WAIT_COMPLETE) {
            closeSession(State.COMPLETE);
        } else {
            state(State.WAIT_COMPLETE);
        }
    }

    private synchronized boolean maybeCompleted() {
        if (!this.receivers.isEmpty() || !this.transfers.isEmpty()) {
            return false;
        }
        if (this.maybeCompleted) {
            return true;
        }
        this.maybeCompleted = true;
        if (this.isFollower) {
            this.messageSender.sendMessage(new CompleteMessage());
            closeSession(State.COMPLETE);
            return true;
        }
        if (this.state == State.WAIT_COMPLETE) {
            closeSession(State.COMPLETE);
            return true;
        }
        state(State.WAIT_COMPLETE);
        return true;
    }

    public synchronized void sessionFailed() {
        logger.error("[Stream #{}] Remote peer {} failed stream session.", planId(), this.peer.toString());
        closeSession(State.FAILED);
    }

    public SessionInfo getSessionInfo() {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<StreamReceiveTask> it = this.receivers.values().iterator();
        while (it.hasNext()) {
            newArrayList.add(it.next().getSummary());
        }
        ArrayList newArrayList2 = Lists.newArrayList();
        Iterator<StreamTransferTask> it2 = this.transfers.values().iterator();
        while (it2.hasNext()) {
            newArrayList2.add(it2.next().getSummary());
        }
        return new SessionInfo(this.peer, this.index, this.template.connectTo == null ? this.peer : this.template.connectTo, newArrayList, newArrayList2, this.state);
    }

    public synchronized void taskCompleted(StreamReceiveTask streamReceiveTask) {
        this.receivers.remove(streamReceiveTask.tableId);
        maybeCompleted();
    }

    public synchronized void taskCompleted(StreamTransferTask streamTransferTask) {
        this.transfers.remove(streamTransferTask.tableId);
        maybeCompleted();
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onRemove(InetAddressAndPort inetAddressAndPort) {
        logger.error("[Stream #{}] Session failed because remote peer {} has left.", planId(), this.peer.toString());
        closeSession(State.FAILED);
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onRestart(InetAddressAndPort inetAddressAndPort, EndpointState endpointState) {
        logger.error("[Stream #{}] Session failed because remote peer {} was restarted.", planId(), this.peer.toString());
        closeSession(State.FAILED);
    }

    private void completePreview() {
        try {
            state(State.WAIT_COMPLETE);
            closeSession(State.COMPLETE);
        } finally {
            Iterator it = Iterables.concat(this.receivers.values(), this.transfers.values()).iterator();
            while (it.hasNext()) {
                ((StreamTask) it.next()).abort();
            }
        }
    }

    private void flushSSTables(Iterable<ColumnFamilyStore> iterable) {
        ArrayList arrayList = new ArrayList();
        Iterator<ColumnFamilyStore> it = iterable.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().forceFlush());
        }
        FBUtilities.waitOnFutures(arrayList);
    }

    @VisibleForTesting
    public synchronized void prepareReceiving(StreamSummary streamSummary) {
        failIfFinished();
        if (streamSummary.files > 0) {
            this.receivers.put(streamSummary.tableId, new StreamReceiveTask(this, streamSummary.tableId, streamSummary.files, streamSummary.totalSize));
        }
    }

    private void startStreamingFiles(boolean z) {
        if (z) {
            this.streamResult.handleSessionPrepared(this);
        }
        state(State.STREAMING);
        for (StreamTransferTask streamTransferTask : this.transfers.values()) {
            Collection<OutgoingStreamMessage> fileMessages = streamTransferTask.getFileMessages();
            if (fileMessages.isEmpty()) {
                taskCompleted(streamTransferTask);
            } else {
                for (OutgoingStreamMessage outgoingStreamMessage : fileMessages) {
                    outgoingStreamMessage.header.addSessionInfo(this);
                    this.messageSender.sendMessage(outgoingStreamMessage);
                }
            }
        }
        maybeCompleted();
    }

    @VisibleForTesting
    public int getNumRequests() {
        return this.requests.size();
    }

    @VisibleForTesting
    public int getNumTransfers() {
        return this.transferredRangesPerKeyspace.size();
    }

    public synchronized void abort() {
        if (this.state.isFinalState()) {
            logger.debug("[Stream #{}] Stream session with peer {} is already in a final state on abort.", planId(), this.peer);
            return;
        }
        logger.info("[Stream #{}] Aborting stream session with peer {}...", planId(), this.peer);
        if (getMessageSender().connected()) {
            getMessageSender().sendMessage(new SessionFailedMessage());
        }
        try {
            closeSession(State.ABORTED);
        } catch (Exception e) {
            logger.error("[Stream #{}] Error aborting stream session with peer {}", planId(), this.peer);
        }
    }

    static {
        $assertionsDisabled = !StreamSession.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(StreamSession.class);
        sink = MessageStateSink.NONE;
    }
}
