package com.linkedin.venice.pushmonitor;

import com.linkedin.venice.meta.ReadWriteStoreRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pushstatushelper.PushStatusStoreReader;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/pushmonitor/PushStatusCollector.class */
public class PushStatusCollector {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) PushStatusCollector.class);
    private final Consumer<String> pushCompletedHandler;
    private final BiConsumer<String, String> pushErrorHandler;
    private final PushStatusStoreReader pushStatusStoreReader;
    private final ReadWriteStoreRepository storeRepository;
    private final int daVinciPushStatusScanPeriodInSeconds;
    private final int daVinciPushStatusScanThreadNumber;
    private final boolean daVinciPushStatusScanEnabled;
    private final int daVinciPushStatusNoReportRetryMaxAttempts;
    private ScheduledExecutorService offlinePushCheckScheduler;
    private ExecutorService pushStatusStoreScanExecutor;
    private final Map<String, TopicPushStatus> topicToPushStatusMap = new VeniceConcurrentHashMap();
    private final AtomicBoolean isStarted = new AtomicBoolean(false);
    private final Map<String, Integer> topicToNoDaVinciStatusRetryCountMap = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/venice/pushmonitor/PushStatusCollector$TopicPushStatus.class */
    public static class TopicPushStatus {
        private final String topicName;
        private final int partitionCount;
        private ExecutionStatusWithDetails serverStatus;
        private ExecutionStatusWithDetails daVinciStatus;
        private boolean isMonitoring = true;

        public TopicPushStatus(String str, int i) {
            this.partitionCount = i;
            this.topicName = str;
        }

        public int getPartitionCount() {
            return this.partitionCount;
        }

        public void setMonitoring(boolean z) {
            this.isMonitoring = z;
        }

        public boolean isMonitoring() {
            return this.isMonitoring;
        }

        public void setServerStatus(ExecutionStatusWithDetails executionStatusWithDetails) {
            this.serverStatus = executionStatusWithDetails;
        }

        public ExecutionStatusWithDetails getServerStatus() {
            return this.serverStatus;
        }

        public void setDaVinciStatus(ExecutionStatusWithDetails executionStatusWithDetails) {
            this.daVinciStatus = executionStatusWithDetails;
        }

        public ExecutionStatusWithDetails getDaVinciStatus() {
            return this.daVinciStatus;
        }

        public String getTopicName() {
            return this.topicName;
        }
    }

    public PushStatusCollector(ReadWriteStoreRepository readWriteStoreRepository, PushStatusStoreReader pushStatusStoreReader, Consumer<String> consumer, BiConsumer<String, String> biConsumer, boolean z, int i, int i2, int i3) {
        this.storeRepository = readWriteStoreRepository;
        this.pushStatusStoreReader = pushStatusStoreReader;
        this.pushCompletedHandler = consumer;
        this.pushErrorHandler = biConsumer;
        this.daVinciPushStatusScanEnabled = z;
        this.daVinciPushStatusScanPeriodInSeconds = i;
        this.daVinciPushStatusScanThreadNumber = i2;
        this.daVinciPushStatusNoReportRetryMaxAttempts = i3;
    }

    public void start() {
        if (!this.daVinciPushStatusScanEnabled) {
            LOGGER.warn("Offline push monitoring Da Vinci push status is not enabled, will only check server push status.");
            return;
        }
        if (this.isStarted.compareAndSet(false, true)) {
            if (this.offlinePushCheckScheduler == null || this.offlinePushCheckScheduler.isShutdown()) {
                this.offlinePushCheckScheduler = Executors.newScheduledThreadPool(1);
                LOGGER.info("Created a new offline push check scheduler");
            }
            if (this.pushStatusStoreScanExecutor == null || this.pushStatusStoreScanExecutor.isShutdown()) {
                this.pushStatusStoreScanExecutor = Executors.newFixedThreadPool(this.daVinciPushStatusScanThreadNumber);
                LOGGER.info("Created a new push status store executor with {} threads", Integer.valueOf(this.daVinciPushStatusScanThreadNumber));
            }
            this.offlinePushCheckScheduler.scheduleAtFixedRate(this::scanDaVinciPushStatus, 0L, this.daVinciPushStatusScanPeriodInSeconds, TimeUnit.SECONDS);
            LOGGER.info("Offline push check scheduler started with {} seconds check interval", Integer.valueOf(this.daVinciPushStatusScanPeriodInSeconds));
        }
    }

    public void subscribeTopic(String str, int i) {
        String parseStoreFromKafkaTopicName = Version.parseStoreFromKafkaTopicName(str);
        Store store = this.storeRepository.getStore(parseStoreFromKafkaTopicName);
        if (store == null) {
            LOGGER.warn("Store {} not found in store repository, will not monitor Da Vinci push status", parseStoreFromKafkaTopicName);
            return;
        }
        start();
        if (this.daVinciPushStatusScanEnabled && store.isDaVinciPushStatusStoreEnabled() && Version.parseVersionFromKafkaTopicName(str) > 1) {
            LOGGER.info("Will monitor Da Vinci push status for topic {}", str);
            this.topicToPushStatusMap.put(str, new TopicPushStatus(str, i));
        }
    }

    public void unsubscribeTopic(String str) {
        this.topicToPushStatusMap.remove(str);
        this.topicToNoDaVinciStatusRetryCountMap.remove(str);
    }

    private void scanDaVinciPushStatus() {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, TopicPushStatus> entry : this.topicToPushStatusMap.entrySet()) {
            String key = entry.getKey();
            TopicPushStatus value = entry.getValue();
            if (value.isMonitoring()) {
                if (value.getDaVinciStatus() == null || !value.getDaVinciStatus().getStatus().isTerminal()) {
                    arrayList.add(CompletableFuture.supplyAsync(() -> {
                        value.setDaVinciStatus(PushMonitorUtils.getDaVinciPushStatusAndDetails(this.pushStatusStoreReader, key, value.getPartitionCount(), Optional.empty()));
                        return value;
                    }, this.pushStatusStoreScanExecutor));
                } else {
                    arrayList.add(CompletableFuture.completedFuture(value));
                }
            }
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            try {
                TopicPushStatus topicPushStatus = (TopicPushStatus) ((CompletableFuture) it2.next()).get();
                ExecutionStatusWithDetails daVinciStatus = topicPushStatus.getDaVinciStatus();
                if (!daVinciStatus.isNoDaVinciStatusReport()) {
                    this.topicToNoDaVinciStatusRetryCountMap.remove(topicPushStatus.topicName);
                } else if (this.topicToNoDaVinciStatusRetryCountMap.compute(topicPushStatus.topicName, (str, num) -> {
                    if (num == null) {
                        return 1;
                    }
                    return Integer.valueOf(num.intValue() + 1);
                }).intValue() <= this.daVinciPushStatusNoReportRetryMaxAttempts) {
                    daVinciStatus = new ExecutionStatusWithDetails(ExecutionStatus.NOT_STARTED, daVinciStatus.getDetails(), true);
                    topicPushStatus.setDaVinciStatus(daVinciStatus);
                } else {
                    this.topicToNoDaVinciStatusRetryCountMap.remove(topicPushStatus.topicName);
                }
                ExecutionStatusWithDetails serverStatus = topicPushStatus.getServerStatus();
                if (serverStatus != null) {
                    LOGGER.info("Topic {} server push status: {}, Da Vinci push status: {}", topicPushStatus.getTopicName(), serverStatus.getStatus(), daVinciStatus.getStatus());
                    if (serverStatus.getStatus().equals(ExecutionStatus.COMPLETED) && daVinciStatus.getStatus().equals(ExecutionStatus.COMPLETED)) {
                        topicPushStatus.setMonitoring(false);
                        this.pushCompletedHandler.accept(topicPushStatus.getTopicName());
                    } else if (serverStatus.getStatus().equals(ExecutionStatus.ERROR) || daVinciStatus.getStatus().equals(ExecutionStatus.ERROR)) {
                        topicPushStatus.setMonitoring(false);
                        StringBuilder sb = new StringBuilder();
                        if (serverStatus.getStatus().equals(ExecutionStatus.ERROR)) {
                            sb.append("Server push error: ").append(serverStatus.getDetails()).append("\n");
                        }
                        if (daVinciStatus.getStatus().equals(ExecutionStatus.ERROR)) {
                            sb.append("Da Vinci push error: ").append(daVinciStatus.getDetails()).append("\n");
                        }
                        this.pushErrorHandler.accept(topicPushStatus.getTopicName(), sb.toString());
                    }
                }
            } catch (Exception e) {
                LOGGER.error("Caught exception when getting future result of push status.", (Throwable) e);
            }
        }
    }

    public void handleServerPushStatusUpdate(String str, ExecutionStatus executionStatus, String str2) {
        TopicPushStatus computeIfPresent = this.topicToPushStatusMap.computeIfPresent(str, (str3, topicPushStatus) -> {
            topicPushStatus.setServerStatus(new ExecutionStatusWithDetails(executionStatus, str2, true));
            return topicPushStatus;
        });
        if (!this.daVinciPushStatusScanEnabled || computeIfPresent == null) {
            if (executionStatus.equals(ExecutionStatus.COMPLETED)) {
                this.pushCompletedHandler.accept(str);
            } else if (executionStatus.equals(ExecutionStatus.ERROR)) {
                this.pushErrorHandler.accept(str, str2);
            }
        }
    }

    public void clear() {
        if (this.isStarted.compareAndSet(true, false)) {
            this.offlinePushCheckScheduler.shutdown();
            try {
                if (!this.offlinePushCheckScheduler.awaitTermination(5L, TimeUnit.SECONDS)) {
                    this.offlinePushCheckScheduler.shutdownNow();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            this.pushStatusStoreScanExecutor.shutdown();
            try {
                if (!this.pushStatusStoreScanExecutor.awaitTermination(5L, TimeUnit.SECONDS)) {
                    this.pushStatusStoreScanExecutor.shutdownNow();
                }
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
            this.topicToPushStatusMap.clear();
            this.topicToNoDaVinciStatusRetryCountMap.clear();
        }
    }

    Map<String, TopicPushStatus> getTopicToPushStatusMap() {
        return this.topicToPushStatusMap;
    }
}
