package io.cassandrareaper.service;

import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import io.cassandrareaper.AppContext;
import io.cassandrareaper.ReaperException;
import io.cassandrareaper.core.Cluster;
import io.cassandrareaper.core.RepairRun;
import io.cassandrareaper.core.RepairSegment;
import io.cassandrareaper.core.RepairUnit;
import io.cassandrareaper.core.Segment;
import io.cassandrareaper.jmx.EndpointSnitchInfoProxy;
import io.cassandrareaper.jmx.JmxProxy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Collectors;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.thrift.protocol.TMultiplexedProtocol;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/cassandrareaper/service/RepairRunner.class */
public final class RepairRunner implements Runnable {
    private static final Logger LOG;
    private final AppContext context;
    private final UUID repairRunId;
    private final String clusterName;
    private JmxProxy jmxConnection;
    private final AtomicReferenceArray<UUID> currentlyRunningSegments;
    private final List<RingRange> parallelRanges;
    private final String metricNameForMillisSinceLastRepairPerKeyspace;
    private final String metricNameForMillisSinceLastRepair;
    private float repairProgress;
    private float segmentsDone;
    private float segmentsTotal;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RepairRunner(AppContext appContext, UUID uuid) throws ReaperException {
        LOG.debug("Creating RepairRunner for run with ID {}", uuid);
        this.context = appContext;
        this.repairRunId = uuid;
        Optional<RepairRun> repairRun = appContext.storage.getRepairRun(uuid);
        if (!$assertionsDisabled && !repairRun.isPresent()) {
            throw new AssertionError("No RepairRun with ID " + uuid + " found from storage");
        }
        Optional<Cluster> cluster = appContext.storage.getCluster(repairRun.get().getClusterName());
        if (!$assertionsDisabled && !cluster.isPresent()) {
            throw new AssertionError("No Cluster with name " + repairRun.get().getClusterName() + " found from storage");
        }
        RepairUnit repairUnit = appContext.storage.getRepairUnit(repairRun.get().getRepairUnitId());
        this.clusterName = cluster.get().getName();
        JmxProxy connectAny = this.context.jmxConnectionFactory.connectAny(cluster.get(), appContext.config.getJmxConnectionTimeoutInSeconds());
        int possibleParallelRepairsCount = repairUnit.getIncrementalRepair() ? 1 : getPossibleParallelRepairsCount(connectAny.getRangeToEndpointMap(repairUnit.getKeyspaceName()), connectAny.getEndpointToHostId());
        this.currentlyRunningSegments = new AtomicReferenceArray<>(possibleParallelRepairsCount);
        for (int i = 0; i < possibleParallelRepairsCount; i++) {
            this.currentlyRunningSegments.set(i, null);
        }
        this.parallelRanges = getParallelRanges(possibleParallelRepairsCount, Lists.newArrayList(Collections2.transform(appContext.storage.getRepairSegmentsForRun(uuid), repairSegment -> {
            return repairSegment.getTokenRange().getBaseRange();
        })));
        String clusterName = repairUnit.getClusterName();
        String keyspaceName = repairUnit.getKeyspaceName();
        String metricName = metricName("repairProgress", clusterName, keyspaceName, uuid);
        String metricName2 = metricName("repairProgress", clusterName, uuid);
        appContext.metricRegistry.register(metricName, () -> {
            return Float.valueOf(this.repairProgress);
        });
        appContext.metricRegistry.register(metricName2, () -> {
            return Float.valueOf(this.repairProgress);
        });
        this.metricNameForMillisSinceLastRepairPerKeyspace = metricName("millisSinceLastRepair", clusterName, keyspaceName, uuid);
        this.metricNameForMillisSinceLastRepair = metricName("millisSinceLastRepair", clusterName, uuid);
        String metricName3 = metricName("segmentsDone", clusterName, keyspaceName, uuid);
        String metricName4 = metricName("segmentsDone", clusterName, uuid);
        appContext.metricRegistry.register(metricName3, () -> {
            return Float.valueOf(this.segmentsDone);
        });
        appContext.metricRegistry.register(metricName4, () -> {
            return Integer.valueOf((int) this.segmentsDone);
        });
        String metricName5 = metricName("segmentsTotal", clusterName, keyspaceName, uuid);
        String metricName6 = metricName("segmentsTotal", clusterName, uuid);
        appContext.metricRegistry.register(metricName5, () -> {
            return Integer.valueOf((int) this.segmentsTotal);
        });
        appContext.metricRegistry.register(metricName6, () -> {
            return Float.valueOf(this.segmentsTotal);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public UUID getRepairRunId() {
        return this.repairRunId;
    }

    static int getPossibleParallelRepairsCount(Map<List<String>, List<String>> map, Map<String, String> map2) throws ReaperException {
        if (map.isEmpty()) {
            LOG.error("Repairing 0-sized cluster.");
            throw new ReaperException("Repairing 0-sized cluster.");
        }
        LOG.debug("Possible parallel repairs : {}", Integer.valueOf(Math.min(map.size() / map.values().iterator().next().size(), Math.max(1, map2.keySet().size() / map.values().iterator().next().size()))));
        return Math.min(map.size() / map.values().iterator().next().size(), Math.max(1, map2.keySet().size() / map.values().iterator().next().size()));
    }

    static List<RingRange> getParallelRanges(int i, List<RingRange> list) throws ReaperException {
        if (i == 0) {
            LOG.error("Can't repair anything with 0 threads");
            throw new ReaperException("Can't repair anything with 0 threads");
        }
        Collections.sort(list, RingRange.START_COMPARATOR);
        ArrayList newArrayList = Lists.newArrayList();
        for (int i2 = 0; i2 < i - 1; i2++) {
            newArrayList.add(new RingRange(list.get((i2 * list.size()) / i).getStart(), list.get(((i2 + 1) * list.size()) / i).getStart()));
        }
        newArrayList.add(new RingRange(list.get(((i - 1) * list.size()) / i).getStart(), list.get(0).getStart()));
        LOG.debug("Parallel ranges : {}", newArrayList);
        return newArrayList;
    }

    @Override // java.lang.Runnable
    public void run() {
        Optional<RepairRun> repairRun;
        Thread.currentThread().setName(this.clusterName + TMultiplexedProtocol.SEPARATOR + this.repairRunId);
        try {
            repairRun = this.context.storage.getRepairRun(this.repairRunId);
        } catch (ReaperException | InterruptedException | RuntimeException e) {
            LOG.error("RepairRun FAILURE, scheduling retry", e);
            this.context.repairManager.scheduleRetry(this);
        }
        if (!repairRun.isPresent() || repairRun.get().getRunState().isTerminated()) {
            LOG.warn("RepairRun \"{}\" does not exist. Killing RepairRunner for this run instance.", this.repairRunId);
            killAndCleanupRunner();
            return;
        }
        RepairRun.RunState runState = repairRun.get().getRunState();
        LOG.debug("run() called for repair run #{} with run state {}", this.repairRunId, runState);
        switch (runState) {
            case NOT_STARTED:
                start();
                break;
            case RUNNING:
                startNextSegment();
                updateClusterNodeList();
                break;
            case PAUSED:
                this.context.repairManager.scheduleRetry(this);
                break;
            default:
                throw new IllegalStateException("un-known/implemented state " + runState);
        }
        LOG.debug("run() exiting for repair run #{}", this.repairRunId);
    }

    private void start() throws ReaperException, InterruptedException {
        LOG.info("Repairs for repair run #{} starting", this.repairRunId);
        synchronized (this) {
            RepairRun repairRun = this.context.storage.getRepairRun(this.repairRunId).get();
            this.context.storage.updateRepairRun(repairRun.with().runState(RepairRun.RunState.RUNNING).startTime(DateTime.now()).build(repairRun.getId()));
        }
        startNextSegment();
    }

    private void updateClusterNodeList() throws ReaperException {
        Set set = (Set) this.jmxConnection.getLiveNodes().stream().collect(Collectors.toSet());
        Optional<Cluster> cluster = this.context.storage.getCluster(this.clusterName);
        if (!cluster.isPresent()) {
            throw new ReaperException("Cluster " + this.clusterName + " couldn't be found in storage. This shouldn't be happening (╯°□°)╯︵ ┻━┻");
        }
        if (cluster.get().getSeedHosts().equals(set) || set.isEmpty()) {
            return;
        }
        LOG.info("Updating the seed list for cluster {} as topology changed since the last repair.", this.clusterName);
        this.context.storage.updateCluster(new Cluster(cluster.get().getName(), cluster.get().getPartitioner(), set));
    }

    private void endRepairRun() {
        LOG.info("Repairs for repair run #{} done", this.repairRunId);
        synchronized (this) {
            RepairRun repairRun = this.context.storage.getRepairRun(this.repairRunId).get();
            DateTime now = DateTime.now();
            this.context.storage.updateRepairRun(repairRun.with().runState(RepairRun.RunState.DONE).endTime(now).lastEvent("All done").build(repairRun.getId()));
            killAndCleanupRunner();
            this.context.metricRegistry.remove(this.metricNameForMillisSinceLastRepairPerKeyspace);
            this.context.metricRegistry.remove(this.metricNameForMillisSinceLastRepair);
            this.context.metricRegistry.register(this.metricNameForMillisSinceLastRepairPerKeyspace, () -> {
                return Long.valueOf(DateTime.now().getMillis() - now.toInstant().getMillis());
            });
            this.context.metricRegistry.register(this.metricNameForMillisSinceLastRepair, () -> {
                return Long.valueOf(DateTime.now().getMillis() - now.toInstant().getMillis());
            });
        }
    }

    private void confirmJmxConnectionIsOpen() throws ReaperException {
        if (this.jmxConnection == null || !this.jmxConnection.isConnectionAlive()) {
            LOG.debug("connecting JMX proxy for repair runner on run id: {}", this.repairRunId);
            this.jmxConnection = this.context.jmxConnectionFactory.connectAny(this.context.storage.getCluster(this.clusterName).get(), this.context.config.getJmxConnectionTimeoutInSeconds());
            LOG.debug("successfully reestablished JMX proxy for repair runner");
        }
    }

    private void startNextSegment() throws ReaperException, InterruptedException {
        boolean z = true;
        boolean z2 = false;
        confirmJmxConnectionIsOpen();
        boolean z3 = false;
        for (int i = 0; i < this.currentlyRunningSegments.length(); i++) {
            if (this.currentlyRunningSegments.get(i) != null) {
                z2 = true;
                RepairSegment repairSegment = this.context.storage.getRepairSegment(this.repairRunId, this.currentlyRunningSegments.get(i)).get();
                DateTime startTime = repairSegment.getStartTime();
                if (startTime != null && startTime.isBefore(DateTime.now().minusDays(1))) {
                    LOG.warn("Looks like segment #{} has been running more than a day. Start time: {}", repairSegment.getId(), repairSegment.getStartTime());
                } else if (startTime != null && startTime.isBefore(DateTime.now().minusHours(1))) {
                    LOG.info("Looks like segment #{} has been running more than an hour. Start time: {}", repairSegment.getId(), repairSegment.getStartTime());
                } else if (startTime != null && startTime.isBefore(DateTime.now().minusMinutes(2))) {
                    LOG.debug("Looks like segment #{} has been running more than two minutes. Start time: {}", repairSegment.getId(), repairSegment.getStartTime());
                }
            } else {
                LOG.info("Running segment for range {}", this.parallelRanges.get(i));
                Optional<RepairSegment> nextFreeSegmentInRange = this.context.storage.getNextFreeSegmentInRange(this.repairRunId, Optional.of(this.parallelRanges.get(i)));
                if (nextFreeSegmentInRange.isPresent()) {
                    LOG.info("Next segment to run : {}", nextFreeSegmentInRange.get().getId());
                    UUID id = nextFreeSegmentInRange.get().getId();
                    if (this.currentlyRunningSegments.compareAndSet(i, null, id)) {
                        LOG.debug("Did set segment id `{}` to slot {}", id, Integer.valueOf(i));
                        z = repairSegment(i, nextFreeSegmentInRange.get().getId(), nextFreeSegmentInRange.get().getTokenRange());
                        if (!z) {
                            break;
                        }
                        this.segmentsTotal = this.context.storage.getSegmentAmountForRepairRun(this.repairRunId);
                        z3 = true;
                    } else {
                        LOG.debug("Didn't set segment id `{}` to slot {} because it was busy", id, Integer.valueOf(i));
                    }
                } else {
                    LOG.debug("No repair segment available for range {}", this.parallelRanges.get(i));
                }
            }
        }
        if (z3 || z2) {
            this.segmentsDone = this.context.storage.getSegmentAmountForRepairRunWithState(this.repairRunId, RepairSegment.State.DONE);
        } else {
            this.segmentsDone = this.context.storage.getSegmentAmountForRepairRunWithState(this.repairRunId, RepairSegment.State.DONE);
            this.segmentsTotal = this.context.storage.getSegmentAmountForRepairRun(this.repairRunId);
            LOG.info("Repair amount done {}", Float.valueOf(this.segmentsDone));
            this.repairProgress = this.segmentsDone / this.segmentsTotal;
            if (this.segmentsDone == this.segmentsTotal) {
                endRepairRun();
                z = false;
            }
        }
        if (z) {
            this.context.repairManager.scheduleRetry(this);
        }
    }

    private boolean repairSegment(final int i, final UUID uuid, Segment segment) throws InterruptedException {
        List<String> filterPotentialCoordinatorsByDatacenters;
        RepairRun repairRun = this.context.storage.getRepairRun(this.repairRunId).get();
        UUID repairUnitId = repairRun.getRepairUnitId();
        double intensity = repairRun.getIntensity();
        RepairParallelism repairParallelism = repairRun.getRepairParallelism();
        this.repairProgress = this.context.storage.getSegmentAmountForRepairRunWithState(this.repairRunId, RepairSegment.State.DONE) / repairRun.getSegmentCount();
        RepairUnit repairUnit = this.context.storage.getRepairUnit(repairUnitId);
        String keyspaceName = repairUnit.getKeyspaceName();
        LOG.debug("preparing to repair segment {} on run with id {}", uuid, this.repairRunId);
        try {
            confirmJmxConnectionIsOpen();
            if (repairUnit.getIncrementalRepair()) {
                Thread.sleep(ThreadLocalRandom.current().nextInt(0, 11) * 1000);
                filterPotentialCoordinatorsByDatacenters = Arrays.asList(this.context.storage.getRepairSegment(this.repairRunId, uuid).get().getCoordinatorHost());
            } else {
                try {
                    filterPotentialCoordinatorsByDatacenters = filterPotentialCoordinatorsByDatacenters(repairUnit.getDatacenters(), this.jmxConnection.tokenRangeToEndpoint(keyspaceName, segment), this.jmxConnection);
                    if (filterPotentialCoordinatorsByDatacenters.isEmpty()) {
                        LOG.warn("Segment #{} is faulty, no potential coordinators for range: {}", uuid, segment.toString());
                        synchronized (this) {
                            this.context.storage.updateRepairRun(this.context.storage.getRepairRun(this.repairRunId).get().with().runState(RepairRun.RunState.ERROR).lastEvent(String.format("No coordinators for range %s", segment)).endTime(DateTime.now()).build(this.repairRunId));
                            killAndCleanupRunner();
                        }
                        return false;
                    }
                } catch (RuntimeException e) {
                    LOG.warn("Couldn't get token ranges from coordinator: #{}", (Throwable) e);
                    return true;
                }
            }
            try {
                Futures.addCallback(this.context.repairManager.submitSegment(new SegmentRunner(this.context, uuid, filterPotentialCoordinatorsByDatacenters, this.context.repairManager.getRepairTimeoutMillis(), intensity, repairParallelism, this.clusterName, repairUnit, this)), new FutureCallback<Object>() { // from class: io.cassandrareaper.service.RepairRunner.1
                    @Override // com.google.common.util.concurrent.FutureCallback
                    public void onSuccess(Object obj) {
                        RepairRunner.this.currentlyRunningSegments.set(i, null);
                        RepairRunner.this.handleResult(uuid);
                    }

                    @Override // com.google.common.util.concurrent.FutureCallback
                    public void onFailure(Throwable th) {
                        RepairRunner.this.currentlyRunningSegments.set(i, null);
                        RepairRunner.LOG.error("Executing SegmentRunner failed", th);
                    }
                });
                return true;
            } catch (ReaperException e2) {
                LOG.error("Executing SegmentRunner failed", (Throwable) e2);
                return true;
            }
        } catch (ReaperException e3) {
            LOG.warn("Failed to reestablish JMX connection in runner {}, retrying", this.repairRunId, e3);
            this.currentlyRunningSegments.set(i, null);
            return true;
        }
    }

    private static List<String> filterPotentialCoordinatorsByDatacenters(Collection<String> collection, List<String> list, JmxProxy jmxProxy) {
        List<String> list2 = (List) list.stream().map(str -> {
            return getNodeDatacenterPair(str, jmxProxy);
        }).filter(pair -> {
            return collection.contains(pair.getRight()) || collection.isEmpty();
        }).map(pair2 -> {
            return (String) pair2.getLeft();
        }).collect(Collectors.toList());
        LOG.debug("[filterPotentialCoordinatorsByDatacenters] coordinators filtered by dc {}. Before : {} / After : {}", collection, list, list2);
        return list2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Pair<String, String> getNodeDatacenterPair(String str, JmxProxy jmxProxy) {
        Pair<String, String> of = Pair.of(str, EndpointSnitchInfoProxy.create(jmxProxy).getDataCenter(str));
        LOG.debug("[getNodeDatacenterPair] node/datacenter association {}", of);
        return of;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleResult(UUID uuid) {
        Optional<RepairSegment> repairSegment = this.context.storage.getRepairSegment(this.repairRunId, uuid);
        if (!repairSegment.isPresent()) {
            LOG.warn("In repair run #{}, triggerRepair on segment {} ended, but run is missing", this.repairRunId, uuid);
            return;
        }
        RepairSegment.State state = repairSegment.get().getState();
        LOG.debug("In repair run #{}, triggerRepair on segment {} ended with state {}", this.repairRunId, uuid, state);
        switch (state) {
            case NOT_STARTED:
            case DONE:
                return;
            default:
                String str = "handleResult called with a segment state (" + state + ") that it should not have after segmentRunner has tried a repair";
                LOG.error(str);
                throw new AssertionError(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void updateLastEvent(String str) {
        synchronized (this) {
            RepairRun repairRun = this.context.storage.getRepairRun(this.repairRunId).get();
            if (repairRun.getRunState().isTerminated()) {
                LOG.warn("Will not update lastEvent of run that has already terminated. The message was: \"{}\"", str);
            } else {
                this.context.storage.updateRepairRun(repairRun.with().lastEvent(str).build(this.repairRunId));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void killAndCleanupRunner() {
        this.context.repairManager.removeRunner(this);
        Thread.currentThread().interrupt();
    }

    private String metricName(String str, String str2, String str3, UUID uuid) {
        return MetricRegistry.name((Class<?>) RepairRunner.class, str, str2.replaceAll("[^A-Za-z0-9]", ""), str3, uuid.toString().replaceAll("-", ""));
    }

    private String metricName(String str, String str2, UUID uuid) {
        return MetricRegistry.name((Class<?>) RepairRunner.class, str, str2.replaceAll("[^A-Za-z0-9]", ""), uuid.toString().replaceAll("-", ""));
    }

    static {
        $assertionsDisabled = !RepairRunner.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) RepairRunner.class);
    }
}
