package io.cassandrareaper.service;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import io.cassandrareaper.AppContext;
import io.cassandrareaper.ReaperException;
import io.cassandrareaper.core.Node;
import io.cassandrareaper.core.StreamSession;
import io.cassandrareaper.jmx.StreamsProxy;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.management.openmbean.CompositeData;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.SessionInfo;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.streaming.management.StreamStateCompositeData;
import org.apache.cassandra.streaming.management.StreamSummaryCompositeData;
import org.apache.cassandra.tracing.TraceKeyspace;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/cassandrareaper/service/StreamService.class */
public final class StreamService {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) StreamService.class);
    private final AppContext context;

    private StreamService(AppContext appContext) {
        this.context = appContext;
    }

    public static StreamService create(AppContext appContext) {
        return new StreamService(appContext);
    }

    public List<StreamSession> listStreams(Node node) throws ReaperException {
        try {
            LOG.debug("Pulling streams for node {}", node);
            return pullStreamInfo(node);
        } catch (ReaperException e) {
            LOG.info("Pulling streams failed: {}", e.getMessage());
            throw new ReaperException(e);
        }
    }

    private List<StreamSession> pullStreamInfo(Node node) throws ReaperException {
        try {
            Set<CompositeData> listStreams = StreamsProxy.create(this.context.jmxConnectionFactory.connect(node, this.context.config.getJmxConnectionTimeoutInSeconds())).listStreams();
            return listStreams.isEmpty() ? ImmutableList.of() : (List) parse(listStreams).stream().map(streamState -> {
                return StreamSessionFactory.fromStreamState(node.getHostname(), streamState);
            }).collect(Collectors.toList());
        } catch (InterruptedException e) {
            throw new ReaperException(e);
        }
    }

    private Set<StreamState> parse(Set<CompositeData> set) throws ReaperException {
        HashSet newHashSet = Sets.newHashSet();
        for (CompositeData compositeData : set) {
            try {
                newHashSet.add(StreamStateCompositeData.fromCompositeData(compositeData));
            } catch (AssertionError e) {
                try {
                    newHashSet.add(parseStreamStatePre2_1(compositeData));
                } catch (ReaperException e2) {
                    LOG.warn("Parsing StreamState message for v 2.0.17 failed");
                    throw new ReaperException("Could not parse composite data");
                }
            }
        }
        return newHashSet;
    }

    private StreamState parseStreamStatePre2_1(CompositeData compositeData) throws ReaperException {
        return new StreamState(UUID.fromString((String) compositeData.get("planId")), (String) compositeData.get("description"), (Set) Arrays.stream((CompositeData[]) compositeData.get(TraceKeyspace.SESSIONS)).map(this::parseSessionInfoPre2_1).collect(Collectors.toSet()));
    }

    private SessionInfo parseSessionInfoPre2_1(CompositeData compositeData) {
        try {
            SessionInfo sessionInfo = new SessionInfo(InetAddress.getByName((String) compositeData.get("peer")), Integer.MIN_VALUE, InetAddress.getByName((String) compositeData.get("connecting")), (Set) Arrays.stream((CompositeData[]) compositeData.get("receivingSummaries")).map(StreamSummaryCompositeData::fromCompositeData).collect(Collectors.toSet()), (Set) Arrays.stream((CompositeData[]) compositeData.get("sendingSummaries")).map(StreamSummaryCompositeData::fromCompositeData).collect(Collectors.toSet()), StreamSession.State.valueOf((String) compositeData.get("state")));
            Stream map = Arrays.stream((CompositeData[]) compositeData.get("receivingFiles")).map(this::parseProgressInfoPre2_1);
            Objects.requireNonNull(sessionInfo);
            map.forEach(sessionInfo::updateProgress);
            Stream map2 = Arrays.stream((CompositeData[]) compositeData.get("sendingFiles")).map(this::parseProgressInfoPre2_1);
            Objects.requireNonNull(sessionInfo);
            map2.forEach(sessionInfo::updateProgress);
            return sessionInfo;
        } catch (UnknownHostException e) {
            throw new IllegalStateException(e);
        }
    }

    private ProgressInfo parseProgressInfoPre2_1(CompositeData compositeData) {
        InetAddress inetAddress = null;
        try {
            inetAddress = InetAddress.getByName((String) compositeData.get("peer"));
        } catch (UnknownHostException e) {
            LOG.warn("Could not resolve host when parsing ProgressInfo {}", compositeData.toString());
        }
        Preconditions.checkNotNull(inetAddress);
        return new ProgressInfo(inetAddress, Integer.MIN_VALUE, (String) compositeData.get("fileName"), ProgressInfo.Direction.valueOf((String) compositeData.get("direction")), ((Long) compositeData.get("currentBytes")).longValue(), ((Long) compositeData.get("totalBytes")).longValue());
    }
}
