package com.linkedin.venice.pushmonitor;

import com.linkedin.venice.controller.HelixAdminClient;
import com.linkedin.venice.controller.VeniceControllerConfig;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository;
import com.linkedin.venice.helix.ResourceAssignment;
import com.linkedin.venice.ingestion.control.RealTimeTopicSwitcher;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.OfflinePushStrategy;
import com.linkedin.venice.meta.PartitionAssignment;
import com.linkedin.venice.meta.ReadWriteStoreRepository;
import com.linkedin.venice.meta.RoutingDataRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreCleaner;
import com.linkedin.venice.meta.UncompletedPartition;
import com.linkedin.venice.meta.UncompletedReplica;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.pushstatushelper.PushStatusStoreReader;
import com.linkedin.venice.throttle.EventThrottler;
import com.linkedin.venice.utils.HelixUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import com.linkedin.venice.utils.locks.ClusterLockManager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/pushmonitor/AbstractPushMonitor.class */
public abstract class AbstractPushMonitor implements PushMonitor, PartitionStatusListener, RoutingDataRepository.RoutingDataChangedListener {
    public static final int MAX_PUSH_TO_KEEP = 5;
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) AbstractPushMonitor.class);
    private final OfflinePushAccessor offlinePushAccessor;
    private final String clusterName;
    private final ReadWriteStoreRepository metadataRepository;
    private final RoutingDataRepository routingDataRepository;
    private final StoreCleaner storeCleaner;
    private final AggPushHealthStats aggPushHealthStats;
    private RealTimeTopicSwitcher realTimeTopicSwitcher;
    private final ClusterLockManager clusterLockManager;
    private final String aggregateRealTimeSourceKafkaUrl;
    private final List<String> activeActiveRealTimeSourceKafkaURLs;
    private final HelixAdminClient helixAdminClient;
    private final boolean disableErrorLeaderReplica;
    private final long offlineJobResourceAssignmentWaitTimeInMilliseconds;
    private final PushStatusCollector pushStatusCollector;
    private final boolean isOfflinePushMonitorDaVinciPushStatusEnabled;
    private final Map<String, OfflinePushStatus> topicToPushMap = new VeniceConcurrentHashMap();
    private final EventThrottler helixClientThrottler = new EventThrottler(10, "push_monitor_helix_client_throttler", false, EventThrottler.BLOCK_STRATEGY);

    public AbstractPushMonitor(String str, OfflinePushAccessor offlinePushAccessor, StoreCleaner storeCleaner, ReadWriteStoreRepository readWriteStoreRepository, RoutingDataRepository routingDataRepository, AggPushHealthStats aggPushHealthStats, RealTimeTopicSwitcher realTimeTopicSwitcher, ClusterLockManager clusterLockManager, String str2, List<String> list, HelixAdminClient helixAdminClient, VeniceControllerConfig veniceControllerConfig, PushStatusStoreReader pushStatusStoreReader) {
        this.clusterName = str;
        this.offlinePushAccessor = offlinePushAccessor;
        this.storeCleaner = storeCleaner;
        this.metadataRepository = readWriteStoreRepository;
        this.routingDataRepository = routingDataRepository;
        this.aggPushHealthStats = aggPushHealthStats;
        this.realTimeTopicSwitcher = realTimeTopicSwitcher;
        this.clusterLockManager = clusterLockManager;
        this.aggregateRealTimeSourceKafkaUrl = str2;
        this.activeActiveRealTimeSourceKafkaURLs = list;
        this.helixAdminClient = helixAdminClient;
        this.disableErrorLeaderReplica = veniceControllerConfig.isErrorLeaderReplicaFailOverEnabled();
        this.offlineJobResourceAssignmentWaitTimeInMilliseconds = veniceControllerConfig.getOffLineJobWaitTimeInMilliseconds();
        this.pushStatusCollector = new PushStatusCollector(readWriteStoreRepository, pushStatusStoreReader, str3 -> {
            handleCompletedPush(getOfflinePush(str3));
        }, (str4, str5) -> {
            handleErrorPush(getOfflinePush(str4), str5);
        }, veniceControllerConfig.isOfflinePushMonitorDaVinciPushStatusEnabled(), veniceControllerConfig.getOfflinePushMonitorDaVinciPushStatusScanIntervalInSeconds(), veniceControllerConfig.getOfflinePushMonitorDaVinciPushStatusScanThreadNumber(), veniceControllerConfig.getOfflinePushMonitorDaVinciPushStatusScanNoDaVinciStatusReportRetryMaxAttempt());
        this.isOfflinePushMonitorDaVinciPushStatusEnabled = veniceControllerConfig.isOfflinePushMonitorDaVinciPushStatusEnabled();
        this.pushStatusCollector.start();
    }

    @Override // com.linkedin.venice.pushmonitor.PushMonitor
    public void loadAllPushes() {
        AutoCloseableLock createClusterWriteLock = this.clusterLockManager.createClusterWriteLock();
        try {
            loadAllPushes(this.offlinePushAccessor.loadOfflinePushStatusesAndPartitionStatuses());
            if (createClusterWriteLock != null) {
                createClusterWriteLock.close();
            }
        } catch (Throwable th) {
            if (createClusterWriteLock != null) {
                try {
                    createClusterWriteLock.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void loadAllPushes(List<OfflinePushStatus> list) {
        this.pushStatusCollector.start();
        AutoCloseableLock createClusterWriteLock = this.clusterLockManager.createClusterWriteLock();
        try {
            LOGGER.info("Load all pushes started for cluster {}'s {}", this.clusterName, getClass().getSimpleName());
            ArrayList<OfflinePushStatus> arrayList = new ArrayList();
            for (OfflinePushStatus offlinePushStatus : list) {
                try {
                    this.routingDataRepository.subscribeRoutingDataChange(offlinePushStatus.getKafkaTopic(), this);
                    arrayList.add(this.offlinePushAccessor.getOfflinePushStatusAndItsPartitionStatuses(offlinePushStatus.getKafkaTopic()));
                } catch (Exception e) {
                    LOGGER.error("Could not load offline push for {}", offlinePushStatus.getKafkaTopic(), e);
                }
            }
            for (OfflinePushStatus offlinePushStatus2 : arrayList) {
                try {
                    this.topicToPushMap.put(offlinePushStatus2.getKafkaTopic(), offlinePushStatus2);
                    getOfflinePushAccessor().subscribePartitionStatusChange(offlinePushStatus2, this);
                    if (!offlinePushStatus2.getCurrentStatus().isTerminal()) {
                        String kafkaTopic = offlinePushStatus2.getKafkaTopic();
                        if (this.routingDataRepository.containsKafkaTopic(kafkaTopic)) {
                            this.pushStatusCollector.subscribeTopic(kafkaTopic, offlinePushStatus2.getNumberOfPartition());
                            Pair<ExecutionStatus, Optional<String>> checkPushStatus = checkPushStatus(offlinePushStatus2, this.routingDataRepository.getPartitionAssignments(kafkaTopic), null);
                            if (checkPushStatus.getFirst().isTerminal()) {
                                LOGGER.info("Found a offline pushes could be terminated: {} status: {}", offlinePushStatus2.getKafkaTopic(), checkPushStatus.getFirst());
                                handleOfflinePushUpdate(offlinePushStatus2, checkPushStatus.getFirst(), checkPushStatus.getSecond());
                            } else {
                                checkWhetherToStartBufferReplayForHybrid(offlinePushStatus2);
                            }
                        } else {
                            LOGGER.info("Found legacy offline push: {}", offlinePushStatus2.getKafkaTopic());
                        }
                    }
                } catch (Exception e2) {
                    LOGGER.error("Could not load offline push for {}", offlinePushStatus2.getKafkaTopic(), e2);
                }
            }
            HashMap hashMap = new HashMap();
            this.topicToPushMap.keySet().forEach(str -> {
                ((List) hashMap.computeIfAbsent(Version.parseStoreFromKafkaTopicName(str), str -> {
                    return new ArrayList();
                })).add(Integer.valueOf(Version.parseVersionFromKafkaTopicName(str)));
            });
            hashMap.forEach(this::retireOldErrorPushes);
            hashMap.keySet().forEach(str2 -> {
                OfflinePushStatus offlinePushStatus3;
                Integer storeCurrentVersion = getStoreCurrentVersion(str2);
                if (storeCurrentVersion == null || (offlinePushStatus3 = this.topicToPushMap.get(Version.composeKafkaTopic(str2, storeCurrentVersion.intValue()))) == null) {
                    return;
                }
                long successfulPushDurationInSecs = offlinePushStatus3.getSuccessfulPushDurationInSecs();
                if (successfulPushDurationInSecs >= 0) {
                    this.aggPushHealthStats.recordSuccessfulPushGauge(str2, successfulPushDurationInSecs);
                }
            });
            LOGGER.info("Load all pushes finished for cluster {}'s {}", this.clusterName, getClass().getSimpleName());
            if (createClusterWriteLock != null) {
                createClusterWriteLock.close();
            }
        } catch (Throwable th) {
            if (createClusterWriteLock != null) {
                try {
                    createClusterWriteLock.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.linkedin.venice.pushmonitor.PushMonitor
    public void startMonitorOfflinePush(String str, int i, int i2, OfflinePushStrategy offlinePushStrategy) {
        AutoCloseableLock createStoreWriteLock = this.clusterLockManager.createStoreWriteLock(Version.parseStoreFromKafkaTopicName(str));
        try {
            if (this.topicToPushMap.containsKey(str)) {
                if (!getPushStatus(str).equals(ExecutionStatus.ERROR)) {
                    throw new VeniceException("Push status has already been created for topic:" + str + " in cluster:" + this.clusterName);
                }
                LOGGER.info("The previous push status for topic: {} is 'ERROR', and the new push will clean up the previous 'ERROR' push status", str);
                cleanupPushStatus(getOfflinePush(str), true);
            }
            OfflinePushStatus offlinePushStatus = new OfflinePushStatus(str, i, i2, offlinePushStrategy);
            this.offlinePushAccessor.createOfflinePushStatusAndItsPartitionStatuses(offlinePushStatus);
            this.topicToPushMap.put(str, offlinePushStatus);
            this.offlinePushAccessor.subscribePartitionStatusChange(offlinePushStatus, this);
            this.routingDataRepository.subscribeRoutingDataChange(str, this);
            this.pushStatusCollector.subscribeTopic(str, i);
            LOGGER.info("Started monitoring push on topic:{}", str);
            if (createStoreWriteLock != null) {
                createStoreWriteLock.close();
            }
        } catch (Throwable th) {
            if (createStoreWriteLock != null) {
                try {
                    createStoreWriteLock.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.linkedin.venice.pushmonitor.PushMonitor
    public void stopMonitorOfflinePush(String str, boolean z, boolean z2) {
        LOGGER.info("Stopping monitoring push on topic:{}", str);
        String parseStoreFromKafkaTopicName = Version.parseStoreFromKafkaTopicName(str);
        AutoCloseableLock createStoreWriteLock = this.clusterLockManager.createStoreWriteLock(parseStoreFromKafkaTopicName);
        try {
            if (!this.topicToPushMap.containsKey(str)) {
                LOGGER.warn("Push status does not exist for topic:{} in cluster:{}", str, this.clusterName);
                if (createStoreWriteLock != null) {
                    createStoreWriteLock.close();
                    return;
                }
                return;
            }
            OfflinePushStatus offlinePush = getOfflinePush(str);
            this.offlinePushAccessor.unsubscribePartitionsStatusChange(offlinePush, this);
            this.routingDataRepository.unSubscribeRoutingDataChange(str, this);
            if (!offlinePush.getCurrentStatus().equals(ExecutionStatus.ERROR) || z2) {
                cleanupPushStatus(offlinePush, z);
            } else {
                retireOldErrorPushes(parseStoreFromKafkaTopicName);
            }
            this.pushStatusCollector.unsubscribeTopic(str);
            LOGGER.info("Stopped monitoring push on topic: {}", str);
            if (createStoreWriteLock != null) {
                createStoreWriteLock.close();
            }
        } catch (Throwable th) {
            if (createStoreWriteLock != null) {
                try {
                    createStoreWriteLock.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.linkedin.venice.pushmonitor.PushMonitor
    public void stopAllMonitoring() {
        LOGGER.info("Stopping monitoring push for all topics.");
        try {
            AutoCloseableLock createClusterWriteLock = this.clusterLockManager.createClusterWriteLock();
            try {
                Iterator<Map.Entry<String, OfflinePushStatus>> it2 = this.topicToPushMap.entrySet().iterator();
                while (it2.hasNext()) {
                    stopMonitorOfflinePush(it2.next().getKey(), false, false);
                }
                LOGGER.info("Successfully stopped monitoring push for all topics.");
                this.pushStatusCollector.clear();
                if (createClusterWriteLock != null) {
                    createClusterWriteLock.close();
                }
            } finally {
            }
        } catch (Exception e) {
            LOGGER.error("Error when stopping monitoring push for all topics", (Throwable) e);
        }
    }

    @Override // com.linkedin.venice.pushmonitor.PushMonitor
    public void cleanupStoreStatus(String str) {
        AutoCloseableLock createStoreWriteLock = this.clusterLockManager.createStoreWriteLock(str);
        try {
            ((List) this.topicToPushMap.keySet().stream().filter(str2 -> {
                return Version.parseStoreFromKafkaTopicName(str2).equals(str);
            }).collect(Collectors.toList())).forEach(str3 -> {
                cleanupPushStatus(getOfflinePush(str3), true);
            });
            if (createStoreWriteLock != null) {
                createStoreWriteLock.close();
            }
        } catch (Throwable th) {
            if (createStoreWriteLock != null) {
                try {
                    createStoreWriteLock.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.linkedin.venice.pushmonitor.PushMonitor
    public OfflinePushStatus getOfflinePushOrThrow(String str) {
        if (this.topicToPushMap.containsKey(str)) {
            return this.topicToPushMap.get(str);
        }
        throw new VeniceException("Can not find offline push status for topic:" + str);
    }

    protected OfflinePushStatus getOfflinePush(String str) {
        return this.topicToPushMap.get(str);
    }

    public ExecutionStatus getPushStatus(String str) {
        return getPushStatusAndDetails(str).getFirst();
    }

    @Override // com.linkedin.venice.pushmonitor.PushMonitor
    public Pair<ExecutionStatus, String> getIncrementalPushStatusAndDetails(String str, String str2, HelixCustomizedViewOfflinePushRepository helixCustomizedViewOfflinePushRepository) {
        OfflinePushStatus offlinePush = getOfflinePush(str);
        return offlinePush == null ? new Pair<>(ExecutionStatus.NOT_CREATED, "Offline job hasn't been created yet.") : new Pair<>(checkIncrementalPushStatus(offlinePush.getIncrementalPushStatus(getRoutingDataRepository().getPartitionAssignments(str), str2), helixCustomizedViewOfflinePushRepository.getCompletedStatusReplicas(str, offlinePush.getNumberOfPartition()), str, str2, offlinePush.getNumberOfPartition(), offlinePush.getReplicationFactor()), null);
    }

    @Override // com.linkedin.venice.pushmonitor.PushMonitor
    public Pair<ExecutionStatus, String> getIncrementalPushStatusFromPushStatusStore(String str, String str2, HelixCustomizedViewOfflinePushRepository helixCustomizedViewOfflinePushRepository, PushStatusStoreReader pushStatusStoreReader) {
        OfflinePushStatus offlinePush = getOfflinePush(str);
        return offlinePush == null ? new Pair<>(ExecutionStatus.NOT_CREATED, "Offline job hasn't been created yet.") : getIncrementalPushStatusFromPushStatusStore(str, str2, helixCustomizedViewOfflinePushRepository, pushStatusStoreReader, offlinePush.getNumberOfPartition(), offlinePush.getReplicationFactor());
    }

    public Pair<ExecutionStatus, String> getIncrementalPushStatusFromPushStatusStore(String str, String str2, HelixCustomizedViewOfflinePushRepository helixCustomizedViewOfflinePushRepository, PushStatusStoreReader pushStatusStoreReader, int i, int i2) {
        return new Pair<>(checkIncrementalPushStatus(pushStatusStoreReader.getPartitionStatuses(Version.parseStoreFromKafkaTopicName(str), Version.parseVersionFromVersionTopicName(str), str2, i), helixCustomizedViewOfflinePushRepository.getCompletedStatusReplicas(str, i), str, str2, i, i2), null);
    }

    private ExecutionStatus checkIncrementalPushStatus(Map<Integer, Map<CharSequence, Integer>> map, Map<Integer, Integer> map2, String str, String str2, int i, int i2) {
        if (map == null || map.isEmpty()) {
            return ExecutionStatus.NOT_CREATED;
        }
        int i3 = 0;
        boolean z = false;
        for (int i4 = 0; i4 < i; i4++) {
            Map<CharSequence, Integer> map3 = map.get(Integer.valueOf(i4));
            if (map3 != null && !map3.isEmpty()) {
                int i5 = 0;
                for (Map.Entry<CharSequence, Integer> entry : map3.entrySet()) {
                    if (!ExecutionStatus.isIncrementalPushStatus(entry.getValue().intValue())) {
                        return ExecutionStatus.ERROR;
                    }
                    z = true;
                    if (entry.getValue().intValue() == ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED.getValue()) {
                        i5++;
                    }
                }
                int max = Math.max(1, Math.max(i2 - 1, map2.getOrDefault(Integer.valueOf(i4), 0).intValue()));
                if (i5 >= max) {
                    i3++;
                } else {
                    LOGGER.info("For partitionId {} need {} replicas to acknowledge the delivery of EOIP but got only {}. kafkaTopic:{} incrementalPushVersion:{}", Integer.valueOf(i4), Integer.valueOf(max), Integer.valueOf(i5), str, str2);
                }
            }
        }
        if (i3 == i) {
            return ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED;
        }
        LOGGER.info("Only {} out of {} partitions are sufficiently replicated. kafkaTopic:{} incrementalPushVersion:{}", Integer.valueOf(i3), Integer.valueOf(i), str, str2);
        return z ? ExecutionStatus.START_OF_INCREMENTAL_PUSH_RECEIVED : ExecutionStatus.NOT_CREATED;
    }

    @Override // com.linkedin.venice.pushmonitor.PushMonitor
    public Set<String> getOngoingIncrementalPushVersions(String str) {
        OfflinePushStatus offlinePush = getOfflinePush(str);
        String str2 = null;
        if (offlinePush != null) {
            str2 = offlinePush.getLatestIncrementalPushVersion(getRoutingDataRepository().getPartitionAssignments(str));
        }
        return (str2 == null || str2.isEmpty()) ? Collections.emptySet() : Collections.singleton(str2);
    }

    @Override // com.linkedin.venice.pushmonitor.PushMonitor
    public Set<String> getOngoingIncrementalPushVersions(String str, PushStatusStoreReader pushStatusStoreReader) {
        return (Set) pushStatusStoreReader.getSupposedlyOngoingIncrementalPushVersions(Version.parseStoreFromKafkaTopicName(str), Version.parseVersionFromVersionTopicName(str)).keySet().stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.toSet());
    }

    @Override // com.linkedin.venice.pushmonitor.PushMonitor
    public Pair<ExecutionStatus, String> getPushStatusAndDetails(String str) {
        OfflinePushStatus offlinePush = getOfflinePush(str);
        if (offlinePush == null) {
            return new Pair<>(ExecutionStatus.NOT_CREATED, "Offline job hasn't been created yet.");
        }
        ExecutionStatus currentStatus = offlinePush.getCurrentStatus();
        if (currentStatus.equals(ExecutionStatus.NOT_STARTED) || currentStatus.equals(ExecutionStatus.STARTED)) {
            ResourceAssignment resourceAssignment = this.routingDataRepository.getResourceAssignment();
            OfflinePushStrategy strategy = offlinePush.getStrategy();
            int replicationFactor = offlinePush.getReplicationFactor();
            Optional<String> hasEnoughNodesToStartPush = PushStatusDecider.getDecider(strategy).hasEnoughNodesToStartPush(str, replicationFactor, resourceAssignment, Optional.empty());
            if (hasEnoughNodesToStartPush.isPresent()) {
                long durationInSec = getDurationInSec(offlinePush);
                if (durationInSec >= TimeUnit.MILLISECONDS.toSeconds(this.offlineJobResourceAssignmentWaitTimeInMilliseconds)) {
                    recordPushPreparationDuration(str, durationInSec);
                    String str2 = "After waiting for " + durationInSec + " seconds, resource assignment for: " + str + " timed out, strategy=" + strategy.toString() + ", replicationFactor=" + replicationFactor + ", reason=" + hasEnoughNodesToStartPush.get();
                    handleOfflinePushUpdate(offlinePush, ExecutionStatus.ERROR, Optional.of(str2));
                    return new Pair<>(ExecutionStatus.ERROR, str2);
                }
                LOGGER.info("After waiting for " + durationInSec + " seconds, resource assignment for: " + str + " is still not complete, strategy=" + strategy.toString() + ", replicationFactor=" + replicationFactor + ", reason=" + hasEnoughNodesToStartPush.get());
            } else {
                Optional<String> optionalStatusDetails = offlinePush.getOptionalStatusDetails();
                if (optionalStatusDetails.isPresent() && Objects.equals(optionalStatusDetails.get(), OfflinePushStatus.HELIX_RESOURCE_NOT_CREATED)) {
                    refreshAndUpdatePushStatus(str, ExecutionStatus.STARTED, Optional.of(OfflinePushStatus.HELIX_ASSIGNMENT_COMPLETED));
                    recordPushPreparationDuration(str, getDurationInSec(offlinePush));
                }
            }
        }
        return new Pair<>(currentStatus, offlinePush.getStatusDetails());
    }

    @Override // com.linkedin.venice.pushmonitor.PushMonitor
    public List<String> getTopicsOfOngoingOfflinePushes() {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll((Collection) this.topicToPushMap.values().stream().filter(offlinePushStatus -> {
            return !offlinePushStatus.getCurrentStatus().isTerminal();
        }).map((v0) -> {
            return v0.getKafkaTopic();
        }).collect(Collectors.toList()));
        return arrayList;
    }

    @Override // com.linkedin.venice.pushmonitor.PushMonitor
    public List<UncompletedPartition> getUncompletedPartitions(String str) {
        OfflinePushStatus offlinePush = getOfflinePush(str);
        if (offlinePush == null || offlinePush.getCurrentStatus().equals(ExecutionStatus.COMPLETED)) {
            return Collections.emptyList();
        }
        PushStatusDecider decider = PushStatusDecider.getDecider(offlinePush.getStrategy());
        ArrayList arrayList = new ArrayList();
        for (PartitionStatus partitionStatus : offlinePush.getPartitionStatuses()) {
            int i = 0;
            ArrayList arrayList2 = new ArrayList();
            for (ReplicaStatus replicaStatus : partitionStatus.getReplicaStatuses()) {
                ExecutionStatus replicaCurrentStatus = PushStatusDecider.getReplicaCurrentStatus(replicaStatus.getStatusHistory());
                if (replicaCurrentStatus.equals(ExecutionStatus.COMPLETED)) {
                    i++;
                } else {
                    arrayList2.add(new UncompletedReplica(replicaStatus.getInstanceId(), replicaCurrentStatus, replicaStatus.getCurrentProgress(), replicaStatus.getIncrementalPushVersion()));
                }
            }
            if (!decider.hasEnoughReplicasForOnePartition(i, offlinePush.getReplicationFactor())) {
                arrayList.add(new UncompletedPartition(partitionStatus.getPartitionId(), arrayList2));
            }
        }
        return arrayList;
    }

    @Override // com.linkedin.venice.pushmonitor.PushMonitor
    public void markOfflinePushAsError(String str, String str2) {
        OfflinePushStatus offlinePush = getOfflinePush(str);
        if (offlinePush == null) {
            LOGGER.warn("Could not find offline push status for topic: {}. Ignore the request of marking status as ERROR.", str);
        } else {
            handleOfflinePushUpdate(offlinePush, ExecutionStatus.ERROR, Optional.of(str2));
        }
    }

    private void cleanupPushStatus(OfflinePushStatus offlinePushStatus, boolean z) {
        try {
            AutoCloseableLock createStoreWriteLock = this.clusterLockManager.createStoreWriteLock(Version.parseStoreFromKafkaTopicName(offlinePushStatus.getKafkaTopic()));
            try {
                this.topicToPushMap.remove(offlinePushStatus.getKafkaTopic());
                if (z) {
                    this.offlinePushAccessor.deleteOfflinePushStatusAndItsPartitionStatuses(offlinePushStatus.getKafkaTopic());
                }
                if (createStoreWriteLock != null) {
                    createStoreWriteLock.close();
                }
            } finally {
            }
        } catch (Exception e) {
            LOGGER.warn("Could not delete legacy push status: {}", offlinePushStatus.getKafkaTopic(), e);
        }
    }

    protected void retireOldErrorPushes(String str) {
        retireOldErrorPushes(str, (List) this.topicToPushMap.keySet().stream().filter(str2 -> {
            return Version.parseStoreFromKafkaTopicName(str2).equals(str);
        }).map(Version::parseVersionFromKafkaTopicName).collect(Collectors.toList()));
    }

    private void retireOldErrorPushes(String str, List<Integer> list) {
        for (OfflinePushStatus offlinePushStatus : (List) list.stream().sorted().map(num -> {
            return getOfflinePush(Version.composeKafkaTopic(str, num.intValue()));
        }).filter(offlinePushStatus2 -> {
            return offlinePushStatus2.getCurrentStatus().equals(ExecutionStatus.ERROR);
        }).collect(Collectors.toList())) {
            if (list.size() <= 5) {
                return;
            }
            list.remove(Integer.valueOf(Version.parseVersionFromKafkaTopicName(offlinePushStatus.getKafkaTopic())));
            cleanupPushStatus(offlinePushStatus, true);
        }
    }

    @Override // com.linkedin.venice.pushmonitor.PushMonitor
    public boolean wouldJobFail(String str, PartitionAssignment partitionAssignment) {
        AutoCloseableLock createStoreReadLock = this.clusterLockManager.createStoreReadLock(Version.parseStoreFromKafkaTopicName(str));
        try {
            if (!this.topicToPushMap.containsKey(str)) {
                if (createStoreReadLock != null) {
                    createStoreReadLock.close();
                }
                return false;
            }
            OfflinePushStatus offlinePush = getOfflinePush(str);
            boolean equals = PushStatusDecider.getDecider(offlinePush.getStrategy()).checkPushStatusAndDetails(offlinePush, partitionAssignment).getFirst().equals(ExecutionStatus.ERROR);
            if (createStoreReadLock != null) {
                createStoreReadLock.close();
            }
            return equals;
        } catch (Throwable th) {
            if (createStoreReadLock != null) {
                try {
                    createStoreReadLock.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected abstract Pair<ExecutionStatus, Optional<String>> checkPushStatus(OfflinePushStatus offlinePushStatus, PartitionAssignment partitionAssignment, DisableReplicaCallback disableReplicaCallback);

    @Override // com.linkedin.venice.pushmonitor.PushMonitor
    public abstract List<Instance> getReadyToServeInstances(PartitionAssignment partitionAssignment, int i);

    @Override // com.linkedin.venice.pushmonitor.PushMonitor
    public void refreshAndUpdatePushStatus(String str, ExecutionStatus executionStatus, Optional<String> optional) {
        OfflinePushStatus offlinePushOrThrow = getOfflinePushOrThrow(str);
        if (offlinePushOrThrow.validatePushStatusTransition(executionStatus)) {
            updatePushStatus(offlinePushOrThrow, executionStatus, optional);
        } else {
            LOGGER.info("refreshedPushStatus does not allow transitioning to {}, because it is currently in: {} status. Will skip updating the status.", executionStatus, offlinePushOrThrow.getCurrentStatus());
        }
    }

    protected void updatePushStatus(OfflinePushStatus offlinePushStatus, ExecutionStatus executionStatus, Optional<String> optional) {
        String kafkaTopic = offlinePushStatus.getKafkaTopic();
        AutoCloseableLock createStoreWriteLock = this.clusterLockManager.createStoreWriteLock(Version.parseStoreFromKafkaTopicName(kafkaTopic));
        try {
            OfflinePushStatus offlinePushOrThrow = getOfflinePushOrThrow(kafkaTopic);
            if (!Objects.equals(offlinePushOrThrow, offlinePushStatus)) {
                LOGGER.warn("For topic {}, the actual current push status is different from the expected current push status. [actual current status = {}], [expected push status = {}]", kafkaTopic, offlinePushOrThrow, offlinePushStatus);
            }
            if (!offlinePushOrThrow.validatePushStatusTransition(executionStatus)) {
                LOGGER.warn("Skip updating push execution status for topic {} due to invalid transition from {} to {}", kafkaTopic, offlinePushOrThrow.getCurrentStatus(), executionStatus);
                if (createStoreWriteLock != null) {
                    createStoreWriteLock.close();
                    return;
                }
                return;
            }
            OfflinePushStatus clonePushStatus = offlinePushStatus.clonePushStatus();
            clonePushStatus.updateStatus(executionStatus, optional);
            this.offlinePushAccessor.updateOfflinePushStatus(clonePushStatus);
            this.topicToPushMap.put(kafkaTopic, clonePushStatus);
            if (createStoreWriteLock != null) {
                createStoreWriteLock.close();
            }
        } catch (Throwable th) {
            if (createStoreWriteLock != null) {
                try {
                    createStoreWriteLock.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected long getDurationInSec(OfflinePushStatus offlinePushStatus) {
        return (System.currentTimeMillis() / 1000) - offlinePushStatus.getStartTimeSec();
    }

    protected OfflinePushAccessor getOfflinePushAccessor() {
        return this.offlinePushAccessor;
    }

    protected ReadWriteStoreRepository getReadWriteStoreRepository() {
        return this.metadataRepository;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RoutingDataRepository getRoutingDataRepository() {
        return this.routingDataRepository;
    }

    @Override // com.linkedin.venice.pushmonitor.PartitionStatusListener, com.linkedin.venice.meta.RoutingDataRepository.RoutingDataChangedListener
    public void onPartitionStatusChange(String str, ReadOnlyPartitionStatus readOnlyPartitionStatus) {
        AutoCloseableLock createStoreWriteLock = this.clusterLockManager.createStoreWriteLock(Version.parseStoreFromKafkaTopicName(str));
        try {
            OfflinePushStatus offlinePush = getOfflinePush(str);
            if (offlinePush == null) {
                LOGGER.error("Can not find Offline push for topic:{}, ignore the partition status change notification.", str);
                if (createStoreWriteLock != null) {
                    createStoreWriteLock.close();
                    return;
                }
                return;
            }
            OfflinePushStatus clonePushStatus = offlinePush.clonePushStatus();
            clonePushStatus.setPartitionStatus(readOnlyPartitionStatus);
            this.topicToPushMap.put(clonePushStatus.getKafkaTopic(), clonePushStatus);
            onPartitionStatusChange(clonePushStatus);
            if (createStoreWriteLock != null) {
                createStoreWriteLock.close();
            }
        } catch (Throwable th) {
            if (createStoreWriteLock != null) {
                try {
                    createStoreWriteLock.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onPartitionStatusChange(OfflinePushStatus offlinePushStatus) {
        checkWhetherToStartBufferReplayForHybrid(offlinePushStatus);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DisableReplicaCallback getDisableReplicaCallback(final String str) {
        if (this.disableErrorLeaderReplica) {
            return new DisableReplicaCallback() { // from class: com.linkedin.venice.pushmonitor.AbstractPushMonitor.1
                private final Map<String, Set<Integer>> disabledReplicaMap = new HashMap();

                @Override // com.linkedin.venice.pushmonitor.DisableReplicaCallback
                public void disableReplica(String str2, int i) {
                    AbstractPushMonitor.LOGGER.warn("Disabling errored out leader replica of {} partition: {} on host {}", str, Integer.valueOf(i), str2);
                    AbstractPushMonitor.this.helixAdminClient.enablePartition(false, AbstractPushMonitor.this.clusterName, str2, str, Collections.singletonList(HelixUtils.getPartitionName(str, i)));
                    this.disabledReplicaMap.computeIfAbsent(str2, str3 -> {
                        return new HashSet();
                    }).add(Integer.valueOf(i));
                }

                @Override // com.linkedin.venice.pushmonitor.DisableReplicaCallback
                public boolean isReplicaDisabled(String str2, int i) {
                    Map<String, Set<Integer>> map = this.disabledReplicaMap;
                    String str3 = str;
                    return map.computeIfAbsent(str2, str4 -> {
                        AbstractPushMonitor.this.helixClientThrottler.maybeThrottle(1.0d);
                        Map<String, List<String>> disabledPartitionsMap = AbstractPushMonitor.this.helixAdminClient.getDisabledPartitionsMap(AbstractPushMonitor.this.clusterName, str2);
                        return disabledPartitionsMap.containsKey(str3) ? (Set) disabledPartitionsMap.get(str3).stream().map(HelixUtils::getPartitionId).collect(Collectors.toSet()) : Collections.emptySet();
                    }).contains(Integer.valueOf(i));
                }
            };
        }
        return null;
    }

    @Override // com.linkedin.venice.meta.RoutingDataRepository.RoutingDataChangedListener
    public void onExternalViewChange(PartitionAssignment partitionAssignment) {
        LOGGER.info("Received the routing data changed notification for topic: {}", partitionAssignment.getTopic());
        AutoCloseableLock createStoreWriteLock = this.clusterLockManager.createStoreWriteLock(Version.parseStoreFromKafkaTopicName(partitionAssignment.getTopic()));
        try {
            String topic = partitionAssignment.getTopic();
            OfflinePushStatus offlinePush = getOfflinePush(topic);
            if (offlinePush != null) {
                ExecutionStatus currentStatus = offlinePush.getCurrentStatus();
                if (currentStatus.equals(ExecutionStatus.COMPLETED) || currentStatus.equals(ExecutionStatus.ERROR)) {
                    LOGGER.warn("Skip updating push status: {} since it is already in: {}", topic, currentStatus);
                    if (createStoreWriteLock != null) {
                        createStoreWriteLock.close();
                        return;
                    }
                    return;
                }
                Pair<ExecutionStatus, Optional<String>> checkPushStatus = checkPushStatus(offlinePush, partitionAssignment, getDisableReplicaCallback(topic));
                if (!checkPushStatus.getFirst().equals(offlinePush.getCurrentStatus())) {
                    if (checkPushStatus.getFirst().isTerminal()) {
                        LOGGER.info("Offline push status will be changed to {} for topic: {} from status: {}", checkPushStatus.toString(), topic, offlinePush.getCurrentStatus());
                        handleOfflinePushUpdate(offlinePush, checkPushStatus.getFirst(), checkPushStatus.getSecond());
                    } else if (checkPushStatus.getFirst().equals(ExecutionStatus.END_OF_PUSH_RECEIVED)) {
                        checkWhetherToStartBufferReplayForHybrid(offlinePush);
                    }
                }
            } else {
                LOGGER.info("Can not find a running offline push for topic:{}, ignore the routing data changed notification.", partitionAssignment.getTopic());
            }
            if (createStoreWriteLock != null) {
                createStoreWriteLock.close();
            }
        } catch (Throwable th) {
            if (createStoreWriteLock != null) {
                try {
                    createStoreWriteLock.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.linkedin.venice.meta.RoutingDataRepository.RoutingDataChangedListener
    public void onCustomizedViewChange(PartitionAssignment partitionAssignment) {
    }

    @Override // com.linkedin.venice.meta.RoutingDataRepository.RoutingDataChangedListener
    public void onRoutingDataDeleted(String str) {
        if (this.routingDataRepository.doesResourcesExistInIdealState(str)) {
            LOGGER.warn("Resource is remaining in the ideal state. Ignore the deletion in the external view.");
            return;
        }
        OfflinePushStatus offlinePush = getOfflinePush(str);
        if (offlinePush == null || !offlinePush.getCurrentStatus().equals(ExecutionStatus.STARTED)) {
            return;
        }
        String str2 = "Helix resource for Topic:" + str + " is deleted, stopping the running push.";
        LOGGER.info(str2);
        handleOfflinePushUpdate(offlinePush, ExecutionStatus.ERROR, Optional.of(str2));
    }

    protected void checkWhetherToStartBufferReplayForHybrid(OfflinePushStatus offlinePushStatus) {
        String parseStoreFromKafkaTopicName = Version.parseStoreFromKafkaTopicName(offlinePushStatus.getKafkaTopic());
        Store store = getReadWriteStoreRepository().getStore(parseStoreFromKafkaTopicName);
        if (store == null) {
            LOGGER.info("Got a null store from metadataRepository for store name: '{}'. Will attempt a refresh().", parseStoreFromKafkaTopicName);
            store = getReadWriteStoreRepository().refreshOneStore(parseStoreFromKafkaTopicName);
            if (store == null) {
                throw new IllegalStateException("checkHybridPushStatus could not find a store named '" + parseStoreFromKafkaTopicName + "' in the metadataRepository, even after refresh()!");
            }
            LOGGER.info("metadataRepository.refresh() allowed us to retrieve store: '{}'!", parseStoreFromKafkaTopicName);
        }
        if (store.isHybrid()) {
            Optional<Version> version = store.getVersion(Version.parseVersionFromKafkaTopicName(offlinePushStatus.getKafkaTopic()));
            if (!offlinePushStatus.isReadyToStartBufferReplay(version.isPresent() && version.get().getDataRecoveryVersionConfig() != null)) {
                if (offlinePushStatus.getCurrentStatus().isTerminal()) {
                    return;
                }
                LOGGER.info("{} is not ready to start buffer replay. Current state: {}", offlinePushStatus.getKafkaTopic(), offlinePushStatus.getCurrentStatus().toString());
                return;
            }
            LOGGER.info("{} is ready to start buffer replay.", offlinePushStatus.getKafkaTopic());
            try {
                getRealTimeTopicSwitcher().switchToRealTimeTopic(Version.composeRealTimeTopic(parseStoreFromKafkaTopicName), offlinePushStatus.getKafkaTopic(), store, this.aggregateRealTimeSourceKafkaUrl, this.activeActiveRealTimeSourceKafkaURLs);
                updatePushStatus(offlinePushStatus, ExecutionStatus.END_OF_PUSH_RECEIVED, Optional.of("kicked off buffer replay"));
                LOGGER.info("Successfully {} for offlinePushStatus: {}", "kicked off buffer replay", offlinePushStatus);
            } catch (Exception e) {
                handleOfflinePushUpdate(offlinePushStatus, ExecutionStatus.ERROR, Optional.of("Failed to kick off the buffer replay"));
                LOGGER.error("{} for offlinePushStatus: {}", "Failed to kick off the buffer replay", offlinePushStatus, e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleOfflinePushUpdate(OfflinePushStatus offlinePushStatus, ExecutionStatus executionStatus, Optional<String> optional) {
        if (executionStatus.equals(ExecutionStatus.COMPLETED)) {
            this.pushStatusCollector.handleServerPushStatusUpdate(offlinePushStatus.getKafkaTopic(), ExecutionStatus.COMPLETED, null);
            return;
        }
        if (executionStatus.equals(ExecutionStatus.ERROR)) {
            String str = "STATUS DETAILS ABSENT.";
            if (optional.isPresent()) {
                str = optional.get();
            } else {
                LOGGER.error("Status details should be provided in order to terminateOfflinePush, but they are missing.", (Throwable) new VeniceException("Exception not thrown, for stacktrace logging purposes."));
            }
            this.pushStatusCollector.handleServerPushStatusUpdate(offlinePushStatus.getKafkaTopic(), ExecutionStatus.ERROR, str);
        }
    }

    protected void handleCompletedPush(OfflinePushStatus offlinePushStatus) {
        this.routingDataRepository.unSubscribeRoutingDataChange(offlinePushStatus.getKafkaTopic(), this);
        LOGGER.info("Updating offline push status, push for: {} old status: {}, new status: {}", offlinePushStatus.getKafkaTopic(), offlinePushStatus.getCurrentStatus(), ExecutionStatus.COMPLETED);
        String kafkaTopic = offlinePushStatus.getKafkaTopic();
        long durationInSec = getDurationInSec(offlinePushStatus);
        offlinePushStatus.setSuccessfulPushDurationInSecs(durationInSec);
        String parseStoreFromKafkaTopicName = Version.parseStoreFromKafkaTopicName(kafkaTopic);
        int parseVersionFromKafkaTopicName = Version.parseVersionFromKafkaTopicName(kafkaTopic);
        updateStoreVersionStatus(parseStoreFromKafkaTopicName, parseVersionFromKafkaTopicName, VersionStatus.ONLINE);
        updatePushStatus(offlinePushStatus, ExecutionStatus.COMPLETED, Optional.empty());
        this.aggPushHealthStats.recordSuccessfulPush(parseStoreFromKafkaTopicName, durationInSec);
        this.aggPushHealthStats.recordSuccessfulPushGauge(parseStoreFromKafkaTopicName, durationInSec);
        try {
            this.storeCleaner.topicCleanupWhenPushComplete(this.clusterName, parseStoreFromKafkaTopicName, parseVersionFromKafkaTopicName);
        } catch (Exception e) {
            LOGGER.warn("Couldn't perform topic cleanup when push job completed for topic: {} in cluster: {}", kafkaTopic, this.clusterName, e);
        }
        try {
            this.storeCleaner.retireOldStoreVersions(this.clusterName, parseStoreFromKafkaTopicName, false, -1);
        } catch (Exception e2) {
            LOGGER.warn("Could not retire the old versions for store: {} in cluster: {}", parseStoreFromKafkaTopicName, this.clusterName, e2);
        }
        LOGGER.info("Offline push for topic: {} is completed.", offlinePushStatus.getKafkaTopic());
    }

    protected void handleErrorPush(OfflinePushStatus offlinePushStatus, String str) {
        this.routingDataRepository.unSubscribeRoutingDataChange(offlinePushStatus.getKafkaTopic(), this);
        LOGGER.info("Updating offline push status, push for: {} is now {}, new status: {}, statusDetails: {}", offlinePushStatus.getKafkaTopic(), offlinePushStatus.getCurrentStatus(), ExecutionStatus.ERROR, str);
        updatePushStatus(offlinePushStatus, ExecutionStatus.ERROR, Optional.of(str));
        String parseStoreFromKafkaTopicName = Version.parseStoreFromKafkaTopicName(offlinePushStatus.getKafkaTopic());
        int parseVersionFromKafkaTopicName = Version.parseVersionFromKafkaTopicName(offlinePushStatus.getKafkaTopic());
        try {
            updateStoreVersionStatus(parseStoreFromKafkaTopicName, parseVersionFromKafkaTopicName, VersionStatus.ERROR);
            this.aggPushHealthStats.recordFailedPush(parseStoreFromKafkaTopicName, getDurationInSec(offlinePushStatus));
            this.storeCleaner.deleteOneStoreVersion(this.clusterName, parseStoreFromKafkaTopicName, parseVersionFromKafkaTopicName);
        } catch (Exception e) {
            LOGGER.warn("Could not delete error version: {} for store: {} in cluster: {}", Integer.valueOf(parseVersionFromKafkaTopicName), parseStoreFromKafkaTopicName, this.clusterName, e);
        }
        LOGGER.info("Offline push for topic: {} fails.", offlinePushStatus.getKafkaTopic());
    }

    private void updateStoreVersionStatus(String str, int i, VersionStatus versionStatus) {
        VersionStatus versionStatus2 = versionStatus;
        AutoCloseableLock createStoreWriteLock = this.clusterLockManager.createStoreWriteLock(str);
        try {
            Store store = this.metadataRepository.getStore(str);
            if (store == null) {
                throw new VeniceNoStoreException(str);
            }
            if (!store.isEnableWrites() && versionStatus.equals(VersionStatus.ONLINE)) {
                versionStatus2 = VersionStatus.PUSHED;
            }
            store.updateVersionStatus(i, versionStatus2);
            LOGGER.info("Updated store: {} version: {} to status: {}", store.getName(), Integer.valueOf(i), versionStatus2.toString());
            if (versionStatus2.equals(VersionStatus.ONLINE)) {
                if (i <= store.getCurrentVersion()) {
                    LOGGER.info("Current version for store {}: {} is newer than the given version: {}. The current version will not be changed.", store.getName(), Integer.valueOf(store.getCurrentVersion()), Integer.valueOf(i));
                } else {
                    if (!store.getVersion(i).isPresent()) {
                        throw new VeniceException(String.format("No version present for store %s version %d!  Aborting version swap!", str, Integer.valueOf(i)));
                    }
                    if (store.getVersion(i).get().isVersionSwapDeferred()) {
                        LOGGER.info("Version swap is deferred for store {} on version {}. Skipping version swap.", store.getName(), Integer.valueOf(i));
                    } else {
                        int currentVersion = store.getCurrentVersion();
                        store.setCurrentVersion(i);
                        this.realTimeTopicSwitcher.transmitVersionSwapMessage(store, currentVersion, i);
                    }
                }
            }
            this.metadataRepository.updateStore(store);
            if (createStoreWriteLock != null) {
                createStoreWriteLock.close();
            }
        } catch (Throwable th) {
            if (createStoreWriteLock != null) {
                try {
                    createStoreWriteLock.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private Integer getStoreCurrentVersion(String str) {
        Store store = this.metadataRepository.getStore(str);
        if (store == null) {
            return null;
        }
        return Integer.valueOf(store.getCurrentVersion());
    }

    @Override // com.linkedin.venice.pushmonitor.PushMonitor
    public void recordPushPreparationDuration(String str, long j) {
        this.aggPushHealthStats.recordPushPrepartionDuration(Version.parseStoreFromKafkaTopicName(str), j);
    }

    public void setRealTimeTopicSwitcher(RealTimeTopicSwitcher realTimeTopicSwitcher) {
        this.realTimeTopicSwitcher = realTimeTopicSwitcher;
    }

    public RealTimeTopicSwitcher getRealTimeTopicSwitcher() {
        return this.realTimeTopicSwitcher;
    }

    @Override // com.linkedin.venice.pushmonitor.PushMonitor
    public boolean isOfflinePushMonitorDaVinciPushStatusEnabled() {
        return this.isOfflinePushMonitorDaVinciPushStatusEnabled;
    }
}
