package com.linkedin.venice.listener;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceNoHelixResourceException;
import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository;
import com.linkedin.venice.listener.request.RouterRequest;
import com.linkedin.venice.listener.response.HttpShortcutResponse;
import com.linkedin.venice.meta.Partition;
import com.linkedin.venice.meta.PartitionAssignment;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.RoutingDataRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreDataChangedListener;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.pushmonitor.ReadOnlyPartitionStatus;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.routerapi.ReplicaState;
import com.linkedin.venice.stats.AggServerQuotaUsageStats;
import com.linkedin.venice.throttle.TokenBucket;
import com.linkedin.venice.utils.ExpiringSet;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.ReferenceCountUtil;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@ChannelHandler.Sharable
/* loaded from: input_file:com/linkedin/venice/listener/ReadQuotaEnforcementHandler.class */
public class ReadQuotaEnforcementHandler extends SimpleChannelInboundHandler<RouterRequest> implements RoutingDataRepository.RoutingDataChangedListener, StoreDataChangedListener {
    private static final Logger LOGGER = LogManager.getLogger(ReadQuotaEnforcementHandler.class);
    private final ConcurrentMap<String, TokenBucket> storeVersionBuckets;
    private final TokenBucket storageNodeBucket;
    private final ReadOnlyStoreRepository storeRepository;
    private HelixCustomizedViewOfflinePushRepository customizedViewRepository;
    private final String thisNodeId;
    private final AggServerQuotaUsageStats stats;
    private final Clock clock;
    private boolean enforcing;
    private final ExpiringSet<String> noBucketStores;
    private volatile boolean initializedVolatile;
    private boolean initialized;
    private final int enforcementIntervalSeconds = 10;
    private final int enforcementCapacityMultiple = 5;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.linkedin.venice.listener.ReadQuotaEnforcementHandler$1, reason: invalid class name */
    /* loaded from: input_file:com/linkedin/venice/listener/ReadQuotaEnforcementHandler$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$venice$read$RequestType = new int[RequestType.values().length];

        static {
            try {
                $SwitchMap$com$linkedin$venice$read$RequestType[RequestType.SINGLE_GET.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$linkedin$venice$read$RequestType[RequestType.MULTI_GET.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$linkedin$venice$read$RequestType[RequestType.COMPUTE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public ReadQuotaEnforcementHandler(long j, ReadOnlyStoreRepository readOnlyStoreRepository, CompletableFuture<HelixCustomizedViewOfflinePushRepository> completableFuture, String str, AggServerQuotaUsageStats aggServerQuotaUsageStats) {
        this(j, readOnlyStoreRepository, completableFuture, str, aggServerQuotaUsageStats, Clock.systemUTC());
    }

    public ReadQuotaEnforcementHandler(long j, ReadOnlyStoreRepository readOnlyStoreRepository, CompletableFuture<HelixCustomizedViewOfflinePushRepository> completableFuture, String str, AggServerQuotaUsageStats aggServerQuotaUsageStats, Clock clock) {
        this.storeVersionBuckets = new VeniceConcurrentHashMap();
        this.enforcing = true;
        this.noBucketStores = new ExpiringSet<>(30L, TimeUnit.SECONDS);
        this.initializedVolatile = false;
        this.initialized = false;
        this.enforcementIntervalSeconds = 10;
        this.enforcementCapacityMultiple = 5;
        this.clock = clock;
        this.storageNodeBucket = tokenBucketfromRcuPerSecond(j, 1.0d);
        this.storeRepository = readOnlyStoreRepository;
        this.thisNodeId = str;
        this.stats = aggServerQuotaUsageStats;
        completableFuture.thenAccept(helixCustomizedViewOfflinePushRepository -> {
            LOGGER.info("Initializing ReadQuotaEnforcementHandler with completed RoutingDataRepository");
            this.customizedViewRepository = helixCustomizedViewOfflinePushRepository;
            init();
        });
    }

    public final void init() {
        this.storeRepository.registerStoreDataChangedListener(this);
        if (this.customizedViewRepository.getResourceAssignment() == null) {
            LOGGER.error("Null resource assignment from HelixCustomizedViewOfflinePushRepository in ReadQuotaEnforcementHandler");
        } else {
            Iterator it = this.customizedViewRepository.getResourceAssignment().getAssignedResources().iterator();
            while (it.hasNext()) {
                onExternalViewChange(this.customizedViewRepository.getPartitionAssignments((String) it.next()));
            }
        }
        this.initializedVolatile = true;
    }

    public boolean isInitialized() {
        if (this.initialized) {
            return true;
        }
        if (!this.initializedVolatile) {
            return false;
        }
        this.initialized = true;
        return true;
    }

    public void channelRead0(ChannelHandlerContext channelHandlerContext, RouterRequest routerRequest) {
        if (!isInitialized()) {
            ReferenceCountUtil.retain(routerRequest);
            channelHandlerContext.fireChannelRead(routerRequest);
            return;
        }
        int rcu = getRcu(routerRequest);
        String parseStoreFromKafkaTopicName = Version.parseStoreFromKafkaTopicName(routerRequest.getResourceName());
        if (!this.storeVersionBuckets.containsKey(routerRequest.getResourceName()) || routerRequest.isRetryRequest()) {
            if (this.enforcing && !this.noBucketStores.contains(routerRequest.getResourceName())) {
                LOGGER.warn("Request for resource: {} but no TokenBucket for that resource. Not yet enforcing quota", routerRequest.getResourceName());
                this.noBucketStores.add(routerRequest.getResourceName());
            }
        } else if (!this.storeVersionBuckets.get(routerRequest.getResourceName()).tryConsume(rcu)) {
            this.stats.recordRejected(parseStoreFromKafkaTopicName, rcu);
            if (this.enforcing) {
                channelHandlerContext.writeAndFlush(new HttpShortcutResponse("Total quota for store " + parseStoreFromKafkaTopicName + " is " + this.storeRepository.getStore(parseStoreFromKafkaTopicName).getReadQuotaInCU() + " RCU per second. Storage Node " + this.thisNodeId + " is allocated " + this.storeVersionBuckets.get(routerRequest.getResourceName()).getAmortizedRefillPerSecond() + " RCU per second which has been exceeded.", HttpResponseStatus.TOO_MANY_REQUESTS));
                return;
            }
        }
        if (!this.storageNodeBucket.tryConsume(rcu)) {
            this.stats.recordRejected(parseStoreFromKafkaTopicName, rcu);
            if (this.enforcing) {
                channelHandlerContext.writeAndFlush(new HttpShortcutResponse("Server over capacity", HttpResponseStatus.SERVICE_UNAVAILABLE));
                return;
            }
        }
        ReferenceCountUtil.retain(routerRequest);
        channelHandlerContext.fireChannelRead(routerRequest);
        this.stats.recordAllowed(parseStoreFromKafkaTopicName, rcu);
        this.stats.recordReadQuotaUsage(parseStoreFromKafkaTopicName, this.storageNodeBucket.getStaleUsageRatio());
    }

    public static int getRcu(RouterRequest routerRequest) {
        switch (AnonymousClass1.$SwitchMap$com$linkedin$venice$read$RequestType[routerRequest.getRequestType().ordinal()]) {
            case 1:
                return 1;
            case 2:
                return routerRequest.getKeyCount();
            case 3:
                return routerRequest.getKeyCount();
            default:
                LOGGER.error("Unknown request type: {}, request for resource: {}", routerRequest.getRequestType(), routerRequest.getResourceName());
                return Integer.MAX_VALUE;
        }
    }

    private TokenBucket tokenBucketfromRcuPerSecond(long j, double d) {
        long j2 = j * 10;
        return new TokenBucket((long) Math.ceil(j2 * 5 * d), (long) Math.ceil(j2 * d), 10L, TimeUnit.SECONDS, this.clock);
    }

    public void onExternalViewChange(PartitionAssignment partitionAssignment) {
        updateQuota(partitionAssignment);
    }

    public void onCustomizedViewChange(PartitionAssignment partitionAssignment) {
        updateQuota(partitionAssignment);
    }

    private void updateQuota(PartitionAssignment partitionAssignment) {
        String topic = partitionAssignment.getTopic();
        if (partitionAssignment.getAllPartitions().isEmpty()) {
            LOGGER.warn("QuotaEnforcementHandler updated with an empty partition map for topic: {}. Skipping update process", topic);
            return;
        }
        double nodeResponsibilityForQuota = getNodeResponsibilityForQuota(this.customizedViewRepository, partitionAssignment, this.thisNodeId);
        if (nodeResponsibilityForQuota <= 0.0d) {
            LOGGER.warn("Routing data changed on quota enforcement handler with 0 replicas assigned to this node, removing quota for resource: {}", topic);
            this.storeVersionBuckets.remove(topic);
        } else {
            this.storeVersionBuckets.put(topic, tokenBucketfromRcuPerSecond(this.storeRepository.getStore(Version.parseStoreFromKafkaTopicName(partitionAssignment.getTopic())).getReadQuotaInCU(), nodeResponsibilityForQuota));
        }
    }

    public void onPartitionStatusChange(String str, ReadOnlyPartitionStatus readOnlyPartitionStatus) {
    }

    protected static double getNodeResponsibilityForQuota(HelixCustomizedViewOfflinePushRepository helixCustomizedViewOfflinePushRepository, PartitionAssignment partitionAssignment, String str) {
        double d = 0.0d;
        for (Partition partition : partitionAssignment.getAllPartitions()) {
            ArrayList arrayList = new ArrayList();
            for (ReplicaState replicaState : helixCustomizedViewOfflinePushRepository.getReplicaStates(partitionAssignment.getTopic(), partition.getId())) {
                if (replicaState.getVenicePushStatus().equals(ExecutionStatus.COMPLETED.name())) {
                    arrayList.add(replicaState.getParticipantId());
                }
            }
            long size = arrayList.size();
            long j = 0;
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                if (((String) it.next()).equals(str)) {
                    j++;
                }
            }
            if (size > 0) {
                d += j / size;
            }
        }
        return d / partitionAssignment.getAllPartitions().size();
    }

    public void onRoutingDataDeleted(String str) {
        this.storeVersionBuckets.remove(str);
    }

    public void handleStoreCreated(Store store) {
        handleStoreChanged(store);
    }

    public void handleStoreDeleted(String str) {
        removeTopics(getStoreTopics(str));
    }

    public void handleStoreChanged(Store store) {
        Set<String> storeTopics = getStoreTopics(store.getName());
        List<String> list = (List) store.getVersions().stream().map(version -> {
            return version.kafkaTopicName();
        }).collect(Collectors.toList());
        for (String str : list) {
            if (store.getCurrentVersion() == Version.parseVersionFromKafkaTopicName(str)) {
                this.customizedViewRepository.subscribeRoutingDataChange(str, this);
                try {
                    onExternalViewChange(this.customizedViewRepository.getPartitionAssignments(str));
                } catch (VeniceNoHelixResourceException e) {
                    Optional version2 = store.getVersion(Version.parseVersionFromKafkaTopicName(str));
                    if (version2.isPresent() && ((Version) version2.get()).getStatus().equals(VersionStatus.ONLINE)) {
                        if (isOldestVersion(Version.parseVersionFromKafkaTopicName(str), list)) {
                            continue;
                        } else if (!isLatestVersion(Version.parseVersionFromKafkaTopicName(str), list)) {
                            throw new VeniceException("Metadata for store " + store.getName() + " shows that version " + ((Version) version2.get()).getNumber() + " is online but couldn't find the resource in external view:", e);
                        }
                    }
                }
                storeTopics.remove(str);
            }
        }
        removeTopics(storeTopics);
    }

    private Set<String> getStoreTopics(String str) {
        return (Set) this.storeVersionBuckets.keySet().stream().filter(str2 -> {
            return Version.parseStoreFromKafkaTopicName(str2).equals(str);
        }).collect(Collectors.toSet());
    }

    private void removeTopics(Set<String> set) {
        for (String str : set) {
            this.customizedViewRepository.unSubscribeRoutingDataChange(str, this);
            this.storeVersionBuckets.remove(str);
        }
    }

    private boolean isOldestVersion(int i, List<String> list) {
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            if (Version.parseVersionFromKafkaTopicName(it.next()) < i) {
                return false;
            }
        }
        return true;
    }

    private boolean isLatestVersion(int i, List<String> list) {
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            if (Version.parseVersionFromKafkaTopicName(it.next()) > i) {
                return false;
            }
        }
        return true;
    }

    protected Set<String> listTopics() {
        return this.storeVersionBuckets.keySet();
    }

    public TokenBucket getBucketForStore(String str) {
        if (str.equals("total")) {
            return this.storageNodeBucket;
        }
        return this.storeVersionBuckets.get(Version.composeKafkaTopic(str, this.storeRepository.getStore(str).getCurrentVersion()));
    }

    public void disableEnforcement() {
        this.enforcing = false;
    }
}
