package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.utils.StoragePartitionDiskUsage;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreDataChangedListener;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.utils.RedundantExceptionFilter;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/StorageUtilizationManager.class */
public class StorageUtilizationManager implements StoreDataChangedListener {
    private static final Logger LOGGER = LogManager.getLogger(StorageUtilizationManager.class);
    private static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER = RedundantExceptionFilter.getRedundantExceptionFilter();
    private final Map<Integer, PartitionConsumptionState> partitionConsumptionStateMap;
    private final AbstractStorageEngine storageEngine;
    private final Function<Integer, StoragePartitionDiskUsage> storagePartitionDiskUsageFunctionConstructor;
    private final String versionTopic;
    private final String storeName;
    private final int storeVersion;
    private final int subPartitionCount;
    private final Map<Integer, StoragePartitionDiskUsage> partitionConsumptionSizeMap;
    private final Set<Integer> pausedPartitions;
    private final boolean isHybridQuotaEnabledInServer;
    private final boolean isServerCalculateQuotaUsageBasedOnPartitionsAssignmentEnabled;
    private final StatusReportAdapter statusReportAdapter;
    private final TopicPartitionConsumerFunction pausePartition;
    private final TopicPartitionConsumerFunction resumePartition;
    private boolean versionIsOnline;
    private long storeQuotaInBytes;
    private long diskQuotaPerPartition;
    private boolean isHybridQuotaEnabledInStoreConfig;
    private final Lock hybridStoreDiskQuotaLock = new ReentrantLock();

    public StorageUtilizationManager(AbstractStorageEngine abstractStorageEngine, Store store, String str, int i, Map<Integer, PartitionConsumptionState> map, boolean z, boolean z2, StatusReportAdapter statusReportAdapter, TopicPartitionConsumerFunction topicPartitionConsumerFunction, TopicPartitionConsumerFunction topicPartitionConsumerFunction2) {
        this.partitionConsumptionStateMap = map;
        this.storageEngine = abstractStorageEngine;
        this.storagePartitionDiskUsageFunctionConstructor = num -> {
            return new StoragePartitionDiskUsage(num.intValue(), abstractStorageEngine);
        };
        this.storeName = store.getName();
        this.versionTopic = str;
        if (i <= 0) {
            throw new IllegalArgumentException("subPartitionCount must be positive!");
        }
        this.subPartitionCount = i;
        this.partitionConsumptionSizeMap = new VeniceConcurrentHashMap();
        this.pausedPartitions = VeniceConcurrentHashMap.newKeySet();
        this.storeVersion = Version.parseVersionFromKafkaTopicName(str);
        this.isHybridQuotaEnabledInServer = z;
        this.isServerCalculateQuotaUsageBasedOnPartitionsAssignmentEnabled = z2;
        this.statusReportAdapter = statusReportAdapter;
        this.pausePartition = topicPartitionConsumerFunction;
        this.resumePartition = topicPartitionConsumerFunction2;
        setStoreQuota(store);
        Optional version = store.getVersion(this.storeVersion);
        this.versionIsOnline = version.isPresent() && isVersionOnline((Version) version.get());
    }

    public void handleStoreCreated(Store store) {
    }

    public void handleStoreDeleted(String str) {
    }

    private boolean isHybridQuotaEnabled() {
        return this.isHybridQuotaEnabledInStoreConfig && this.isHybridQuotaEnabledInServer;
    }

    private boolean isHybridStoreDiskQuotaUpdated(Store store) {
        return (this.isHybridQuotaEnabledInStoreConfig == store.isHybridStoreDiskQuotaEnabled() && this.storeQuotaInBytes == store.getStorageQuotaInByte()) ? false : true;
    }

    private void reportStoreQuotaNotViolated() {
        Iterator<PartitionConsumptionState> it = this.partitionConsumptionStateMap.values().iterator();
        while (it.hasNext()) {
            this.statusReportAdapter.reportQuotaNotViolated(it.next());
        }
    }

    private void setStoreQuota(Store store) {
        this.storeQuotaInBytes = store.getStorageQuotaInByte();
        this.diskQuotaPerPartition = this.storeQuotaInBytes / this.subPartitionCount;
        this.isHybridQuotaEnabledInStoreConfig = store.isHybridStoreDiskQuotaEnabled();
    }

    public void handleStoreChanged(Store store) {
        if (store.getName().equals(this.storeName)) {
            Optional version = store.getVersion(this.storeVersion);
            if (!version.isPresent()) {
                LOGGER.debug("Version: {}  doesn't exist in the store: {}", Integer.valueOf(Version.parseVersionFromKafkaTopicName(this.versionTopic)), this.storeName);
                return;
            }
            this.versionIsOnline = isVersionOnline((Version) version.get());
            if (this.storeQuotaInBytes != store.getStorageQuotaInByte() || !store.isHybridStoreDiskQuotaEnabled()) {
                LOGGER.info("Store: {} changed, updated quota from {} to {} and store quota is {}enabled, so we reset the store quota and resume all partitions.", this.storeName, Long.valueOf(this.storeQuotaInBytes), Long.valueOf(store.getStorageQuotaInByte()), store.isHybridStoreDiskQuotaEnabled() ? "" : "not ");
                resumeAllPartitions();
            }
            AutoCloseableLock of = AutoCloseableLock.of(this.hybridStoreDiskQuotaLock);
            try {
                boolean isHybridStoreDiskQuotaUpdated = isHybridStoreDiskQuotaUpdated(store);
                boolean isHybridQuotaEnabled = isHybridQuotaEnabled();
                setStoreQuota(store);
                if (isHybridQuotaEnabled && !isHybridQuotaEnabled()) {
                    reportStoreQuotaNotViolated();
                } else if (isHybridStoreDiskQuotaUpdated) {
                    checkAllPartitionsQuota();
                }
                if (of != null) {
                    of.close();
                }
            } catch (Throwable th) {
                if (of != null) {
                    try {
                        of.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    public void initPartition(int i) {
        this.partitionConsumptionSizeMap.put(Integer.valueOf(i), new StoragePartitionDiskUsage(i, this.storageEngine));
    }

    public void removePartition(int i) {
        this.partitionConsumptionSizeMap.remove(Integer.valueOf(i));
    }

    public void checkAllPartitionsQuota() {
        AutoCloseableLock of = AutoCloseableLock.of(this.hybridStoreDiskQuotaLock);
        try {
            if (!isHybridQuotaEnabled()) {
                if (of != null) {
                    of.close();
                    return;
                }
                return;
            }
            for (Map.Entry<Integer, PartitionConsumptionState> entry : this.partitionConsumptionStateMap.entrySet()) {
                enforcePartitionQuota(entry.getKey().intValue(), entry.getValue(), 0L);
            }
            if (of != null) {
                of.close();
            }
        } catch (Throwable th) {
            if (of != null) {
                try {
                    of.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void enforcePartitionQuota(int i, long j) {
        AutoCloseableLock of = AutoCloseableLock.of(this.hybridStoreDiskQuotaLock);
        try {
            if (!isHybridQuotaEnabled()) {
                if (of != null) {
                    of.close();
                }
            } else {
                enforcePartitionQuota(i, this.partitionConsumptionStateMap.get(Integer.valueOf(i)), j);
                if (of != null) {
                    of.close();
                }
            }
        } catch (Throwable th) {
            if (of != null) {
                try {
                    of.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void enforcePartitionQuota(int i, PartitionConsumptionState partitionConsumptionState, long j) {
        Objects.requireNonNull(partitionConsumptionState, "The PartitionConsumptionState param cannot be null.");
        StoragePartitionDiskUsage computeIfAbsent = this.partitionConsumptionSizeMap.computeIfAbsent(Integer.valueOf(i), this.storagePartitionDiskUsageFunctionConstructor);
        computeIfAbsent.add(j);
        String consumingTopic = getConsumingTopic(partitionConsumptionState);
        if (!isStorageQuotaExceeded(computeIfAbsent)) {
            this.statusReportAdapter.reportQuotaNotViolated(partitionConsumptionState);
            if (isPartitionPausedIngestion(i)) {
                resumePartition(i, consumingTopic);
                LOGGER.info("Quota available for store {} partition {}, resumed this partition.", this.storeName, Integer.valueOf(i));
                return;
            }
            return;
        }
        this.statusReportAdapter.reportQuotaViolated(partitionConsumptionState);
        if (isVersionOnline() && !partitionConsumptionState.isCompletionReported()) {
            this.statusReportAdapter.reportCompleted(partitionConsumptionState);
        }
        pausePartition(i, consumingTopic);
        if (!REDUNDANT_LOGGING_FILTER.isRedundantException(new StringBuilder().append(consumingTopic).append("_").append(i).append("_quota_exceeded").toString())) {
            LOGGER.info("Quota exceeded for store {} partition {}, paused this partition. {}", this.storeName, Integer.valueOf(i), this.versionTopic);
        }
    }

    private boolean isStorageQuotaExceeded(StoragePartitionDiskUsage storagePartitionDiskUsage) {
        return storagePartitionDiskUsage.getUsage() >= this.diskQuotaPerPartition && this.storeQuotaInBytes != -1;
    }

    private void pausePartition(int i, String str) {
        this.pausePartition.execute(str, i);
        this.pausedPartitions.add(Integer.valueOf(i));
    }

    private void resumePartition(int i, String str) {
        this.resumePartition.execute(str, i);
        this.pausedPartitions.remove(Integer.valueOf(i));
    }

    private void resumeAllPartitions() {
        this.partitionConsumptionStateMap.forEach((num, partitionConsumptionState) -> {
            resumePartition(num.intValue(), getConsumingTopic(partitionConsumptionState));
        });
    }

    private boolean isVersionOnline(Version version) {
        return version.getStatus().equals(VersionStatus.ONLINE);
    }

    private String getConsumingTopic(PartitionConsumptionState partitionConsumptionState) {
        String str = this.versionTopic;
        if (partitionConsumptionState.getLeaderFollowerState().equals(LeaderFollowerStateType.LEADER)) {
            OffsetRecord offsetRecord = partitionConsumptionState.getOffsetRecord();
            if (offsetRecord.getLeaderTopic() != null) {
                str = offsetRecord.getLeaderTopic();
            }
        }
        return str;
    }

    protected boolean isPartitionPausedIngestion(int i) {
        return this.pausedPartitions.contains(Integer.valueOf(i));
    }

    public boolean hasPausedPartitionIngestion() {
        return !this.pausedPartitions.isEmpty();
    }

    protected long getStoreQuotaInBytes() {
        return this.storeQuotaInBytes;
    }

    protected long getPartitionQuotaInBytes() {
        return this.diskQuotaPerPartition;
    }

    protected boolean isVersionOnline() {
        return this.versionIsOnline;
    }

    public double getDiskQuotaUsage() {
        long j = this.storeQuotaInBytes;
        if (j == -1) {
            return -1.0d;
        }
        if (this.isServerCalculateQuotaUsageBasedOnPartitionsAssignmentEnabled) {
            if (this.subPartitionCount == 0) {
                return 0.0d;
            }
            j = (j * this.partitionConsumptionSizeMap.size()) / this.subPartitionCount;
        }
        long j2 = 0;
        Iterator<StoragePartitionDiskUsage> it = this.partitionConsumptionSizeMap.values().iterator();
        while (it.hasNext()) {
            j2 += it.next().getUsage();
        }
        return j2 / j;
    }

    public void notifyFlushToDisk(PartitionConsumptionState partitionConsumptionState) {
        StoragePartitionDiskUsage storagePartitionDiskUsage = this.partitionConsumptionSizeMap.get(Integer.valueOf(partitionConsumptionState.getPartition()));
        if (storagePartitionDiskUsage != null) {
            storagePartitionDiskUsage.syncWithDB();
        }
    }
}
