package com.linkedin.venice.listener;

import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository;
import com.linkedin.venice.listener.request.RouterRequest;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.Partition;
import com.linkedin.venice.meta.PartitionAssignment;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.routerapi.ReplicaState;
import com.linkedin.venice.stats.AggServerQuotaUsageStats;
import com.linkedin.venice.utils.Utils;
import io.netty.channel.ChannelHandlerContext;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/listener/ReadQuotaEnforcementHandlerTest.class */
public class ReadQuotaEnforcementHandlerTest {
    long nodeCapacity;
    String thisNodeId;
    Clock clock;
    long currentTime;
    ReadOnlyStoreRepository storeRepository;
    HelixCustomizedViewOfflinePushRepository customizedViewRepository;
    ReadQuotaEnforcementHandler quotaEnforcer;
    AggServerQuotaUsageStats stats;

    @BeforeMethod
    public void setUp() {
        this.nodeCapacity = 10L;
        this.thisNodeId = "node1";
        this.clock = (Clock) Mockito.mock(Clock.class);
        this.currentTime = 0L;
        ((Clock) Mockito.doReturn(Long.valueOf(this.currentTime)).when(this.clock)).millis();
        this.storeRepository = (ReadOnlyStoreRepository) Mockito.mock(ReadOnlyStoreRepository.class);
        this.customizedViewRepository = (HelixCustomizedViewOfflinePushRepository) Mockito.mock(HelixCustomizedViewOfflinePushRepository.class);
        this.stats = (AggServerQuotaUsageStats) Mockito.mock(AggServerQuotaUsageStats.class);
        this.quotaEnforcer = new ReadQuotaEnforcementHandler(this.nodeCapacity, this.storeRepository, CompletableFuture.completedFuture(this.customizedViewRepository), this.thisNodeId, this.stats, this.clock);
    }

    @Test
    public void testQuotaEnforcementHandlerAtNodeLevel() {
        runTest("dummyStore_v1", this.nodeCapacity * 5 * 10, this.nodeCapacity * 10, TimeUnit.MILLISECONDS.convert(10L, TimeUnit.SECONDS));
    }

    @Test
    public void testQuotaEnforcementAtStoreLevel() {
        String composeKafkaTopic = Version.composeKafkaTopic(Utils.getUniqueString("store"), 1);
        Instance instance = (Instance) Mockito.mock(Instance.class);
        ((Instance) Mockito.doReturn(this.thisNodeId).when(instance)).getNodeId();
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ((Partition) Mockito.doReturn(0).when(partition)).getId();
        ArrayList arrayList = new ArrayList();
        ReplicaState replicaState = (ReplicaState) Mockito.mock(ReplicaState.class);
        ((ReplicaState) Mockito.doReturn(instance.getNodeId()).when(replicaState)).getParticipantId();
        ((ReplicaState) Mockito.doReturn(ExecutionStatus.COMPLETED.name()).when(replicaState)).getVenicePushStatus();
        arrayList.add(replicaState);
        Mockito.when(this.customizedViewRepository.getReplicaStates(composeKafkaTopic, partition.getId())).thenReturn(arrayList);
        PartitionAssignment partitionAssignment = (PartitionAssignment) Mockito.mock(PartitionAssignment.class);
        ((PartitionAssignment) Mockito.doReturn(composeKafkaTopic).when(partitionAssignment)).getTopic();
        ((PartitionAssignment) Mockito.doReturn(Collections.singletonList(partition)).when(partitionAssignment)).getAllPartitions();
        Store store = (Store) Mockito.mock(Store.class);
        ((Store) Mockito.doReturn(5L).when(store)).getReadQuotaInCU();
        ((ReadOnlyStoreRepository) Mockito.doReturn(store).when(this.storeRepository)).getStore((String) Mockito.any());
        this.quotaEnforcer.onExternalViewChange(partitionAssignment);
        runTest(composeKafkaTopic, 5 * 5 * 10, 5 * 10, 10000L);
    }

    @Test
    public void testQuotaEnforcementAtStoreLevelWithMultipleNodes() {
        String composeKafkaTopic = Version.composeKafkaTopic(Utils.getUniqueString("store"), 1);
        Instance instance = (Instance) Mockito.mock(Instance.class);
        ((Instance) Mockito.doReturn(this.thisNodeId).when(instance)).getNodeId();
        Instance instance2 = (Instance) Mockito.mock(Instance.class);
        ((Instance) Mockito.doReturn("otherNodeId").when(instance2)).getNodeId();
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ((Partition) Mockito.doReturn(0).when(partition)).getId();
        ArrayList arrayList = new ArrayList();
        ReplicaState replicaState = (ReplicaState) Mockito.mock(ReplicaState.class);
        ((ReplicaState) Mockito.doReturn(instance.getNodeId()).when(replicaState)).getParticipantId();
        ((ReplicaState) Mockito.doReturn(ExecutionStatus.COMPLETED.name()).when(replicaState)).getVenicePushStatus();
        arrayList.add(replicaState);
        ReplicaState replicaState2 = (ReplicaState) Mockito.mock(ReplicaState.class);
        ((ReplicaState) Mockito.doReturn(instance2.getNodeId()).when(replicaState2)).getParticipantId();
        ((ReplicaState) Mockito.doReturn(ExecutionStatus.COMPLETED.name()).when(replicaState2)).getVenicePushStatus();
        arrayList.add(replicaState2);
        Mockito.when(this.customizedViewRepository.getReplicaStates(composeKafkaTopic, partition.getId())).thenReturn(arrayList);
        PartitionAssignment partitionAssignment = (PartitionAssignment) Mockito.mock(PartitionAssignment.class);
        ((PartitionAssignment) Mockito.doReturn(composeKafkaTopic).when(partitionAssignment)).getTopic();
        ((PartitionAssignment) Mockito.doReturn(Collections.singletonList(partition)).when(partitionAssignment)).getAllPartitions();
        ((HelixCustomizedViewOfflinePushRepository) Mockito.doReturn(partitionAssignment).when(this.customizedViewRepository)).getPartitionAssignments(composeKafkaTopic);
        Store store = (Store) Mockito.mock(Store.class);
        ((Store) Mockito.doReturn(6L).when(store)).getReadQuotaInCU();
        ((ReadOnlyStoreRepository) Mockito.doReturn(store).when(this.storeRepository)).getStore((String) Mockito.any());
        this.quotaEnforcer.onCustomizedViewChange(partitionAssignment);
        runTest(composeKafkaTopic, (6 / 2) * 5 * 10, (6 / 2) * 10, 10000L);
    }

    void runTest(String str, long j, long j2, long j3) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        RouterRequest routerRequest = (RouterRequest) Mockito.mock(RouterRequest.class);
        ((RouterRequest) Mockito.doReturn(str).when(routerRequest)).getResourceName();
        ((RouterRequest) Mockito.doReturn(RequestType.SINGLE_GET).when(routerRequest)).getRequestType();
        ChannelHandlerContext channelHandlerContext = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class);
        ((ChannelHandlerContext) Mockito.doAnswer(invocationOnMock -> {
            atomicInteger2.incrementAndGet();
            return null;
        }).when(channelHandlerContext)).writeAndFlush(Mockito.any());
        ((ChannelHandlerContext) Mockito.doAnswer(invocationOnMock2 -> {
            atomicInteger.incrementAndGet();
            return null;
        }).when(channelHandlerContext)).fireChannelRead(Mockito.any());
        for (int i = 0; i < j; i++) {
            this.quotaEnforcer.channelRead0(channelHandlerContext, routerRequest);
        }
        Assert.assertEquals(atomicInteger.get(), j, "Made " + j + " reads, and all should have been allowed");
        Assert.assertEquals(atomicInteger2.get(), 0, "Didn't exceed " + j + " reads, but " + atomicInteger2.get() + " were throttled");
        this.quotaEnforcer.channelRead0(channelHandlerContext, routerRequest);
        Assert.assertEquals(atomicInteger2.get(), 1, "After reading capacity of " + j + " next read should have been blocked");
        atomicInteger.set(0);
        atomicInteger2.set(0);
        this.currentTime = this.currentTime + j3 + 1;
        ((Clock) Mockito.doReturn(Long.valueOf(this.currentTime)).when(this.clock)).millis();
        for (int i2 = 0; i2 < j2; i2++) {
            this.quotaEnforcer.channelRead0(channelHandlerContext, routerRequest);
        }
        Assert.assertEquals(atomicInteger.get(), j2, "Made " + j2 + " reads after refill, and all should have been allowed");
        Assert.assertEquals(atomicInteger2.get(), 0, "After refill, reads should not be throttled");
        this.quotaEnforcer.channelRead0(channelHandlerContext, routerRequest);
        Assert.assertEquals(atomicInteger2.get(), 1, "After exhausting refill of " + j2 + " next read should have been blocked");
    }
}
