package com.linkedin.venice.fastclient;

import com.beust.jcommander.internal.Lists;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.streaming.StreamingCallback;
import com.linkedin.venice.fastclient.utils.TestClientSimulator;
import com.linkedin.venice.read.RequestType;
import io.tehuti.Metric;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.avro.util.Utf8;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/fastclient/BatchGetAvroStoreClientUnitTest.class */
public class BatchGetAvroStoreClientUnitTest {
    private static final Logger LOGGER = LogManager.getLogger(BatchGetAvroStoreClientUnitTest.class);
    private static final long TIME_OUT_IN_SECONDS = 10;
    private static final int RETRY_THRESHOLD_IN_MS = 50;
    private static final int NUM_KEYS = 12;
    private static final int NUM_PARTITIONS = 3;

    @Test
    public void testSimpleStreamingBatchGet() throws InterruptedException, ExecutionException, TimeoutException {
        TestClientSimulator testClientSimulator = new TestClientSimulator();
        testClientSimulator.generateKeyValues(0, 1000).partitionKeys(1).assignRouteToPartitions("https://host1.linkedin.com", 0).expectRequestWithKeysForPartitionOnRoute(1, 1, "https://host1.linkedin.com", 0).respondToRequestWithKeyValues(5, 1).simulate();
        callStreamingBatchGetAndVerifyResults(testClientSimulator.getFastClient(), testClientSimulator.getRequestedKeyValues(), testClientSimulator.getSimulatorCompleteFuture());
        Map<String, ? extends Metric> stats = getStats(testClientSimulator.getClientConfig());
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_healthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_unhealthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_long_tail_retry_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_key_count.Rate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_success_key_count.Rate").toString()).value() > 0.0d);
    }

    @Test
    public void testSimpleStreamingBatchGettingTimeout() throws InterruptedException, ExecutionException, TimeoutException {
        TestClientSimulator testClientSimulator = new TestClientSimulator();
        testClientSimulator.generateKeyValues(0, 2).partitionKeys(2).assignRouteToPartitions("https://host1.linkedin.com", 0).expectRequestWithKeysForPartitionOnRoute(1, 1, "https://host1.linkedin.com", 0).respondToRequestWithKeyValues(20000, 1).simulate();
        callStreamingBatchGetAndVerifyResults(testClientSimulator.getFastClient(), testClientSimulator.getRequestedKeyValues(), testClientSimulator.getSimulatorCompleteFuture(), true);
        Map<String, ? extends Metric> stats = getStats(testClientSimulator.getClientConfig());
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_healthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_unhealthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_long_tail_retry_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_key_count.Rate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_success_key_count.Rate").toString()).value() > 0.0d);
    }

    @Test(expectedExceptions = {IllegalStateException.class}, expectedExceptionsMessageRegExp = "Must have a corresponding request")
    public void testSimpleStreamingBatchGetWithoutRequest() throws InterruptedException, ExecutionException, TimeoutException {
        new TestClientSimulator().generateKeyValues(0, 2).partitionKeys(1).assignRouteToPartitions("https://host1.linkedin.com", 0).respondToRequestWithKeyValues(1, 1).simulate();
    }

    @Test(expectedExceptions = {IllegalStateException.class}, expectedExceptionsMessageRegExp = "Request should happen before response")
    public void testSimpleStreamingBatchGetWithBadTimeLine() throws InterruptedException, ExecutionException, TimeoutException {
        new TestClientSimulator().generateKeyValues(0, 2).partitionKeys(1).assignRouteToPartitions("https://host1.linkedin.com", 0).expectRequestWithKeysForPartitionOnRoute(2, 1, "https://host1.linkedin.com", 0).respondToRequestWithKeyValues(1, 1).simulate();
    }

    @Test(expectedExceptions = {AssertionError.class}, expectedExceptionsMessageRegExp = "Unexpected key.*")
    public void testSimpleStreamingBatchGetWithBadTimeLineWithSameRoute() throws InterruptedException, ExecutionException, TimeoutException {
        TestClientSimulator testClientSimulator = new TestClientSimulator();
        testClientSimulator.generateKeyValues(0, 10).partitionKeys(2).assignRouteToPartitions("https://host1.linkedin.com", 0, 1).expectRequestWithKeysForPartitionOnRoute(1, 1, "https://host1.linkedin.com", 0).expectRequestWithKeysForPartitionOnRoute(1, 2, "https://host1.linkedin.com", 1).simulate();
        callStreamingBatchGetAndVerifyResults(testClientSimulator.getFastClient(), testClientSimulator.getRequestedKeyValues(), testClientSimulator.getSimulatorCompleteFuture());
        Map<String, ? extends Metric> stats = getStats(testClientSimulator.getClientConfig());
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_healthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_unhealthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_long_tail_retry_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_key_count.Rate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_success_key_count.Rate").toString()).value() > 0.0d);
    }

    @Test
    public void testSimpleStreamingBatchGetAndLongTailRetryEnabledForSingleGet() throws InterruptedException, ExecutionException, TimeoutException {
        TestClientSimulator testClientSimulator = new TestClientSimulator();
        testClientSimulator.generateKeyValues(0, 1000).setLongTailRetryEnabledForSingleGet(true).setLongTailRetryThresholdForSingleGetInMicroseconds(RETRY_THRESHOLD_IN_MS).partitionKeys(1).assignRouteToPartitions("https://host1.linkedin.com", 0).expectRequestWithKeysForPartitionOnRoute(1, 1, "https://host1.linkedin.com", 0).respondToRequestWithKeyValues(5, 1).simulate();
        callStreamingBatchGetAndVerifyResults(testClientSimulator.getFastClient(), testClientSimulator.getRequestedKeyValues(), testClientSimulator.getSimulatorCompleteFuture());
        Map<String, ? extends Metric> stats = getStats(testClientSimulator.getClientConfig());
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_healthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_unhealthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_long_tail_retry_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_key_count.Rate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_success_key_count.Rate").toString()).value() > 0.0d);
    }

    @Test
    public void testSimpleStreamingBatchGetMultiplePartitions() throws InterruptedException, ExecutionException, TimeoutException {
        TestClientSimulator testClientSimulator = new TestClientSimulator();
        testClientSimulator.generateKeyValues(0, 1000).partitionKeys(5).assignRouteToPartitions("https://host1.linkedin.com", 0, 1, 2, NUM_PARTITIONS, 4).expectRequestWithKeysForPartitionOnRoute(1, 1, "https://host1.linkedin.com", 0, 1, 2, NUM_PARTITIONS, 4).respondToRequestWithKeyValues(5, 1).simulate();
        callStreamingBatchGetAndVerifyResults(testClientSimulator.getFastClient(), testClientSimulator.getRequestedKeyValues(), testClientSimulator.getSimulatorCompleteFuture());
        Map<String, ? extends Metric> stats = getStats(testClientSimulator.getClientConfig());
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_healthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_unhealthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_long_tail_retry_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_key_count.Rate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_success_key_count.Rate").toString()).value() > 0.0d);
    }

    @Test
    public void testStreamingBatchGetMultipleRoutesAndPartitions() throws InterruptedException, ExecutionException, TimeoutException {
        TestClientSimulator testClientSimulator = new TestClientSimulator();
        testClientSimulator.generateKeyValues(0, NUM_KEYS).partitionKeys(NUM_PARTITIONS).assignRouteToPartitions("https://host0.linkedin.com", 0).assignRouteToPartitions("https://host1.linkedin.com", 1).assignRouteToPartitions("https://host2.linkedin.com", 2).expectRequestWithKeysForPartitionOnRoute(1, 1, "https://host0.linkedin.com", 0).expectRequestWithKeysForPartitionOnRoute(1, 2, "https://host1.linkedin.com", 1).expectRequestWithKeysForPartitionOnRoute(1, NUM_PARTITIONS, "https://host2.linkedin.com", 2).respondToRequestWithKeyValues(5, 1).respondToRequestWithKeyValues(6, 2).respondToRequestWithKeyValues(7, NUM_PARTITIONS).simulate();
        callStreamingBatchGetAndVerifyResults(testClientSimulator.getFastClient(), testClientSimulator.getRequestedKeyValues(), testClientSimulator.getSimulatorCompleteFuture());
        Map<String, ? extends Metric> stats = getStats(testClientSimulator.getClientConfig());
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_healthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_unhealthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_long_tail_retry_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_key_count.Rate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_success_key_count.Rate").toString()).value() > 0.0d);
    }

    @Test
    public void testStreamingBatchGetMultipleRoutesAndPartitionsV2() throws InterruptedException, ExecutionException, TimeoutException {
        TestClientSimulator testClientSimulator = new TestClientSimulator();
        testClientSimulator.generateKeyValues(0, NUM_KEYS).partitionKeys(NUM_PARTITIONS).assignRouteToPartitions("https://host0.linkedin.com", 0).assignRouteToPartitions("https://host1.linkedin.com", 1).assignRouteToPartitions("https://host2.linkedin.com", 2).expectRequestWithKeysForPartitionOnRoute(1, 1, "https://host0.linkedin.com", 0).expectRequestWithKeysForPartitionOnRoute(2, 2, "https://host1.linkedin.com", 1).expectRequestWithKeysForPartitionOnRoute(NUM_PARTITIONS, NUM_PARTITIONS, "https://host2.linkedin.com", 2).respondToRequestWithKeyValues(5, 1).respondToRequestWithKeyValues(5, 2).respondToRequestWithKeyValues(5, NUM_PARTITIONS).simulate();
        callStreamingBatchGetAndVerifyResults(testClientSimulator.getFastClient(), testClientSimulator.getRequestedKeyValues(), testClientSimulator.getSimulatorCompleteFuture());
        Map<String, ? extends Metric> stats = getStats(testClientSimulator.getClientConfig());
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_healthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_unhealthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_long_tail_retry_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_key_count.Rate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_success_key_count.Rate").toString()).value() > 0.0d);
    }

    @Test
    public void testStreamingBatchGetMultiplePartitionsPerRoute() throws InterruptedException, ExecutionException, TimeoutException {
        TestClientSimulator testClientSimulator = new TestClientSimulator();
        testClientSimulator.generateKeyValues(0, NUM_KEYS).partitionKeys(NUM_PARTITIONS).assignRouteToPartitions("https://host0.linkedin.com", 0, 1).assignRouteToPartitions("https://host1.linkedin.com", 1, 2).assignRouteToPartitions("https://host2.linkedin.com", 2, 0).expectReplicaRequestForPartitionAndRespondWithReplicas(0, Lists.newArrayList(new String[]{"https://host0.linkedin.com", "https://host2.linkedin.com"})).expectReplicaRequestForPartitionAndRespondWithReplicas(1, Lists.newArrayList(new String[]{"https://host1.linkedin.com", "https://host0.linkedin.com"})).expectReplicaRequestForPartitionAndRespondWithReplicas(2, Lists.newArrayList(new String[]{"https://host2.linkedin.com", "https://host1.linkedin.com"})).expectRequestWithKeysForPartitionOnRoute(1, 1, "https://host0.linkedin.com", 0).expectRequestWithKeysForPartitionOnRoute(1, 2, "https://host1.linkedin.com", 1).expectRequestWithKeysForPartitionOnRoute(1, NUM_PARTITIONS, "https://host2.linkedin.com", 2).respondToRequestWithKeyValues(5, 1).respondToRequestWithKeyValues(6, 2).respondToRequestWithKeyValues(7, NUM_PARTITIONS).simulate();
        callStreamingBatchGetAndVerifyResults(testClientSimulator.getFastClient(), testClientSimulator.getRequestedKeyValues(), testClientSimulator.getSimulatorCompleteFuture());
        Map<String, ? extends Metric> stats = getStats(testClientSimulator.getClientConfig());
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_healthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_unhealthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_long_tail_retry_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_key_count.Rate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_success_key_count.Rate").toString()).value() > 0.0d);
    }

    @Test
    public void testStreamingBatchGetLongTailRetryMultiplePartitionsNoRetryTriggered() throws InterruptedException, ExecutionException, TimeoutException {
        TestClientSimulator testClientSimulator = new TestClientSimulator();
        setupLongTailRetryWithMultiplePartitions(testClientSimulator).expectRequestWithKeysForPartitionOnRoute(1, 1, "https://host0.linkedin.com", 0).expectRequestWithKeysForPartitionOnRoute(1, 2, "https://host1.linkedin.com", 1).expectRequestWithKeysForPartitionOnRoute(1, NUM_PARTITIONS, "https://host2.linkedin.com", 2).respondToRequestWithKeyValues(5, 1).respondToRequestWithKeyValues(6, 2).respondToRequestWithKeyValues(10, NUM_PARTITIONS).simulate();
        callStreamingBatchGetAndVerifyResults(testClientSimulator.getFastClient(), testClientSimulator.getRequestedKeyValues(), testClientSimulator.getSimulatorCompleteFuture());
        Map<String, ? extends Metric> stats = getStats(testClientSimulator.getClientConfig());
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_healthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_unhealthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_long_tail_retry_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_key_count.Rate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_success_key_count.Rate").toString()).value() > 0.0d);
    }

    @Test
    public void testStreamingBatchGetLongTailRetryMultiplePartitionsNoOrigResponse() throws InterruptedException, ExecutionException, TimeoutException {
        TestClientSimulator testClientSimulator = new TestClientSimulator();
        setupLongTailRetryWithMultiplePartitions(testClientSimulator).expectRequestWithKeysForPartitionOnRoute(1, 1, "https://host0.linkedin.com", 0).expectRequestWithKeysForPartitionOnRoute(1, 2, "https://host1.linkedin.com", 1).expectRequestWithKeysForPartitionOnRoute(1, NUM_PARTITIONS, "https://host2.linkedin.com", 2).respondToRequestWithKeyValues(5, 1).respondToRequestWithKeyValues(6, 2).expectRequestWithKeysForPartitionOnRoute(RETRY_THRESHOLD_IN_MS, 4, "https://host1.linkedin.com", 2).respondToRequestWithKeyValues(55, 4).simulate();
        callStreamingBatchGetAndVerifyResults(testClientSimulator.getFastClient(), testClientSimulator.getRequestedKeyValues(), testClientSimulator.getSimulatorCompleteFuture());
        Map<String, ? extends Metric> stats = getStats(testClientSimulator.getClientConfig());
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_healthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_unhealthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_long_tail_retry_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_key_count.Max").toString()).value() == ((double) 4));
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_success_key_count.Max").toString()).value() == ((double) 4));
    }

    @Test
    public void testStreamingBatchGetLongTailRetryMultiplePartitionsOrigResponseLate() throws InterruptedException, ExecutionException, TimeoutException {
        TestClientSimulator testClientSimulator = new TestClientSimulator();
        setupLongTailRetryWithMultiplePartitions(testClientSimulator).expectRequestWithKeysForPartitionOnRoute(1, 1, "https://host0.linkedin.com", 0).expectRequestWithKeysForPartitionOnRoute(1, 2, "https://host1.linkedin.com", 1).expectRequestWithKeysForPartitionOnRoute(1, NUM_PARTITIONS, "https://host2.linkedin.com", 2).respondToRequestWithKeyValues(5, 1).respondToRequestWithKeyValues(6, 2).expectRequestWithKeysForPartitionOnRoute(RETRY_THRESHOLD_IN_MS, 4, "https://host1.linkedin.com", 2).respondToRequestWithKeyValues(55, 4).respondToRequestWithKeyValues(70, NUM_PARTITIONS).simulate();
        callStreamingBatchGetAndVerifyResults(testClientSimulator.getFastClient(), testClientSimulator.getRequestedKeyValues(), testClientSimulator.getSimulatorCompleteFuture());
        Map<String, ? extends Metric> stats = getStats(testClientSimulator.getClientConfig());
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_healthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_unhealthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_long_tail_retry_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_key_count.Max").toString()).value() == ((double) 4));
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_success_key_count.Max").toString()).value() == ((double) 4));
    }

    @Test
    public void testStreamingBatchGetLongTailRetryMultiplePartitionsOrigResponseEarly() throws InterruptedException, ExecutionException, TimeoutException {
        TestClientSimulator testClientSimulator = new TestClientSimulator();
        setupLongTailRetryWithMultiplePartitions(testClientSimulator).expectRequestWithKeysForPartitionOnRoute(1, 1, "https://host0.linkedin.com", 0).expectRequestWithKeysForPartitionOnRoute(1, 2, "https://host1.linkedin.com", 1).expectRequestWithKeysForPartitionOnRoute(1, NUM_PARTITIONS, "https://host2.linkedin.com", 2).respondToRequestWithKeyValues(5, 1).respondToRequestWithKeyValues(6, 2).expectRequestWithKeysForPartitionOnRoute(RETRY_THRESHOLD_IN_MS, 4, "https://host1.linkedin.com", 2).respondToRequestWithKeyValues(55, NUM_PARTITIONS).respondToRequestWithKeyValues(70, 4).simulate();
        callStreamingBatchGetAndVerifyResults(testClientSimulator.getFastClient(), testClientSimulator.getRequestedKeyValues(), testClientSimulator.getSimulatorCompleteFuture());
        Map<String, ? extends Metric> stats = getStats(testClientSimulator.getClientConfig());
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_healthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_unhealthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_long_tail_retry_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_key_count.Max").toString()).value() == ((double) 4));
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_success_key_count.Rate").toString()).value() > 0.0d);
    }

    @Test
    public void testStreamingBatchGetLongTailRetryMultiplePartitionsMixedResponse() throws InterruptedException, ExecutionException, TimeoutException {
        TestClientSimulator testClientSimulator = new TestClientSimulator();
        setupLongTailRetryWithMultiplePartitions(testClientSimulator).expectRequestWithKeysForPartitionOnRoute(1, 1, "https://host0.linkedin.com", 0).expectRequestWithKeysForPartitionOnRoute(1, 2, "https://host1.linkedin.com", 1).expectRequestWithKeysForPartitionOnRoute(1, NUM_PARTITIONS, "https://host2.linkedin.com", 2).respondToRequestWithKeyValues(5, 1).expectRequestWithKeysForPartitionOnRoute(RETRY_THRESHOLD_IN_MS, 4, "https://host0.linkedin.com", 1).expectRequestWithKeysForPartitionOnRoute(RETRY_THRESHOLD_IN_MS, 5, "https://host1.linkedin.com", 2).respondToRequestWithKeyValues(55, 2).respondToRequestWithKeyValues(60, 5).simulate();
        callStreamingBatchGetAndVerifyResults(testClientSimulator.getFastClient(), testClientSimulator.getRequestedKeyValues(), testClientSimulator.getSimulatorCompleteFuture());
        Map<String, ? extends Metric> stats = getStats(testClientSimulator.getClientConfig());
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_healthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_unhealthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_long_tail_retry_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_key_count.Max").toString()).value() == ((double) 8));
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_success_key_count.Max").toString()).value() == ((double) 4));
    }

    @Test
    public void testStreamingBatchGetLongTailRetryOriginalRequestErrorBeforeRetry() throws InterruptedException, ExecutionException, TimeoutException {
        TestClientSimulator testClientSimulator = new TestClientSimulator();
        setupLongTailRetryWithMultiplePartitions(testClientSimulator).expectRequestWithKeysForPartitionOnRoute(1, 1, "https://host0.linkedin.com", 0).expectRequestWithKeysForPartitionOnRoute(1, 2, "https://host1.linkedin.com", 1).expectRequestWithKeysForPartitionOnRoute(1, NUM_PARTITIONS, "https://host2.linkedin.com", 2).respondToRequestWithKeyValues(5, 1).respondToRequestWithError(40, NUM_PARTITIONS, 500).expectRequestWithKeysForPartitionOnRoute(RETRY_THRESHOLD_IN_MS, 4, "https://host0.linkedin.com", 1).expectRequestWithKeysForPartitionOnRoute(RETRY_THRESHOLD_IN_MS, 5, "https://host1.linkedin.com", 2).respondToRequestWithKeyValues(55, 2).respondToRequestWithKeyValues(60, 5).simulate();
        callStreamingBatchGetAndVerifyResults(testClientSimulator.getFastClient(), testClientSimulator.getRequestedKeyValues(), testClientSimulator.getSimulatorCompleteFuture());
        Map<String, ? extends Metric> stats = getStats(testClientSimulator.getClientConfig());
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_healthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_unhealthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_long_tail_retry_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_key_count.Max").toString()).value() == ((double) 8));
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_success_key_count.Max").toString()).value() == ((double) 4));
    }

    @Test
    public void testStreamingBatchGetLongTailRetryOriginalRequestErrorAfterRetry() throws InterruptedException, ExecutionException, TimeoutException {
        TestClientSimulator testClientSimulator = new TestClientSimulator();
        setupLongTailRetryWithMultiplePartitions(testClientSimulator).expectRequestWithKeysForPartitionOnRoute(1, 1, "https://host0.linkedin.com", 0).expectRequestWithKeysForPartitionOnRoute(1, 2, "https://host1.linkedin.com", 1).expectRequestWithKeysForPartitionOnRoute(1, NUM_PARTITIONS, "https://host2.linkedin.com", 2).respondToRequestWithKeyValues(5, 1).expectRequestWithKeysForPartitionOnRoute(RETRY_THRESHOLD_IN_MS, 4, "https://host0.linkedin.com", 1).expectRequestWithKeysForPartitionOnRoute(RETRY_THRESHOLD_IN_MS, 5, "https://host1.linkedin.com", 2).respondToRequestWithError(53, NUM_PARTITIONS, 500).respondToRequestWithKeyValues(55, 2).respondToRequestWithKeyValues(60, 5).simulate();
        callStreamingBatchGetAndVerifyResults(testClientSimulator.getFastClient(), testClientSimulator.getRequestedKeyValues(), testClientSimulator.getSimulatorCompleteFuture());
        Map<String, ? extends Metric> stats = getStats(testClientSimulator.getClientConfig());
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_healthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_unhealthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_long_tail_retry_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_key_count.Max").toString()).value() == ((double) 8));
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_success_key_count.Max").toString()).value() == ((double) 4));
    }

    @Test
    public void testStreamingBatchGetLongTailRetryRequestError() throws InterruptedException, ExecutionException, TimeoutException {
        TestClientSimulator testClientSimulator = new TestClientSimulator();
        setupLongTailRetryWithMultiplePartitions(testClientSimulator).expectRequestWithKeysForPartitionOnRoute(1, 1, "https://host0.linkedin.com", 0).expectRequestWithKeysForPartitionOnRoute(1, 2, "https://host1.linkedin.com", 1).expectRequestWithKeysForPartitionOnRoute(1, NUM_PARTITIONS, "https://host2.linkedin.com", 2).respondToRequestWithKeyValues(5, 1).respondToRequestWithKeyValues(6, 2).expectRequestWithKeysForPartitionOnRoute(RETRY_THRESHOLD_IN_MS, 4, "https://host1.linkedin.com", 2).respondToRequestWithError(55, 4, 500).respondToRequestWithKeyValues(70, NUM_PARTITIONS).simulate();
        callStreamingBatchGetAndVerifyResults(testClientSimulator.getFastClient(), testClientSimulator.getRequestedKeyValues(), testClientSimulator.getSimulatorCompleteFuture());
        Map<String, ? extends Metric> stats = getStats(testClientSimulator.getClientConfig());
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_healthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_unhealthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_long_tail_retry_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_key_count.Max").toString()).value() == ((double) 4));
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_success_key_count.Rate").toString()).value() > 0.0d);
    }

    @Test
    public void testStreamingBatchGetLongTailBothError() throws InterruptedException, ExecutionException, TimeoutException {
        TestClientSimulator testClientSimulator = new TestClientSimulator();
        setupLongTailRetryWithMultiplePartitions(testClientSimulator).expectRequestWithKeysForPartitionOnRoute(1, 1, "https://host0.linkedin.com", 0).expectRequestWithKeysForPartitionOnRoute(1, 2, "https://host1.linkedin.com", 1).expectRequestWithKeysForPartitionOnRoute(1, NUM_PARTITIONS, "https://host2.linkedin.com", 2).respondToRequestWithKeyValues(5, 1).respondToRequestWithKeyValues(6, 2).expectRequestWithKeysForPartitionOnRoute(RETRY_THRESHOLD_IN_MS, 4, "https://host1.linkedin.com", 2).respondToRequestWithError(55, 4, 500).respondToRequestWithError(70, NUM_PARTITIONS, 500).simulate();
        callStreamingBatchGetAndVerifyResults(testClientSimulator.getFastClient(), testClientSimulator.getRequestedKeyValues(), testClientSimulator.getSimulatorCompleteFuture(), true);
        Map<String, ? extends Metric> stats = getStats(testClientSimulator.getClientConfig());
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_healthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertTrue(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_unhealthy_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_long_tail_retry_request.OccurrenceRate").toString()).value() > 0.0d);
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_key_count.Max").toString()).value() == ((double) 4));
        Assert.assertFalse(stats.get(new StringBuilder().append(".").append(TestClientSimulator.UNIT_TEST_STORE_NAME).append("--multiget_retry_request_success_key_count.Rate").toString()).value() > 0.0d);
    }

    private TestClientSimulator setupLongTailRetryWithMultiplePartitions(TestClientSimulator testClientSimulator) {
        return testClientSimulator.generateKeyValues(0, NUM_KEYS).partitionKeys(NUM_PARTITIONS).setLongTailRetryEnabledForBatchGet(true).setLongTailRetryThresholdForBatchGetInMicroSeconds(50000).assignRouteToPartitions("https://host0.linkedin.com", 0, 1).assignRouteToPartitions("https://host1.linkedin.com", 1, 2).assignRouteToPartitions("https://host2.linkedin.com", 2, 0).expectReplicaRequestForPartitionAndRespondWithReplicas(0, Lists.newArrayList(new String[]{"https://host0.linkedin.com", "https://host2.linkedin.com"})).expectReplicaRequestForPartitionAndRespondWithReplicas(1, Lists.newArrayList(new String[]{"https://host1.linkedin.com", "https://host0.linkedin.com"})).expectReplicaRequestForPartitionAndRespondWithReplicas(2, Lists.newArrayList(new String[]{"https://host2.linkedin.com", "https://host1.linkedin.com"}));
    }

    private void callStreamingBatchGetAndVerifyResults(AvroGenericStoreClient<String, Utf8> avroGenericStoreClient, Map<String, String> map, CompletableFuture<Integer> completableFuture) throws ExecutionException, InterruptedException, TimeoutException {
        callStreamingBatchGetAndVerifyResults(avroGenericStoreClient, map, completableFuture, false);
    }

    private void callStreamingBatchGetAndVerifyResults(AvroGenericStoreClient<String, Utf8> avroGenericStoreClient, Map<String, String> map, CompletableFuture<Integer> completableFuture, boolean z) throws InterruptedException, ExecutionException, TimeoutException {
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final CompletableFuture completableFuture2 = new CompletableFuture();
        avroGenericStoreClient.streamingBatchGet(map.keySet(), new StreamingCallback<String, Utf8>() { // from class: com.linkedin.venice.fastclient.BatchGetAvroStoreClientUnitTest.1
            public void onRecordReceived(String str, Utf8 utf8) {
                BatchGetAvroStoreClientUnitTest.LOGGER.info("Record received {}:{}", str, utf8);
                if ("nonExisting".equals(str)) {
                    Assert.assertNull(utf8);
                    return;
                }
                if (concurrentHashMap.containsKey(str)) {
                    Assert.fail("Duplicate value received for key " + str);
                }
                concurrentHashMap.put(str, utf8.toString());
            }

            public void onCompletion(Optional<Exception> optional) {
                BatchGetAvroStoreClientUnitTest.LOGGER.info("OnCompletion called . Exception: {} isComplete: {} ", optional, Boolean.valueOf(atomicBoolean.get()));
                if (optional.isPresent()) {
                    completableFuture2.completeExceptionally(optional.get());
                    return;
                }
                Assert.assertEquals(optional, Optional.empty());
                Assert.assertTrue(atomicBoolean.compareAndSet(false, true));
                completableFuture2.complete(0);
            }
        });
        CompletableFuture<Void> allOf = CompletableFuture.allOf(completableFuture2, completableFuture);
        allOf.whenComplete((r6, th) -> {
            if (th == null) {
                LOGGER.info("Test completed successfully");
                Assert.assertTrue(atomicBoolean.get());
                return;
            }
            LOGGER.error("Exception received", th);
            if (z) {
                Assert.assertFalse(atomicBoolean.get());
            } else {
                Assert.fail("Exception received");
            }
        });
        try {
            allOf.get(TIME_OUT_IN_SECONDS, TimeUnit.SECONDS);
        } catch (Exception e) {
            if (!z) {
                throw e;
            }
            LOGGER.info("Test completed successfully because was expecting an exception");
        }
    }

    private Map<String, ? extends Metric> getStats(ClientConfig clientConfig) {
        return clientConfig.getStats(RequestType.MULTI_GET_STREAMING).getMetricsRepository().metrics();
    }
}
