package com.linkedin.venice.client.store;

import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.exceptions.VeniceClientHttpException;
import com.linkedin.venice.client.store.streaming.StreamingCallback;
import com.linkedin.venice.client.store.streaming.TrackingStreamingCallback;
import com.linkedin.venice.client.store.streaming.VeniceResponseMap;
import com.linkedin.venice.client.store.transport.TransportClient;
import com.linkedin.venice.client.store.transport.TransportClientResponse;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compute.ComputeRequestWrapper;
import com.linkedin.venice.compute.protocol.response.ComputeResponseRecordV1;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.schema.avro.ReadAvroProtocolDefinition;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.serializer.RecordSerializer;
import com.linkedin.venice.serializer.SerializerDeserializerFactory;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.tehuti.Metric;
import io.tehuti.metrics.MetricsRepository;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/client/store/StatTrackingStoreClientTest.class */
public class StatTrackingStoreClientTest {
    private InternalAvroStoreClient<String, Object> mockStoreClient;
    private String storeName;
    private String metricPrefix;
    private static final String VALUE_SCHEMA = "{\n\t\"type\": \"record\",\n\t\"name\": \"record_schema\",\n\t\"fields\": [\n\t\t{\"name\": \"int_field\", \"type\": \"int\", \"default\": 0, \"doc\": \"doc for int_field\"},\n\t\t{\"name\": \"float_field\", \"type\": \"float\", \"doc\": \"doc for float_field\"},\n\t\t{\n\t\t\t\"name\": \"record_field\",\n\t\t\t\"namespace\": \"com.linkedin.test\",\n\t\t\t\"type\": {\n\t\t\t\t\"name\": \"Record1\",\n\t\t\t\t\"type\": \"record\",\n\t\t\t\t\"fields\": [\n\t\t\t\t\t{\"name\": \"nested_field1\", \"type\": \"double\", \"doc\": \"doc for nested field\"}\n\t\t\t\t]\n\t\t\t}\n\t\t},\n\t\t{\"name\": \"float_array_field1\", \"type\": {\"type\": \"array\", \"items\": \"float\"}},\n\t\t{\"name\": \"float_array_field2\", \"type\": {\"type\": \"array\", \"items\": \"float\"}},\n\t\t{\"name\": \"int_array_field2\", \"type\": {\"type\": \"array\", \"items\": \"int\"}}\n\t]\n}";
    private static final Set<String> keys = new HashSet();
    private static final List<Float> dotProductParam;

    /* loaded from: input_file:com/linkedin/venice/client/store/StatTrackingStoreClientTest$MultiGetStreamTestWithExceptionStoreClient.class */
    private static class MultiGetStreamTestWithExceptionStoreClient<K, V> extends SimpleStoreClient<K, V> {
        private final VeniceClientException veniceException;

        public MultiGetStreamTestWithExceptionStoreClient(TransportClient transportClient, String str, boolean z, Executor executor, VeniceClientException veniceClientException) {
            super(transportClient, str, z, executor);
            this.veniceException = veniceClientException;
        }

        public void streamingBatchGet(Set<K> set, StreamingCallback<K, V> streamingCallback) {
            if (streamingCallback instanceof TrackingStreamingCallback) {
                TrackingStreamingCallback trackingStreamingCallback = (TrackingStreamingCallback) streamingCallback;
                Utils.sleep(5L);
                trackingStreamingCallback.onDeserializationCompletion(Optional.of(this.veniceException), 10, 5);
                trackingStreamingCallback.onCompletion(Optional.of(this.veniceException));
            }
        }
    }

    /* loaded from: input_file:com/linkedin/venice/client/store/StatTrackingStoreClientTest$SimpleStoreClient.class */
    private static class SimpleStoreClient<K, V> extends AbstractAvroStoreClient<K, V> {
        public SimpleStoreClient(TransportClient transportClient, String str, boolean z, Executor executor) {
            super(transportClient, z, ClientConfig.defaultGenericClientConfig(str).setDeserializationExecutor(executor));
        }

        protected AbstractAvroStoreClient<K, V> getStoreClientForSchemaReader() {
            return null;
        }

        public RecordDeserializer<V> getDataRecordDeserializer(int i) throws VeniceClientException {
            return null;
        }

        protected SchemaReader getSchemaReader() {
            SchemaReader schemaReader = (SchemaReader) Mockito.mock(SchemaReader.class);
            ((SchemaReader) Mockito.doReturn(Schema.create(Schema.Type.STRING)).when(schemaReader)).getKeySchema();
            return schemaReader;
        }

        public Schema getLatestValueSchema() {
            return Schema.parse(StatTrackingStoreClientTest.VALUE_SCHEMA);
        }
    }

    /* loaded from: input_file:com/linkedin/venice/client/store/StatTrackingStoreClientTest$StoreClientForMultiGetStreamTest.class */
    private static class StoreClientForMultiGetStreamTest<K, V> extends SimpleStoreClient<K, V> {
        private final Map<K, V> resultMap;
        private final boolean fullResponse;

        public StoreClientForMultiGetStreamTest(TransportClient transportClient, String str, boolean z, Executor executor, Map<K, V> map, boolean z2) {
            super(transportClient, str, z, executor);
            this.resultMap = map;
            this.fullResponse = z2;
        }

        public void streamingBatchGet(Set<K> set, StreamingCallback<K, V> streamingCallback) {
            if (streamingCallback instanceof TrackingStreamingCallback) {
                TrackingStreamingCallback trackingStreamingCallback = (TrackingStreamingCallback) streamingCallback;
                Utils.sleep(5L);
                if (this.fullResponse) {
                    set.forEach(obj -> {
                        if (this.resultMap.containsKey(obj)) {
                            trackingStreamingCallback.onRecordReceived(obj, this.resultMap.get(obj));
                        } else {
                            trackingStreamingCallback.onRecordReceived(obj, (Object) null);
                        }
                    });
                } else if (set.size() > 1) {
                    trackingStreamingCallback.onRecordReceived(set.iterator().next(), (Object) null);
                }
                trackingStreamingCallback.onDeserializationCompletion(Optional.empty(), 10, 5);
                trackingStreamingCallback.onCompletion(Optional.empty());
            }
        }
    }

    @BeforeTest
    public void setUp() {
        this.mockStoreClient = (InternalAvroStoreClient) Mockito.mock(InternalAvroStoreClient.class);
        this.storeName = Utils.getUniqueString("store");
        ((InternalAvroStoreClient) Mockito.doReturn(this.storeName).when(this.mockStoreClient)).getStoreName();
        this.metricPrefix = "." + this.storeName;
    }

    @Test
    public void testGet() throws ExecutionException, InterruptedException {
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete(Mockito.mock(Object.class));
        ((InternalAvroStoreClient) Mockito.doReturn(completableFuture.handle((obj, th) -> {
            try {
                Thread.sleep(50L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return obj;
        })).when(this.mockStoreClient)).get((String) Mockito.any(), (Optional) Mockito.any(), Mockito.anyLong());
        MetricsRepository metricsRepository = new MetricsRepository();
        new StatTrackingStoreClient(this.mockStoreClient, ClientConfig.defaultGenericClientConfig(this.mockStoreClient.getStoreName()).setMetricsRepository(metricsRepository)).get("key").get();
        Map metrics = metricsRepository.metrics();
        Metric metric = (Metric) metrics.get(this.metricPrefix + "--request.OccurrenceRate");
        Metric metric2 = (Metric) metrics.get(this.metricPrefix + "--healthy_request.OccurrenceRate");
        Metric metric3 = (Metric) metrics.get(this.metricPrefix + "--unhealthy_request.OccurrenceRate");
        Assert.assertTrue(metric.value() > 0.0d);
        Assert.assertTrue(metric2.value() > 0.0d);
        Assert.assertEquals(Double.valueOf(metric3.value()), Double.valueOf(0.0d));
    }

    @Test
    public void testMultiGet() throws ExecutionException, InterruptedException {
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        for (int i = 0; i < 5; i++) {
            hashMap.put("key_" + i, "value_" + i);
        }
        for (int i2 = 0; i2 < 10; i2++) {
            hashSet.add("key_" + i2);
        }
        MetricsRepository metricsRepository = new MetricsRepository();
        Assert.assertEquals((Map) new StatTrackingStoreClient(new StoreClientForMultiGetStreamTest((TransportClient) Mockito.mock(TransportClient.class), this.storeName, true, AbstractAvroStoreClient.getDefaultDeserializationExecutor(), hashMap, true), ClientConfig.defaultGenericClientConfig(this.storeName).setMetricsRepository(metricsRepository)).batchGet(hashSet).get(), hashMap);
        Map metrics = metricsRepository.metrics();
        Metric metric = (Metric) metrics.get(this.metricPrefix + "--multiget_streaming_request.OccurrenceRate");
        Metric metric2 = (Metric) metrics.get(this.metricPrefix + "--multiget_streaming_healthy_request.OccurrenceRate");
        Metric metric3 = (Metric) metrics.get(this.metricPrefix + "--multiget_streaming_unhealthy_request.OccurrenceRate");
        Metric metric4 = (Metric) metrics.get(this.metricPrefix + "--multiget_streaming_request_key_count.Avg");
        Metric metric5 = (Metric) metrics.get(this.metricPrefix + "--multiget_streaming_success_request_key_count.Avg");
        Metric metric6 = (Metric) metrics.get(this.metricPrefix + "--multiget_streaming_success_request_key_ratio.SimpleRatioStat");
        Assert.assertTrue(metric.value() > 0.0d);
        Assert.assertTrue(metric2.value() > 0.0d);
        Assert.assertEquals(Double.valueOf(metric3.value()), Double.valueOf(0.0d));
        Assert.assertEquals(Double.valueOf(metric4.value()), Double.valueOf(10.0d));
        Assert.assertEquals(Double.valueOf(metric5.value()), Double.valueOf(10.0d));
        Assert.assertTrue(metric6.value() > 0.0d, "Success Key Ratio should be positive");
    }

    @Test(expectedExceptions = {ExecutionException.class}, expectedExceptionsMessageRegExp = ".*Received partial response.*")
    public void testMultiGetWithPartialResponse() throws ExecutionException, InterruptedException {
        HashSet hashSet = new HashSet();
        for (int i = 0; i < 10; i++) {
            hashSet.add("key_" + i);
        }
        new StatTrackingStoreClient(new StoreClientForMultiGetStreamTest((TransportClient) Mockito.mock(TransportClient.class), this.storeName, true, AbstractAvroStoreClient.getDefaultDeserializationExecutor(), Collections.emptyMap(), false), ClientConfig.defaultGenericClientConfig(this.storeName).setMetricsRepository(new MetricsRepository())).batchGet(hashSet).get();
    }

    @Test
    public void testGetWithException() throws InterruptedException {
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.completeExceptionally(new VeniceClientHttpException("Inner mock exception", HttpResponseStatus.BAD_REQUEST.code()));
        ((InternalAvroStoreClient) Mockito.doReturn(completableFuture).when(this.mockStoreClient)).get((String) Mockito.any(), (Optional) Mockito.any(), Mockito.anyLong());
        MetricsRepository metricsRepository = new MetricsRepository();
        try {
            new StatTrackingStoreClient(this.mockStoreClient, ClientConfig.defaultGenericClientConfig(this.mockStoreClient.getStoreName()).setMetricsRepository(metricsRepository)).get("key").get();
            Assert.fail("ExecutionException should be thrown");
        } catch (ExecutionException e) {
        }
        Map metrics = metricsRepository.metrics();
        Metric metric = (Metric) metrics.get(this.metricPrefix + "--request.OccurrenceRate");
        Metric metric2 = (Metric) metrics.get(this.metricPrefix + "--healthy_request.OccurrenceRate");
        Metric metric3 = (Metric) metrics.get(this.metricPrefix + "--unhealthy_request.OccurrenceRate");
        Metric metric4 = (Metric) metrics.get(this.metricPrefix + "--http_400_request.OccurrenceRate");
        Assert.assertTrue(metric.value() > 0.0d);
        Assert.assertEquals(Double.valueOf(metric2.value()), Double.valueOf(0.0d));
        Assert.assertTrue(metric3.value() > 0.0d);
        Assert.assertTrue(metric4.value() > 0.0d);
    }

    @Test
    public void testMultiGetWithException() throws InterruptedException {
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.completeExceptionally(new VeniceClientHttpException("Inner mock exception", HttpResponseStatus.BAD_REQUEST.code()));
        ((InternalAvroStoreClient) Mockito.doReturn(completableFuture).when(this.mockStoreClient)).batchGet((Set) Mockito.any());
        MetricsRepository metricsRepository = new MetricsRepository();
        StatTrackingStoreClient statTrackingStoreClient = new StatTrackingStoreClient(new MultiGetStreamTestWithExceptionStoreClient((TransportClient) Mockito.mock(TransportClient.class), this.storeName, true, AbstractAvroStoreClient.getDefaultDeserializationExecutor(), new VeniceClientHttpException(500)), ClientConfig.defaultGenericClientConfig(this.storeName).setMetricsRepository(metricsRepository));
        HashSet hashSet = new HashSet();
        hashSet.add("key");
        try {
            statTrackingStoreClient.batchGet(hashSet).get();
            Assert.fail("ExecutionException should be thrown");
        } catch (ExecutionException e) {
        }
        Map metrics = metricsRepository.metrics();
        Metric metric = (Metric) metrics.get(this.metricPrefix + "--multiget_streaming_request.OccurrenceRate");
        Metric metric2 = (Metric) metrics.get(this.metricPrefix + "--multiget_streaming_healthy_request.OccurrenceRate");
        Metric metric3 = (Metric) metrics.get(this.metricPrefix + "--multiget_streaming_unhealthy_request.OccurrenceRate");
        Metric metric4 = (Metric) metrics.get(this.metricPrefix + "--multiget_streaming_http_500_request.OccurrenceRate");
        Assert.assertTrue(metric.value() > 0.0d);
        Assert.assertEquals(Double.valueOf(metric2.value()), Double.valueOf(0.0d));
        Assert.assertTrue(metric3.value() > 0.0d);
        Assert.assertTrue(metric4.value() > 0.0d);
    }

    @Test(enabled = false)
    public void testCompute() throws ExecutionException, InterruptedException {
        TransportClient transportClient = (TransportClient) Mockito.mock(TransportClient.class);
        Schema parse = Schema.parse("{\n\t\"type\": \"record\",\n\t\"name\": \"test_store_VeniceComputeResult\", \"doc\" : \"\",\n\t\"fields\": [\n\t\t{\"name\": \"int_field\", \"type\": \"int\", \"default\": 0, \"doc\": \"\"},\n\t\t{\"name\": \"dot_product_for_float_array_field1\", \"type\": [\"null\",\"float\"], \"doc\": \"\", \"default\" : null},\n\t\t{\"name\": \"veniceComputationError\", \"type\": {\"type\": \"map\", \"values\": \"string\"}, \"doc\": \"\", \"default\" : {}}]}");
        RecordSerializer avroGenericSerializer = SerializerDeserializerFactory.getAvroGenericSerializer(parse);
        ArrayList arrayList = new ArrayList();
        GenericData.Record record = new GenericData.Record(parse);
        record.put("int_field", 1);
        record.put("dot_product_for_float_array_field1", Float.valueOf(1.1f));
        record.put("veniceComputationError", Collections.emptyMap());
        ComputeResponseRecordV1 computeResponseRecordV1 = new ComputeResponseRecordV1();
        computeResponseRecordV1.keyIndex = 0;
        computeResponseRecordV1.value = ByteBuffer.wrap(avroGenericSerializer.serialize(record));
        GenericData.Record record2 = new GenericData.Record(parse);
        record2.put("int_field", 2);
        record2.put("dot_product_for_float_array_field1", Float.valueOf(1.2f));
        record2.put("veniceComputationError", Collections.emptyMap());
        ComputeResponseRecordV1 computeResponseRecordV12 = new ComputeResponseRecordV1();
        computeResponseRecordV12.keyIndex = 1;
        computeResponseRecordV12.value = ByteBuffer.wrap(avroGenericSerializer.serialize(record2));
        arrayList.add(computeResponseRecordV1);
        arrayList.add(computeResponseRecordV12);
        TransportClientResponse transportClientResponse = new TransportClientResponse(ReadAvroProtocolDefinition.COMPUTE_RESPONSE_V1.getProtocolVersion(), CompressionStrategy.NO_OP, SerializerDeserializerFactory.getAvroGenericSerializer(ComputeResponseRecordV1.SCHEMA$).serializeObjects(arrayList));
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete(transportClientResponse);
        completableFuture.handle((transportClientResponse2, th) -> {
            try {
                Thread.sleep(50L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return transportClientResponse2;
        });
        ((TransportClient) Mockito.doReturn(completableFuture).when(transportClient)).post((String) Mockito.any(), (Map) Mockito.any(), (byte[]) Mockito.any());
        SimpleStoreClient simpleStoreClient = new SimpleStoreClient(transportClient, "test_store", true, AbstractAvroStoreClient.getDefaultDeserializationExecutor());
        MetricsRepository metricsRepository = new MetricsRepository();
        Map map = (Map) new StatTrackingStoreClient(simpleStoreClient, ClientConfig.defaultGenericClientConfig("test_store").setMetricsRepository(metricsRepository)).compute().project(new String[]{"int_field"}).dotProduct("float_array_field1", dotProductParam, "dot_product_for_float_array_field1").execute(keys).get();
        Assert.assertEquals(map.size(), 2);
        Assert.assertNotNull(map.get("key1"));
        GenericRecord genericRecord = (GenericRecord) map.get("key1");
        Assert.assertEquals(1, genericRecord.get("int_field"));
        Assert.assertEquals(Float.valueOf(1.1f), genericRecord.get("dot_product_for_float_array_field1"));
        Assert.assertNotNull(map.get("key2"));
        GenericRecord genericRecord2 = (GenericRecord) map.get("key2");
        Assert.assertEquals(2, genericRecord2.get("int_field"));
        Assert.assertEquals(Float.valueOf(1.2f), genericRecord2.get("dot_product_for_float_array_field1"));
        Map metrics = metricsRepository.metrics();
        String str = ".test_store";
        Metric metric = (Metric) metrics.get(str + "--compute_request.OccurrenceRate");
        Metric metric2 = (Metric) metrics.get(str + "--compute_healthy_request.OccurrenceRate");
        Metric metric3 = (Metric) metrics.get(str + "--compute_unhealthy_request.OccurrenceRate");
        Metric metric4 = (Metric) metrics.get(str + "--compute_response_deserialization_time.Avg");
        Assert.assertTrue(metric.value() > 0.0d);
        Assert.assertTrue(metric2.value() > 0.0d);
        Assert.assertEquals(Double.valueOf(metric3.value()), Double.valueOf(0.0d));
        TestUtils.waitForNonDeterministicCompletion(5L, TimeUnit.SECONDS, () -> {
            return !Double.isNaN(metric4.value());
        });
        Assert.assertTrue(metric4.value() > 0.0d);
    }

    @Test
    public void multiGetStreamTest() {
        String uniqueString = Utils.getUniqueString("test_store");
        StoreClientForMultiGetStreamTest storeClientForMultiGetStreamTest = new StoreClientForMultiGetStreamTest((TransportClient) Mockito.mock(TransportClient.class), uniqueString, true, AbstractAvroStoreClient.getDefaultDeserializationExecutor(), Collections.emptyMap(), true);
        MetricsRepository metricsRepository = new MetricsRepository();
        StatTrackingStoreClient statTrackingStoreClient = new StatTrackingStoreClient(storeClientForMultiGetStreamTest, ClientConfig.defaultGenericClientConfig(uniqueString).setMetricsRepository(metricsRepository));
        HashSet hashSet = new HashSet();
        for (int i = 0; i < 10; i++) {
            hashSet.add("key_" + i);
        }
        statTrackingStoreClient.streamingBatchGet(hashSet, new StreamingCallback<String, GenericRecord>() { // from class: com.linkedin.venice.client.store.StatTrackingStoreClientTest.1
            public void onRecordReceived(String str, GenericRecord genericRecord) {
            }

            public void onCompletion(Optional<Exception> optional) {
            }
        });
        Map metrics = metricsRepository.metrics();
        String str = "." + uniqueString;
        Metric metric = (Metric) metrics.get(str + "--multiget_streaming_request.OccurrenceRate");
        Metric metric2 = (Metric) metrics.get(str + "--multiget_streaming_healthy_request.OccurrenceRate");
        Metric metric3 = (Metric) metrics.get(str + "--multiget_streaming_unhealthy_request.OccurrenceRate");
        Metric metric4 = (Metric) metrics.get(str + "--multiget_streaming_success_request_duplicate_key_count.Rate");
        Assert.assertTrue(metric.value() > 0.0d);
        Assert.assertTrue(metric2.value() > 0.0d);
        Assert.assertEquals(Double.valueOf(metric3.value()), Double.valueOf(0.0d));
        Assert.assertTrue(metric4.value() > 0.0d);
    }

    @Test
    public void multiGetStreamTestWithException() {
        MultiGetStreamTestWithExceptionStoreClient multiGetStreamTestWithExceptionStoreClient = new MultiGetStreamTestWithExceptionStoreClient((TransportClient) Mockito.mock(TransportClient.class), this.storeName, false, AbstractAvroStoreClient.getDefaultDeserializationExecutor(), new VeniceClientHttpException(500));
        MetricsRepository metricsRepository = new MetricsRepository();
        StatTrackingStoreClient statTrackingStoreClient = new StatTrackingStoreClient(multiGetStreamTestWithExceptionStoreClient, ClientConfig.defaultGenericClientConfig(this.storeName).setMetricsRepository(metricsRepository));
        HashSet hashSet = new HashSet();
        for (int i = 0; i < 10; i++) {
            hashSet.add("key_" + i);
        }
        statTrackingStoreClient.streamingBatchGet(hashSet, new StreamingCallback<String, GenericRecord>() { // from class: com.linkedin.venice.client.store.StatTrackingStoreClientTest.2
            public void onRecordReceived(String str, GenericRecord genericRecord) {
            }

            public void onCompletion(Optional<Exception> optional) {
            }
        });
        Map metrics = metricsRepository.metrics();
        Metric metric = (Metric) metrics.get(this.metricPrefix + "--multiget_streaming_request.OccurrenceRate");
        Metric metric2 = (Metric) metrics.get(this.metricPrefix + "--multiget_streaming_healthy_request.OccurrenceRate");
        Metric metric3 = (Metric) metrics.get(this.metricPrefix + "--multiget_streaming_unhealthy_request.OccurrenceRate");
        Metric metric4 = (Metric) metrics.get(this.metricPrefix + "--multiget_streaming_success_request_duplicate_key_count.Rate");
        Metric metric5 = (Metric) metrics.get(this.metricPrefix + "--multiget_streaming_http_500_request.OccurrenceRate");
        Assert.assertTrue(metric.value() > 0.0d);
        Assert.assertEquals(Double.valueOf(metric2.value()), Double.valueOf(0.0d));
        Assert.assertTrue(metric3.value() > 0.0d);
        Assert.assertTrue(metric4.value() > 0.0d);
        Assert.assertTrue(metric5.value() > 0.0d);
    }

    @Test
    public void multiGetStreamTestForPartialResponse() throws InterruptedException, ExecutionException, TimeoutException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        String uniqueString = Utils.getUniqueString("test_store");
        AbstractAvroStoreClient abstractAvroStoreClient = new SimpleStoreClient<K, V>((TransportClient) Mockito.mock(TransportClient.class), uniqueString, false, AbstractAvroStoreClient.getDefaultDeserializationExecutor(), new VeniceClientHttpException(500), countDownLatch) { // from class: com.linkedin.venice.client.store.StatTrackingStoreClientTest.1StoreClientForMultiGetStreamTest
            private final VeniceClientException veniceException;
            final /* synthetic */ CountDownLatch val$resultLatch;

            {
                this.val$resultLatch = countDownLatch;
                this.veniceException = r12;
            }

            public void streamingBatchGet(Set<K> set, StreamingCallback<K, V> streamingCallback) {
                CountDownLatch countDownLatch2 = this.val$resultLatch;
                new Thread(() -> {
                    for (int i = 0; i < 10; i += 2) {
                        streamingCallback.onRecordReceived("key_" + i, Mockito.mock(GenericRecord.class));
                        streamingCallback.onRecordReceived("key_" + (i + 1), (Object) null);
                    }
                    if (streamingCallback instanceof TrackingStreamingCallback) {
                        ((TrackingStreamingCallback) streamingCallback).onDeserializationCompletion(Optional.of(this.veniceException), 10, 5);
                    }
                    countDownLatch2.countDown();
                }).start();
            }
        };
        MetricsRepository metricsRepository = new MetricsRepository();
        StatTrackingStoreClient statTrackingStoreClient = new StatTrackingStoreClient(abstractAvroStoreClient, ClientConfig.defaultGenericClientConfig(uniqueString).setMetricsRepository(metricsRepository));
        HashSet hashSet = new HashSet();
        for (int i = 0; i < 10; i++) {
            hashSet.add("key_" + i);
        }
        CompletableFuture streamingBatchGet = statTrackingStoreClient.streamingBatchGet(hashSet);
        countDownLatch.await();
        VeniceResponseMap veniceResponseMap = (VeniceResponseMap) streamingBatchGet.get(1L, TimeUnit.MILLISECONDS);
        Assert.assertTrue(!veniceResponseMap.isFullResponse());
        Assert.assertTrue(veniceResponseMap.size() > 0);
        Assert.assertTrue(veniceResponseMap.getNonExistingKeys().size() > 0);
        Map metrics = metricsRepository.metrics();
        String str = "." + uniqueString;
        Metric metric = (Metric) metrics.get(str + "--multiget_streaming_app_timed_out_request.OccurrenceRate");
        Metric metric2 = (Metric) metrics.get(str + "--multiget_streaming_app_timed_out_request_result_ratio.Avg");
        Assert.assertTrue(metric.value() > 0.0d);
        Assert.assertTrue(metric2.value() > 0.0d);
    }

    @Test
    public void computeStreamTestForPartialResponse() throws InterruptedException, ExecutionException, TimeoutException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        String uniqueString = Utils.getUniqueString("test_store");
        AbstractAvroStoreClient abstractAvroStoreClient = new SimpleStoreClient<K, GenericRecord>((TransportClient) Mockito.mock(TransportClient.class), uniqueString, false, AbstractAvroStoreClient.getDefaultDeserializationExecutor(), new VeniceClientHttpException(500), countDownLatch) { // from class: com.linkedin.venice.client.store.StatTrackingStoreClientTest.1StoreClientForComputeStreamTest
            private final VeniceClientException veniceException;
            final /* synthetic */ CountDownLatch val$resultLatch;

            {
                this.val$resultLatch = countDownLatch;
                this.veniceException = r12;
            }

            public void compute(ComputeRequestWrapper computeRequestWrapper, Set<K> set, Schema schema, StreamingCallback<K, ComputeGenericRecord> streamingCallback, long j, BinaryEncoder binaryEncoder, ByteArrayOutputStream byteArrayOutputStream) {
                CountDownLatch countDownLatch2 = this.val$resultLatch;
                new Thread(() -> {
                    for (int i = 0; i < 10; i += 2) {
                        streamingCallback.onRecordReceived("key_" + i, (ComputeGenericRecord) Mockito.mock(ComputeGenericRecord.class));
                        streamingCallback.onRecordReceived("key_" + (i + 1), (Object) null);
                    }
                    if (streamingCallback instanceof TrackingStreamingCallback) {
                        ((TrackingStreamingCallback) streamingCallback).onDeserializationCompletion(Optional.of(this.veniceException), 10, 5);
                    }
                    countDownLatch2.countDown();
                }).start();
            }
        };
        MetricsRepository metricsRepository = new MetricsRepository();
        StatTrackingStoreClient statTrackingStoreClient = new StatTrackingStoreClient(abstractAvroStoreClient, ClientConfig.defaultGenericClientConfig(uniqueString).setMetricsRepository(metricsRepository));
        HashSet hashSet = new HashSet();
        for (int i = 0; i < 10; i++) {
            hashSet.add("key_" + i);
        }
        CompletableFuture streamingExecute = statTrackingStoreClient.compute().project(new String[]{"int_field"}).streamingExecute(hashSet);
        countDownLatch.await();
        VeniceResponseMap veniceResponseMap = (VeniceResponseMap) streamingExecute.get(1L, TimeUnit.MILLISECONDS);
        Assert.assertFalse(veniceResponseMap.isFullResponse());
        Assert.assertTrue(veniceResponseMap.size() > 0);
        Assert.assertTrue(veniceResponseMap.getNonExistingKeys().size() > 0);
        Map metrics = metricsRepository.metrics();
        String str = "." + uniqueString;
        Metric metric = (Metric) metrics.get(str + "--compute_streaming_app_timed_out_request.OccurrenceRate");
        Metric metric2 = (Metric) metrics.get(str + "--compute_streaming_app_timed_out_request_result_ratio.Avg");
        Assert.assertTrue(metric.value() > 0.0d);
        Assert.assertTrue(metric2.value() > 0.0d);
    }

    static {
        keys.add("key1");
        keys.add("key2");
        dotProductParam = Arrays.asList(Float.valueOf(0.1f), Float.valueOf(0.2f));
    }
}
