package com.linkedin.venice.endToEnd;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.d2.balancer.D2ClientBuilder;
import com.linkedin.davinci.client.AvroGenericDaVinciClient;
import com.linkedin.davinci.client.DaVinciClient;
import com.linkedin.davinci.client.DaVinciConfig;
import com.linkedin.davinci.client.NonLocalAccessPolicy;
import com.linkedin.davinci.client.StorageClass;
import com.linkedin.davinci.client.factory.CachingDaVinciClientFactory;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.client.store.ComputeGenericRecord;
import com.linkedin.venice.client.store.predicate.Predicate;
import com.linkedin.venice.client.store.predicate.PredicateBuilder;
import com.linkedin.venice.client.store.streaming.StreamingCallback;
import com.linkedin.venice.compute.ComputeOperationUtils;
import com.linkedin.venice.integration.utils.DaVinciTestContext;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import io.tehuti.metrics.MetricsRepository;
import java.io.Closeable;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/endToEnd/DaVinciComputeTest.class */
public class DaVinciComputeTest {
    private static final Logger LOGGER = LogManager.getLogger(DaVinciComputeTest.class);
    private static final int TEST_TIMEOUT = 120000;
    private VeniceClusterWrapper cluster;
    private D2Client d2Client;
    private final List<Float> mfEmbedding = generateRandomFloatList(100);
    private final List<Float> companiesEmbedding = generateRandomFloatList(100);
    private final List<Float> pymkCosineSimilarityEmbedding = generateRandomFloatList(100);
    private static final String KEY_PREFIX = "key_";
    private static final String VALUE_PREFIX = "id_";
    private static final int MAX_KEY_LIMIT = 1000;
    private static final String NON_EXISTING_KEY1 = "a_unknown_key";
    private static final String NON_EXISTING_KEY2 = "z_unknown_key";
    private static final int NON_EXISTING_KEY_NUM = 2;
    private static final String VALUE_SCHEMA_FOR_COMPUTE = "{  \"namespace\": \"example.compute\",      \"type\": \"record\",          \"name\": \"MemberFeature\",         \"fields\": [                 { \"name\": \"id\", \"type\": \"string\" },                      { \"name\": \"name\", \"type\": \"string\" },                    {   \"default\": [], \n  \"name\": \"companiesEmbedding\",  \"type\": {  \"items\": \"float\",  \"type\": \"array\"   }  },          { \"name\": \"member_feature\", \"type\": { \"type\": \"array\", \"items\": \"float\" } }          ]        }       ";
    private static final String VALUE_SCHEMA_FOR_COMPUTE_MISSING_FIELD = "{  \"namespace\": \"example.compute\",      \"type\": \"record\",          \"name\": \"MemberFeature\",         \"fields\": [                 { \"name\": \"id\", \"type\": \"string\" },                      { \"name\": \"name\", \"type\": \"string\" },                    { \"name\": \"member_feature\", \"type\": { \"type\": \"array\", \"items\": \"float\" } }          ]        }       ";
    private static final String VALUE_SCHEMA_FOR_COMPUTE_NULLABLE_LIST_FIELD = "{  \"namespace\": \"example.compute\",      \"type\": \"record\",          \"name\": \"MemberFeature\",         \"fields\": [           {\"name\": \"id\", \"type\": \"string\" },                {\"name\": \"name\", \"type\": \"string\" },              {\"name\": \"member_feature\", \"type\": [\"null\",{\"type\":\"array\",\"items\":\"float\"}],\"default\": null}  ]  }  ";
    private static final String VALUE_SCHEMA_FOR_COMPUTE_SWAPPED = "{  \"namespace\": \"example.compute\",      \"type\": \"record\",          \"name\": \"MemberFeature\",         \"fields\": [                 { \"name\": \"id\", \"type\": \"string\" },                      { \"name\": \"name\", \"type\": \"string\" },                    { \"name\": \"member_feature\", \"type\": { \"type\": \"array\", \"items\": \"float\" } },                 {   \"default\": [], \n  \"name\": \"companiesEmbedding\",  \"type\": {  \"items\": \"float\",  \"type\": \"array\"   }  }   ]        }       ";
    private static final String KEY_SCHEMA_STEAMING_COMPUTE = "\"string\"";
    private static final String VALUE_SCHEMA_STREAMING_COMPUTE = "{\n\"type\": \"record\",\n\"name\": \"test_value_schema\",\n\"fields\": [\n  {\"name\": \"int_field\", \"type\": \"int\"},\n  {\"name\": \"float_field\", \"type\": \"float\"}\n]\n}";
    private static final String KEY_SCHEMA_PARTIAL_KEY_LOOKUP = "{\"type\":\"record\",\"name\":\"KeyRecord\",\"namespace\":\"example.partialKeyLookup\",\"fields\":[    {\"name\":\"id\",\"type\":\"string\"},   {\"name\":\"companyId\",\"type\":\"int\"},    {\"name\":\"name\",\"type\":\"string\"}  ]}";

    @BeforeClass
    public void setUp() {
        Utils.thisIsLocalhost();
        Properties properties = new Properties();
        properties.put("server.promotion.to.leader.replica.delay.seconds", 1L);
        this.cluster = ServiceFactory.getVeniceCluster(1, NON_EXISTING_KEY_NUM, 1, 1, 100, false, false, properties);
        this.d2Client = new D2ClientBuilder().setZkHosts(this.cluster.getZk().getAddress()).setZkSessionTimeout(3L, TimeUnit.SECONDS).setZkStartupTimeout(3L, TimeUnit.SECONDS).build();
        D2ClientUtils.startClient(this.d2Client);
    }

    @AfterClass
    public void cleanUp() {
        if (this.d2Client != null) {
            D2ClientUtils.shutdownClient(this.d2Client);
        }
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.cluster});
    }

    @AfterMethod
    public void verifyPostConditions(Method method) {
        try {
            Assert.assertThrows(NullPointerException.class, AvroGenericDaVinciClient::getBackend);
        } catch (AssertionError e) {
            throw new AssertionError(method.getName() + " leaked DaVinciBackend.", e);
        }
    }

    public void testComputeOnStoreWithQTFDScompliantSchema() throws Exception {
        String uniqueString = Utils.getUniqueString("store");
        this.cluster.useControllerClient(controllerClient -> {
            TestUtils.assertCommand(controllerClient.createNewStore(uniqueString, getClass().getName(), "\"int\"", VALUE_SCHEMA_FOR_COMPUTE_NULLABLE_LIST_FIELD));
        });
        this.cluster.createMetaSystemStore(uniqueString);
        String kafkaTopic = this.cluster.getNewVersion(uniqueString).getKafkaTopic();
        VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory(this.cluster.getKafka().getAddress());
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer = new VeniceAvroKafkaSerializer("\"int\"");
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer2 = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_FOR_COMPUTE_NULLABLE_LIST_FIELD);
        HashSet<Integer> hashSet = new HashSet<Integer>() { // from class: com.linkedin.venice.endToEnd.DaVinciComputeTest.1
            {
                add(1);
                add(Integer.valueOf(DaVinciComputeTest.NON_EXISTING_KEY_NUM));
            }
        };
        DaVinciTestContext genericAvroDaVinciFactoryAndClientWithRetries = ServiceFactory.getGenericAvroDaVinciFactoryAndClientWithRetries(this.d2Client, new MetricsRepository(), Optional.empty(), this.cluster.getZk().getAddress(), uniqueString, new DaVinciConfig(), TestUtils.getIngestionIsolationPropertyMap());
        VeniceWriter<Object, Object, byte[]> createVeniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(kafkaTopic).setKeySerializer(veniceAvroKafkaSerializer).setValueSerializer(veniceAvroKafkaSerializer2).build());
        try {
            CachingDaVinciClientFactory daVinciClientFactory = genericAvroDaVinciFactoryAndClientWithRetries.getDaVinciClientFactory();
            try {
                DaVinciClient daVinciClient = genericAvroDaVinciFactoryAndClientWithRetries.getDaVinciClient();
                try {
                    Schema parse = Schema.parse(VALUE_SCHEMA_FOR_COMPUTE_NULLABLE_LIST_FIELD);
                    List asList = Arrays.asList(Float.valueOf(1.0f), Float.valueOf(2.0f), Float.valueOf(3.0f));
                    GenericData.Record record = new GenericData.Record(parse);
                    GenericData.Record record2 = new GenericData.Record(parse);
                    record.put("id", "1");
                    record.put("name", "companiesEmbedding");
                    record.put("member_feature", asList);
                    record2.put("id", "2");
                    record2.put("name", "companiesEmbedding");
                    record2.put("member_feature", (Object) null);
                    HashMap hashMap = new HashMap(NON_EXISTING_KEY_NUM);
                    hashMap.put(1, record);
                    hashMap.put(Integer.valueOf(NON_EXISTING_KEY_NUM), record2);
                    pushRecordsToStore(hashMap, createVeniceWriter, 1);
                    daVinciClient.subscribeAll().get();
                    Consumer consumer = map -> {
                        map.forEach((num, computeGenericRecord) -> {
                            Assert.assertEquals(((HashMap) computeGenericRecord.get("__veniceComputationError__")).size(), 0);
                        });
                        Assert.assertNull(((ComputeGenericRecord) map.get(Integer.valueOf(NON_EXISTING_KEY_NUM))).get("dot_product_result"));
                        Assert.assertNull(((ComputeGenericRecord) map.get(Integer.valueOf(NON_EXISTING_KEY_NUM))).get("hadamard_product_result"));
                        Assert.assertNull(((ComputeGenericRecord) map.get(Integer.valueOf(NON_EXISTING_KEY_NUM))).get("cosine_similarity_result"));
                        Assert.assertEquals(((ComputeGenericRecord) map.get(1)).get("dot_product_result"), Float.valueOf(ComputeOperationUtils.dotProduct(asList, asList)));
                        Assert.assertEquals(((ComputeGenericRecord) map.get(1)).get("hadamard_product_result"), ComputeOperationUtils.hadamardProduct(asList, asList));
                        Assert.assertEquals(((ComputeGenericRecord) map.get(1)).get("cosine_similarity_result"), Float.valueOf(1.0f));
                    };
                    consumer.accept((Map) daVinciClient.compute().dotProduct("member_feature", asList, "dot_product_result").hadamardProduct("member_feature", asList, "hadamard_product_result").cosineSimilarity("member_feature", asList, "cosine_similarity_result").execute(hashSet).get());
                    consumer.accept((Map) daVinciClient.compute().dotProduct("member_feature", asList, "dot_product_result").hadamardProduct("member_feature", asList, "hadamard_product_result").cosineSimilarity("member_feature", asList, "cosine_similarity_result").streamingExecute(hashSet).get());
                    daVinciClient.unsubscribeAll();
                    if (daVinciClient != null) {
                        daVinciClient.close();
                    }
                    if (daVinciClientFactory != null) {
                        daVinciClientFactory.close();
                    }
                    if (createVeniceWriter != null) {
                        createVeniceWriter.close();
                    }
                } catch (Throwable th) {
                    if (daVinciClient != null) {
                        try {
                            daVinciClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (createVeniceWriter != null) {
                try {
                    createVeniceWriter.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test(timeOut = 240000)
    public void testReadComputeMissingField() throws Exception {
        String uniqueString = Utils.getUniqueString("store");
        this.cluster.useControllerClient(controllerClient -> {
            TestUtils.assertCommand(controllerClient.createNewStore(uniqueString, getClass().getName(), "\"int\"", VALUE_SCHEMA_FOR_COMPUTE));
        });
        this.cluster.createMetaSystemStore(uniqueString);
        String kafkaTopic = this.cluster.getNewVersion(uniqueString).getKafkaTopic();
        VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory(this.cluster.getKafka().getAddress());
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer = new VeniceAvroKafkaSerializer("\"int\"");
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer2 = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_FOR_COMPUTE);
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer3 = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_FOR_COMPUTE_MISSING_FIELD);
        HashSet<Integer> hashSet = new HashSet<Integer>() { // from class: com.linkedin.venice.endToEnd.DaVinciComputeTest.2
            {
                add(1);
                add(Integer.valueOf(DaVinciComputeTest.NON_EXISTING_KEY_NUM));
            }
        };
        DaVinciTestContext genericAvroDaVinciFactoryAndClientWithRetries = ServiceFactory.getGenericAvroDaVinciFactoryAndClientWithRetries(this.d2Client, new MetricsRepository(), Optional.empty(), this.cluster.getZk().getAddress(), uniqueString, new DaVinciConfig(), TestUtils.getIngestionIsolationPropertyMap());
        VeniceWriter<Object, Object, byte[]> createVeniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(kafkaTopic).setKeySerializer(veniceAvroKafkaSerializer).setValueSerializer(veniceAvroKafkaSerializer2).build());
        try {
            CachingDaVinciClientFactory daVinciClientFactory = genericAvroDaVinciFactoryAndClientWithRetries.getDaVinciClientFactory();
            try {
                DaVinciClient daVinciClient = genericAvroDaVinciFactoryAndClientWithRetries.getDaVinciClient();
                try {
                    pushSyntheticDataToStore(createVeniceWriter, VALUE_SCHEMA_FOR_COMPUTE, false, 1, 100);
                    daVinciClient.subscribeAll().get();
                    ((Map) daVinciClient.compute().cosineSimilarity("companiesEmbedding", this.pymkCosineSimilarityEmbedding, "companiesEmbedding_score").cosineSimilarity("member_feature", this.pymkCosineSimilarityEmbedding, "member_feature_score").execute(hashSet).get()).forEach((num, computeGenericRecord) -> {
                        Assert.assertEquals(((HashMap) computeGenericRecord.get("__veniceComputationError__")).size(), 0);
                    });
                    ((Map) daVinciClient.compute().cosineSimilarity("companiesEmbedding", this.pymkCosineSimilarityEmbedding, "companiesEmbedding_score").cosineSimilarity("member_feature", this.pymkCosineSimilarityEmbedding, "member_feature_score").streamingExecute(hashSet).get()).forEach((num2, computeGenericRecord2) -> {
                        Assert.assertEquals(((HashMap) computeGenericRecord2.get("__veniceComputationError__")).size(), 0);
                    });
                    daVinciClient.unsubscribeAll();
                    if (daVinciClient != null) {
                        daVinciClient.close();
                    }
                    if (daVinciClientFactory != null) {
                        daVinciClientFactory.close();
                    }
                    if (createVeniceWriter != null) {
                        createVeniceWriter.close();
                    }
                    this.cluster.useControllerClient(controllerClient2 -> {
                        TestUtils.assertCommand(controllerClient2.addValueSchema(uniqueString, VALUE_SCHEMA_FOR_COMPUTE_MISSING_FIELD));
                    });
                    String kafkaTopic2 = this.cluster.getNewVersion(uniqueString).getKafkaTopic();
                    DaVinciTestContext genericAvroDaVinciFactoryAndClientWithRetries2 = ServiceFactory.getGenericAvroDaVinciFactoryAndClientWithRetries(this.d2Client, new MetricsRepository(), Optional.empty(), this.cluster.getZk().getAddress(), uniqueString, new DaVinciConfig(), TestUtils.getIngestionIsolationPropertyMap());
                    createVeniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(kafkaTopic2).setKeySerializer(veniceAvroKafkaSerializer).setValueSerializer(veniceAvroKafkaSerializer3).build());
                    try {
                        CachingDaVinciClientFactory daVinciClientFactory2 = genericAvroDaVinciFactoryAndClientWithRetries2.getDaVinciClientFactory();
                        try {
                            daVinciClient = genericAvroDaVinciFactoryAndClientWithRetries2.getDaVinciClient();
                            try {
                                pushSyntheticDataToStore(createVeniceWriter, VALUE_SCHEMA_FOR_COMPUTE_MISSING_FIELD, true, NON_EXISTING_KEY_NUM, 100);
                                daVinciClient.subscribeAll().get();
                                ((Map) daVinciClient.compute().cosineSimilarity("companiesEmbedding", this.pymkCosineSimilarityEmbedding, "companiesEmbedding_score").cosineSimilarity("member_feature", this.pymkCosineSimilarityEmbedding, "member_feature_score").execute(hashSet).get()).forEach((num3, computeGenericRecord3) -> {
                                    Assert.assertEquals(((HashMap) computeGenericRecord3.get("__veniceComputationError__")).size(), 1);
                                });
                                ((Map) daVinciClient.compute().cosineSimilarity("companiesEmbedding", this.pymkCosineSimilarityEmbedding, "companiesEmbedding_score").cosineSimilarity("member_feature", this.pymkCosineSimilarityEmbedding, "member_feature_score").streamingExecute(hashSet).get()).forEach((num4, computeGenericRecord4) -> {
                                    Assert.assertEquals(((HashMap) computeGenericRecord4.get("__veniceComputationError__")).size(), 1);
                                });
                                daVinciClient.unsubscribeAll();
                                if (daVinciClient != null) {
                                    daVinciClient.close();
                                }
                                if (daVinciClientFactory2 != null) {
                                    daVinciClientFactory2.close();
                                }
                                if (createVeniceWriter != null) {
                                    createVeniceWriter.close();
                                }
                            } finally {
                            }
                        } finally {
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (Throwable th) {
                if (daVinciClientFactory != null) {
                    try {
                        daVinciClientFactory.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } finally {
        }
    }

    @Test(timeOut = 360000)
    public void testReadComputeSwappedFields() throws Exception {
        String uniqueString = Utils.getUniqueString("store");
        this.cluster.useControllerClient(controllerClient -> {
            TestUtils.assertCommand(controllerClient.createNewStore(uniqueString, getClass().getName(), "\"int\"", VALUE_SCHEMA_FOR_COMPUTE_SWAPPED));
        });
        this.cluster.createMetaSystemStore(uniqueString);
        String kafkaTopic = this.cluster.getNewVersion(uniqueString).getKafkaTopic();
        VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory(this.cluster.getKafka().getAddress());
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer = new VeniceAvroKafkaSerializer("\"int\"");
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer2 = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_FOR_COMPUTE);
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer3 = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_FOR_COMPUTE_SWAPPED);
        HashSet<Integer> hashSet = new HashSet<Integer>() { // from class: com.linkedin.venice.endToEnd.DaVinciComputeTest.3
            {
                add(1);
                add(Integer.valueOf(DaVinciComputeTest.NON_EXISTING_KEY_NUM));
            }
        };
        DaVinciTestContext genericAvroDaVinciFactoryAndClientWithRetries = ServiceFactory.getGenericAvroDaVinciFactoryAndClientWithRetries(this.d2Client, new MetricsRepository(), Optional.empty(), this.cluster.getZk().getAddress(), uniqueString, new DaVinciConfig(), TestUtils.getIngestionIsolationPropertyMap());
        VeniceWriter<Object, Object, byte[]> createVeniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(kafkaTopic).setKeySerializer(veniceAvroKafkaSerializer).setValueSerializer(veniceAvroKafkaSerializer2).build());
        try {
            CachingDaVinciClientFactory daVinciClientFactory = genericAvroDaVinciFactoryAndClientWithRetries.getDaVinciClientFactory();
            try {
                DaVinciClient daVinciClient = genericAvroDaVinciFactoryAndClientWithRetries.getDaVinciClient();
                try {
                    pushSyntheticDataToStore(createVeniceWriter, VALUE_SCHEMA_FOR_COMPUTE_SWAPPED, false, 1, 100);
                    daVinciClient.subscribeAll().get();
                    ((Map) daVinciClient.compute().cosineSimilarity("companiesEmbedding", this.pymkCosineSimilarityEmbedding, "companiesEmbedding_score").cosineSimilarity("member_feature", this.pymkCosineSimilarityEmbedding, "member_feature_score").execute(hashSet).get()).forEach((num, computeGenericRecord) -> {
                        Assert.assertEquals(((HashMap) computeGenericRecord.get("__veniceComputationError__")).size(), 0);
                    });
                    daVinciClient.unsubscribeAll();
                    if (daVinciClient != null) {
                        daVinciClient.close();
                    }
                    if (daVinciClientFactory != null) {
                        daVinciClientFactory.close();
                    }
                    if (createVeniceWriter != null) {
                        createVeniceWriter.close();
                    }
                    this.cluster.useControllerClient(controllerClient2 -> {
                        TestUtils.assertCommand(controllerClient2.addValueSchema(uniqueString, VALUE_SCHEMA_FOR_COMPUTE_MISSING_FIELD));
                    });
                    String kafkaTopic2 = this.cluster.getNewVersion(uniqueString).getKafkaTopic();
                    DaVinciTestContext genericAvroDaVinciFactoryAndClientWithRetries2 = ServiceFactory.getGenericAvroDaVinciFactoryAndClientWithRetries(this.d2Client, new MetricsRepository(), Optional.empty(), this.cluster.getZk().getAddress(), uniqueString, new DaVinciConfig(), TestUtils.getIngestionIsolationPropertyMap());
                    createVeniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(kafkaTopic2).setKeySerializer(veniceAvroKafkaSerializer).setValueSerializer(veniceAvroKafkaSerializer3).build());
                    try {
                        CachingDaVinciClientFactory daVinciClientFactory2 = genericAvroDaVinciFactoryAndClientWithRetries2.getDaVinciClientFactory();
                        try {
                            daVinciClient = genericAvroDaVinciFactoryAndClientWithRetries2.getDaVinciClient();
                            try {
                                pushSyntheticDataToStore(createVeniceWriter, VALUE_SCHEMA_FOR_COMPUTE_SWAPPED, false, NON_EXISTING_KEY_NUM, 100);
                                daVinciClient.subscribeAll().get();
                                ((Map) daVinciClient.compute().cosineSimilarity("member_feature", this.pymkCosineSimilarityEmbedding, "member_feature_score").execute(hashSet).get()).forEach((num2, computeGenericRecord2) -> {
                                    Assert.assertEquals(((HashMap) computeGenericRecord2.get("__veniceComputationError__")).size(), 0);
                                });
                                daVinciClient.unsubscribeAll();
                                if (daVinciClient != null) {
                                    daVinciClient.close();
                                }
                                if (daVinciClientFactory2 != null) {
                                    daVinciClientFactory2.close();
                                }
                                if (createVeniceWriter != null) {
                                    createVeniceWriter.close();
                                }
                            } finally {
                            }
                        } finally {
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (Throwable th) {
                if (daVinciClientFactory != null) {
                    try {
                        daVinciClientFactory.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } finally {
        }
    }

    @Test(timeOut = 240000)
    public void testComputeStreamingExecute() throws ExecutionException, InterruptedException {
        String uniqueString = Utils.getUniqueString("store");
        this.cluster.useControllerClient(controllerClient -> {
            TestUtils.assertCommand(controllerClient.createNewStore(uniqueString, getClass().getName(), KEY_SCHEMA_STEAMING_COMPUTE, VALUE_SCHEMA_STREAMING_COMPUTE));
        });
        this.cluster.createMetaSystemStore(uniqueString);
        String kafkaTopic = this.cluster.getNewVersion(uniqueString).getKafkaTopic();
        VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory(this.cluster.getKafka().getAddress());
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer = new VeniceAvroKafkaSerializer(KEY_SCHEMA_STEAMING_COMPUTE);
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer2 = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_STREAMING_COMPUTE);
        TreeSet treeSet = new TreeSet();
        treeSet.add(NON_EXISTING_KEY1);
        for (int i = 0; i < 998; i++) {
            treeSet.add(KEY_PREFIX + i);
        }
        treeSet.add(NON_EXISTING_KEY2);
        DaVinciConfig daVinciConfig = new DaVinciConfig();
        daVinciConfig.setNonLocalAccessPolicy(NonLocalAccessPolicy.QUERY_VENICE);
        DaVinciTestContext genericAvroDaVinciFactoryAndClientWithRetries = ServiceFactory.getGenericAvroDaVinciFactoryAndClientWithRetries(this.d2Client, new MetricsRepository(), Optional.empty(), this.cluster.getZk().getAddress(), uniqueString, daVinciConfig, TestUtils.getIngestionIsolationPropertyMap());
        VeniceWriter<Object, Object, byte[]> createVeniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(kafkaTopic).setKeySerializer(veniceAvroKafkaSerializer).setValueSerializer(veniceAvroKafkaSerializer2).build());
        try {
            DaVinciClient daVinciClient = genericAvroDaVinciFactoryAndClientWithRetries.getDaVinciClient();
            try {
                pushDataToStoreForStreamingCompute(createVeniceWriter, VALUE_SCHEMA_STREAMING_COMPUTE, 1, 10000);
                daVinciClient.subscribeAll().get();
                final AtomicInteger atomicInteger = new AtomicInteger(0);
                final VeniceConcurrentHashMap veniceConcurrentHashMap = new VeniceConcurrentHashMap();
                final CountDownLatch countDownLatch = new CountDownLatch(1);
                daVinciClient.compute().project(new String[]{"int_field"}).streamingExecute(treeSet, new StreamingCallback<String, ComputeGenericRecord>() { // from class: com.linkedin.venice.endToEnd.DaVinciComputeTest.4
                    public void onRecordReceived(String str, ComputeGenericRecord computeGenericRecord) {
                        atomicInteger.incrementAndGet();
                        if (computeGenericRecord != null) {
                            veniceConcurrentHashMap.put(str, computeGenericRecord);
                        }
                    }

                    public void onCompletion(Optional<Exception> optional) {
                        countDownLatch.countDown();
                        optional.ifPresent(exc -> {
                            Assert.fail("Exception: " + exc + " is not expected");
                        });
                    }
                });
                countDownLatch.await();
                Assert.assertEquals(atomicInteger.get(), MAX_KEY_LIMIT);
                Assert.assertEquals(veniceConcurrentHashMap.size(), 998);
                verifyStreamingComputeResult(veniceConcurrentHashMap);
                Map<String, ComputeGenericRecord> map = (Map) daVinciClient.compute().project(new String[]{"int_field"}).streamingExecute(treeSet).get();
                Assert.assertEquals(map.size(), 998);
                verifyStreamingComputeResult(map);
                daVinciClient.unsubscribeAll();
                if (daVinciClient != null) {
                    daVinciClient.close();
                }
                if (createVeniceWriter != null) {
                    createVeniceWriter.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createVeniceWriter != null) {
                try {
                    createVeniceWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 120000)
    public void testPartialKeyLookupWithRocksDBBlockBasedTable() throws ExecutionException, InterruptedException {
        String uniqueString = Utils.getUniqueString("store");
        this.cluster.useControllerClient(controllerClient -> {
            TestUtils.assertCommand(controllerClient.createNewStore(uniqueString, getClass().getName(), KEY_SCHEMA_PARTIAL_KEY_LOOKUP, VALUE_SCHEMA_FOR_COMPUTE));
        });
        this.cluster.createMetaSystemStore(uniqueString);
        String kafkaTopic = this.cluster.getNewVersion(uniqueString).getKafkaTopic();
        VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory(this.cluster.getKafka().getAddress());
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer = new VeniceAvroKafkaSerializer(KEY_SCHEMA_PARTIAL_KEY_LOOKUP);
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer2 = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_FOR_COMPUTE);
        MetricsRepository metricsRepository = new MetricsRepository();
        VeniceProperties build = new PropertyBuilder().put("client.use.system.store.repository", true).put("client.system.store.repository.refresh.interval.seconds", 1).put("data.base.path", Utils.getTempDataDirectory().getAbsolutePath()).put("persistence.type", PersistenceType.ROCKS_DB).build();
        VeniceWriter<GenericRecord, GenericRecord, byte[]> createVeniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(kafkaTopic).setKeySerializer(veniceAvroKafkaSerializer).setValueSerializer(veniceAvroKafkaSerializer2).build());
        try {
            CachingDaVinciClientFactory cachingDaVinciClientFactory = new CachingDaVinciClientFactory(this.d2Client, VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, metricsRepository, build);
            try {
                DaVinciClient andStartGenericAvroClient = cachingDaVinciClientFactory.getAndStartGenericAvroClient(uniqueString, new DaVinciConfig().setStorageClass(StorageClass.DISK));
                try {
                    pushSyntheticDataToStoreForPartialKeyLookup(createVeniceWriter, KEY_SCHEMA_PARTIAL_KEY_LOOKUP, VALUE_SCHEMA_FOR_COMPUTE, false, 1, 100);
                    andStartGenericAvroClient.subscribeAll().get();
                    final VeniceConcurrentHashMap veniceConcurrentHashMap = new VeniceConcurrentHashMap();
                    final CountDownLatch countDownLatch = new CountDownLatch(1);
                    Predicate and = PredicateBuilder.and(new Predicate[]{PredicateBuilder.equalTo("id", "key_abcdefgh_1"), PredicateBuilder.equalTo("companyId", 0)});
                    Schema parse = new Schema.Parser().parse(KEY_SCHEMA_PARTIAL_KEY_LOOKUP);
                    GenericData.Record record = new GenericData.Record(parse);
                    record.put("id", "key_abcdefgh_1");
                    record.put("companyId", 0);
                    record.put("name", "name_4");
                    Assert.assertNotNull((GenericRecord) andStartGenericAvroClient.get(record).get());
                    andStartGenericAvroClient.compute().project(new String[]{"id", "name", "companiesEmbedding", "member_feature"}).cosineSimilarity("companiesEmbedding", this.pymkCosineSimilarityEmbedding, "companiesEmbedding_score").cosineSimilarity("member_feature", this.pymkCosineSimilarityEmbedding, "member_feature_score").executeWithFilter(and, new StreamingCallback<GenericRecord, GenericRecord>() { // from class: com.linkedin.venice.endToEnd.DaVinciComputeTest.5
                        public void onRecordReceived(GenericRecord genericRecord, GenericRecord genericRecord2) {
                            if (genericRecord2 != null) {
                                veniceConcurrentHashMap.put(genericRecord, genericRecord2);
                            }
                        }

                        public void onCompletion(Optional<Exception> optional) {
                            countDownLatch.countDown();
                            if (optional.isPresent()) {
                                Assert.fail("Exception: " + optional.get() + " is not expected");
                            }
                        }
                    });
                    countDownLatch.await();
                    Assert.assertEquals(veniceConcurrentHashMap.size(), 16);
                    veniceConcurrentHashMap.forEach((genericRecord, genericRecord2) -> {
                        Assert.assertEquals(genericRecord.getSchema(), parse);
                        Assert.assertEquals(genericRecord.get("id").toString(), "key_abcdefgh_1");
                        Assert.assertEquals(genericRecord.get("companyId"), 0);
                        Assert.assertEquals(genericRecord2.getSchema().getFields().size(), 7);
                        Assert.assertEquals(((HashMap) genericRecord2.get("__veniceComputationError__")).size(), 0);
                    });
                    if (andStartGenericAvroClient != null) {
                        andStartGenericAvroClient.close();
                    }
                    cachingDaVinciClientFactory.close();
                    if (createVeniceWriter != null) {
                        createVeniceWriter.close();
                    }
                } catch (Throwable th) {
                    if (andStartGenericAvroClient != null) {
                        try {
                            andStartGenericAvroClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (createVeniceWriter != null) {
                try {
                    createVeniceWriter.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test(timeOut = 120000)
    public void testPartialKeyLookupWithRocksDBPlainTable() throws ExecutionException, InterruptedException {
        String uniqueString = Utils.getUniqueString("store");
        this.cluster.useControllerClient(controllerClient -> {
            TestUtils.assertCommand(controllerClient.createNewStore(uniqueString, getClass().getName(), KEY_SCHEMA_PARTIAL_KEY_LOOKUP, VALUE_SCHEMA_FOR_COMPUTE));
        });
        this.cluster.createMetaSystemStore(uniqueString);
        String kafkaTopic = this.cluster.getNewVersion(uniqueString).getKafkaTopic();
        VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory(this.cluster.getKafka().getAddress());
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer = new VeniceAvroKafkaSerializer(KEY_SCHEMA_PARTIAL_KEY_LOOKUP);
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer2 = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_FOR_COMPUTE);
        MetricsRepository metricsRepository = new MetricsRepository();
        VeniceProperties build = new PropertyBuilder().put("client.use.system.store.repository", true).put("client.system.store.repository.refresh.interval.seconds", 1).put("data.base.path", Utils.getTempDataDirectory().getAbsolutePath()).put("persistence.type", PersistenceType.ROCKS_DB).build();
        VeniceWriter<GenericRecord, GenericRecord, byte[]> createVeniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(kafkaTopic).setKeySerializer(veniceAvroKafkaSerializer).setValueSerializer(veniceAvroKafkaSerializer2).build());
        try {
            CachingDaVinciClientFactory cachingDaVinciClientFactory = new CachingDaVinciClientFactory(this.d2Client, VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, metricsRepository, build);
            try {
                DaVinciClient andStartGenericAvroClient = cachingDaVinciClientFactory.getAndStartGenericAvroClient(uniqueString, new DaVinciConfig());
                try {
                    pushSyntheticDataToStoreForPartialKeyLookup(createVeniceWriter, KEY_SCHEMA_PARTIAL_KEY_LOOKUP, VALUE_SCHEMA_FOR_COMPUTE, false, 1, 100);
                    andStartGenericAvroClient.subscribeAll().get();
                    final boolean[] zArr = {false};
                    andStartGenericAvroClient.compute().project(new String[]{"id", "name", "companiesEmbedding", "member_feature"}).executeWithFilter(PredicateBuilder.and(new Predicate[]{PredicateBuilder.equalTo("id", "key_abcdefgh_1"), PredicateBuilder.equalTo("companyId", 0)}), new StreamingCallback<GenericRecord, GenericRecord>() { // from class: com.linkedin.venice.endToEnd.DaVinciComputeTest.6
                        public void onRecordReceived(GenericRecord genericRecord, GenericRecord genericRecord2) {
                            Assert.fail("No records should have been found from the store engine.");
                        }

                        public void onCompletion(Optional<Exception> optional) {
                            zArr[0] = true;
                            Assert.assertTrue(optional.isPresent());
                            Assert.assertEquals(optional.get().getMessage(), "Get by key prefix is not supported with RocksDB PlainTable Format.");
                        }
                    });
                    Assert.assertTrue(zArr[0]);
                    final CountDownLatch countDownLatch = new CountDownLatch(1);
                    final AtomicInteger atomicInteger = new AtomicInteger(0);
                    andStartGenericAvroClient.compute().project(new String[]{"id", "name", "companiesEmbedding", "member_feature"}).executeWithFilter((Predicate) null, new StreamingCallback<GenericRecord, GenericRecord>() { // from class: com.linkedin.venice.endToEnd.DaVinciComputeTest.7
                        public void onRecordReceived(GenericRecord genericRecord, GenericRecord genericRecord2) {
                            atomicInteger.incrementAndGet();
                        }

                        public void onCompletion(Optional<Exception> optional) {
                            countDownLatch.countDown();
                            optional.ifPresent(exc -> {
                                Assert.fail("Exception: " + exc + " is not expected");
                            });
                        }
                    });
                    countDownLatch.await();
                    Assert.assertEquals(atomicInteger.get(), 100, "Should receive all key-value pairs");
                    if (andStartGenericAvroClient != null) {
                        andStartGenericAvroClient.close();
                    }
                    cachingDaVinciClientFactory.close();
                    if (createVeniceWriter != null) {
                        createVeniceWriter.close();
                    }
                } catch (Throwable th) {
                    if (andStartGenericAvroClient != null) {
                        try {
                            andStartGenericAvroClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (createVeniceWriter != null) {
                try {
                    createVeniceWriter.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    private void pushSyntheticDataToStore(VeniceWriter<Object, Object, byte[]> veniceWriter, String str, boolean z, int i, int i2) throws ExecutionException, InterruptedException {
        veniceWriter.broadcastStartOfPush(Collections.emptyMap());
        Schema parse = Schema.parse(str);
        for (int i3 = 0; i3 < i2; i3++) {
            GenericData.Record record = new GenericData.Record(parse);
            record.put("id", VALUE_PREFIX + i3);
            record.put("name", "companiesEmbedding");
            if (!z) {
                record.put("companiesEmbedding", this.companiesEmbedding);
            }
            record.put("member_feature", this.mfEmbedding);
            veniceWriter.put(Integer.valueOf(i3), record, i).get();
        }
        veniceWriter.broadcastEndOfPush(Collections.emptyMap());
    }

    private void pushRecordsToStore(Map<Integer, GenericRecord> map, VeniceWriter<Object, Object, byte[]> veniceWriter, int i) throws Exception {
        veniceWriter.broadcastStartOfPush(Collections.emptyMap());
        for (Map.Entry<Integer, GenericRecord> entry : map.entrySet()) {
            veniceWriter.put(entry.getKey(), entry.getValue(), i).get();
        }
        veniceWriter.broadcastEndOfPush(Collections.emptyMap());
    }

    private void pushDataToStoreForStreamingCompute(VeniceWriter<Object, Object, byte[]> veniceWriter, String str, int i, int i2) throws ExecutionException, InterruptedException {
        veniceWriter.broadcastStartOfPush(Collections.emptyMap());
        Schema parse = new Schema.Parser().parse(str);
        for (int i3 = 0; i3 < i2; i3++) {
            GenericData.Record record = new GenericData.Record(parse);
            record.put("int_field", Integer.valueOf(i3));
            record.put("float_field", Float.valueOf(i3 + 100.0f));
            veniceWriter.put(KEY_PREFIX + i3, record, i).get();
        }
        veniceWriter.broadcastEndOfPush(Collections.emptyMap());
    }

    private void verifyStreamingComputeResult(Map<String, ComputeGenericRecord> map) {
        for (int i = 0; i < 998; i++) {
            GenericRecord genericRecord = map.get(KEY_PREFIX + i);
            Assert.assertEquals(genericRecord.get("int_field"), Integer.valueOf(i));
            Assert.assertNull(genericRecord.get("float_field"));
        }
    }

    private void pushSyntheticDataToStoreForPartialKeyLookup(VeniceWriter<GenericRecord, GenericRecord, byte[]> veniceWriter, String str, String str2, boolean z, int i, int i2) throws ExecutionException, InterruptedException {
        veniceWriter.broadcastStartOfPush(Collections.emptyMap());
        Schema parse = new Schema.Parser().parse(str);
        Schema parse2 = new Schema.Parser().parse(str2);
        for (int i3 = 0; i3 < i2; i3++) {
            GenericData.Record record = new GenericData.Record(parse);
            record.put("id", KEY_PREFIX + "abcdefgh_" + (i3 % 3));
            record.put("companyId", Integer.valueOf(i3 % NON_EXISTING_KEY_NUM));
            record.put("name", "name_" + i3);
            GenericData.Record record2 = new GenericData.Record(parse2);
            record2.put("id", VALUE_PREFIX + i3);
            record2.put("name", "companiesEmbedding");
            if (!z) {
                record2.put("companiesEmbedding", this.companiesEmbedding);
            }
            record2.put("member_feature", this.mfEmbedding);
            veniceWriter.put(record, record2, i).get();
        }
        veniceWriter.broadcastEndOfPush(Collections.emptyMap());
    }

    private List<Float> generateRandomFloatList(int i) {
        ThreadLocalRandom current = ThreadLocalRandom.current();
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(Float.valueOf(current.nextFloat()));
        }
        return arrayList;
    }
}
