package com.linkedin.venice.storagenode;

import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.client.store.ComputeGenericRecord;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.CompressorFactory;
import com.linkedin.venice.compute.ComputeOperationUtils;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.exceptions.ErrorType;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.serialization.DefaultSerializer;
import com.linkedin.venice.serialization.VeniceKafkaSerializer;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:com/linkedin/venice/storagenode/ReadComputeValidationTest.class */
public class ReadComputeValidationTest {
    private static final String VALUE_PREFIX = "id_";
    private VeniceClusterWrapper veniceCluster;
    private String storeName;
    private String routerAddr;
    private VeniceKafkaSerializer keySerializer;
    private VeniceKafkaSerializer valueSerializer;
    private VeniceKafkaSerializer valueSerializer2;
    private VeniceKafkaSerializer valueSerializerSwapped;
    private CompressorFactory compressorFactory;
    private static final List<Float> MF_EMBEDDING = generateRandomFloatList(100);
    private static final List<Float> COMPANIES_EMBEDDING = generateRandomFloatList(100);
    private static final List<Float> PYMK_COSINE_SIMILARITY_EMBEDDING = generateRandomFloatList(100);
    private static final String VALUE_SCHEMA_FOR_COMPUTE = "{  \"namespace\": \"example.compute\",      \"type\": \"record\",          \"name\": \"MemberFeature\",         \"fields\": [                 { \"name\": \"id\", \"type\": \"string\" },                      { \"name\": \"name\", \"type\": \"string\" },                    {   \"default\": [], \n  \"name\": \"companiesEmbedding\",  \"type\": {  \"items\": \"float\",  \"type\": \"array\"   }  },          { \"name\": \"member_feature\", \"type\": { \"type\": \"array\", \"items\": \"float\" } }          ]        }       ";
    private static final String VALUE_SCHEMA_FOR_COMPUTE_2 = "{  \"namespace\": \"example.compute\",      \"type\": \"record\",          \"name\": \"MemberFeature\",         \"fields\": [                 { \"name\": \"id\", \"type\": \"string\" },                      { \"name\": \"name\", \"type\": \"string\" },                    { \"name\": \"member_feature\", \"type\": { \"type\": \"array\", \"items\": \"float\" } }          ]        }       ";
    private static final String VALUE_SCHEMA_FOR_COMPUTE_SWAPPED = "{  \"namespace\": \"example.compute\",      \"type\": \"record\",          \"name\": \"MemberFeature\",         \"fields\": [                 { \"name\": \"id\", \"type\": \"string\" },                      { \"name\": \"name\", \"type\": \"string\" },                    { \"name\": \"member_feature\", \"type\": { \"type\": \"array\", \"items\": \"float\" } },                 {   \"default\": [], \n  \"name\": \"companiesEmbedding\",  \"type\": {  \"items\": \"float\",  \"type\": \"array\"   }  }   ]        }       ";

    @BeforeClass(alwaysRun = true)
    public void setUp() throws VeniceClientException {
        this.veniceCluster = ServiceFactory.getVeniceCluster(1, 1, 0, 2, 100, false, false);
        Properties properties = new Properties();
        properties.put("server.compute.fast.avro.enabled", true);
        this.veniceCluster.addVeniceServer(new Properties(), properties);
        Properties properties2 = new Properties();
        properties2.put("router.long.tail.retry.for.single.get.threshold.ms", 1);
        properties2.put("router.long.tail.retry.for.batch.get.threshold.ms", "1-:1");
        this.veniceCluster.addVeniceRouter(properties2);
        this.routerAddr = "http://" + this.veniceCluster.getVeniceRouters().get(0).getAddress();
        this.storeName = Version.parseStoreFromKafkaTopicName(this.veniceCluster.getNewStoreVersion("\"int\"", VALUE_SCHEMA_FOR_COMPUTE).getKafkaTopic());
        this.keySerializer = new VeniceAvroKafkaSerializer("\"int\"");
        this.valueSerializer = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_FOR_COMPUTE);
        this.valueSerializer2 = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_FOR_COMPUTE_2);
        this.valueSerializerSwapped = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_FOR_COMPUTE_SWAPPED);
        this.compressorFactory = new CompressorFactory();
    }

    @AfterClass(alwaysRun = true)
    public void cleanUp() {
        if (this.veniceCluster != null) {
            this.veniceCluster.close();
        }
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.compressorFactory});
    }

    @Test
    public void testComputeMissingField() throws Exception {
        CompressionStrategy compressionStrategy = CompressionStrategy.NO_OP;
        UpdateStoreQueryParams updateStoreQueryParams = new UpdateStoreQueryParams();
        updateStoreQueryParams.setCompressionStrategy(compressionStrategy);
        updateStoreQueryParams.setReadComputationEnabled(true);
        updateStoreQueryParams.setChunkingEnabled(false);
        this.veniceCluster.updateStore(this.storeName, updateStoreQueryParams);
        VersionCreationResponse newVersion = this.veniceCluster.getNewVersion(this.storeName);
        int version = newVersion.getVersion();
        String kafkaTopic = newVersion.getKafkaTopic();
        VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory(this.veniceCluster.getKafka().getAddress());
        VeniceWriter<Object, byte[], byte[]> createVeniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(kafkaTopic).setKeySerializer(this.keySerializer).setValueSerializer(new DefaultSerializer()).setChunkingEnabled(false).build());
        try {
            AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(this.storeName).setVeniceURL(this.routerAddr).setUseFastAvro(true));
            try {
                pushSyntheticDataToStore(kafkaTopic, 100, createVeniceWriter, version, VALUE_SCHEMA_FOR_COMPUTE, this.valueSerializer, false, 1);
                HashSet hashSet = new HashSet();
                hashSet.add(1);
                hashSet.add(2);
                andStartGenericAvroClient.compute().cosineSimilarity("companiesEmbedding", PYMK_COSINE_SIMILARITY_EMBEDDING, "companiesEmbedding_score").cosineSimilarity("member_feature", PYMK_COSINE_SIMILARITY_EMBEDDING, "member_feature_score").execute(hashSet).get();
                new ControllerClient(this.veniceCluster.getClusterName(), this.veniceCluster.getRandomVeniceController().getControllerUrl()).addValueSchema(this.storeName, VALUE_SCHEMA_FOR_COMPUTE_2);
                this.veniceCluster.stopAndRestartVeniceServer(this.veniceCluster.getVeniceServers().get(0).getPort());
                VersionCreationResponse newVersion2 = this.veniceCluster.getNewVersion(this.storeName);
                int version2 = newVersion2.getVersion();
                String kafkaTopic2 = newVersion2.getKafkaTopic();
                pushSyntheticDataToStore(kafkaTopic2, 100, veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(kafkaTopic2).setKeySerializer(this.keySerializer).setValueSerializer(new DefaultSerializer()).setChunkingEnabled(false).build()), version2, VALUE_SCHEMA_FOR_COMPUTE_2, this.valueSerializer2, true, 2);
                this.veniceCluster.stopAndRestartVeniceServer(this.veniceCluster.getVeniceServers().get(0).getPort());
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                    ((Map) andStartGenericAvroClient.compute().cosineSimilarity("companiesEmbedding", PYMK_COSINE_SIMILARITY_EMBEDDING, "companiesEmbedding_score").cosineSimilarity("member_feature", PYMK_COSINE_SIMILARITY_EMBEDDING, "member_feature_score").execute(hashSet).get()).forEach((num, computeGenericRecord) -> {
                        Assert.assertEquals(((HashMap) computeGenericRecord.get("__veniceComputationError__")).size(), 1);
                    });
                });
                if (andStartGenericAvroClient != null) {
                    andStartGenericAvroClient.close();
                }
                if (createVeniceWriter != null) {
                    createVeniceWriter.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createVeniceWriter != null) {
                try {
                    createVeniceWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testComputeSwappedFields() throws Exception {
        CompressionStrategy compressionStrategy = CompressionStrategy.NO_OP;
        UpdateStoreQueryParams updateStoreQueryParams = new UpdateStoreQueryParams();
        updateStoreQueryParams.setCompressionStrategy(compressionStrategy);
        updateStoreQueryParams.setReadComputationEnabled(true);
        updateStoreQueryParams.setChunkingEnabled(false);
        this.veniceCluster.updateStore(this.storeName, updateStoreQueryParams);
        VersionCreationResponse newVersion = this.veniceCluster.getNewVersion(this.storeName);
        int version = newVersion.getVersion();
        String kafkaTopic = newVersion.getKafkaTopic();
        VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory(this.veniceCluster.getKafka().getAddress());
        VeniceWriter<Object, byte[], byte[]> createVeniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(kafkaTopic).setKeySerializer(this.keySerializer).build());
        try {
            AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(this.storeName).setVeniceURL(this.routerAddr).setUseFastAvro(true));
            try {
                pushSyntheticDataToStore(kafkaTopic, 100, createVeniceWriter, version, VALUE_SCHEMA_FOR_COMPUTE_SWAPPED, this.valueSerializer, false, 1);
                HashSet hashSet = new HashSet();
                hashSet.add(1);
                hashSet.add(2);
                andStartGenericAvroClient.compute().cosineSimilarity("companiesEmbedding", PYMK_COSINE_SIMILARITY_EMBEDDING, "companiesEmbedding_score").cosineSimilarity("member_feature", PYMK_COSINE_SIMILARITY_EMBEDDING, "member_feature_score").execute(hashSet).get();
                new ControllerClient(this.veniceCluster.getClusterName(), this.veniceCluster.getRandomVeniceController().getControllerUrl()).addValueSchema(this.storeName, VALUE_SCHEMA_FOR_COMPUTE_2);
                this.veniceCluster.stopAndRestartVeniceServer(this.veniceCluster.getVeniceServers().get(0).getPort());
                VersionCreationResponse newVersion2 = this.veniceCluster.getNewVersion(this.storeName);
                int version2 = newVersion2.getVersion();
                String kafkaTopic2 = newVersion2.getKafkaTopic();
                pushSyntheticDataToStore(kafkaTopic2, 100, veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(kafkaTopic2).setKeySerializer(this.keySerializer).build()), version2, VALUE_SCHEMA_FOR_COMPUTE_SWAPPED, this.valueSerializerSwapped, false, 2);
                this.veniceCluster.stopAndRestartVeniceServer(this.veniceCluster.getVeniceServers().get(0).getPort());
                ((Map) andStartGenericAvroClient.compute().project(new String[]{"member_feature"}).execute(hashSet).get()).forEach((num, computeGenericRecord) -> {
                    Assert.assertEquals(((HashMap) computeGenericRecord.get("__veniceComputationError__")).size(), 0);
                });
                if (andStartGenericAvroClient != null) {
                    andStartGenericAvroClient.close();
                }
                if (createVeniceWriter != null) {
                    createVeniceWriter.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createVeniceWriter != null) {
                try {
                    createVeniceWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testComputeOnStoreWithQTFDScompliantSchema() throws Exception {
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer = new VeniceAvroKafkaSerializer("\"int\"");
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer2 = new VeniceAvroKafkaSerializer("{  \"namespace\": \"example.compute\",      \"type\": \"record\",          \"name\": \"MemberFeature\",         \"fields\": [           {\"name\": \"id\", \"type\": \"string\" },                {\"name\": \"name\", \"type\": \"string\" },              {\"name\": \"member_feature\", \"type\": [\"null\",{\"type\":\"array\",\"items\":\"float\"}],\"default\": null}  ]  }  ");
        VersionCreationResponse newStoreVersion = this.veniceCluster.getNewStoreVersion("\"int\"", "{  \"namespace\": \"example.compute\",      \"type\": \"record\",          \"name\": \"MemberFeature\",         \"fields\": [           {\"name\": \"id\", \"type\": \"string\" },                {\"name\": \"name\", \"type\": \"string\" },              {\"name\": \"member_feature\", \"type\": [\"null\",{\"type\":\"array\",\"items\":\"float\"}],\"default\": null}  ]  }  ");
        Assert.assertFalse(newStoreVersion.isError());
        String kafkaTopic = newStoreVersion.getKafkaTopic();
        String parseStoreFromKafkaTopicName = Version.parseStoreFromKafkaTopicName(newStoreVersion.getKafkaTopic());
        CompressionStrategy compressionStrategy = CompressionStrategy.NO_OP;
        UpdateStoreQueryParams updateStoreQueryParams = new UpdateStoreQueryParams();
        updateStoreQueryParams.setCompressionStrategy(compressionStrategy);
        updateStoreQueryParams.setReadComputationEnabled(true);
        updateStoreQueryParams.setChunkingEnabled(false);
        Assert.assertFalse(this.veniceCluster.updateStore(parseStoreFromKafkaTopicName, updateStoreQueryParams).isError());
        Schema parse = Schema.parse("{  \"namespace\": \"example.compute\",      \"type\": \"record\",          \"name\": \"MemberFeature\",         \"fields\": [           {\"name\": \"id\", \"type\": \"string\" },                {\"name\": \"name\", \"type\": \"string\" },              {\"name\": \"member_feature\", \"type\": [\"null\",{\"type\":\"array\",\"items\":\"float\"}],\"default\": null}  ]  }  ");
        GenericData.Record record = new GenericData.Record(parse);
        List asList = Arrays.asList(Float.valueOf(1.0f), Float.valueOf(2.0f), Float.valueOf(3.0f));
        record.put("id", "1");
        record.put("name", "companiesEmbedding");
        record.put("member_feature", asList);
        GenericData.Record record2 = new GenericData.Record(parse);
        record2.put("id", "2");
        record2.put("name", "companiesEmbedding");
        record2.put("member_feature", (Object) null);
        HashMap hashMap = new HashMap(2);
        hashMap.put(1, record);
        hashMap.put(2, record2);
        VeniceWriter<Object, byte[], byte[]> createVeniceWriter = TestUtils.getVeniceWriterFactory(this.veniceCluster.getKafka().getAddress()).createVeniceWriter(new VeniceWriterOptions.Builder(kafkaTopic).setKeySerializer(veniceAvroKafkaSerializer).build());
        try {
            AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(parseStoreFromKafkaTopicName).setVeniceURL(this.routerAddr).setUseFastAvro(false));
            try {
                pushRecordsToStore(kafkaTopic, hashMap, createVeniceWriter, veniceAvroKafkaSerializer2, 1);
                Set of = Utils.setOf(new Integer[]{1, 2});
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, false, true, () -> {
                    Map map = (Map) andStartGenericAvroClient.compute().dotProduct("member_feature", asList, "dot_product_result").hadamardProduct("member_feature", asList, "hadamard_product_result").cosineSimilarity("member_feature", asList, "cosine_similarity_result").execute(of).get();
                    map.forEach((num, computeGenericRecord) -> {
                        Assert.assertEquals(((HashMap) computeGenericRecord.get("__veniceComputationError__")).size(), 0);
                    });
                    Assert.assertNull(((ComputeGenericRecord) map.get(2)).get("dot_product_result"));
                    Assert.assertNull(((ComputeGenericRecord) map.get(2)).get("hadamard_product_result"));
                    Assert.assertNull(((ComputeGenericRecord) map.get(2)).get("cosine_similarity_result"));
                    Assert.assertEquals(((ComputeGenericRecord) map.get(1)).get("dot_product_result"), Float.valueOf(ComputeOperationUtils.dotProduct(asList, asList)));
                    Assert.assertEquals(((ComputeGenericRecord) map.get(1)).get("hadamard_product_result"), ComputeOperationUtils.hadamardProduct(asList, asList));
                    Assert.assertEquals(((ComputeGenericRecord) map.get(1)).get("cosine_similarity_result"), Float.valueOf(1.0f));
                    Exception exc = null;
                    try {
                    } catch (Exception e) {
                        exc = e;
                    }
                    Assert.assertNotNull(exc);
                    Assert.assertTrue(exc instanceof VeniceClientException);
                    Assert.assertEquals(((VeniceClientException) exc).getErrorType(), ErrorType.GENERAL_ERROR);
                    Assert.assertEquals(exc.getMessage(), "COUNT field: member_feature isn't 'ARRAY' or 'MAP' type");
                });
                if (andStartGenericAvroClient != null) {
                    andStartGenericAvroClient.close();
                }
                if (createVeniceWriter != null) {
                    createVeniceWriter.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createVeniceWriter != null) {
                try {
                    createVeniceWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void pushRecordsToStore(String str, Map<Integer, GenericRecord> map, VeniceWriter<Object, byte[], byte[]> veniceWriter, VeniceKafkaSerializer veniceKafkaSerializer, int i) throws Exception {
        veniceWriter.broadcastStartOfPush(false, false, CompressionStrategy.NO_OP, Collections.emptyMap());
        for (Map.Entry<Integer, GenericRecord> entry : map.entrySet()) {
            veniceWriter.put(entry.getKey(), this.compressorFactory.getCompressor(CompressionStrategy.NO_OP).compress(veniceKafkaSerializer.serialize(str, entry.getValue())), i).get();
        }
        veniceWriter.broadcastEndOfPush(Collections.emptyMap());
    }

    private void pushSyntheticDataToStore(String str, int i, VeniceWriter<Object, byte[], byte[]> veniceWriter, int i2, String str2, VeniceKafkaSerializer veniceKafkaSerializer, boolean z, int i3) throws Exception {
        veniceWriter.broadcastStartOfPush(false, false, CompressionStrategy.NO_OP, new HashMap());
        Schema parse = Schema.parse(str2);
        for (int i4 = 0; i4 < i; i4++) {
            GenericData.Record record = new GenericData.Record(parse);
            record.put("id", VALUE_PREFIX + i4);
            record.put("name", "companiesEmbedding");
            if (!z) {
                record.put("companiesEmbedding", COMPANIES_EMBEDDING);
            }
            record.put("member_feature", MF_EMBEDDING);
            veniceWriter.put(Integer.valueOf(i4), this.compressorFactory.getCompressor(CompressionStrategy.NO_OP).compress(veniceKafkaSerializer.serialize(str, record)), i3).get();
        }
        veniceWriter.broadcastEndOfPush(new HashMap());
        ControllerClient controllerClient = new ControllerClient(this.veniceCluster.getClusterName(), this.veniceCluster.getAllControllersURLs());
        try {
            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                if (controllerClient.queryJobStatus(str).getStatus().equals(ExecutionStatus.ERROR.name())) {
                    throw new VeniceException("Push failed.");
                }
                int currentVersion = controllerClient.getStore(this.storeName).getStore().getCurrentVersion();
                if (currentVersion == i2) {
                    this.veniceCluster.refreshAllRouterMetaData();
                }
                Assert.assertEquals(currentVersion, i2, "New version not online yet.");
            });
            controllerClient.close();
        } catch (Throwable th) {
            try {
                controllerClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private static List<Float> generateRandomFloatList(int i) {
        ThreadLocalRandom current = ThreadLocalRandom.current();
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(Float.valueOf(current.nextFloat()));
        }
        return arrayList;
    }
}
