package com.linkedin.venice.pushstatushelper;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.store.AvroSpecificStoreClient;
import com.linkedin.venice.common.PushStatusStoreUtils;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.pushstatus.PushStatusKey;
import com.linkedin.venice.pushstatus.PushStatusValue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/pushstatushelper/PushStatusStoreReaderTest.class */
public class PushStatusStoreReaderTest {
    private static final String CLUSTER_DISCOVERY_D2_SERVICE_NAME = "venice-discovery_test";
    private D2Client d2ClientMock;
    private AvroSpecificStoreClient<PushStatusKey, PushStatusValue> storeClientMock;
    private static final int storeVersion = 42;
    private static final String storeName = "venice-test-push-status-store";
    private static final String incPushVersion = "ip-2022";
    private static final int partitionCount = 2;
    private static final int replicationFactor = 3;

    @BeforeMethod
    public void setUp() {
        this.d2ClientMock = (D2Client) Mockito.mock(D2Client.class);
        this.storeClientMock = (AvroSpecificStoreClient) Mockito.mock(AvroSpecificStoreClient.class);
    }

    private Map<PushStatusKey, PushStatusValue> getPushStatusInstanceData(int i, String str, int i2, int i3) {
        HashMap hashMap = new HashMap();
        for (int i4 = 0; i4 < i2; i4++) {
            PushStatusValue pushStatusValue = new PushStatusValue();
            pushStatusValue.instances = new HashMap();
            for (int i5 = 0; i5 < i3; i5++) {
                pushStatusValue.instances.put("instance-" + i5, Integer.valueOf(ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED.getValue()));
            }
            hashMap.put(PushStatusStoreUtils.getServerIncrementalPushKey(i, i4, str, "SERVER_SIDE_INCREMENTAL_PUSH_STATUS"), pushStatusValue);
        }
        return hashMap;
    }

    @Test(description = "Expect empty results when push status info is not available for any of the partition")
    public void testGetPartitionStatusesWhenPushStatusesAreNotAvailable() throws ExecutionException, InterruptedException {
        Map<PushStatusKey, PushStatusValue> pushStatusInstanceData = getPushStatusInstanceData(storeVersion, incPushVersion, partitionCount, replicationFactor);
        PushStatusStoreReader pushStatusStoreReader = (PushStatusStoreReader) Mockito.spy(new PushStatusStoreReader(this.d2ClientMock, CLUSTER_DISCOVERY_D2_SERVICE_NAME, 10L));
        CompletableFuture completableFuture = (CompletableFuture) Mockito.mock(CompletableFuture.class);
        ((PushStatusStoreReader) Mockito.doReturn(this.storeClientMock).when(pushStatusStoreReader)).getVeniceClient((String) Mockito.any());
        Mockito.when(this.storeClientMock.batchGet(pushStatusInstanceData.keySet())).thenReturn(completableFuture);
        Mockito.when((Map) completableFuture.get()).thenReturn(Collections.emptyMap());
        Map partitionStatuses = pushStatusStoreReader.getPartitionStatuses(storeName, storeVersion, incPushVersion, partitionCount);
        System.out.println(partitionStatuses);
        Iterator it = partitionStatuses.values().iterator();
        while (it.hasNext()) {
            Assert.assertEqualsDeep((Map) it.next(), Collections.emptyMap());
        }
    }

    @Test(expectedExceptions = {VeniceException.class}, description = "Expect exception when result when push status read fails for some partitions")
    public void testGetPartitionStatusesWhenPushStatusReadFailsForSomePartitions() throws ExecutionException, InterruptedException {
        Map<PushStatusKey, PushStatusValue> pushStatusInstanceData = getPushStatusInstanceData(storeVersion, incPushVersion, partitionCount, replicationFactor);
        PushStatusStoreReader pushStatusStoreReader = (PushStatusStoreReader) Mockito.spy(new PushStatusStoreReader(this.d2ClientMock, CLUSTER_DISCOVERY_D2_SERVICE_NAME, 10L));
        CompletableFuture completableFuture = (CompletableFuture) Mockito.mock(CompletableFuture.class);
        ((PushStatusStoreReader) Mockito.doReturn(this.storeClientMock).when(pushStatusStoreReader)).getVeniceClient((String) Mockito.any());
        Mockito.when(this.storeClientMock.batchGet(pushStatusInstanceData.keySet())).thenReturn(completableFuture);
        Mockito.when((Map) completableFuture.get()).thenReturn((Object) null);
        Assert.assertEqualsDeep(pushStatusStoreReader.getPartitionStatuses(storeName, storeVersion, incPushVersion, partitionCount), Collections.emptyMap());
    }

    @Test(expectedExceptions = {VeniceException.class}, description = "Expect an exception when push status store client throws an exception")
    public void testGetPartitionStatusesWhenStoreClientThrowsException() throws ExecutionException, InterruptedException {
        Map<PushStatusKey, PushStatusValue> pushStatusInstanceData = getPushStatusInstanceData(storeVersion, incPushVersion, partitionCount, replicationFactor);
        PushStatusStoreReader pushStatusStoreReader = (PushStatusStoreReader) Mockito.spy(new PushStatusStoreReader(this.d2ClientMock, CLUSTER_DISCOVERY_D2_SERVICE_NAME, 10L));
        CompletableFuture completableFuture = (CompletableFuture) Mockito.mock(CompletableFuture.class);
        ((PushStatusStoreReader) Mockito.doReturn(this.storeClientMock).when(pushStatusStoreReader)).getVeniceClient((String) Mockito.any());
        Mockito.when(this.storeClientMock.batchGet(pushStatusInstanceData.keySet())).thenReturn(completableFuture);
        Mockito.when((Map) completableFuture.get()).thenThrow(new Throwable[]{new ExecutionException(new Throwable("Mock execution exception"))});
        pushStatusStoreReader.getPartitionStatuses(storeName, storeVersion, incPushVersion, partitionCount);
    }

    @Test(description = "Expect statuses of all replicas when store returns all replica statuses")
    public void testGetPartitionStatusesWhenStoreReturnStatusesOfAllReplicas() throws ExecutionException, InterruptedException {
        Map<PushStatusKey, PushStatusValue> pushStatusInstanceData = getPushStatusInstanceData(storeVersion, incPushVersion, partitionCount, replicationFactor);
        PushStatusStoreReader pushStatusStoreReader = (PushStatusStoreReader) Mockito.spy(new PushStatusStoreReader(this.d2ClientMock, CLUSTER_DISCOVERY_D2_SERVICE_NAME, 10L));
        CompletableFuture completableFuture = (CompletableFuture) Mockito.mock(CompletableFuture.class);
        ((PushStatusStoreReader) Mockito.doReturn(this.storeClientMock).when(pushStatusStoreReader)).getVeniceClient((String) Mockito.any());
        Mockito.when(this.storeClientMock.batchGet(pushStatusInstanceData.keySet())).thenReturn(completableFuture);
        Mockito.when((Map) completableFuture.get()).thenReturn(pushStatusInstanceData);
        Map partitionStatuses = pushStatusStoreReader.getPartitionStatuses(storeName, storeVersion, incPushVersion, partitionCount);
        Assert.assertNotEquals(Integer.valueOf(partitionStatuses.size()), 0);
        for (Map.Entry<PushStatusKey, PushStatusValue> entry : pushStatusInstanceData.entrySet()) {
            Assert.assertEqualsDeep((Map) partitionStatuses.get(Integer.valueOf(PushStatusStoreUtils.getPartitionIdFromServerIncrementalPushKey(entry.getKey()))), entry.getValue().instances);
        }
    }

    @Test(description = "Expect empty status when statuses for replicas of a partition is missing")
    public void testGetPartitionStatusesWhenStatusOfPartitionIsMissing() throws ExecutionException, InterruptedException {
        Map<PushStatusKey, PushStatusValue> pushStatusInstanceData = getPushStatusInstanceData(storeVersion, incPushVersion, partitionCount, replicationFactor);
        pushStatusInstanceData.put(PushStatusStoreUtils.getServerIncrementalPushKey(storeVersion, 0, incPushVersion, "SERVER_SIDE_INCREMENTAL_PUSH_STATUS"), null);
        PushStatusStoreReader pushStatusStoreReader = (PushStatusStoreReader) Mockito.spy(new PushStatusStoreReader(this.d2ClientMock, CLUSTER_DISCOVERY_D2_SERVICE_NAME, 10L));
        CompletableFuture completableFuture = (CompletableFuture) Mockito.mock(CompletableFuture.class);
        ((PushStatusStoreReader) Mockito.doReturn(this.storeClientMock).when(pushStatusStoreReader)).getVeniceClient((String) Mockito.any());
        Mockito.when(this.storeClientMock.batchGet(pushStatusInstanceData.keySet())).thenReturn(completableFuture);
        Mockito.when((Map) completableFuture.get()).thenReturn(pushStatusInstanceData);
        Map partitionStatuses = pushStatusStoreReader.getPartitionStatuses(storeName, storeVersion, incPushVersion, partitionCount);
        Assert.assertNotEquals(Integer.valueOf(partitionStatuses.size()), 0);
        Assert.assertEqualsDeep((Map) partitionStatuses.get(0), Collections.emptyMap());
        Assert.assertEqualsDeep((Map) partitionStatuses.get(1), pushStatusInstanceData.get(PushStatusStoreUtils.getServerIncrementalPushKey(storeVersion, 1, incPushVersion, "SERVER_SIDE_INCREMENTAL_PUSH_STATUS")).instances);
    }

    @Test(description = "Expect empty status when instance info for replicas of a partition is missing")
    public void testGetPartitionStatusesWhenInstanceInfoOfPartitionIsMissing() throws ExecutionException, InterruptedException {
        Map<PushStatusKey, PushStatusValue> pushStatusInstanceData = getPushStatusInstanceData(storeVersion, incPushVersion, partitionCount, replicationFactor);
        pushStatusInstanceData.put(PushStatusStoreUtils.getServerIncrementalPushKey(storeVersion, 0, incPushVersion, "SERVER_SIDE_INCREMENTAL_PUSH_STATUS"), new PushStatusValue());
        PushStatusStoreReader pushStatusStoreReader = (PushStatusStoreReader) Mockito.spy(new PushStatusStoreReader(this.d2ClientMock, CLUSTER_DISCOVERY_D2_SERVICE_NAME, 10L));
        CompletableFuture completableFuture = (CompletableFuture) Mockito.mock(CompletableFuture.class);
        ((PushStatusStoreReader) Mockito.doReturn(this.storeClientMock).when(pushStatusStoreReader)).getVeniceClient((String) Mockito.any());
        Mockito.when(this.storeClientMock.batchGet(pushStatusInstanceData.keySet())).thenReturn(completableFuture);
        Mockito.when((Map) completableFuture.get()).thenReturn(pushStatusInstanceData);
        Map partitionStatuses = pushStatusStoreReader.getPartitionStatuses(storeName, storeVersion, incPushVersion, partitionCount);
        Assert.assertNotEquals(Integer.valueOf(partitionStatuses.size()), 0);
        Assert.assertEqualsDeep((Map) partitionStatuses.get(0), Collections.emptyMap());
        Assert.assertEqualsDeep((Map) partitionStatuses.get(1), pushStatusInstanceData.get(PushStatusStoreUtils.getServerIncrementalPushKey(storeVersion, 1, incPushVersion, "SERVER_SIDE_INCREMENTAL_PUSH_STATUS")).instances);
    }

    @Test(description = "Expect all statuses even when number of partitions are greater than the batchGetLimit")
    public void testGetPartitionStatusesWhenNumberOfPartitionsAreGreaterThanBatchGetLimit() throws ExecutionException, InterruptedException {
        Map<PushStatusKey, PushStatusValue> pushStatusInstanceData = getPushStatusInstanceData(storeVersion, incPushVersion, 1055, 1);
        PushStatusStoreReader pushStatusStoreReader = (PushStatusStoreReader) Mockito.spy(new PushStatusStoreReader(this.d2ClientMock, CLUSTER_DISCOVERY_D2_SERVICE_NAME, 10L));
        ((PushStatusStoreReader) Mockito.doReturn(this.storeClientMock).when(pushStatusStoreReader)).getVeniceClient((String) Mockito.any());
        ArrayList arrayList = new ArrayList();
        int i = 0;
        while (true) {
            int i2 = i;
            if (i2 >= 1055) {
                break;
            }
            HashMap hashMap = new HashMap();
            for (int i3 = i2; i3 < i2 + 256 && i3 < 1055; i3++) {
                PushStatusKey serverIncrementalPushKey = PushStatusStoreUtils.getServerIncrementalPushKey(storeVersion, i3, incPushVersion, "SERVER_SIDE_INCREMENTAL_PUSH_STATUS");
                hashMap.put(serverIncrementalPushKey, pushStatusInstanceData.get(serverIncrementalPushKey));
            }
            arrayList.add(hashMap.keySet());
            CompletableFuture completableFuture = (CompletableFuture) Mockito.mock(CompletableFuture.class);
            Mockito.when(this.storeClientMock.batchGet((Set) Mockito.eq(hashMap.keySet()))).thenReturn(completableFuture);
            Mockito.when((Map) completableFuture.get()).thenReturn(hashMap);
            i = i2 + 256;
        }
        Map partitionStatuses = pushStatusStoreReader.getPartitionStatuses(storeName, storeVersion, incPushVersion, 1055, 256);
        Assert.assertNotEquals(Integer.valueOf(partitionStatuses.size()), 0);
        for (Map.Entry<PushStatusKey, PushStatusValue> entry : pushStatusInstanceData.entrySet()) {
            Assert.assertEqualsDeep((Map) partitionStatuses.get(Integer.valueOf(PushStatusStoreUtils.getPartitionIdFromServerIncrementalPushKey(entry.getKey()))), entry.getValue().instances);
        }
        ((AvroSpecificStoreClient) Mockito.verify(this.storeClientMock, Mockito.times(5))).batchGet(Mockito.anySet());
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((AvroSpecificStoreClient) Mockito.verify(this.storeClientMock)).batchGet((Set) it.next());
        }
    }

    @Test(description = "Expect an exception if venice system store client throws an exception", expectedExceptions = {VeniceException.class})
    public void testGetSupposedlyOngoingIncrementalPushVersionsWithClientException() {
        PushStatusKey ongoingIncrementalPushStatusesKey = PushStatusStoreUtils.getOngoingIncrementalPushStatusesKey(storeVersion);
        PushStatusStoreReader pushStatusStoreReader = (PushStatusStoreReader) Mockito.spy(new PushStatusStoreReader(this.d2ClientMock, CLUSTER_DISCOVERY_D2_SERVICE_NAME, 10L));
        ((PushStatusStoreReader) Mockito.doReturn(this.storeClientMock).when(pushStatusStoreReader)).getVeniceClient((String) Mockito.any());
        Mockito.when(this.storeClientMock.get(ongoingIncrementalPushStatusesKey)).thenThrow(VeniceClientException.class);
        pushStatusStoreReader.getSupposedlyOngoingIncrementalPushVersions(storeName, storeVersion);
        ((AvroSpecificStoreClient) Mockito.verify(this.storeClientMock)).get(ongoingIncrementalPushStatusesKey);
        ((PushStatusStoreReader) Mockito.verify(pushStatusStoreReader)).getVeniceClient((String) Mockito.any());
    }

    @Test(description = "Expect an empty result when key-value for ongoing incremental pushes doesn't exist")
    public void testGetSupposedlyOngoingIncrementalPushVersionsWhenIncPushVersionsDoesNotExist() throws ExecutionException, InterruptedException {
        PushStatusKey ongoingIncrementalPushStatusesKey = PushStatusStoreUtils.getOngoingIncrementalPushStatusesKey(storeVersion);
        PushStatusStoreReader pushStatusStoreReader = (PushStatusStoreReader) Mockito.spy(new PushStatusStoreReader(this.d2ClientMock, CLUSTER_DISCOVERY_D2_SERVICE_NAME, 10L));
        CompletableFuture completableFuture = (CompletableFuture) Mockito.mock(CompletableFuture.class);
        ((PushStatusStoreReader) Mockito.doReturn(this.storeClientMock).when(pushStatusStoreReader)).getVeniceClient((String) Mockito.any());
        Mockito.when(this.storeClientMock.get(ongoingIncrementalPushStatusesKey)).thenReturn(completableFuture);
        Mockito.when((PushStatusValue) completableFuture.get()).thenReturn((Object) null);
        Assert.assertEqualsDeep(pushStatusStoreReader.getSupposedlyOngoingIncrementalPushVersions(storeName, storeVersion), Collections.emptyMap());
        ((CompletableFuture) Mockito.verify(completableFuture)).get();
        ((AvroSpecificStoreClient) Mockito.verify(this.storeClientMock)).get(ongoingIncrementalPushStatusesKey);
    }

    @Test(description = "Expect an empty result when inc push versions are missing in the returned result")
    public void testGetSupposedlyOngoingIncrementalPushVersionsWhenIncPushVersionsAreMissing() throws ExecutionException, InterruptedException {
        PushStatusKey ongoingIncrementalPushStatusesKey = PushStatusStoreUtils.getOngoingIncrementalPushStatusesKey(storeVersion);
        PushStatusValue pushStatusValue = new PushStatusValue();
        pushStatusValue.instances = null;
        PushStatusStoreReader pushStatusStoreReader = (PushStatusStoreReader) Mockito.spy(new PushStatusStoreReader(this.d2ClientMock, CLUSTER_DISCOVERY_D2_SERVICE_NAME, 10L));
        CompletableFuture completableFuture = (CompletableFuture) Mockito.mock(CompletableFuture.class);
        ((PushStatusStoreReader) Mockito.doReturn(this.storeClientMock).when(pushStatusStoreReader)).getVeniceClient((String) Mockito.any());
        Mockito.when(this.storeClientMock.get(ongoingIncrementalPushStatusesKey)).thenReturn(completableFuture);
        Mockito.when((PushStatusValue) completableFuture.get()).thenReturn(pushStatusValue);
        Assert.assertEqualsDeep(pushStatusStoreReader.getSupposedlyOngoingIncrementalPushVersions(storeName, storeVersion), Collections.emptyMap());
        ((CompletableFuture) Mockito.verify(completableFuture)).get();
        ((AvroSpecificStoreClient) Mockito.verify(this.storeClientMock)).get(ongoingIncrementalPushStatusesKey);
    }

    @Test(description = "Expect all inc push versions when inc push versions are found in push status store")
    public void testGetSupposedlyOngoingIncrementalPushVersionsWhenIncPushVersionsAreAvailable() throws ExecutionException, InterruptedException {
        PushStatusKey ongoingIncrementalPushStatusesKey = PushStatusStoreUtils.getOngoingIncrementalPushStatusesKey(storeVersion);
        PushStatusValue pushStatusValue = new PushStatusValue();
        pushStatusValue.instances = new HashMap();
        pushStatusValue.instances.put("inc_push_v1", 7);
        pushStatusValue.instances.put("inc_push_v2", 7);
        pushStatusValue.instances.put("inc_push_v3", 7);
        PushStatusStoreReader pushStatusStoreReader = (PushStatusStoreReader) Mockito.spy(new PushStatusStoreReader(this.d2ClientMock, CLUSTER_DISCOVERY_D2_SERVICE_NAME, 10L));
        CompletableFuture completableFuture = (CompletableFuture) Mockito.mock(CompletableFuture.class);
        ((PushStatusStoreReader) Mockito.doReturn(this.storeClientMock).when(pushStatusStoreReader)).getVeniceClient((String) Mockito.any());
        Mockito.when(this.storeClientMock.get(ongoingIncrementalPushStatusesKey)).thenReturn(completableFuture);
        Mockito.when((PushStatusValue) completableFuture.get()).thenReturn(pushStatusValue);
        Assert.assertEqualsDeep(pushStatusStoreReader.getSupposedlyOngoingIncrementalPushVersions(storeName, storeVersion), pushStatusValue.instances);
        ((CompletableFuture) Mockito.verify(completableFuture)).get();
        ((AvroSpecificStoreClient) Mockito.verify(this.storeClientMock)).get(ongoingIncrementalPushStatusesKey);
    }
}
