package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.venice.exceptions.VeniceIngestionTaskKilledException;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.pushmonitor.HybridStoreQuotaStatus;
import com.linkedin.venice.utils.LatencyUtils;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.jetty.util.component.AbstractLifeCycle;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/IngestionNotificationDispatcher.class */
public class IngestionNotificationDispatcher {
    public static long PROGRESS_REPORT_INTERVAL = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
    public static long QUOTA_REPORT_INTERVAL = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
    private final Logger LOGGER;
    private final Queue<VeniceNotifier> notifiers;
    private final String topic;
    private final BooleanSupplier isCurrentVersion;
    private long lastProgressReportTime = 0;
    private final Map<Integer, NotificationRecord> lastQuotaStatusReported = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/IngestionNotificationDispatcher$NotificationRecord.class */
    public static class NotificationRecord {
        private final long timeStampInMs;
        private final HybridStoreQuotaStatus status;

        public NotificationRecord(long j, HybridStoreQuotaStatus hybridStoreQuotaStatus) {
            this.timeStampInMs = j;
            this.status = hybridStoreQuotaStatus;
        }

        public HybridStoreQuotaStatus getStatus() {
            return this.status;
        }

        public long getTimeStampInMs() {
            return this.timeStampInMs;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/IngestionNotificationDispatcher$NotifierFunction.class */
    public interface NotifierFunction {
        void apply(VeniceNotifier veniceNotifier);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/IngestionNotificationDispatcher$PreNotificationCheck.class */
    public interface PreNotificationCheck {
        boolean apply();
    }

    public IngestionNotificationDispatcher(Queue<VeniceNotifier> queue, String str, BooleanSupplier booleanSupplier) {
        this.LOGGER = LogManager.getLogger(IngestionNotificationDispatcher.class.getSimpleName() + " for [ Topic: " + str + " ] ");
        this.notifiers = queue;
        this.topic = str;
        this.isCurrentVersion = booleanSupplier;
    }

    private void report(PartitionConsumptionState partitionConsumptionState, String str, NotifierFunction notifierFunction, PreNotificationCheck preNotificationCheck) {
        if (partitionConsumptionState == null) {
            this.LOGGER.info("Partition has been unsubscribed, no need to report {}", str);
            return;
        }
        if (preNotificationCheck.apply()) {
            for (VeniceNotifier veniceNotifier : this.notifiers) {
                try {
                    notifierFunction.apply(veniceNotifier);
                } catch (Exception e) {
                    this.LOGGER.error("Error reporting status to notifier {}", veniceNotifier.getClass(), e);
                }
            }
            this.LOGGER.info("Reported {} to {} notifiers for PartitionConsumptionState: {}", str, Integer.valueOf(this.notifiers.size()), partitionConsumptionState);
        }
    }

    void report(PartitionConsumptionState partitionConsumptionState, ExecutionStatus executionStatus, NotifierFunction notifierFunction, PreNotificationCheck preNotificationCheck) {
        if (!executionStatus.isTaskStatus()) {
            throw new IllegalArgumentException("The " + IngestionNotificationDispatcher.class.getSimpleName() + " can only be used to report task status.");
        }
        report(partitionConsumptionState, executionStatus.name(), notifierFunction, preNotificationCheck);
    }

    void report(PartitionConsumptionState partitionConsumptionState, ExecutionStatus executionStatus, NotifierFunction notifierFunction) {
        report(partitionConsumptionState, executionStatus.name(), notifierFunction, () -> {
            return true;
        });
    }

    void report(PartitionConsumptionState partitionConsumptionState, HybridStoreQuotaStatus hybridStoreQuotaStatus, NotifierFunction notifierFunction, PreNotificationCheck preNotificationCheck) {
        report(partitionConsumptionState, hybridStoreQuotaStatus.name(), notifierFunction, preNotificationCheck);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportStarted(PartitionConsumptionState partitionConsumptionState) {
        report(partitionConsumptionState, ExecutionStatus.STARTED, veniceNotifier -> {
            veniceNotifier.started(this.topic, partitionConsumptionState.getUserPartition());
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportRestarted(PartitionConsumptionState partitionConsumptionState) {
        report(partitionConsumptionState, ExecutionStatus.STARTED, veniceNotifier -> {
            veniceNotifier.restarted(this.topic, partitionConsumptionState.getUserPartition(), partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset());
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportCatchUpVersionTopicOffsetLag(PartitionConsumptionState partitionConsumptionState) {
        report(partitionConsumptionState, ExecutionStatus.CATCH_UP_BASE_TOPIC_OFFSET_LAG, veniceNotifier -> {
            veniceNotifier.catchUpVersionTopicOffsetLag(this.topic, partitionConsumptionState.getUserPartition());
            partitionConsumptionState.releaseLatch();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportCompleted(PartitionConsumptionState partitionConsumptionState, boolean z) {
        report(partitionConsumptionState, ExecutionStatus.COMPLETED, veniceNotifier -> {
            veniceNotifier.completed(this.topic, partitionConsumptionState.getUserPartition(), partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset(), partitionConsumptionState.getLeaderFollowerState().toString());
            partitionConsumptionState.releaseLatch();
            partitionConsumptionState.completionReported();
        }, () -> {
            if (partitionConsumptionState.isErrorReported()) {
                this.LOGGER.error("Processing completed WITH ERRORS for Partition: {}, Last Offset: {}", Integer.valueOf(partitionConsumptionState.getUserPartition()), Long.valueOf(partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset()));
                return false;
            }
            if (!z && !partitionConsumptionState.isComplete()) {
                this.LOGGER.error("Unexpected! Received a request to report completion but the PartitionConsumptionState says it is incomplete: {}", partitionConsumptionState);
                return false;
            }
            if (!partitionConsumptionState.isCompletionReported()) {
                return true;
            }
            this.LOGGER.info("Received a request to report completion, but it has already been reported. Skipping.");
            return false;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportQuotaNotViolated(PartitionConsumptionState partitionConsumptionState) {
        report(partitionConsumptionState, HybridStoreQuotaStatus.QUOTA_NOT_VIOLATED, veniceNotifier -> {
            veniceNotifier.quotaNotViolated(this.topic, partitionConsumptionState.getUserPartition(), partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset());
        }, () -> {
            return checkQuotaStatusReported(partitionConsumptionState.getUserPartition(), HybridStoreQuotaStatus.QUOTA_NOT_VIOLATED);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportQuotaViolated(PartitionConsumptionState partitionConsumptionState) {
        report(partitionConsumptionState, HybridStoreQuotaStatus.QUOTA_VIOLATED, veniceNotifier -> {
            veniceNotifier.quotaViolated(this.topic, partitionConsumptionState.getUserPartition(), partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset());
        }, () -> {
            return checkQuotaStatusReported(partitionConsumptionState.getUserPartition(), HybridStoreQuotaStatus.QUOTA_VIOLATED);
        });
    }

    private boolean checkQuotaStatusReported(int i, HybridStoreQuotaStatus hybridStoreQuotaStatus) {
        if (!this.lastQuotaStatusReported.containsKey(Integer.valueOf(i))) {
            this.lastQuotaStatusReported.put(Integer.valueOf(i), new NotificationRecord(System.currentTimeMillis(), hybridStoreQuotaStatus));
            return true;
        }
        NotificationRecord notificationRecord = this.lastQuotaStatusReported.get(Integer.valueOf(i));
        if (notificationRecord.getStatus() == hybridStoreQuotaStatus && LatencyUtils.getLatencyInMS(notificationRecord.getTimeStampInMs()) < QUOTA_REPORT_INTERVAL) {
            return false;
        }
        this.lastQuotaStatusReported.put(Integer.valueOf(i), new NotificationRecord(System.currentTimeMillis(), hybridStoreQuotaStatus));
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportProgress(PartitionConsumptionState partitionConsumptionState) {
        report(partitionConsumptionState, ExecutionStatus.PROGRESS, veniceNotifier -> {
            veniceNotifier.progress(this.topic, partitionConsumptionState.getUserPartition(), partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset());
        }, () -> {
            if (System.currentTimeMillis() - this.lastProgressReportTime < PROGRESS_REPORT_INTERVAL) {
                return false;
            }
            if (this.isCurrentVersion.getAsBoolean() || !(!partitionConsumptionState.isStarted() || partitionConsumptionState.isEndOfPushReceived() || partitionConsumptionState.isErrorReported())) {
                this.lastProgressReportTime = System.currentTimeMillis();
                return true;
            }
            this.LOGGER.debug("Can not report progress for topic '{}', because it has not been started or has already been terminated. partitionConsumptionState: {}", this.topic, partitionConsumptionState);
            return false;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportEndOfPushReceived(PartitionConsumptionState partitionConsumptionState) {
        report(partitionConsumptionState, ExecutionStatus.END_OF_PUSH_RECEIVED, veniceNotifier -> {
            veniceNotifier.endOfPushReceived(this.topic, partitionConsumptionState.getUserPartition(), partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset());
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportStartOfIncrementalPushReceived(PartitionConsumptionState partitionConsumptionState, String str) {
        report(partitionConsumptionState, ExecutionStatus.START_OF_INCREMENTAL_PUSH_RECEIVED, veniceNotifier -> {
            veniceNotifier.startOfIncrementalPushReceived(this.topic, partitionConsumptionState.getUserPartition(), partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset(), str);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportEndOfIncrementalPushReceived(PartitionConsumptionState partitionConsumptionState, String str) {
        report(partitionConsumptionState, ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED, veniceNotifier -> {
            veniceNotifier.endOfIncrementalPushReceived(this.topic, partitionConsumptionState.getUserPartition(), partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset(), str);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportTopicSwitchReceived(PartitionConsumptionState partitionConsumptionState) {
        report(partitionConsumptionState, ExecutionStatus.TOPIC_SWITCH_RECEIVED, veniceNotifier -> {
            veniceNotifier.topicSwitchReceived(this.topic, partitionConsumptionState.getUserPartition(), partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset());
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportDataRecoveryCompleted(PartitionConsumptionState partitionConsumptionState) {
        report(partitionConsumptionState, ExecutionStatus.DATA_RECOVERY_COMPLETED, veniceNotifier -> {
            veniceNotifier.dataRecoveryCompleted(this.topic, partitionConsumptionState.getUserPartition(), partitionConsumptionState.getOffsetRecord().getLocalVersionTopicOffset(), "");
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportError(int i, String str, Exception exc) {
        for (VeniceNotifier veniceNotifier : this.notifiers) {
            try {
                veniceNotifier.error(this.topic, i, str, exc);
            } catch (Exception e) {
                this.LOGGER.error("Error reporting status to notifier {}", veniceNotifier.getClass(), e);
            }
        }
        this.LOGGER.info("Reported {} to {} notifiers for partition: {}", ExecutionStatus.ERROR, Integer.valueOf(this.notifiers.size()), Integer.valueOf(i));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportError(Collection<PartitionConsumptionState> collection, String str, Exception exc) {
        for (PartitionConsumptionState partitionConsumptionState : collection) {
            report(partitionConsumptionState, ExecutionStatus.ERROR, veniceNotifier -> {
                veniceNotifier.error(this.topic, partitionConsumptionState.getUserPartition(), str, exc);
                partitionConsumptionState.errorReported();
            }, () -> {
                StringBuilder sb = new StringBuilder();
                sb.append("Partition: ").append(partitionConsumptionState.getUserPartition()).append(" has already been ");
                boolean z = true;
                if (partitionConsumptionState.isComplete()) {
                    sb.append("marked as completed so an error will not be reported.");
                    z = false;
                }
                if (partitionConsumptionState.isErrorReported()) {
                    sb.append("reported as an error before so it will not be reported again.");
                    z = false;
                }
                if (!z) {
                    this.LOGGER.info(sb.toString());
                }
                return z;
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportKilled(Collection<PartitionConsumptionState> collection, VeniceIngestionTaskKilledException veniceIngestionTaskKilledException) {
        for (PartitionConsumptionState partitionConsumptionState : collection) {
            report(partitionConsumptionState, ExecutionStatus.ERROR, veniceNotifier -> {
                veniceNotifier.error(this.topic, partitionConsumptionState.getUserPartition(), veniceIngestionTaskKilledException.getMessage(), veniceIngestionTaskKilledException);
                partitionConsumptionState.errorReported();
            }, () -> {
                if (partitionConsumptionState.isErrorReported()) {
                    this.LOGGER.warn("Partition: {} has been reported as error before.", Integer.valueOf(partitionConsumptionState.getUserPartition()));
                    return false;
                }
                if (!partitionConsumptionState.isCompletionReported()) {
                    return true;
                }
                this.LOGGER.warn("Partition: {} has been marked as completed, so an error will not be reported...", Integer.valueOf(partitionConsumptionState.getUserPartition()));
                return false;
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportStopped(PartitionConsumptionState partitionConsumptionState) {
        report(partitionConsumptionState, AbstractLifeCycle.STOPPED, veniceNotifier -> {
            veniceNotifier.stopped(this.topic, partitionConsumptionState.getUserPartition(), partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset());
        }, () -> {
            return true;
        });
    }
}
