package com.linkedin.venice.client.store;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.stats.ClientStats;
import com.linkedin.venice.client.store.transport.TransportClient;
import com.linkedin.venice.client.store.transport.TransportClientResponse;
import com.linkedin.venice.client.store.transport.TransportClientStreamingCallback;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compute.protocol.response.ComputeResponseRecordV1;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.schema.avro.ReadAvroProtocolDefinition;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.serializer.RecordSerializer;
import com.linkedin.venice.serializer.SerializerDeserializerFactory;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.TestUtils;
import io.tehuti.metrics.MetricsRepository;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/client/store/AbstractAvroStoreClientTest.class */
public class AbstractAvroStoreClientTest {
    private static final Schema VALUE_SCHEMA = Schema.parse("{\n\t\"type\": \"record\",\n\t\"name\": \"record_schema\",\n\t\"fields\": [\n\t\t{\"name\": \"int_field\", \"type\": \"int\", \"default\": 0, \"doc\": \"doc for int_field\"},\n\t\t{\"name\": \"float_field\", \"type\": \"float\", \"doc\": \"doc for float_field\"},\n\t\t{\n\t\t\t\"name\": \"record_field\",\n\t\t\t\"namespace\": \"com.linkedin.test\",\n\t\t\t\"type\": {\n\t\t\t\t\"name\": \"Record1\",\n\t\t\t\t\"type\": \"record\",\n\t\t\t\t\"fields\": [\n\t\t\t\t\t{\"name\": \"nested_field1\", \"type\": \"double\", \"doc\": \"doc for nested field\"}\n\t\t\t\t]\n\t\t\t}\n\t\t},\n\t\t{\"name\": \"float_array_field1\", \"type\": {\"type\": \"array\", \"items\": \"float\"}},\n\t\t{\"name\": \"float_array_field2\", \"type\": {\"type\": \"array\", \"items\": \"float\"}},\n\t\t{\"name\": \"int_array_field2\", \"type\": {\"type\": \"array\", \"items\": \"int\"}}\n\t]\n}");
    private static final Set<String> keys = new HashSet();
    private static final List<Float> dotProductParam;
    private static final List<Float> cosineSimilarityParam;
    private static final List<Float> hadamardProductParam;

    /* loaded from: input_file:com/linkedin/venice/client/store/AbstractAvroStoreClientTest$ParameterizedComputeTransportClient.class */
    private static class ParameterizedComputeTransportClient extends TransportClient {
        private final Map<String, String> headerMap = new HashMap();
        private final Optional<byte[]> responseBody;
        private final Optional<VeniceClientException> completedException;

        public ParameterizedComputeTransportClient(Optional<byte[]> optional, Optional<VeniceClientException> optional2) {
            this.responseBody = optional;
            this.completedException = optional2;
            this.headerMap.put("X-VENICE-SCHEMA-ID", Integer.toString(ReadAvroProtocolDefinition.COMPUTE_RESPONSE_V1.getProtocolVersion()));
        }

        public CompletableFuture<TransportClientResponse> get(String str, Map<String, String> map) {
            return null;
        }

        public CompletableFuture<TransportClientResponse> post(String str, Map<String, String> map, byte[] bArr) {
            return null;
        }

        public void streamPost(String str, Map<String, String> map, byte[] bArr, TransportClientStreamingCallback transportClientStreamingCallback, int i) {
            HashMap hashMap = new HashMap();
            hashMap.put("X-VENICE-SCHEMA-ID", Integer.toString(ReadAvroProtocolDefinition.COMPUTE_RESPONSE_V1.getProtocolVersion()));
            transportClientStreamingCallback.onHeaderReceived(hashMap);
            this.responseBody.ifPresent(bArr2 -> {
                transportClientStreamingCallback.onDataReceived(ByteBuffer.wrap(bArr2));
            });
            transportClientStreamingCallback.onCompletion(this.completedException);
        }

        public void close() throws IOException {
        }
    }

    /* loaded from: input_file:com/linkedin/venice/client/store/AbstractAvroStoreClientTest$SimpleStoreClient.class */
    private static class SimpleStoreClient<K, V> extends AbstractAvroStoreClient<K, V> {
        private final boolean overrideGetSchemaReader;

        public SimpleStoreClient(TransportClient transportClient, String str, boolean z, Executor executor) {
            this(transportClient, str, z, executor, true);
        }

        public SimpleStoreClient(TransportClient transportClient, String str, boolean z, Executor executor, boolean z2) {
            super(transportClient, z, ClientConfig.defaultGenericClientConfig(str).setDeserializationExecutor(executor));
            this.overrideGetSchemaReader = z2;
        }

        public RecordDeserializer<V> getDataRecordDeserializer(int i) throws VeniceClientException {
            return null;
        }

        protected SchemaReader getSchemaReader() {
            if (!this.overrideGetSchemaReader) {
                return super.getSchemaReader();
            }
            SchemaReader schemaReader = (SchemaReader) Mockito.mock(SchemaReader.class);
            ((SchemaReader) Mockito.doReturn(Schema.create(Schema.Type.STRING)).when(schemaReader)).getKeySchema();
            return schemaReader;
        }

        public Schema getLatestValueSchema() {
            return AbstractAvroStoreClientTest.VALUE_SCHEMA;
        }
    }

    @Test
    public void testCompute() throws ExecutionException, InterruptedException {
        Schema parse = Schema.parse("{  \"type\": \"record\",          \"name\": \"test_store_VeniceComputeResult\",         \"doc\": \"\",                            \"fields\": [                 { \"name\": \"int_field\", \"type\": \"int\", \"doc\": \"\", \"default\": 0 },                      { \"name\": \"dot_product_for_float_array_field1\", \"type\": [\"null\",\"float\"], \"doc\": \"\", \"default\": null },                    { \"name\": \"cosine_similarity_for_float_array_field2\", \"type\": [\"null\",\"float\"], \"doc\": \"\", \"default\": null },                    { \"name\": \"hadamard_product_for_float_array_field1\", \"type\":[\"null\",{\"type\":\"array\",\"items\":\"float\"}],\"doc\":\"\",\"default\":null },                    { \"name\": \"veniceComputationError\", \"type\": { \"type\": \"map\", \"values\": \"string\" }, \"doc\": \"\", \"default\": { } }          ]        }       ");
        RecordSerializer avroGenericSerializer = SerializerDeserializerFactory.getAvroGenericSerializer(parse);
        List asList = Arrays.asList(Float.valueOf(3.1f), Float.valueOf(4.1f));
        ArrayList arrayList = new ArrayList();
        GenericData.Record record = new GenericData.Record(parse);
        record.put("int_field", 1);
        record.put("dot_product_for_float_array_field1", Float.valueOf(1.1f));
        record.put("cosine_similarity_for_float_array_field2", Float.valueOf(2.1f));
        record.put("hadamard_product_for_float_array_field1", asList);
        record.put("veniceComputationError", Collections.emptyMap());
        ComputeResponseRecordV1 computeResponseRecordV1 = new ComputeResponseRecordV1();
        computeResponseRecordV1.keyIndex = 0;
        computeResponseRecordV1.value = ByteBuffer.wrap(avroGenericSerializer.serialize(record));
        List asList2 = Arrays.asList(Float.valueOf(3.2f), Float.valueOf(4.2f));
        GenericData.Record record2 = new GenericData.Record(parse);
        record2.put("int_field", 2);
        record2.put("dot_product_for_float_array_field1", Float.valueOf(1.2f));
        record2.put("cosine_similarity_for_float_array_field2", Float.valueOf(2.2f));
        record2.put("hadamard_product_for_float_array_field1", asList2);
        record2.put("veniceComputationError", Collections.emptyMap());
        ComputeResponseRecordV1 computeResponseRecordV12 = new ComputeResponseRecordV1();
        computeResponseRecordV12.keyIndex = 1;
        computeResponseRecordV12.value = ByteBuffer.wrap(avroGenericSerializer.serialize(record2));
        arrayList.add(computeResponseRecordV1);
        arrayList.add(computeResponseRecordV12);
        SimpleStoreClient simpleStoreClient = new SimpleStoreClient(new ParameterizedComputeTransportClient(Optional.of(SerializerDeserializerFactory.getAvroGenericSerializer(ComputeResponseRecordV1.SCHEMA$).serializeObjects(arrayList)), Optional.empty()), "test_store", true, AbstractAvroStoreClient.getDefaultDeserializationExecutor());
        MetricsRepository metricsRepository = new MetricsRepository();
        Map map = (Map) simpleStoreClient.compute(Optional.of(ClientStats.getClientStats(metricsRepository, "test_store", RequestType.COMPUTE, (ClientConfig) null)), Optional.of(ClientStats.getClientStats(metricsRepository, "test_store", RequestType.COMPUTE_STREAMING, (ClientConfig) null)), 0L).project(new String[]{"int_field"}).dotProduct("float_array_field1", dotProductParam, "dot_product_for_float_array_field1").cosineSimilarity("float_array_field2", cosineSimilarityParam, "cosine_similarity_for_float_array_field2").hadamardProduct("float_array_field1", hadamardProductParam, "hadamard_product_for_float_array_field1").execute(keys).get();
        Assert.assertEquals(map.size(), 2);
        Assert.assertNotNull(map.get("key1"));
        ComputeGenericRecord computeGenericRecord = (ComputeGenericRecord) map.get("key1");
        Assert.assertEquals(computeGenericRecord.getValueSchema(), VALUE_SCHEMA);
        Assert.assertEquals(computeGenericRecord.get("int_field"), 1);
        Assert.assertEquals(computeGenericRecord.get("dot_product_for_float_array_field1"), Float.valueOf(1.1f));
        Assert.assertEquals(computeGenericRecord.get("cosine_similarity_for_float_array_field2"), Float.valueOf(2.1f));
        Assert.assertEquals(computeGenericRecord.get("hadamard_product_for_float_array_field1"), asList);
        Assert.assertNotNull(map.get("key2"));
        ComputeGenericRecord computeGenericRecord2 = (ComputeGenericRecord) map.get("key2");
        Assert.assertEquals(computeGenericRecord2.getValueSchema(), VALUE_SCHEMA);
        Assert.assertEquals(computeGenericRecord2.get("int_field"), 2);
        Assert.assertEquals(computeGenericRecord2.get("dot_product_for_float_array_field1"), Float.valueOf(1.2f));
        Assert.assertEquals(computeGenericRecord2.get("cosine_similarity_for_float_array_field2"), Float.valueOf(2.2f));
        Assert.assertEquals(computeGenericRecord2.get("hadamard_product_for_float_array_field1"), asList2);
    }

    @Test
    public void testComputeFailure() throws ExecutionException, InterruptedException {
        Schema parse = Schema.parse("{  \"type\": \"record\",          \"name\": \"test_store_VeniceComputeResult\",         \"doc\": \"\",                            \"fields\": [                 { \"name\": \"int_field\", \"type\": \"int\", \"doc\": \"\", \"default\": 0 },                      { \"name\": \"dot_product_for_float_array_field1\", \"type\": [\"null\",\"float\"], \"doc\": \"\", \"default\": null },                    { \"name\": \"cosine_similarity_for_float_array_field2\", \"type\": [\"null\",\"float\"], \"doc\": \"\", \"default\": null },                    { \"name\": \"veniceComputationError\", \"type\": { \"type\": \"map\", \"values\": \"string\" }, \"doc\": \"\", \"default\": { } }          ]        }       ");
        RecordSerializer avroGenericSerializer = SerializerDeserializerFactory.getAvroGenericSerializer(parse);
        ArrayList arrayList = new ArrayList();
        GenericData.Record record = new GenericData.Record(parse);
        record.put("int_field", 1);
        record.put("dot_product_for_float_array_field1", Float.valueOf(0.0f));
        record.put("cosine_similarity_for_float_array_field2", Float.valueOf(0.0f));
        HashMap hashMap = new HashMap();
        hashMap.put("dot_product_for_float_array_field1", "array length are different");
        hashMap.put("cosine_similarity_for_float_array_field2", "NullPointerException");
        record.put("veniceComputationError", hashMap);
        ComputeResponseRecordV1 computeResponseRecordV1 = new ComputeResponseRecordV1();
        computeResponseRecordV1.keyIndex = 0;
        computeResponseRecordV1.value = ByteBuffer.wrap(avroGenericSerializer.serialize(record));
        arrayList.add(computeResponseRecordV1);
        SimpleStoreClient simpleStoreClient = new SimpleStoreClient(new ParameterizedComputeTransportClient(Optional.of(SerializerDeserializerFactory.getAvroGenericSerializer(ComputeResponseRecordV1.SCHEMA$).serializeObjects(arrayList)), Optional.empty()), "test_store", true, AbstractAvroStoreClient.getDefaultDeserializationExecutor());
        MetricsRepository metricsRepository = new MetricsRepository();
        Map map = (Map) simpleStoreClient.compute(Optional.of(ClientStats.getClientStats(metricsRepository, "test_store", RequestType.COMPUTE, (ClientConfig) null)), Optional.of(ClientStats.getClientStats(metricsRepository, "test_store", RequestType.COMPUTE_STREAMING, (ClientConfig) null)), 0L).project(new String[]{"int_field"}).dotProduct("float_array_field1", dotProductParam, "dot_product_for_float_array_field1").cosineSimilarity("float_array_field2", cosineSimilarityParam, "cosine_similarity_for_float_array_field2").execute(keys).get();
        Assert.assertEquals(map.size(), 1);
        Assert.assertNotNull(map.get("key1"));
        GenericRecord genericRecord = (GenericRecord) map.get("key1");
        Assert.assertEquals(1, genericRecord.get("int_field"));
        try {
            genericRecord.get("dot_product_for_float_array_field1");
            Assert.fail("An exception should be thrown when retrieving a failed computation result");
        } catch (Exception e) {
            Assert.fail("Only VeniceException should be thrown");
        } catch (VeniceException e2) {
            Assert.assertTrue(e2.getMessage().contains("computing this field: dot_product_for_float_array_field1, error message: array length are different"), "Error message doesn't contain: [computing this field: dot_product_for_float_array_field1, error message: array length are different], and received message is :" + e2.getMessage());
        }
        try {
            genericRecord.get("cosine_similarity_for_float_array_field2");
            Assert.fail("An exception should be thrown for the failed cosine similarity computation");
        } catch (Exception e3) {
            Assert.fail("Only VeniceException should be thrown");
        } catch (VeniceException e4) {
            Assert.assertTrue(e4.getMessage().contains("computing this field: cosine_similarity_for_float_array_field2, error message: NullPointerException"), "Error message doesn't contain: [computing this field: cosine_similarity_for_float_array_field2, error message: NullPointerException], and received message is :" + e4.getMessage());
        }
        Assert.assertNull(map.get("key2"));
    }

    @Test(timeOut = 3000, expectedExceptions = {ExecutionException.class}, expectedExceptionsMessageRegExp = ".*mock_exception.*")
    public void testComputeReceiveNon200Response() throws ExecutionException, InterruptedException {
        new SimpleStoreClient(new ParameterizedComputeTransportClient(Optional.empty(), Optional.of(new VeniceClientException("mock_exception"))), "test_store", true, AbstractAvroStoreClient.getDefaultDeserializationExecutor()).compute().project(new String[]{"int_field"}).execute(keys).get();
    }

    @Test
    public void testStoreInitAsyncRetry() {
        SimpleStoreClient simpleStoreClient = new SimpleStoreClient(new TransportClient() { // from class: com.linkedin.venice.client.store.AbstractAvroStoreClientTest.1
            private final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance();
            int retryCnt = 0;
            int totalFailedRetryCnt = 50;

            public CompletableFuture<TransportClientResponse> get(String str, Map<String, String> map) {
                CompletableFuture<TransportClientResponse> completableFuture = new CompletableFuture<>();
                if (!str.contains("key_schema")) {
                    if (str.contains("storage")) {
                        completableFuture.complete(null);
                    } else {
                        completableFuture.completeExceptionally(new VeniceException("Fake request failure for path: " + str));
                    }
                    return completableFuture;
                }
                int i = this.retryCnt + 1;
                this.retryCnt = i;
                if (i <= this.totalFailedRetryCnt) {
                    completableFuture.completeExceptionally(new VeniceException("Fake request failure for key schema request"));
                } else {
                    SchemaResponse schemaResponse = new SchemaResponse();
                    schemaResponse.setSchemaStr("\"string\"");
                    schemaResponse.setId(1);
                    try {
                        completableFuture.complete(new TransportClientResponse(-1, CompressionStrategy.NO_OP, this.OBJECT_MAPPER.writeValueAsBytes(schemaResponse)));
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
                return completableFuture;
            }

            public CompletableFuture<TransportClientResponse> post(String str, Map<String, String> map, byte[] bArr) {
                return null;
            }

            public void streamPost(String str, Map<String, String> map, byte[] bArr, TransportClientStreamingCallback transportClientStreamingCallback, int i) {
            }

            public void close() throws IOException {
            }
        }, "test_store_init_retry", true, AbstractAvroStoreClient.getDefaultDeserializationExecutor(), false);
        simpleStoreClient.setAsyncStoreInitSleepIntervalMs(1L);
        simpleStoreClient.start();
        String str = "test_key";
        Assert.assertTrue(Assert.expectThrows(VeniceException.class, () -> {
            simpleStoreClient.get(str);
        }).getMessage().contains("Failed to initializing Venice Client"));
        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
            try {
                simpleStoreClient.get(str).get();
            } catch (Exception e) {
                Assert.fail("Failed to get key: " + str);
            }
        });
        simpleStoreClient.close();
    }

    static {
        keys.add("key1");
        keys.add("key2");
        dotProductParam = Arrays.asList(Float.valueOf(0.1f), Float.valueOf(0.2f));
        cosineSimilarityParam = Arrays.asList(Float.valueOf(0.3f), Float.valueOf(0.4f));
        hadamardProductParam = Arrays.asList(Float.valueOf(0.5f), Float.valueOf(0.6f));
    }
}
