package org.apache.cassandra.streaming;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.metrics.StreamingMetrics;
import org.apache.cassandra.net.OutboundConnectionSettings;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.async.NettyStreamingMessageSender;
import org.apache.cassandra.streaming.messages.CompleteMessage;
import org.apache.cassandra.streaming.messages.IncomingStreamMessage;
import org.apache.cassandra.streaming.messages.OutgoingStreamMessage;
import org.apache.cassandra.streaming.messages.PrepareAckMessage;
import org.apache.cassandra.streaming.messages.PrepareSynAckMessage;
import org.apache.cassandra.streaming.messages.PrepareSynMessage;
import org.apache.cassandra.streaming.messages.ReceivedMessage;
import org.apache.cassandra.streaming.messages.SessionFailedMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MerkleTree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/streaming/StreamSession.class */
public class StreamSession implements IEndpointStateChangeSubscriber {
    private static final Logger logger;
    private final StreamOperation streamOperation;
    public final InetAddressAndPort peer;
    private final OutboundConnectionSettings template;
    private final int index;
    private StreamResultFuture streamResult;
    protected final Set<StreamRequest> requests;

    @VisibleForTesting
    protected final ConcurrentHashMap<TableId, StreamTransferTask> transfers;
    private final Map<TableId, StreamReceiveTask> receivers;
    private final StreamingMetrics metrics;
    final Map<String, Set<Range<Token>>> transferredRangesPerKeyspace;
    private final NettyStreamingMessageSender messageSender;
    private final ConcurrentMap<ChannelId, Channel> incomingChannels;
    private final AtomicBoolean isAborted;
    private final UUID pendingRepair;
    private final PreviewKind previewKind;
    private volatile State state;
    private volatile boolean completeSent;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* renamed from: org.apache.cassandra.streaming.StreamSession$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/cassandra/streaming/StreamSession$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$cassandra$streaming$messages$StreamMessage$Type = new int[StreamMessage.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$cassandra$streaming$messages$StreamMessage$Type[StreamMessage.Type.STREAM_INIT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$cassandra$streaming$messages$StreamMessage$Type[StreamMessage.Type.PREPARE_SYN.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$cassandra$streaming$messages$StreamMessage$Type[StreamMessage.Type.PREPARE_SYNACK.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$cassandra$streaming$messages$StreamMessage$Type[StreamMessage.Type.PREPARE_ACK.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$cassandra$streaming$messages$StreamMessage$Type[StreamMessage.Type.STREAM.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$cassandra$streaming$messages$StreamMessage$Type[StreamMessage.Type.RECEIVED.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$cassandra$streaming$messages$StreamMessage$Type[StreamMessage.Type.COMPLETE.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$cassandra$streaming$messages$StreamMessage$Type[StreamMessage.Type.KEEP_ALIVE.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$cassandra$streaming$messages$StreamMessage$Type[StreamMessage.Type.SESSION_FAILED.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
        }
    }

    /* loaded from: input_file:org/apache/cassandra/streaming/StreamSession$State.class */
    public enum State {
        INITIALIZED,
        PREPARING,
        STREAMING,
        WAIT_COMPLETE,
        COMPLETE,
        FAILED
    }

    public StreamSession(StreamOperation streamOperation, InetAddressAndPort inetAddressAndPort, StreamConnectionFactory streamConnectionFactory, int i, UUID uuid, PreviewKind previewKind) {
        this(streamOperation, new OutboundConnectionSettings(inetAddressAndPort), streamConnectionFactory, i, uuid, previewKind);
    }

    public StreamSession(StreamOperation streamOperation, OutboundConnectionSettings outboundConnectionSettings, StreamConnectionFactory streamConnectionFactory, int i, UUID uuid, PreviewKind previewKind) {
        this.requests = Sets.newConcurrentHashSet();
        this.transfers = new ConcurrentHashMap<>();
        this.receivers = new ConcurrentHashMap();
        this.transferredRangesPerKeyspace = new HashMap();
        this.incomingChannels = new ConcurrentHashMap();
        this.isAborted = new AtomicBoolean(false);
        this.state = State.INITIALIZED;
        this.completeSent = false;
        this.streamOperation = streamOperation;
        this.peer = outboundConnectionSettings.to;
        this.template = outboundConnectionSettings;
        this.index = i;
        this.messageSender = new NettyStreamingMessageSender(this, outboundConnectionSettings, streamConnectionFactory, 12, previewKind.isPreview());
        this.metrics = StreamingMetrics.get(this.peer);
        this.pendingRepair = uuid;
        this.previewKind = previewKind;
        logger.debug("Creating stream session to {}", outboundConnectionSettings);
    }

    public UUID planId() {
        if (this.streamResult == null) {
            return null;
        }
        return this.streamResult.planId;
    }

    public int sessionIndex() {
        return this.index;
    }

    public StreamOperation streamOperation() {
        if (this.streamResult == null) {
            return null;
        }
        return this.streamResult.streamOperation;
    }

    public StreamOperation getStreamOperation() {
        return this.streamOperation;
    }

    public UUID getPendingRepair() {
        return this.pendingRepair;
    }

    public boolean isPreview() {
        return this.previewKind.isPreview();
    }

    public PreviewKind getPreviewKind() {
        return this.previewKind;
    }

    public StreamReceiver getAggregator(TableId tableId) {
        if ($assertionsDisabled || this.receivers.containsKey(tableId)) {
            return this.receivers.get(tableId).getReceiver();
        }
        throw new AssertionError("Missing tableId " + tableId);
    }

    public void init(StreamResultFuture streamResultFuture) {
        this.streamResult = streamResultFuture;
        StreamHook.instance.reportStreamFuture(this, streamResultFuture);
    }

    public boolean attach(Channel channel) {
        if (!this.messageSender.hasControlChannel()) {
            this.messageSender.injectControlMessageChannel(channel);
        }
        return this.incomingChannels.putIfAbsent(channel.id(), channel) == null;
    }

    public void start() {
        if (this.requests.isEmpty() && this.transfers.isEmpty()) {
            logger.info("[Stream #{}] Session does not have any tasks.", planId());
            closeSession(State.COMPLETE);
            return;
        }
        try {
            Logger logger2 = logger;
            Object[] objArr = new Object[3];
            objArr[0] = planId();
            objArr[1] = this.peer;
            objArr[2] = this.template.connectTo == null ? "" : " through " + this.template.connectTo;
            logger2.info("[Stream #{}] Starting streaming to {}{}", objArr);
            this.messageSender.initialize();
            onInitializationComplete();
        } catch (Exception e) {
            JVMStabilityInspector.inspectThrowable(e);
            onError(e);
        }
    }

    public void addStreamRequest(String str, RangesAtEndpoint rangesAtEndpoint, RangesAtEndpoint rangesAtEndpoint2, Collection<String> collection) {
        if (!$assertionsDisabled && !Iterables.all(rangesAtEndpoint, (v0) -> {
            return v0.isSelf();
        }) && !RangesAtEndpoint.isDummyList(rangesAtEndpoint)) {
            throw new AssertionError(rangesAtEndpoint.toString());
        }
        if (!$assertionsDisabled && !Iterables.all(rangesAtEndpoint2, (v0) -> {
            return v0.isSelf();
        }) && !RangesAtEndpoint.isDummyList(rangesAtEndpoint2)) {
            throw new AssertionError(rangesAtEndpoint2.toString());
        }
        this.requests.add(new StreamRequest(str, rangesAtEndpoint, rangesAtEndpoint2, collection));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void addTransferRanges(String str, RangesAtEndpoint rangesAtEndpoint, Collection<String> collection, boolean z) {
        failIfFinished();
        Collection<ColumnFamilyStore> columnFamilyStores = getColumnFamilyStores(str, collection);
        if (z) {
            flushSSTables(columnFamilyStores);
        }
        addTransferStreams(getOutgoingStreamsForRanges(rangesAtEndpoint.unwrap(), columnFamilyStores, this.pendingRepair, this.previewKind));
        Set<Range<Token>> set = this.transferredRangesPerKeyspace.get(str);
        if (set == null) {
            set = new HashSet();
        }
        set.addAll(rangesAtEndpoint.ranges());
        this.transferredRangesPerKeyspace.put(str, set);
    }

    private void failIfFinished() {
        if (state() == State.COMPLETE || state() == State.FAILED) {
            throw new RuntimeException(String.format("Stream %s is finished with state %s", planId(), state().name()));
        }
    }

    private Collection<ColumnFamilyStore> getColumnFamilyStores(String str, Collection<String> collection) {
        HashSet hashSet = new HashSet();
        if (collection.isEmpty()) {
            hashSet.addAll(Keyspace.open(str).getColumnFamilyStores());
        } else {
            Iterator<String> it = collection.iterator();
            while (it.hasNext()) {
                hashSet.add(Keyspace.open(str).getColumnFamilyStore(it.next()));
            }
        }
        return hashSet;
    }

    @VisibleForTesting
    public List<OutgoingStream> getOutgoingStreamsForRanges(RangesAtEndpoint rangesAtEndpoint, Collection<ColumnFamilyStore> collection, UUID uuid, PreviewKind previewKind) {
        ArrayList arrayList = new ArrayList();
        try {
            Iterator<ColumnFamilyStore> it = collection.iterator();
            while (it.hasNext()) {
                arrayList.addAll(it.next().getStreamManager().createOutgoingStreams(this, rangesAtEndpoint, uuid, previewKind));
            }
            return arrayList;
        } catch (Throwable th) {
            arrayList.forEach((v0) -> {
                v0.finish();
            });
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void addTransferStreams(Collection<OutgoingStream> collection) {
        failIfFinished();
        for (OutgoingStream outgoingStream : collection) {
            TableId tableId = outgoingStream.getTableId();
            StreamTransferTask streamTransferTask = this.transfers.get(tableId);
            if (streamTransferTask == null) {
                StreamTransferTask streamTransferTask2 = new StreamTransferTask(this, tableId);
                streamTransferTask = this.transfers.putIfAbsent(tableId, streamTransferTask2);
                if (streamTransferTask == null) {
                    streamTransferTask = streamTransferTask2;
                }
            }
            streamTransferTask.addTransferStream(outgoingStream);
        }
    }

    private synchronized Future closeSession(State state) {
        Future<?> future = null;
        if (this.isAborted.compareAndSet(false, true)) {
            state(state);
            if (state == State.FAILED) {
                future = ScheduledExecutors.nonPeriodicTasks.submit(this::abortTasks);
            }
            this.incomingChannels.values().stream().map(channel -> {
                return channel.close();
            });
            this.messageSender.close();
            this.streamResult.handleSessionComplete(this);
        }
        return future != null ? future : Futures.immediateFuture((Object) null);
    }

    private void abortTasks() {
        try {
            this.receivers.values().forEach((v0) -> {
                v0.abort();
            });
            this.transfers.values().forEach((v0) -> {
                v0.abort();
            });
        } catch (Exception e) {
            logger.warn("failed to abort some streaming tasks", e);
        }
    }

    public void state(State state) {
        this.state = state;
    }

    public State state() {
        return this.state;
    }

    public NettyStreamingMessageSender getMessageSender() {
        return this.messageSender;
    }

    public boolean isSuccess() {
        return this.state == State.COMPLETE;
    }

    public void messageReceived(StreamMessage streamMessage) {
        switch (AnonymousClass1.$SwitchMap$org$apache$cassandra$streaming$messages$StreamMessage$Type[streamMessage.type.ordinal()]) {
            case 1:
            case 8:
                return;
            case MerkleTree.Inner.IDENT /* 2 */:
                PrepareSynMessage prepareSynMessage = (PrepareSynMessage) streamMessage;
                prepare(prepareSynMessage.requests, prepareSynMessage.summaries);
                return;
            case 3:
                prepareSynAck((PrepareSynAckMessage) streamMessage);
                return;
            case 4:
                prepareAck((PrepareAckMessage) streamMessage);
                return;
            case 5:
                receive((IncomingStreamMessage) streamMessage);
                return;
            case 6:
                ReceivedMessage receivedMessage = (ReceivedMessage) streamMessage;
                received(receivedMessage.tableId, receivedMessage.sequenceNumber);
                return;
            case 7:
                complete();
                return;
            case 9:
                sessionFailed();
                return;
            default:
                throw new AssertionError("unhandled StreamMessage type: " + streamMessage.getClass().getName());
        }
    }

    public void onInitializationComplete() {
        state(State.PREPARING);
        PrepareSynMessage prepareSynMessage = new PrepareSynMessage();
        prepareSynMessage.requests.addAll(this.requests);
        long j = 0;
        long j2 = 0;
        for (StreamTransferTask streamTransferTask : this.transfers.values()) {
            j += streamTransferTask.getTotalSize();
            j2 += streamTransferTask.getTotalNumberOfFiles();
            prepareSynMessage.summaries.add(streamTransferTask.getSummary());
        }
        if (StreamOperation.REPAIR == getStreamOperation()) {
            StreamingMetrics.totalOutgoingRepairBytes.inc(j);
            StreamingMetrics.totalOutgoingRepairSSTables.inc(j2);
        }
        this.messageSender.sendMessage(prepareSynMessage);
    }

    public Future onError(Throwable th) {
        logError(th);
        if (this.messageSender.connected()) {
            this.messageSender.sendMessage(new SessionFailedMessage());
        }
        return closeSession(State.FAILED);
    }

    private void logError(Throwable th) {
        if (!(th instanceof SocketTimeoutException)) {
            Logger logger2 = logger;
            Object[] objArr = new Object[4];
            objArr[0] = planId();
            objArr[1] = this.peer.getHostAddress(true);
            objArr[2] = this.template.connectTo == null ? "" : " through " + this.template.connectTo.getHostAddress(true);
            objArr[3] = th;
            logger2.error("[Stream #{}] Streaming error occurred on session with peer {}{}", objArr);
            return;
        }
        Logger logger3 = logger;
        Object[] objArr2 = new Object[5];
        objArr2[0] = planId();
        objArr2[1] = this.peer.getHostAddress(true);
        objArr2[2] = this.template.connectTo == null ? "" : " through " + this.template.connectTo.getHostAddress(true);
        objArr2[3] = Integer.valueOf(2 * DatabaseDescriptor.getStreamingKeepAlivePeriod());
        objArr2[4] = th;
        logger3.error("[Stream #{}] Did not receive response from peer {}{} for {} secs. Is peer down? If not, maybe try increasing streaming_keep_alive_period_in_secs.", objArr2);
    }

    public void prepare(Collection<StreamRequest> collection, Collection<StreamSummary> collection2) {
        state(State.PREPARING);
        ScheduledExecutors.nonPeriodicTasks.execute(() -> {
            prepareAsync(collection, collection2);
        });
    }

    private void prepareAsync(Collection<StreamRequest> collection, Collection<StreamSummary> collection2) {
        for (StreamRequest streamRequest : collection) {
            addTransferRanges(streamRequest.keyspace, RangesAtEndpoint.concat(streamRequest.full, streamRequest.transientReplicas), streamRequest.columnFamilies, true);
        }
        Iterator<StreamSummary> it = collection2.iterator();
        while (it.hasNext()) {
            prepareReceiving(it.next());
        }
        PrepareSynAckMessage prepareSynAckMessage = new PrepareSynAckMessage();
        if (!this.peer.equals(FBUtilities.getBroadcastAddressAndPort())) {
            Iterator<StreamTransferTask> it2 = this.transfers.values().iterator();
            while (it2.hasNext()) {
                prepareSynAckMessage.summaries.add(it2.next().getSummary());
            }
        }
        this.messageSender.sendMessage(prepareSynAckMessage);
        this.streamResult.handleSessionPrepared(this);
        maybeCompleted();
    }

    private void prepareSynAck(PrepareSynAckMessage prepareSynAckMessage) {
        if (!prepareSynAckMessage.summaries.isEmpty()) {
            Iterator<StreamSummary> it = prepareSynAckMessage.summaries.iterator();
            while (it.hasNext()) {
                prepareReceiving(it.next());
            }
            this.messageSender.sendMessage(new PrepareAckMessage());
        }
        if (isPreview()) {
            completePreview();
        } else {
            startStreamingFiles(true);
        }
    }

    private void prepareAck(PrepareAckMessage prepareAckMessage) {
        if (isPreview()) {
            completePreview();
        } else {
            startStreamingFiles(true);
        }
    }

    public void streamSent(OutgoingStreamMessage outgoingStreamMessage) {
        long size = outgoingStreamMessage.stream.getSize();
        StreamingMetrics.totalOutgoingBytes.inc(size);
        this.metrics.outgoingBytes.inc(size);
        StreamTransferTask streamTransferTask = this.transfers.get(outgoingStreamMessage.header.tableId);
        if (streamTransferTask != null) {
            streamTransferTask.scheduleTimeout(outgoingStreamMessage.header.sequenceNumber, 12L, TimeUnit.HOURS);
        }
    }

    public void receive(IncomingStreamMessage incomingStreamMessage) {
        if (isPreview()) {
            throw new RuntimeException("Cannot receive files for preview session");
        }
        long size = incomingStreamMessage.stream.getSize();
        StreamingMetrics.totalIncomingBytes.inc(size);
        this.metrics.incomingBytes.inc(size);
        this.messageSender.sendMessage(new ReceivedMessage(incomingStreamMessage.header.tableId, incomingStreamMessage.header.sequenceNumber));
        StreamHook.instance.reportIncomingStream(incomingStreamMessage.header.tableId, incomingStreamMessage.stream, this, incomingStreamMessage.header.sequenceNumber);
        this.receivers.get(incomingStreamMessage.header.tableId).received(incomingStreamMessage.stream);
    }

    public void progress(String str, ProgressInfo.Direction direction, long j, long j2) {
        this.streamResult.handleProgress(new ProgressInfo(this.peer, this.index, str, direction, j, j2));
    }

    public void received(TableId tableId, int i) {
        this.transfers.get(tableId).complete(i);
    }

    public synchronized void complete() {
        logger.debug("handling Complete message, state = {}, completeSent = {}", this.state, Boolean.valueOf(this.completeSent));
        if (this.state != State.WAIT_COMPLETE) {
            state(State.WAIT_COMPLETE);
            return;
        }
        if (!this.completeSent) {
            this.messageSender.sendMessage(new CompleteMessage());
            this.completeSent = true;
        }
        closeSession(State.COMPLETE);
    }

    public synchronized void sessionFailed() {
        logger.error("[Stream #{}] Remote peer {} failed stream session.", planId(), this.peer.toString());
        closeSession(State.FAILED);
    }

    public SessionInfo getSessionInfo() {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<StreamReceiveTask> it = this.receivers.values().iterator();
        while (it.hasNext()) {
            newArrayList.add(it.next().getSummary());
        }
        ArrayList newArrayList2 = Lists.newArrayList();
        Iterator<StreamTransferTask> it2 = this.transfers.values().iterator();
        while (it2.hasNext()) {
            newArrayList2.add(it2.next().getSummary());
        }
        return new SessionInfo(this.peer, this.index, this.template.connectTo == null ? this.peer : this.template.connectTo, newArrayList, newArrayList2, this.state);
    }

    public synchronized void taskCompleted(StreamReceiveTask streamReceiveTask) {
        this.receivers.remove(streamReceiveTask.tableId);
        maybeCompleted();
    }

    public synchronized void taskCompleted(StreamTransferTask streamTransferTask) {
        this.transfers.remove(streamTransferTask.tableId);
        maybeCompleted();
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onJoin(InetAddressAndPort inetAddressAndPort, EndpointState endpointState) {
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void beforeChange(InetAddressAndPort inetAddressAndPort, EndpointState endpointState, ApplicationState applicationState, VersionedValue versionedValue) {
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onChange(InetAddressAndPort inetAddressAndPort, ApplicationState applicationState, VersionedValue versionedValue) {
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onAlive(InetAddressAndPort inetAddressAndPort, EndpointState endpointState) {
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onDead(InetAddressAndPort inetAddressAndPort, EndpointState endpointState) {
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onRemove(InetAddressAndPort inetAddressAndPort) {
        logger.error("[Stream #{}] Session failed because remote peer {} has left.", planId(), this.peer.toString());
        closeSession(State.FAILED);
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onRestart(InetAddressAndPort inetAddressAndPort, EndpointState endpointState) {
        logger.error("[Stream #{}] Session failed because remote peer {} was restarted.", planId(), this.peer.toString());
        closeSession(State.FAILED);
    }

    private void completePreview() {
        try {
            state(State.WAIT_COMPLETE);
            closeSession(State.COMPLETE);
        } finally {
            Iterator it = Iterables.concat(this.receivers.values(), this.transfers.values()).iterator();
            while (it.hasNext()) {
                ((StreamTask) it.next()).abort();
            }
        }
    }

    private boolean maybeCompleted() {
        boolean z = this.receivers.isEmpty() && this.transfers.isEmpty();
        if (z) {
            if (this.state == State.WAIT_COMPLETE) {
                if (!this.completeSent) {
                    this.messageSender.sendMessage(new CompleteMessage());
                    this.completeSent = true;
                }
                closeSession(State.COMPLETE);
            } else {
                this.messageSender.sendMessage(new CompleteMessage());
                this.completeSent = true;
                state(State.WAIT_COMPLETE);
            }
        }
        return z;
    }

    private void flushSSTables(Iterable<ColumnFamilyStore> iterable) {
        ArrayList arrayList = new ArrayList();
        Iterator<ColumnFamilyStore> it = iterable.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().forceFlush());
        }
        FBUtilities.waitOnFutures(arrayList);
    }

    @VisibleForTesting
    public synchronized void prepareReceiving(StreamSummary streamSummary) {
        failIfFinished();
        if (streamSummary.files > 0) {
            this.receivers.put(streamSummary.tableId, new StreamReceiveTask(this, streamSummary.tableId, streamSummary.files, streamSummary.totalSize));
        }
    }

    private void startStreamingFiles(boolean z) {
        if (z) {
            this.streamResult.handleSessionPrepared(this);
        }
        state(State.STREAMING);
        for (StreamTransferTask streamTransferTask : this.transfers.values()) {
            Collection<OutgoingStreamMessage> fileMessages = streamTransferTask.getFileMessages();
            if (fileMessages.isEmpty()) {
                taskCompleted(streamTransferTask);
            } else {
                for (OutgoingStreamMessage outgoingStreamMessage : fileMessages) {
                    outgoingStreamMessage.header.addSessionInfo(this);
                    this.messageSender.sendMessage(outgoingStreamMessage);
                }
            }
        }
        maybeCompleted();
    }

    @VisibleForTesting
    public int getNumRequests() {
        return this.requests.size();
    }

    @VisibleForTesting
    public int getNumTransfers() {
        return this.transferredRangesPerKeyspace.size();
    }

    static {
        $assertionsDisabled = !StreamSession.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(StreamSession.class);
    }
}
