package com.linkedin.venice.endToEnd;

import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.endToEnd.TestBatch;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.schema.vson.VsonAvroSchemaAdapter;
import com.linkedin.venice.schema.vson.VsonSchema;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.KeyAndValueSchemas;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import io.tehuti.metrics.MetricsRepository;
import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Consumer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/endToEnd/TestVsonStoreBatch.class */
public class TestVsonStoreBatch {
    private static final int TEST_TIMEOUT = 60000;
    private VeniceClusterWrapper veniceCluster;
    private ControllerClient controllerClient;

    @BeforeClass
    public void setUp() {
        this.veniceCluster = ServiceFactory.getVeniceCluster();
        this.controllerClient = new ControllerClient(this.veniceCluster.getClusterName(), this.veniceCluster.getLeaderVeniceController().getControllerUrl());
    }

    @AfterClass
    public void cleanUp() {
        IOUtils.closeQuietly(this.controllerClient);
        IOUtils.closeQuietly(this.veniceCluster);
    }

    @Test(timeOut = 60000)
    public void testVsonStoreWithSimpleRecords() throws Exception {
        testBatchStore(file -> {
            return TestWriteUtils.writeSimpleVsonFile(file);
        }, properties -> {
            properties.setProperty("key.field", "");
            properties.setProperty("value.field", "");
        }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            for (int i = 0; i < 100; i++) {
                Assert.assertEquals(avroGenericStoreClient.get(Integer.valueOf(i)).get().toString(), String.valueOf(i + 100));
                Assert.assertEquals(avroGenericStoreClient2.get(Integer.valueOf(i)).get(), String.valueOf(i + 100));
            }
        });
    }

    @Test(timeOut = 60000)
    public void testVsonStoreWithMissingKeyField() throws Exception {
        testBatchStore(file -> {
            return TestWriteUtils.writeSimpleVsonFile(file);
        }, properties -> {
            properties.remove("key.field");
            properties.setProperty("value.field", "");
        }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            for (int i = 0; i < 100; i++) {
                Assert.assertEquals(avroGenericStoreClient.get(Integer.valueOf(i)).get().toString(), String.valueOf(i + 100));
                Assert.assertEquals(avroGenericStoreClient2.get(Integer.valueOf(i)).get(), String.valueOf(i + 100));
            }
        });
    }

    @Test(timeOut = 60000)
    public void testVsonStoreWithMissingValueField() throws Exception {
        testBatchStore(file -> {
            return TestWriteUtils.writeSimpleVsonFile(file);
        }, properties -> {
            properties.setProperty("key.field", "");
            properties.remove("value.field");
        }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            for (int i = 0; i < 100; i++) {
                Assert.assertEquals(avroGenericStoreClient.get(Integer.valueOf(i)).get().toString(), String.valueOf(i + 100));
                Assert.assertEquals(avroGenericStoreClient2.get(Integer.valueOf(i)).get(), String.valueOf(i + 100));
            }
        });
    }

    @Test(timeOut = 60000)
    public void testVsonStoreWithMissingKeyAndValueFields() throws Exception {
        testBatchStore(file -> {
            return TestWriteUtils.writeSimpleVsonFile(file);
        }, properties -> {
            properties.remove("key.field");
            properties.remove("value.field");
        }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            for (int i = 0; i < 100; i++) {
                Assert.assertEquals(avroGenericStoreClient.get(Integer.valueOf(i)).get().toString(), String.valueOf(i + 100));
                Assert.assertEquals(avroGenericStoreClient2.get(Integer.valueOf(i)).get(), String.valueOf(i + 100));
            }
        });
    }

    @Test(timeOut = 60000)
    public void testVsonStoreWithComplexRecords() throws Exception {
        testBatchStore(file -> {
            return TestWriteUtils.writeComplexVsonFile(file);
        }, properties -> {
            properties.setProperty("key.field", "");
            properties.setProperty("value.field", "");
        }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            for (int i = 0; i < 100; i++) {
                GenericData.Record record = (GenericData.Record) avroGenericStoreClient.get(Integer.valueOf(i)).get();
                Map map = (Map) avroGenericStoreClient2.get(Integer.valueOf(i)).get();
                Assert.assertEquals(record.get("member_id"), Integer.valueOf(i + 100));
                Assert.assertEquals(map.get("member_id"), Integer.valueOf(i + 100));
                Assert.assertEquals(record.get("score"), i % 10 != 0 ? Float.valueOf(i) : null);
                Assert.assertEquals(map.get("score"), i % 10 != 0 ? Float.valueOf(i) : null);
            }
        });
    }

    @Test(timeOut = 60000)
    public void testVsonStoreCanProcessByteAndShort() throws Exception {
        testBatchStore(file -> {
            return TestWriteUtils.writeVsonByteAndShort(file);
        }, properties -> {
            properties.setProperty("key.field", "");
            properties.setProperty("value.field", "");
        }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            for (int i = 0; i < 100; i++) {
                Assert.assertEquals(avroGenericStoreClient2.get(Byte.valueOf((byte) i)).get(), Short.valueOf((short) (i - 50)));
            }
        });
    }

    @Test(timeOut = 60000)
    public void testVsonStoreMultiLevelRecordsSchema() throws Exception {
        testBatchStore(file -> {
            return TestWriteUtils.writeMultiLevelVsonFile(file);
        }, properties -> {
            properties.setProperty("key.field", "");
            properties.setProperty("value.field", "");
        }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            for (int i = 0; i < 100; i++) {
                Assert.assertEquals(((GenericRecord) ((GenericRecord) ((GenericRecord) avroGenericStoreClient.get(Integer.valueOf(i)).get()).get("level1")).get("level21")).get("field1"), Integer.valueOf(i + 100));
                Assert.assertEquals(((HashMap) ((HashMap) ((HashMap) avroGenericStoreClient2.get(Integer.valueOf(i)).get()).get("level1")).get("level21")).get("field1"), Integer.valueOf(i + 100));
            }
        });
    }

    @Test(timeOut = 60000)
    public void testVsonStoreWithSelectedField() throws Exception {
        testBatchStore(file -> {
            KeyAndValueSchemas writeComplexVsonFile = TestWriteUtils.writeComplexVsonFile(file);
            return new KeyAndValueSchemas(writeComplexVsonFile.getKey(), VsonAvroSchemaAdapter.stripFromUnion(writeComplexVsonFile.getValue()).getField("score").schema());
        }, properties -> {
            properties.setProperty("key.field", "");
            properties.setProperty("value.field", "score");
        }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            for (int i = 0; i < 100; i++) {
                Assert.assertEquals(avroGenericStoreClient.get(Integer.valueOf(i)).get(), i % 10 != 0 ? Float.valueOf(i) : null);
                Assert.assertEquals(avroGenericStoreClient2.get(Integer.valueOf(i)).get(), i % 10 != 0 ? Float.valueOf(i) : null);
            }
        });
    }

    @Test(timeOut = 60000)
    public void testVsonStoreMultiLevelRecordsSchemaWithSelectedField() throws Exception {
        testBatchStore(file -> {
            Pair writeMultiLevelVsonFile2 = TestWriteUtils.writeMultiLevelVsonFile2(file);
            return new KeyAndValueSchemas(VsonAvroSchemaAdapter.parse(((VsonSchema) writeMultiLevelVsonFile2.getFirst()).toString()), VsonAvroSchemaAdapter.parse(((VsonSchema) writeMultiLevelVsonFile2.getSecond()).recordSubtype("recs").toString()));
        }, properties -> {
            properties.setProperty("key.field", "");
            properties.setProperty("value.field", "recs");
        }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            for (int i = 0; i < 100; i++) {
                GenericRecord genericRecord = (GenericRecord) ((List) avroGenericStoreClient.get(Integer.valueOf(i)).get()).get(0);
                Assert.assertEquals(genericRecord.get("member_id"), Integer.valueOf(i));
                Assert.assertEquals(genericRecord.get("score"), Float.valueOf(i));
                HashMap hashMap = (HashMap) ((List) avroGenericStoreClient2.get(Integer.valueOf(i)).get()).get(0);
                Assert.assertEquals(hashMap.get("member_id"), Integer.valueOf(i));
                Assert.assertEquals(hashMap.get("score"), Float.valueOf(i));
            }
        });
    }

    @Test(timeOut = 60000)
    public void testKafkaInputBatchJobWithVsonStoreMultiLevelRecordsSchemaWithSelectedField() throws Exception {
        TestBatch.VPJValidator vPJValidator = (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            for (int i = 0; i < 100; i++) {
                GenericRecord genericRecord = (GenericRecord) ((List) avroGenericStoreClient.get(Integer.valueOf(i)).get()).get(0);
                Assert.assertEquals(genericRecord.get("member_id"), Integer.valueOf(i));
                Assert.assertEquals(genericRecord.get("score"), Float.valueOf(i));
                HashMap hashMap = (HashMap) ((List) avroGenericStoreClient2.get(Integer.valueOf(i)).get()).get(0);
                Assert.assertEquals(hashMap.get("member_id"), Integer.valueOf(i));
                Assert.assertEquals(hashMap.get("score"), Float.valueOf(i));
            }
        };
        String testBatchStore = testBatchStore(file -> {
            Pair writeMultiLevelVsonFile2 = TestWriteUtils.writeMultiLevelVsonFile2(file);
            return new KeyAndValueSchemas(VsonAvroSchemaAdapter.parse(((VsonSchema) writeMultiLevelVsonFile2.getFirst()).toString()), VsonAvroSchemaAdapter.parse(((VsonSchema) writeMultiLevelVsonFile2.getSecond()).recordSubtype("recs").toString()));
        }, properties -> {
            properties.setProperty("key.field", "");
            properties.setProperty("value.field", "recs");
        }, vPJValidator, new UpdateStoreQueryParams(), Optional.empty(), false);
        testBatchStore(file2 -> {
            return new KeyAndValueSchemas(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.NULL));
        }, properties2 -> {
            properties2.setProperty("source.kafka", "true");
            properties2.setProperty("kafka.input.topic", Version.composeKafkaTopic(testBatchStore, 1));
            properties2.setProperty("kafka.input.broker.url", this.veniceCluster.getKafka().getAddress());
            properties2.setProperty("kafka.input.max.records.per.mapper", "5");
        }, vPJValidator, new UpdateStoreQueryParams(), Optional.of(testBatchStore), true);
    }

    @Test(timeOut = 60000)
    public void testZstdCompressingVsonRecord() throws Exception {
        testBatchStore(file -> {
            Pair writeSimpleVsonFileWithUserSchema = TestWriteUtils.writeSimpleVsonFileWithUserSchema(file);
            return new KeyAndValueSchemas((Schema) writeSimpleVsonFileWithUserSchema.getFirst(), VsonAvroSchemaAdapter.stripFromUnion((Schema) writeSimpleVsonFileWithUserSchema.getSecond()).getField("name").schema());
        }, properties -> {
            properties.setProperty("key.field", "");
            properties.setProperty("value.field", "name");
        }, TestBatch.getSimpleFileWithUserSchemaValidatorForZstd(), new UpdateStoreQueryParams().setCompressionStrategy(CompressionStrategy.ZSTD_WITH_DICT));
    }

    private String testBatchStore(TestBatch.InputFileWriter inputFileWriter, Consumer<Properties> consumer, TestBatch.VPJValidator vPJValidator) throws Exception {
        return testBatchStore(inputFileWriter, consumer, vPJValidator, new UpdateStoreQueryParams());
    }

    private String testBatchStore(TestBatch.InputFileWriter inputFileWriter, Consumer<Properties> consumer, TestBatch.VPJValidator vPJValidator, UpdateStoreQueryParams updateStoreQueryParams) throws Exception {
        return testBatchStore(inputFileWriter, consumer, vPJValidator, updateStoreQueryParams, Optional.empty(), true);
    }

    private String testBatchStore(TestBatch.InputFileWriter inputFileWriter, Consumer<Properties> consumer, TestBatch.VPJValidator vPJValidator, UpdateStoreQueryParams updateStoreQueryParams, Optional<String> optional, boolean z) throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        KeyAndValueSchemas write = inputFileWriter.write(tempDataDirectory);
        String uniqueString = optional.isPresent() ? optional.get() : Utils.getUniqueString("store");
        AvroGenericStoreClient avroGenericStoreClient = null;
        AvroGenericStoreClient avroGenericStoreClient2 = null;
        try {
            Properties defaultVPJPropsWithoutD2Routing = IntegrationTestPushUtils.defaultVPJPropsWithoutD2Routing(this.veniceCluster, "file://" + tempDataDirectory.getAbsolutePath(), uniqueString);
            consumer.accept(defaultVPJPropsWithoutD2Routing);
            if (!optional.isPresent()) {
                IntegrationTestPushUtils.createStoreForJob(this.veniceCluster.getClusterName(), write.getKey().toString(), write.getValue().toString(), defaultVPJPropsWithoutD2Routing, updateStoreQueryParams).close();
            }
            TestWriteUtils.runPushJob("Test Batch push job", defaultVPJPropsWithoutD2Routing);
            MetricsRepository metricsRepository = new MetricsRepository();
            avroGenericStoreClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.veniceCluster.getRandomRouterURL()).setMetricsRepository(metricsRepository));
            avroGenericStoreClient2 = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultVsonGenericClientConfig(uniqueString).setVeniceURL(this.veniceCluster.getRandomRouterURL()));
            this.veniceCluster.refreshAllRouterMetaData();
            vPJValidator.validate(avroGenericStoreClient, avroGenericStoreClient2, metricsRepository);
            IOUtils.closeQuietly(avroGenericStoreClient);
            IOUtils.closeQuietly(avroGenericStoreClient2);
            if (z) {
                this.controllerClient.enableStoreReadWrites(uniqueString, false);
                this.controllerClient.deleteStore(uniqueString);
            }
            return uniqueString;
        } catch (Throwable th) {
            IOUtils.closeQuietly(avroGenericStoreClient);
            IOUtils.closeQuietly(avroGenericStoreClient2);
            if (z) {
                this.controllerClient.enableStoreReadWrites(uniqueString, false);
                this.controllerClient.deleteStore(uniqueString);
            }
            throw th;
        }
    }
}
