package com.linkedin.venice.controller.lingeringjob;

import com.linkedin.venice.authorization.IdentityParser;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.controller.Admin;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.status.BatchJobHeartbeatConfigs;
import com.linkedin.venice.status.protocol.BatchJobHeartbeatKey;
import com.linkedin.venice.status.protocol.BatchJobHeartbeatValue;
import com.linkedin.venice.status.protocol.PushJobDetails;
import com.linkedin.venice.status.protocol.PushJobStatusRecordKey;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nonnull;
import org.apache.commons.lang.Validate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/controller/lingeringjob/HeartbeatBasedLingeringStoreVersionChecker.class */
public class HeartbeatBasedLingeringStoreVersionChecker implements LingeringStoreVersionChecker {
    private static final Logger LOGGER = LogManager.getLogger(HeartbeatBasedLingeringStoreVersionChecker.class);
    private final Duration heartbeatTimeout;
    private final Duration initialHeartbeatBufferTime;
    private final DefaultLingeringStoreVersionChecker defaultLingeringStoreVersionChecker;
    private final HeartbeatBasedCheckerStats heartbeatBasedCheckerStats;

    public HeartbeatBasedLingeringStoreVersionChecker(@Nonnull Duration duration, @Nonnull Duration duration2, @Nonnull DefaultLingeringStoreVersionChecker defaultLingeringStoreVersionChecker, @Nonnull HeartbeatBasedCheckerStats heartbeatBasedCheckerStats) {
        Validate.notNull(duration);
        Validate.notNull(duration2);
        Validate.notNull(defaultLingeringStoreVersionChecker);
        Validate.notNull(heartbeatBasedCheckerStats);
        this.heartbeatTimeout = duration;
        this.initialHeartbeatBufferTime = duration2;
        this.defaultLingeringStoreVersionChecker = defaultLingeringStoreVersionChecker;
        this.heartbeatBasedCheckerStats = heartbeatBasedCheckerStats;
        LOGGER.info("Instance is created with [initialHeartbeatBufferTime={}] and [heartbeatTimeout={}]", duration2, duration);
    }

    @Override // com.linkedin.venice.controller.lingeringjob.LingeringStoreVersionChecker
    public boolean isStoreVersionLingering(Store store, Version version, Time time, Admin admin, Optional<X509Certificate> optional, IdentityParser identityParser) {
        if (isBatchJobHeartbeatEnabled(store, version, admin, optional, identityParser)) {
            LOGGER.info("Batch job heartbeat is enabled for store {} with version {}", store.getName(), Integer.valueOf(version.getNumber()));
            return !isBatchJobAlive(store, version, time, admin);
        }
        LOGGER.info("Batch job heartbeat is not enabled for store {} with version {}. Fall back to the default behavior.", store.getName(), Integer.valueOf(version.getNumber()));
        return this.defaultLingeringStoreVersionChecker.isStoreVersionLingering(store, version, time, admin, optional, identityParser);
    }

    private boolean isBatchJobAlive(Store store, Version version, Time time, Admin admin) {
        BatchJobHeartbeatKey batchJobHeartbeatKey = new BatchJobHeartbeatKey();
        batchJobHeartbeatKey.storeName = store.getName();
        batchJobHeartbeatKey.storeVersion = version.getNumber();
        try {
            BatchJobHeartbeatValue batchJobHeartbeatValue = admin.getBatchJobHeartbeatValue(batchJobHeartbeatKey);
            if (batchJobHeartbeatValue == null) {
                long milliseconds = time.getMilliseconds() - version.getCreatedTime();
                if (milliseconds < this.initialHeartbeatBufferTime.toMillis()) {
                    LOGGER.info("No heartbeat found for store {} with version {}. However, still assume it is alive, because this store version was created {} ms ago and this duration is shorter than the buffer time {} ms.", store.getName(), Integer.valueOf(version.getNumber()), Long.valueOf(milliseconds), Long.valueOf(this.initialHeartbeatBufferTime.toMillis()));
                    return true;
                }
                LOGGER.info("No heartbeat found for store {} with version {} with created time {} ms", store.getName(), Integer.valueOf(version.getNumber()), Long.valueOf(version.getCreatedTime()));
                return false;
            }
            long milliseconds2 = time.getMilliseconds() - batchJobHeartbeatValue.timestamp;
            if (milliseconds2 > this.heartbeatTimeout.toMillis()) {
                LOGGER.info("Heartbeat timed out for store {} with version {}. Timeout threshold is {} ms and time since last heartbeat is: {} ms", store.getName(), Integer.valueOf(version.getNumber()), Long.valueOf(this.heartbeatTimeout.toMillis()), Long.valueOf(milliseconds2));
                this.heartbeatBasedCheckerStats.recordTimeoutHeartbeatCheck();
                return false;
            }
            LOGGER.info("Heartbeat detected for store {} with version {} and time since last heartbeat is: {} ms and the timeout threshold is {} ms", store.getName(), Integer.valueOf(version.getNumber()), Long.valueOf(milliseconds2), Long.valueOf(this.heartbeatTimeout.toMillis()));
            this.heartbeatBasedCheckerStats.recordNoTimeoutHeartbeatCheck();
            return true;
        } catch (Exception e) {
            LOGGER.warn("Got exception when getting heartbeat value. Store {} with version {}", store.getName(), Integer.valueOf(version.getNumber()), e);
            this.heartbeatBasedCheckerStats.recordCheckJobHasHeartbeatFailed();
            return true;
        }
    }

    private boolean isBatchJobHeartbeatEnabled(Store store, Version version, Admin admin, Optional<X509Certificate> optional, IdentityParser identityParser) {
        if (!canRequesterAccessHeartbeatStore(admin, optional)) {
            LOGGER.warn("Assume the batch job heartbeat is not enabled since it does not have write access to the heartbeat store. Requested store {} with version {} for requester: {}", store.getName(), Integer.valueOf(version.getNumber()), optional.isPresent() ? identityParser.parseIdentityFromCert(optional.get()) : "unknown (no cert)");
            this.heartbeatBasedCheckerStats.recordCheckJobHasHeartbeatFailed();
            return false;
        }
        PushJobStatusRecordKey pushJobStatusRecordKey = new PushJobStatusRecordKey();
        pushJobStatusRecordKey.storeName = store.getName();
        pushJobStatusRecordKey.versionNumber = version.getNumber();
        try {
            PushJobDetails pushJobDetails = admin.getPushJobDetails(pushJobStatusRecordKey);
            if (pushJobDetails == null) {
                LOGGER.warn("Found no push job details for store {} version {}", store.getName(), Integer.valueOf(version.getNumber()));
                return false;
            }
            Map map = pushJobDetails.pushJobConfigs;
            if (map == null) {
                LOGGER.warn("Null push job configs in the push job details event. Store {} and its version {}", store.getName(), Integer.valueOf(version.getNumber()));
                this.heartbeatBasedCheckerStats.recordCheckJobHasHeartbeatFailed();
                return false;
            }
            LOGGER.info("For store {} with version {}, found pushJobConfigs: {}", store.getName(), Integer.valueOf(version.getNumber()), map);
            Optional valueFromCharSequenceMapWithStringKey = Utils.getValueFromCharSequenceMapWithStringKey(map, BatchJobHeartbeatConfigs.HEARTBEAT_ENABLED_CONFIG.getConfigName());
            if (valueFromCharSequenceMapWithStringKey.isPresent()) {
                return Boolean.parseBoolean(((CharSequence) valueFromCharSequenceMapWithStringKey.get()).toString());
            }
            return false;
        } catch (Exception e) {
            LOGGER.error("Cannot determine if batch job heartbeat is enabled or not with exception. Assume it is not enabled. Store {} and its version {}", store.getName(), Integer.valueOf(version.getNumber()), e);
            this.heartbeatBasedCheckerStats.recordCheckJobHasHeartbeatFailed();
            return false;
        }
    }

    private boolean canRequesterAccessHeartbeatStore(Admin admin, Optional<X509Certificate> optional) {
        if (!optional.isPresent()) {
            LOGGER.warn("No requester cert is provided. Hence assume the requester has no write permission to the heartbeat store");
            return false;
        }
        try {
            return admin.hasWritePermissionToBatchJobHeartbeatStore(optional.get(), VeniceSystemStoreType.BATCH_JOB_HEARTBEAT_STORE.getPrefix());
        } catch (Exception e) {
            LOGGER.warn("Cannot check access permission. Assume no access permission.", e);
            return false;
        }
    }
}
