package com.linkedin.venice.router.throttle;

import com.linkedin.venice.exceptions.QuotaExceededException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.ZkRoutersClusterManager;
import com.linkedin.venice.meta.PartitionAssignment;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.RoutersClusterConfig;
import com.linkedin.venice.meta.RoutersClusterManager;
import com.linkedin.venice.meta.RoutingDataRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreDataChangedListener;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pushmonitor.ReadOnlyPartitionStatus;
import com.linkedin.venice.router.VeniceRouterConfig;
import com.linkedin.venice.router.stats.AggRouterHttpRequestStats;
import com.linkedin.venice.throttle.EventThrottler;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/router/throttle/ReadRequestThrottler.class */
public class ReadRequestThrottler implements RouterThrottler, RoutersClusterManager.RouterCountChangedListener, RoutingDataRepository.RoutingDataChangedListener, StoreDataChangedListener, RoutersClusterManager.RouterClusterConfigChangedListener {
    public static final long DEFAULT_STORE_QUOTA_TIME_WINDOW = TimeUnit.SECONDS.toMillis(10);
    public static final long DEFAULT_STORAGE_NODE_QUOTA_TIME_WINDOW = TimeUnit.SECONDS.toMillis(30);
    private static final Logger LOGGER = LogManager.getLogger(ReadRequestThrottler.class);
    private final ZkRoutersClusterManager zkRoutersManager;
    private final ReadOnlyStoreRepository storeRepository;
    private final RoutingDataRepository routingDataRepository;
    private final long maxRouterReadCapacity;
    private int lastRouterCount;
    private long idealTotalQuotaPerRouter;
    private final AtomicReference<ConcurrentMap<String, StoreReadThrottler>> storesThrottlers;
    private final AggRouterHttpRequestStats stats;
    private final double perStorageNodeReadQuotaBuffer;
    private final double perStoreRouterQuotaBuffer;
    private final long storeQuotaCheckTimeWindow;
    private final long storageNodeQuotaCheckTimeWindow;
    private final boolean perStorageNodeThrottlerEnabled;

    public ReadRequestThrottler(ZkRoutersClusterManager zkRoutersClusterManager, ReadOnlyStoreRepository readOnlyStoreRepository, RoutingDataRepository routingDataRepository, AggRouterHttpRequestStats aggRouterHttpRequestStats, VeniceRouterConfig veniceRouterConfig) {
        this(zkRoutersClusterManager, readOnlyStoreRepository, routingDataRepository, veniceRouterConfig.getMaxReadCapacityCu(), aggRouterHttpRequestStats, veniceRouterConfig.getPerStorageNodeReadQuotaBuffer(), veniceRouterConfig.getPerStoreRouterQuotaBuffer(), DEFAULT_STORE_QUOTA_TIME_WINDOW, DEFAULT_STORAGE_NODE_QUOTA_TIME_WINDOW, veniceRouterConfig.isPerRouterStorageNodeThrottlerEnabled());
    }

    public ReadRequestThrottler(ZkRoutersClusterManager zkRoutersClusterManager, ReadOnlyStoreRepository readOnlyStoreRepository, RoutingDataRepository routingDataRepository, long j, AggRouterHttpRequestStats aggRouterHttpRequestStats, double d, double d2, long j2, long j3, boolean z) {
        this.zkRoutersManager = zkRoutersClusterManager;
        this.storeRepository = readOnlyStoreRepository;
        this.routingDataRepository = routingDataRepository;
        this.storeQuotaCheckTimeWindow = j2;
        this.storageNodeQuotaCheckTimeWindow = j3;
        this.zkRoutersManager.subscribeRouterCountChangedEvent(this);
        this.storeRepository.registerStoreDataChangedListener(this);
        this.stats = aggRouterHttpRequestStats;
        this.maxRouterReadCapacity = j;
        this.perStorageNodeReadQuotaBuffer = d;
        this.perStorageNodeThrottlerEnabled = z;
        this.lastRouterCount = zkRoutersClusterManager.getExpectedRoutersCount();
        this.perStoreRouterQuotaBuffer = d2;
        this.idealTotalQuotaPerRouter = calculateIdealTotalQuotaPerRouter();
        this.storesThrottlers = new AtomicReference<>(buildAllStoreReadThrottlers());
    }

    @Override // com.linkedin.venice.router.throttle.RouterThrottler
    public void mayThrottleRead(String str, double d, String str2) throws QuotaExceededException {
        if (this.zkRoutersManager.isThrottlingEnabled()) {
            StoreReadThrottler storeReadThrottler = this.storesThrottlers.get().get(str);
            if (storeReadThrottler == null) {
                throw new VeniceException("Could not find the throttler for store: " + str);
            }
            storeReadThrottler.mayThrottleRead(d, this.perStorageNodeThrottlerEnabled ? str2 : null);
        }
    }

    @Override // com.linkedin.venice.router.throttle.RouterThrottler
    public int getReadCapacity() {
        return 1;
    }

    protected long calculateStoreQuotaPerRouter(long j) {
        int liveRoutersCount = this.zkRoutersManager.getLiveRoutersCount();
        if (liveRoutersCount <= 0) {
            liveRoutersCount = this.lastRouterCount;
        } else {
            this.lastRouterCount = liveRoutersCount;
        }
        if (liveRoutersCount <= 0) {
            LOGGER.error("Could not find any live router to serve traffic.");
        }
        long max = liveRoutersCount > 0 ? Math.max(j / liveRoutersCount, 5L) : 0L;
        if (!this.zkRoutersManager.isMaxCapacityProtectionEnabled() || this.idealTotalQuotaPerRouter <= this.maxRouterReadCapacity) {
            return max * (1 + ((long) this.perStoreRouterQuotaBuffer));
        }
        LOGGER.warn("The ideal total quota per router: {} has exceeded the router's max capacity: {}, will reduce quotas for all store in proportion.", Long.valueOf(this.idealTotalQuotaPerRouter), Long.valueOf(this.maxRouterReadCapacity));
        return (max * this.maxRouterReadCapacity) / this.idealTotalQuotaPerRouter;
    }

    protected final long calculateIdealTotalQuotaPerRouter() {
        long j = 0;
        int liveRoutersCount = this.zkRoutersManager.getLiveRoutersCount();
        if (liveRoutersCount != 0) {
            j = this.storeRepository.getTotalStoreReadQuota() / liveRoutersCount;
        }
        if (this.zkRoutersManager.isMaxCapacityProtectionEnabled()) {
            this.stats.recordTotalQuota(Math.min(j, this.maxRouterReadCapacity));
        } else {
            this.stats.recordTotalQuota(j);
        }
        return j;
    }

    protected StoreReadThrottler getStoreReadThrottler(String str) {
        return this.storesThrottlers.get().get(str);
    }

    private StoreReadThrottler buildStoreReadThrottler(String str, int i, long j) {
        Optional empty;
        String composeKafkaTopic = Version.composeKafkaTopic(str, i);
        if (this.perStorageNodeThrottlerEnabled && this.routingDataRepository.containsKafkaTopic(composeKafkaTopic)) {
            empty = Optional.of(this.routingDataRepository.getPartitionAssignments(composeKafkaTopic));
            this.routingDataRepository.subscribeRoutingDataChange(Version.composeKafkaTopic(str, i), this);
        } else {
            empty = Optional.empty();
            LOGGER.warn("Unable to find routing data for topic: {}, it might be caused by the delay of the routing data. Only create per store level throttler.", composeKafkaTopic);
        }
        this.stats.recordQuota(str, j);
        return new StoreReadThrottler(str, j, EventThrottler.REJECT_STRATEGY, empty, this.perStorageNodeReadQuotaBuffer, this.storeQuotaCheckTimeWindow, this.storageNodeQuotaCheckTimeWindow);
    }

    private ConcurrentMap<String, StoreReadThrottler> buildAllStoreReadThrottlers() {
        List<Store> allStores = this.storeRepository.getAllStores();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (Store store : allStores) {
            if (!storeHasNoValidVersion(store)) {
                concurrentHashMap.put(store.getName(), buildStoreReadThrottler(store.getName(), store.getCurrentVersion(), calculateStoreQuotaPerRouter(store.getReadQuotaInCU())));
            }
        }
        return concurrentHashMap;
    }

    public void handleRouterCountChanged(int i) {
        LOGGER.info("Number of router has been changed. Delete all of store throttlers.");
        resetAllThrottlers();
        LOGGER.info("All throttlers were reset");
    }

    public void onExternalViewChange(PartitionAssignment partitionAssignment) {
        if (this.perStorageNodeThrottlerEnabled) {
            String parseStoreFromKafkaTopicName = Version.parseStoreFromKafkaTopicName(partitionAssignment.getTopic());
            synchronized (this.storesThrottlers) {
                StoreReadThrottler storeReadThrottler = this.storesThrottlers.get().get(parseStoreFromKafkaTopicName);
                if (storeReadThrottler == null) {
                    LOGGER.error("Could not found throttler for store: {}", parseStoreFromKafkaTopicName);
                } else {
                    storeReadThrottler.updateStorageNodesThrottlers(partitionAssignment);
                }
            }
        }
    }

    public void onCustomizedViewChange(PartitionAssignment partitionAssignment) {
    }

    public void onPartitionStatusChange(String str, ReadOnlyPartitionStatus readOnlyPartitionStatus) {
    }

    public void onRoutingDataDeleted(String str) {
    }

    public void handleStoreCreated(Store store) {
        if (storeHasNoValidVersion(store)) {
            return;
        }
        updateStoreThrottler(() -> {
            long calculateStoreQuotaPerRouter = calculateStoreQuotaPerRouter(store.getReadQuotaInCU());
            LOGGER.info("Store: {} is created. Add a throttler with quota: {} for this store.", store.getName(), Long.valueOf(calculateStoreQuotaPerRouter));
            this.storesThrottlers.get().put(store.getName(), buildStoreReadThrottler(store.getName(), store.getCurrentVersion(), calculateStoreQuotaPerRouter));
        });
    }

    private void updateStoreThrottler(Runnable runnable) {
        synchronized (this.storesThrottlers) {
            long j = this.idealTotalQuotaPerRouter;
            this.idealTotalQuotaPerRouter = calculateIdealTotalQuotaPerRouter();
            runnable.run();
            if ((j > this.maxRouterReadCapacity || this.idealTotalQuotaPerRouter > this.maxRouterReadCapacity) && j != this.idealTotalQuotaPerRouter) {
                LOGGER.info("Old router's quota and/or new router's quota exceeds the router's max capacity, update throttlers for all stores.");
                this.storesThrottlers.set(buildAllStoreReadThrottlers());
            }
        }
    }

    public void handleStoreDeleted(String str) {
        updateStoreThrottler(() -> {
            LOGGER.info("Store: {} has been deleted. Remove the throttler for this store.", str);
            StoreReadThrottler remove = this.storesThrottlers.get().remove(str);
            if (remove == null) {
                return;
            }
            this.stats.recordQuota(str, 0.0d);
            remove.clearStorageNodesThrottlers();
            this.routingDataRepository.unSubscribeRoutingDataChange(Version.composeKafkaTopic(str, remove.getCurrentVersion()), this);
        });
    }

    public void handleStoreChanged(Store store) {
        if (storeHasNoValidVersion(store)) {
            return;
        }
        updateStoreThrottler(() -> {
            StoreReadThrottler storeReadThrottler = this.storesThrottlers.get().get(store.getName());
            if (storeReadThrottler == null) {
                LOGGER.warn("Throttler have not been created for store: {}. Router might miss the creation event.", store.getName());
                handleStoreCreated(store);
                return;
            }
            long calculateStoreQuotaPerRouter = calculateStoreQuotaPerRouter(store.getReadQuotaInCU());
            if (calculateStoreQuotaPerRouter != this.storesThrottlers.get().get(store.getName()).getQuota()) {
                LOGGER.info("Read quota has been changed for store: {} - oldQuota: {}, newQuota: {}. Updating the store read throttler.", store.getName(), Long.valueOf(storeReadThrottler.getQuota()), Long.valueOf(calculateStoreQuotaPerRouter));
                this.storesThrottlers.get().put(store.getName(), buildStoreReadThrottler(store.getName(), store.getCurrentVersion(), calculateStoreQuotaPerRouter));
            }
            if (store.getCurrentVersion() == storeReadThrottler.getCurrentVersion() || !this.perStorageNodeThrottlerEnabled) {
                return;
            }
            LOGGER.info("Current version has been changed for store: {} - oldVersion: {}, currentVersion: {}. Updating the storage node's throttlers only.", store.getName(), Integer.valueOf(storeReadThrottler.getCurrentVersion()), Integer.valueOf(store.getCurrentVersion()));
            this.routingDataRepository.unSubscribeRoutingDataChange(Version.composeKafkaTopic(store.getName(), storeReadThrottler.getCurrentVersion()), this);
            storeReadThrottler.clearStorageNodesThrottlers();
            if (!this.routingDataRepository.containsKafkaTopic(Version.composeKafkaTopic(store.getName(), store.getCurrentVersion()))) {
                LOGGER.warn("Partition assignment not found for store: {} version: {}", store.getName(), Integer.valueOf(store.getCurrentVersion()));
            } else {
                storeReadThrottler.updateStorageNodesThrottlers(this.routingDataRepository.getPartitionAssignments(Version.composeKafkaTopic(store.getName(), store.getCurrentVersion())));
                this.routingDataRepository.subscribeRoutingDataChange(Version.composeKafkaTopic(store.getName(), store.getCurrentVersion()), this);
            }
        });
    }

    private boolean storeHasNoValidVersion(Store store) {
        return store.getCurrentVersion() == 0;
    }

    public void handleRouterClusterConfigChanged(RoutersClusterConfig routersClusterConfig) {
        LOGGER.info("Router cluster config has been changed, reset all throttlers.");
        resetAllThrottlers();
        LOGGER.info("All throttlers were reset");
    }

    private void resetAllThrottlers() {
        synchronized (this.storesThrottlers) {
            long calculateIdealTotalQuotaPerRouter = calculateIdealTotalQuotaPerRouter();
            if (this.idealTotalQuotaPerRouter != calculateIdealTotalQuotaPerRouter) {
                this.idealTotalQuotaPerRouter = calculateIdealTotalQuotaPerRouter;
                this.storesThrottlers.set(buildAllStoreReadThrottlers());
            }
        }
    }

    protected void restoreAllThrottlers() {
        synchronized (this.storesThrottlers) {
            this.storesThrottlers.set(buildAllStoreReadThrottlers());
        }
    }
}
