package com.linkedin.venice.controller;

import com.linkedin.venice.controller.stats.ErrorPartitionStats;
import com.linkedin.venice.exceptions.VeniceNoHelixResourceException;
import com.linkedin.venice.helix.HelixExternalViewRepository;
import com.linkedin.venice.helix.HelixState;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.Partition;
import com.linkedin.venice.meta.PartitionAssignment;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.pushmonitor.OfflinePushStatus;
import com.linkedin.venice.pushmonitor.PartitionStatus;
import com.linkedin.venice.pushmonitor.PushMonitor;
import com.linkedin.venice.utils.HelixUtils;
import com.linkedin.venice.utils.Utils;
import io.tehuti.metrics.MetricsRepository;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/controller/ErrorPartitionResetTask.class */
public class ErrorPartitionResetTask implements Runnable, Closeable {
    private static final String TASK_ID_FORMAT = ErrorPartitionResetTask.class.getSimpleName() + " [cluster: %s] ";
    private final String taskId;
    private final Logger logger;
    private final String clusterName;
    private final HelixAdminClient helixAdminClient;
    private final ReadOnlyStoreRepository readOnlyStoreRepository;
    private final HelixExternalViewRepository routingDataRepository;
    private final PushMonitor pushMonitor;
    private final int errorPartitionAutoResetLimit;
    private final long processingCycleDelayMs;
    private final ErrorPartitionStats errorPartitionStats;
    private final Map<String, Map<Integer, Integer>> errorPartitionResetTracker = new HashMap();
    private final AtomicBoolean isRunning = new AtomicBoolean();
    private final Set<String> irrelevantResources = new HashSet();

    public ErrorPartitionResetTask(String str, HelixAdminClient helixAdminClient, ReadOnlyStoreRepository readOnlyStoreRepository, HelixExternalViewRepository helixExternalViewRepository, PushMonitor pushMonitor, MetricsRepository metricsRepository, int i, long j) {
        this.taskId = String.format(TASK_ID_FORMAT, str);
        this.logger = LogManager.getLogger(this.taskId);
        this.clusterName = str;
        this.helixAdminClient = helixAdminClient;
        this.readOnlyStoreRepository = readOnlyStoreRepository;
        this.routingDataRepository = helixExternalViewRepository;
        this.pushMonitor = pushMonitor;
        this.errorPartitionAutoResetLimit = i;
        this.processingCycleDelayMs = j;
        this.errorPartitionStats = new ErrorPartitionStats(metricsRepository, str);
    }

    @Override // java.lang.Runnable
    public void run() {
        this.logger.info("Running {}", this.taskId);
        this.isRunning.set(true);
        while (this.isRunning.get()) {
            try {
                Utils.sleep(this.processingCycleDelayMs);
                long currentTimeMillis = System.currentTimeMillis();
                this.irrelevantResources.addAll(this.errorPartitionResetTracker.keySet());
                this.readOnlyStoreRepository.getAllStores().stream().filter(store -> {
                    return !store.isSystemStore();
                }).forEach(this::resetApplicableErrorPartitions);
                Iterator<String> it2 = this.irrelevantResources.iterator();
                while (it2.hasNext()) {
                    this.errorPartitionResetTracker.remove(it2.next());
                }
                this.irrelevantResources.clear();
                this.errorPartitionStats.recordErrorPartitionProcessingTime(System.currentTimeMillis() - currentTimeMillis);
            } catch (Exception e) {
                this.logger.error("Unexpected exception while running {}", this.taskId, e);
                this.errorPartitionStats.recordErrorPartitionResetAttemptErrored();
            }
        }
        this.logger.info("Stopped {}", this.taskId);
    }

    private void resetApplicableErrorPartitions(Store store) {
        int currentVersion = store.getCurrentVersion();
        if (currentVersion == 0 || !store.getVersion(currentVersion).isPresent()) {
            return;
        }
        String kafkaTopicName = store.getVersion(currentVersion).get().kafkaTopicName();
        this.irrelevantResources.remove(kafkaTopicName);
        try {
            PartitionAssignment partitionAssignments = this.routingDataRepository.getPartitionAssignments(kafkaTopicName);
            Map<Integer, Integer> computeIfAbsent = this.errorPartitionResetTracker.computeIfAbsent(kafkaTopicName, str -> {
                return new HashMap();
            });
            HashMap hashMap = new HashMap();
            OfflinePushStatus offlinePushStatus = null;
            for (Partition partition : partitionAssignments.getAllPartitions()) {
                List<Instance> errorInstances = partition.getErrorInstances();
                if (!errorInstances.isEmpty()) {
                    Integer orDefault = computeIfAbsent.getOrDefault(Integer.valueOf(partition.getId()), 0);
                    if (orDefault.intValue() <= this.errorPartitionAutoResetLimit) {
                        if (orDefault.intValue() == this.errorPartitionAutoResetLimit) {
                            computeIfAbsent.put(Integer.valueOf(partition.getId()), Integer.valueOf(this.errorPartitionAutoResetLimit + 1));
                            this.errorPartitionStats.recordErrorPartitionUnrecoverableFromReset();
                            if (offlinePushStatus == null) {
                                offlinePushStatus = this.pushMonitor.getOfflinePushOrThrow(kafkaTopicName);
                            }
                            this.logger.warn("Error partition unrecoverable from reset. Resource: {}, partition: {}, reset count: {}", kafkaTopicName, Integer.valueOf(partition.getId()), orDefault);
                            PartitionStatus partitionStatus = offlinePushStatus.getPartitionStatus(partition.getId());
                            if (partitionStatus == null) {
                                this.logger.warn("Hosts unavailable to retrieve.");
                            } else {
                                partitionStatus.getReplicaStatuses().forEach(replicaStatus -> {
                                    this.logger.warn("Host: {}, Offline push status: {}, Details: {}.", replicaStatus.getInstanceId(), replicaStatus.getCurrentStatus().name(), replicaStatus.getIncrementalPushVersion());
                                });
                            }
                        } else if (errorInstances.size() <= 1) {
                            computeIfAbsent.put(Integer.valueOf(partition.getId()), Integer.valueOf(orDefault.intValue() + 1));
                            ((List) hashMap.computeIfAbsent(errorInstances.get(0).getNodeId(), str2 -> {
                                return new ArrayList();
                            })).add(HelixUtils.getPartitionName(kafkaTopicName, partition.getId()));
                        } else if (partition.getNumOfTotalInstances() - errorInstances.size() >= store.getReplicationFactor() - 1) {
                            computeIfAbsent.put(Integer.valueOf(partition.getId()), Integer.valueOf(orDefault.intValue() + 1));
                            Iterator<Instance> it2 = errorInstances.iterator();
                            while (it2.hasNext()) {
                                ((List) hashMap.computeIfAbsent(it2.next().getNodeId(), str3 -> {
                                    return new ArrayList();
                                })).add(HelixUtils.getPartitionName(kafkaTopicName, partition.getId()));
                            }
                        }
                    }
                } else if (checkPartitionRecovered(partition, computeIfAbsent)) {
                    computeIfAbsent.remove(Integer.valueOf(partition.getId()));
                    this.errorPartitionStats.recordErrorPartitionRecoveredFromReset();
                }
            }
            this.errorPartitionResetTracker.put(kafkaTopicName, computeIfAbsent);
            hashMap.forEach((str4, list) -> {
                this.helixAdminClient.resetPartition(this.clusterName, str4, kafkaTopicName, list);
                this.errorPartitionStats.recordErrorPartitionResetAttempt(list.size());
            });
        } catch (VeniceNoHelixResourceException e) {
            this.logger.error("Resource: {} is missing unexpectedly", kafkaTopicName, e);
            this.errorPartitionStats.recordErrorPartitionResetAttemptErrored();
        } catch (Exception e2) {
            this.logger.error("Unexpected exception while processing partitions for resource: {} for error partition reset", kafkaTopicName, e2);
            this.errorPartitionStats.recordErrorPartitionResetAttemptErrored();
        }
    }

    private boolean checkPartitionRecovered(Partition partition, Map<Integer, Integer> map) {
        Set<String> keySet = partition.getAllInstances().keySet();
        return map.containsKey(Integer.valueOf(partition.getId())) && ((keySet.size() == 1 && keySet.contains(HelixState.ONLINE_STATE)) || (keySet.size() == 2 && keySet.contains(HelixState.LEADER_STATE) && keySet.contains(HelixState.STANDBY_STATE)));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.isRunning.set(false);
    }
}
