package com.linkedin.venice.endToEnd;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.meta.BackupStrategy;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.system.store.MetaStoreDataType;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.DictionaryUtils;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.KeyAndValueSchemas;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import io.tehuti.Metric;
import io.tehuti.metrics.MetricsRepository;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.DecimalFormat;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.LongBinaryOperator;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.util.Utf8;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:com/linkedin/venice/endToEnd/TestBatch.class */
public abstract class TestBatch {
    protected static final int TEST_TIMEOUT = 60000;
    private static final int MAX_RETRY_ATTEMPTS = 3;
    protected VeniceClusterWrapper veniceCluster;
    private static final Logger LOGGER = LogManager.getLogger(TestBatch.class);
    protected static final String BASE_DATA_PATH_1 = Utils.getTempDataDirectory().getAbsolutePath();
    protected static final String BASE_DATA_PATH_2 = Utils.getTempDataDirectory().getAbsolutePath();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/venice/endToEnd/TestBatch$InputFileWriter.class */
    public interface InputFileWriter {
        KeyAndValueSchemas write(File file) throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/endToEnd/TestBatch$MaxLong.class */
    public static class MaxLong extends StatCounter {
        private static final long serialVersionUID = 1;

        public MaxLong() {
            super(-2147483648L, Math::max);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/endToEnd/TestBatch$MinLong.class */
    public static class MinLong extends StatCounter {
        private static final long serialVersionUID = 1;

        public MinLong() {
            super(2147483647L, Math::min);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/endToEnd/TestBatch$StatCounter.class */
    public static class StatCounter extends AtomicLong {
        private static final long serialVersionUID = 1;
        final long initialValue;
        final LongBinaryOperator accumulator;

        public StatCounter(long j, LongBinaryOperator longBinaryOperator) {
            super(j);
            this.initialValue = j;
            this.accumulator = longBinaryOperator;
        }

        public void add(long j) {
            accumulateAndGet(j, this.accumulator);
        }

        public void reset() {
            set(this.initialValue);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/endToEnd/TestBatch$TotalLong.class */
    public static class TotalLong extends StatCounter {
        private static final long serialVersionUID = 1;

        public TotalLong() {
            super(0L, Long::sum);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/venice/endToEnd/TestBatch$VPJValidator.class */
    public interface VPJValidator {
        void validate(AvroGenericStoreClient avroGenericStoreClient, AvroGenericStoreClient avroGenericStoreClient2, MetricsRepository metricsRepository) throws Exception;
    }

    public abstract VeniceClusterWrapper initializeVeniceCluster();

    @BeforeClass(alwaysRun = true)
    public void setUp() {
        this.veniceCluster = initializeVeniceCluster();
    }

    @AfterClass(alwaysRun = true)
    public void cleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.veniceCluster});
    }

    @Test(timeOut = 60000)
    public void testStoreWithNoVersionThrows400() {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        KeyAndValueSchemas keyAndValueSchemas = new KeyAndValueSchemas(Schema.parse("\"string\""), Schema.parse("\"string\""));
        String uniqueString = Utils.getUniqueString("store");
        IntegrationTestPushUtils.createStoreForJob(this.veniceCluster, keyAndValueSchemas.getKey().toString(), keyAndValueSchemas.getValue().toString(), IntegrationTestPushUtils.defaultVPJProps(this.veniceCluster, "file://" + tempDataDirectory.getAbsolutePath(), uniqueString)).close();
        AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.veniceCluster.getRandomRouterURL()));
        try {
            try {
                andStartGenericAvroClient.get("key1").get();
                Assert.fail("Single get request on store with no push should fail");
            } catch (Throwable th) {
                if (andStartGenericAvroClient != null) {
                    try {
                        andStartGenericAvroClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Exception e) {
            Assert.assertTrue(e.getMessage().contains("Please push data to that store"));
        }
        try {
            HashSet hashSet = new HashSet();
            hashSet.add("key2");
            hashSet.add("key3");
            andStartGenericAvroClient.batchGet(hashSet).get();
            Assert.fail("Batch get request on store with no push should fail");
        } catch (Exception e2) {
            Assert.assertTrue(e2.getMessage().contains("Please push data to that store"));
        }
        if (andStartGenericAvroClient != null) {
            andStartGenericAvroClient.close();
        }
    }

    @Test(timeOut = 60000)
    public void testDuplicateKey() throws Exception {
        try {
            testStoreWithDuplicateKeys(false);
            Assert.fail();
        } catch (VeniceException e) {
        }
        testStoreWithDuplicateKeys(true);
    }

    private void testStoreWithDuplicateKeys(boolean z) throws Exception {
        testBatchStore(file -> {
            return new KeyAndValueSchemas(TestWriteUtils.writeSimpleAvroFileWithDuplicateKey(file));
        }, properties -> {
            if (z) {
                properties.setProperty("allow.duplicate.key", "true");
            }
        }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            if (!z) {
                Assert.fail("Push should have failed since duplicate keys are not allowed");
                return;
            }
            for (int i = 0; i < 100; i++) {
                if (i % 10 == 0) {
                    Assert.assertNotNull(avroGenericStoreClient.get("0").get());
                } else {
                    Assert.assertEquals(avroGenericStoreClient.get(Integer.toString(i)).get().toString(), "test_name" + i);
                }
            }
        });
    }

    @Test(timeOut = 60000)
    public void testEmptyPush() throws Exception {
        testBatchStore(file -> {
            return new KeyAndValueSchemas(TestWriteUtils.writeEmptyAvroFileWithUserSchema(file));
        }, properties -> {
        }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
        });
    }

    @Test(timeOut = 60000, dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testDataPushWithSchemaWithAWrongDefault(boolean z) throws Exception {
        Exception exc = null;
        try {
            testBatchStore(file -> {
                return new KeyAndValueSchemas(TestWriteUtils.writeSimpleAvroFileWithASchemaWithAWrongDefaultValue(file, 100));
            }, properties -> {
                properties.setProperty("extended.schema.validity.check.enabled", Boolean.toString(z));
            }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
                for (int i = 0; i < 100; i++) {
                    Object obj = avroGenericStoreClient.get(Integer.toString(i)).get();
                    Assert.assertTrue(obj instanceof GenericRecord, "The returned value must be a ''GenericRecord' for key: " + i);
                    GenericRecord genericRecord = (GenericRecord) obj;
                    Assert.assertEquals(genericRecord.get("key").toString(), Integer.toString(i));
                    Assert.assertEquals(Float.valueOf(genericRecord.get("score").toString()), Float.valueOf(100.0f));
                }
            });
        } catch (Exception e) {
            exc = e;
        }
        if (z) {
            Assert.assertTrue(exc != null && exc.getMessage().contains("Invalid default"));
        } else {
            Assert.assertNull(exc);
        }
    }

    @Test(dataProvider = "Two-True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testCompressingRecord(boolean z, boolean z2) throws Exception {
        VPJValidator vPJValidator = (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            for (int i = 1; i <= 100; i++) {
                Assert.assertEquals(avroGenericStoreClient.get(Integer.toString(i)).get().toString(), "test_name_" + i);
            }
            for (int i2 = 0; i2 < 10; i2++) {
                HashSet hashSet = new HashSet();
                for (int i3 = 1; i3 <= 10; i3++) {
                    hashSet.add(Integer.toString((i2 * 10) + i3));
                }
                Map map = (Map) avroGenericStoreClient.batchGet(hashSet).get();
                Assert.assertEquals(map.size(), 10);
                for (int i4 = 1; i4 <= 10; i4++) {
                    Assert.assertEquals(((CharSequence) map.get(Integer.toString((i2 * 10) + i4))).toString(), "test_name_" + ((i2 * 10) + i4));
                }
            }
        };
        testRepush(testBatchStore(file -> {
            return new KeyAndValueSchemas(TestWriteUtils.writeSimpleAvroFileWithUserSchema(file, false));
        }, properties -> {
            properties.setProperty("compression.metric.collection.enabled", String.valueOf(z));
            properties.setProperty("use.mapper.to.build.dictionary", String.valueOf(z2));
        }, vPJValidator, new UpdateStoreQueryParams().setCompressionStrategy(CompressionStrategy.GZIP)), vPJValidator);
    }

    @Test(timeOut = 60000)
    public void testZstdCompressingAvroRecordCanFailWhenNoFallbackAvailable() throws Exception {
        testBatchStore(file -> {
            return new KeyAndValueSchemas(TestWriteUtils.writeSimpleAvroFileWithUserSchema(file, false));
        }, properties -> {
        }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            try {
                Assert.assertEquals(avroGenericStoreClient.get("1").get().toString(), "test_name_1");
            } catch (ExecutionException e) {
                if (e.getMessage().matches(".* Compressor not available for resource " + avroGenericStoreClient.getStoreName() + "\\. Dictionary not downloaded\\.\\n")) {
                    return;
                }
                Assert.fail("Unexpected exception message", e);
            }
        }, new UpdateStoreQueryParams().setCompressionStrategy(CompressionStrategy.ZSTD_WITH_DICT));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static VPJValidator getSimpleFileWithUserSchemaValidatorForZstd() {
        return (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, true, () -> {
                Assert.assertEquals(avroGenericStoreClient.get(Integer.toString(1)).get().toString(), "test_name_1");
            });
            for (int i = 2; i <= 100; i++) {
                Assert.assertEquals(avroGenericStoreClient.get(Integer.toString(i)).get().toString(), "test_name_" + i);
            }
            for (int i2 = 0; i2 < 10; i2++) {
                HashSet hashSet = new HashSet();
                for (int i3 = 1; i3 <= 10; i3++) {
                    hashSet.add(Integer.toString((i2 * 10) + i3));
                }
                Map map = (Map) avroGenericStoreClient.batchGet(hashSet).get();
                Assert.assertEquals(map.size(), 10);
                for (int i4 = 1; i4 <= 10; i4++) {
                    Assert.assertEquals(((CharSequence) map.get(Integer.toString((i2 * 10) + i4))).toString(), "test_name_" + ((i2 * 10) + i4));
                }
            }
        };
    }

    @Test(timeOut = 60000)
    public void testZstdCompressingAvroRecordWhenNoFallbackAvailableWithSleep() throws Exception {
        testBatchStore(file -> {
            return new KeyAndValueSchemas(TestWriteUtils.writeSimpleAvroFileWithUserSchema(file, false));
        }, properties -> {
            properties.setProperty("zstd.compression.level", String.valueOf(17));
        }, getSimpleFileWithUserSchemaValidatorForZstd(), new UpdateStoreQueryParams().setCompressionStrategy(CompressionStrategy.ZSTD_WITH_DICT));
    }

    @Test(timeOut = 120000, dataProvider = "Two-True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testZstdCompressingAvroRecordWhenFallbackAvailable(boolean z, boolean z2) throws Exception {
        String testBatchStore = testBatchStore(file -> {
            return new KeyAndValueSchemas(TestWriteUtils.writeSimpleAvroFileWithUserSchema(file, false));
        }, properties -> {
            properties.setProperty("compression.metric.collection.enabled", String.valueOf(z));
            properties.setProperty("use.mapper.to.build.dictionary", String.valueOf(z2));
        }, getSimpleFileWithUserSchemaValidatorForZstd());
        VPJValidator vPJValidator = (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            Utils.sleep(1000L);
            for (int i = 1; i <= 100; i++) {
                Assert.assertEquals(avroGenericStoreClient.get(Integer.toString(i)).get().toString(), "alternate_test_name_" + i);
            }
            for (int i2 = 0; i2 < 10; i2++) {
                HashSet hashSet = new HashSet();
                for (int i3 = 1; i3 <= 10; i3++) {
                    hashSet.add(Integer.toString((i2 * 10) + i3));
                }
                Map map = (Map) avroGenericStoreClient.batchGet(hashSet).get();
                Assert.assertEquals(map.size(), 10);
                for (int i4 = 1; i4 <= 10; i4++) {
                    Assert.assertEquals(((CharSequence) map.get(Integer.toString((i2 * 10) + i4))).toString(), "alternate_test_name_" + ((i2 * 10) + i4));
                }
            }
        };
        testBatchStore(file2 -> {
            return new KeyAndValueSchemas(TestWriteUtils.writeAlternateSimpleAvroFileWithUserSchema(file2, false));
        }, properties2 -> {
            properties2.setProperty("compression.metric.collection.enabled", String.valueOf(z));
            properties2.setProperty("use.mapper.to.build.dictionary", String.valueOf(z2));
        }, vPJValidator, testBatchStore, new UpdateStoreQueryParams().setCompressionStrategy(CompressionStrategy.ZSTD_WITH_DICT).setPartitionCount(MAX_RETRY_ATTEMPTS));
        String composeKafkaTopic = Version.composeKafkaTopic(testBatchStore, 2);
        Properties properties3 = new Properties();
        properties3.setProperty("kafka.bootstrap.servers", this.veniceCluster.getKafka().getAddress());
        VeniceProperties veniceProperties = new VeniceProperties(properties3);
        ByteBuffer readDictionaryFromKafka = DictionaryUtils.readDictionaryFromKafka(composeKafkaTopic, veniceProperties);
        testRepush(testBatchStore, vPJValidator);
        Assert.assertNotEquals(DictionaryUtils.readDictionaryFromKafka(Version.composeKafkaTopic(testBatchStore, 4), veniceProperties), readDictionaryFromKafka, "The dict of repushed version should be different from the source version");
    }

    @Test(timeOut = 60000)
    public void testEarlyDeleteBackupStore() throws Exception {
        String testBatchStoreMultiVersionPush = testBatchStoreMultiVersionPush(file -> {
            return new KeyAndValueSchemas(TestWriteUtils.writeSimpleAvroFileWithUserSchema(file, false));
        }, properties -> {
        }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            for (int i = 1; i <= 100; i++) {
                Assert.assertEquals(avroGenericStoreClient.get(Integer.toString(i)).get().toString(), "test_name_" + i);
            }
        }, new UpdateStoreQueryParams().setBackupStrategy(BackupStrategy.DELETE_ON_NEW_PUSH_START));
        TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, () -> {
            String str = BASE_DATA_PATH_1 + "/rocksdb/" + testBatchStoreMultiVersionPush + "_v1";
            String str2 = BASE_DATA_PATH_1 + "/rocksdb/" + testBatchStoreMultiVersionPush + "_v2";
            String str3 = BASE_DATA_PATH_2 + "/rocksdb/" + testBatchStoreMultiVersionPush + "_v1";
            String str4 = BASE_DATA_PATH_2 + "/rocksdb/" + testBatchStoreMultiVersionPush + "_v2";
            Assert.assertFalse(new File(str).exists() || new File(str3).exists());
            Assert.assertTrue(new File(str2).exists() || new File(str4).exists());
        });
    }

    @Test(timeOut = 60000)
    public void testIncrementalPush() throws Exception {
        testBatchStore(file -> {
            return new KeyAndValueSchemas(TestWriteUtils.writeSimpleAvroFileWithUserSchema2(file));
        }, properties -> {
            properties.setProperty("incremental.push", "true");
        }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            for (int i = 1; i <= 50; i++) {
                Assert.assertEquals(avroGenericStoreClient.get(Integer.toString(i)).get().toString(), "test_name_" + i);
            }
            for (int i2 = 51; i2 <= 150; i2++) {
                Assert.assertEquals(avroGenericStoreClient.get(Integer.toString(i2)).get().toString(), "test_name_" + (i2 * 2));
            }
        }, testBatchStore(file2 -> {
            return new KeyAndValueSchemas(TestWriteUtils.writeSimpleAvroFileWithUserSchema(file2));
        }, properties2 -> {
        }, (avroGenericStoreClient3, avroGenericStoreClient4, metricsRepository2) -> {
            for (int i = 1; i <= 100; i++) {
                Assert.assertEquals(avroGenericStoreClient3.get(Integer.toString(i)).get().toString(), "test_name_" + i);
            }
        }, new UpdateStoreQueryParams().setIncrementalPushEnabled(true)), new UpdateStoreQueryParams().setIncrementalPushEnabled(true));
    }

    @Test(timeOut = 60000, dataProvider = "Two-True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testIncrementalPushWithCompression(boolean z, boolean z2) throws Exception {
        testBatchStore(file -> {
            return new KeyAndValueSchemas(TestWriteUtils.writeSimpleAvroFileWithUserSchema2(file));
        }, properties -> {
            properties.setProperty("incremental.push", "true");
            properties.setProperty("compression.metric.collection.enabled", String.valueOf(z));
            properties.setProperty("use.mapper.to.build.dictionary", String.valueOf(z2));
        }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            for (int i = 1; i <= 50; i++) {
                Assert.assertEquals(avroGenericStoreClient.get(Integer.toString(i)).get().toString(), "test_name_" + i);
            }
            for (int i2 = 51; i2 <= 150; i2++) {
                Assert.assertEquals(avroGenericStoreClient.get(Integer.toString(i2)).get().toString(), "test_name_" + (i2 * 2));
            }
        }, testBatchStore(file2 -> {
            return new KeyAndValueSchemas(TestWriteUtils.writeSimpleAvroFileWithUserSchema(file2, false));
        }, properties2 -> {
            properties2.setProperty("compression.metric.collection.enabled", String.valueOf(z));
            properties2.setProperty("use.mapper.to.build.dictionary", String.valueOf(z2));
        }, getSimpleFileWithUserSchemaValidatorForZstd(), new UpdateStoreQueryParams().setCompressionStrategy(CompressionStrategy.ZSTD_WITH_DICT).setIncrementalPushEnabled(true).setHybridOffsetLagThreshold(10L).setHybridRewindSeconds(0L)), null);
    }

    @Test(timeOut = 60000)
    public void testIncrementalPushWritesToRealTimeTopicWithPolicy() throws Exception {
        String str = "attempt [" + Math.random() + "] of " + (getClass().getSimpleName() + ".testIncrementalPushWritesToRealTimeTopicWithPolicy()");
        LOGGER.info("Start of {}", str);
        try {
            String testBatchStore = testBatchStore(file -> {
                return new KeyAndValueSchemas(TestWriteUtils.writeSimpleAvroFileWithUserSchema(file));
            }, properties -> {
            }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
                for (int i = 1; i <= 100; i++) {
                    Assert.assertEquals(avroGenericStoreClient.get(Integer.toString(i)).get().toString(), "test_name_" + i);
                }
            }, new UpdateStoreQueryParams().setAmplificationFactor(2).setIncrementalPushEnabled(true).setChunkingEnabled(true).setHybridOffsetLagThreshold(10L).setHybridRewindSeconds(0L));
            testBatchStore(file2 -> {
                return new KeyAndValueSchemas(TestWriteUtils.writeSimpleAvroFileWithUserSchema2(file2));
            }, properties2 -> {
                properties2.setProperty("incremental.push", "true");
            }, (avroGenericStoreClient3, avroGenericStoreClient4, metricsRepository2) -> {
                for (int i = 51; i <= 150; i++) {
                    Assert.assertEquals(avroGenericStoreClient3.get(Integer.toString(i)).get().toString(), "test_name_" + (i * 2));
                }
            }, testBatchStore, null);
            testBatchStore(file3 -> {
                return new KeyAndValueSchemas(TestWriteUtils.writeSimpleAvroFileWithUserSchema(file3));
            }, properties3 -> {
            }, (avroGenericStoreClient5, avroGenericStoreClient6, metricsRepository3) -> {
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                    for (int i = 1; i <= 100; i++) {
                        Assert.assertEquals(avroGenericStoreClient5.get(Integer.toString(i)).get().toString(), "test_name_" + i);
                    }
                    for (int i2 = 101; i2 <= 150; i2++) {
                        Assert.assertNull(avroGenericStoreClient5.get(Integer.toString(i2)).get());
                    }
                });
            }, testBatchStore, null);
            LOGGER.info("Successful end of {}", str);
        } catch (Throwable th) {
            LOGGER.error("Caught throwable in {}", str, th);
            throw th;
        }
    }

    @Test(timeOut = 60000)
    public void testMetaStoreSchemaValidation() throws Exception {
        final String testBatchStore = testBatchStore(file -> {
            return new KeyAndValueSchemas(TestWriteUtils.writeSimpleAvroFileWithUserSchema(file, false));
        }, properties -> {
        }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            for (int i = 1; i <= 100; i++) {
                Assert.assertEquals(avroGenericStoreClient.get(Integer.toString(i)).get().toString(), "test_name_" + i);
            }
        }, null, new UpdateStoreQueryParams(), false, true);
        AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(VeniceSystemStoreType.META_STORE.getSystemStoreName(testBatchStore)).setVeniceURL(this.veniceCluster.getRandomRouterURL()));
        try {
            try {
                System.out.println(andStartGenericAvroClient.get(MetaStoreDataType.VALUE_SCHEMAS_WRITTEN_PER_STORE_VERSION.getStoreMetaKey(new HashMap<String, String>() { // from class: com.linkedin.venice.endToEnd.TestBatch.1
                    {
                        put("KEY_STORE_NAME", testBatchStore);
                        put("KEY_VERSION_NUMBER", Integer.toString(1));
                    }
                })).get());
            } catch (Exception e) {
                Assert.fail("get request to fetch schema from meta store fails", e);
            }
            if (andStartGenericAvroClient != null) {
                andStartGenericAvroClient.close();
            }
        } catch (Throwable th) {
            if (andStartGenericAvroClient != null) {
                try {
                    andStartGenericAvroClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 60000)
    public void testKafkaInputBatchJob() throws Exception {
        VPJValidator vPJValidator = (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            for (int i = 1; i <= 100; i++) {
                Assert.assertEquals(avroGenericStoreClient.get(Integer.toString(i)).get().toString(), "test_name_" + i);
            }
        };
        testRepush(testBatchStore(file -> {
            return new KeyAndValueSchemas(TestWriteUtils.writeSimpleAvroFileWithUserSchema(file, false));
        }, properties -> {
        }, vPJValidator), vPJValidator);
    }

    @Test(timeOut = 60000)
    public void testKafkaInputAAStore() throws Exception {
        VPJValidator vPJValidator = (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            for (int i = 1; i <= 100; i++) {
                Assert.assertEquals(avroGenericStoreClient.get(Integer.toString(i)).get().toString(), "test_name_" + i);
            }
        };
        testRepush(testBatchStore(file -> {
            return new KeyAndValueSchemas(TestWriteUtils.writeSimpleAvroFileWithUserSchema(file, false));
        }, properties -> {
        }, vPJValidator, new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(true).setHybridRewindSeconds(5L).setHybridOffsetLagThreshold(2L).setNativeReplicationEnabled(true)), vPJValidator);
    }

    @Test(timeOut = 60000)
    public void testReducerCountValidation() throws Exception {
        VPJValidator vPJValidator = (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            for (int i = 1; i <= 1; i++) {
                Assert.assertEquals(avroGenericStoreClient.get(Integer.toString(i)).get().toString(), "test_name_" + i);
            }
        };
        String testBatchStore = testBatchStore(file -> {
            return new KeyAndValueSchemas(TestWriteUtils.writeSimpleAvroFileWithUserSchema(file, false, 1));
        }, properties -> {
        }, vPJValidator, new UpdateStoreQueryParams().setPartitionCount(MAX_RETRY_ATTEMPTS));
        testBatchStore(file2 -> {
            return new KeyAndValueSchemas(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.NULL));
        }, properties2 -> {
            properties2.setProperty("source.kafka", "true");
            properties2.setProperty("kafka.input.topic", Version.composeKafkaTopic(testBatchStore, 1));
            properties2.setProperty("kafka.input.broker.url", this.veniceCluster.getKafka().getAddress());
            properties2.setProperty("kafka.input.max.records.per.mapper", "2");
        }, vPJValidator, testBatchStore, new UpdateStoreQueryParams());
    }

    @Test(timeOut = 60000)
    public void testBatchFromETL() throws Exception {
        testBatchStore(file -> {
            TestWriteUtils.writeETLFileWithUserSchema(file, false);
            return new KeyAndValueSchemas(Schema.parse("{\n    \"type\":\"record\",\n    \"name\":\"key\",\n    \"namespace\":\"com.linkedin.venice.testkey\",\n    \"fields\":[\n        {\n            \"name\":\"key\",\n            \"type\":\"string\"\n        }\n    ]\n}"), Schema.parse("{\n    \"type\":\"record\",\n    \"name\":\"value\",\n    \"namespace\":\"com.linkedin.venice.testvalue\",\n    \"fields\":[\n        {\n            \"name\":\"value\",\n            \"type\":\"string\"\n        }\n    ],\n    \"version\":10\n}"));
        }, properties -> {
            properties.setProperty("source.etl", "true");
        }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            for (int i = 1; i <= 50; i++) {
                GenericData.Record record = new GenericData.Record(Schema.parse("{\n    \"type\":\"record\",\n    \"name\":\"key\",\n    \"namespace\":\"com.linkedin.venice.testkey\",\n    \"fields\":[\n        {\n            \"name\":\"key\",\n            \"type\":\"string\"\n        }\n    ]\n}"));
                GenericData.Record record2 = new GenericData.Record(Schema.parse("{\n    \"type\":\"record\",\n    \"name\":\"value\",\n    \"namespace\":\"com.linkedin.venice.testvalue\",\n    \"fields\":[\n        {\n            \"name\":\"value\",\n            \"type\":\"string\"\n        }\n    ],\n    \"version\":10\n}"));
                record.put("key", Integer.toString(i));
                record2.put("value", "test_name_" + i);
                Assert.assertEquals(avroGenericStoreClient.get(record).get().toString(), record2.toString());
            }
            for (int i2 = 51; i2 <= 100; i2++) {
                GenericData.Record record3 = new GenericData.Record(Schema.parse("{\n    \"type\":\"record\",\n    \"name\":\"key\",\n    \"namespace\":\"com.linkedin.venice.testkey\",\n    \"fields\":[\n        {\n            \"name\":\"key\",\n            \"type\":\"string\"\n        }\n    ]\n}"));
                record3.put("key", Integer.toString(i2));
                Assert.assertNull(avroGenericStoreClient.get(record3).get());
            }
        });
    }

    @Test(timeOut = 60000)
    public void testBatchFromETLWithForUnionWithNullSchema() throws Exception {
        testBatchStore(file -> {
            TestWriteUtils.writeETLFileWithUnionWithNullSchema(file, false);
            return new KeyAndValueSchemas(Schema.parse("{\n    \"type\":\"record\",\n    \"name\":\"key\",\n    \"namespace\":\"com.linkedin.venice.testkey\",\n    \"fields\":[\n        {\n            \"name\":\"key\",\n            \"type\":\"string\"\n        }\n    ]\n}"), Schema.parse("[\"int\", \"string\", \"null\"]"));
        }, properties -> {
            properties.setProperty("source.etl", "true");
        }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            for (int i = 1; i <= 25; i++) {
                GenericData.Record record = new GenericData.Record(Schema.parse("{\n    \"type\":\"record\",\n    \"name\":\"key\",\n    \"namespace\":\"com.linkedin.venice.testkey\",\n    \"fields\":[\n        {\n            \"name\":\"key\",\n            \"type\":\"string\"\n        }\n    ]\n}"));
                record.put("key", Integer.toString(i));
                Assert.assertEquals(avroGenericStoreClient.get(record).get().toString(), "string_" + i);
            }
            for (int i2 = 26; i2 <= 50; i2++) {
                GenericData.Record record2 = new GenericData.Record(Schema.parse("{\n    \"type\":\"record\",\n    \"name\":\"key\",\n    \"namespace\":\"com.linkedin.venice.testkey\",\n    \"fields\":[\n        {\n            \"name\":\"key\",\n            \"type\":\"string\"\n        }\n    ]\n}"));
                record2.put("key", Integer.toString(i2));
                Assert.assertEquals(avroGenericStoreClient.get(record2).get(), Integer.valueOf(i2));
            }
            for (int i3 = 51; i3 <= 100; i3++) {
                GenericData.Record record3 = new GenericData.Record(Schema.parse("{\n    \"type\":\"record\",\n    \"name\":\"key\",\n    \"namespace\":\"com.linkedin.venice.testkey\",\n    \"fields\":[\n        {\n            \"name\":\"key\",\n            \"type\":\"string\"\n        }\n    ]\n}"));
                record3.put("key", Integer.toString(i3));
                Assert.assertNull(avroGenericStoreClient.get(record3).get());
            }
        });
    }

    @Test(timeOut = 60000)
    public void testBatchFromETLWithForUnionWithoutNullSchema() throws Exception {
        testBatchStore(file -> {
            TestWriteUtils.writeETLFileWithUnionWithoutNullSchema(file, false);
            return new KeyAndValueSchemas(Schema.parse("{\n    \"type\":\"record\",\n    \"name\":\"key\",\n    \"namespace\":\"com.linkedin.venice.testkey\",\n    \"fields\":[\n        {\n            \"name\":\"key\",\n            \"type\":\"string\"\n        }\n    ]\n}"), Schema.parse("[\"int\", \"string\"]"));
        }, properties -> {
            properties.setProperty("source.etl", "true");
        }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            for (int i = 1; i <= 25; i++) {
                GenericData.Record record = new GenericData.Record(Schema.parse("{\n    \"type\":\"record\",\n    \"name\":\"key\",\n    \"namespace\":\"com.linkedin.venice.testkey\",\n    \"fields\":[\n        {\n            \"name\":\"key\",\n            \"type\":\"string\"\n        }\n    ]\n}"));
                record.put("key", Integer.toString(i));
                Assert.assertEquals(avroGenericStoreClient.get(record).get().toString(), "string_" + i);
            }
            for (int i2 = 26; i2 <= 50; i2++) {
                GenericData.Record record2 = new GenericData.Record(Schema.parse("{\n    \"type\":\"record\",\n    \"name\":\"key\",\n    \"namespace\":\"com.linkedin.venice.testkey\",\n    \"fields\":[\n        {\n            \"name\":\"key\",\n            \"type\":\"string\"\n        }\n    ]\n}"));
                record2.put("key", Integer.toString(i2));
                Assert.assertEquals(avroGenericStoreClient.get(record2).get(), Integer.valueOf(i2));
            }
            for (int i3 = 51; i3 <= 100; i3++) {
                GenericData.Record record3 = new GenericData.Record(Schema.parse("{\n    \"type\":\"record\",\n    \"name\":\"key\",\n    \"namespace\":\"com.linkedin.venice.testkey\",\n    \"fields\":[\n        {\n            \"name\":\"key\",\n            \"type\":\"string\"\n        }\n    ]\n}"));
                record3.put("key", Integer.toString(i3));
                Assert.assertNull(avroGenericStoreClient.get(record3).get());
            }
        });
    }

    protected String testBatchStore(InputFileWriter inputFileWriter, Consumer<Properties> consumer, VPJValidator vPJValidator) throws Exception {
        return testBatchStore(inputFileWriter, consumer, vPJValidator, new UpdateStoreQueryParams());
    }

    private String testBatchStore(InputFileWriter inputFileWriter, Consumer<Properties> consumer, VPJValidator vPJValidator, UpdateStoreQueryParams updateStoreQueryParams) throws Exception {
        return testBatchStore(inputFileWriter, consumer, vPJValidator, null, updateStoreQueryParams, false);
    }

    private String testBatchStoreMultiVersionPush(InputFileWriter inputFileWriter, Consumer<Properties> consumer, VPJValidator vPJValidator, UpdateStoreQueryParams updateStoreQueryParams) throws Exception {
        return testBatchStore(inputFileWriter, consumer, vPJValidator, null, updateStoreQueryParams, true);
    }

    private void testRepush(String str, VPJValidator vPJValidator) throws Exception {
        for (String str2 : new String[]{"true", "false"}) {
            testBatchStore(file -> {
                return new KeyAndValueSchemas(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.NULL));
            }, properties -> {
                properties.setProperty("source.kafka", "true");
                properties.setProperty("kafka.input.broker.url", this.veniceCluster.getKafka().getAddress());
                properties.setProperty("kafka.input.max.records.per.mapper", "5");
                properties.setProperty("kafka.input.combiner.enabled", str2);
            }, vPJValidator, str, new UpdateStoreQueryParams());
        }
    }

    private String testBatchStore(InputFileWriter inputFileWriter, Consumer<Properties> consumer, VPJValidator vPJValidator, String str, UpdateStoreQueryParams updateStoreQueryParams) throws Exception {
        return testBatchStore(inputFileWriter, consumer, vPJValidator, str, updateStoreQueryParams, false);
    }

    private String testBatchStore(InputFileWriter inputFileWriter, Consumer<Properties> consumer, VPJValidator vPJValidator, String str, UpdateStoreQueryParams updateStoreQueryParams, boolean z) throws Exception {
        return testBatchStore(inputFileWriter, consumer, vPJValidator, str, updateStoreQueryParams, z, false);
    }

    private String testBatchStore(InputFileWriter inputFileWriter, Consumer<Properties> consumer, VPJValidator vPJValidator, String str, UpdateStoreQueryParams updateStoreQueryParams, boolean z, boolean z2) throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        KeyAndValueSchemas write = inputFileWriter.write(tempDataDirectory);
        String uniqueString = StringUtils.isEmpty(str) ? Utils.getUniqueString("store") : str;
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.veniceCluster, "file://" + tempDataDirectory.getAbsolutePath(), uniqueString);
        consumer.accept(defaultVPJProps);
        if (StringUtils.isEmpty(str)) {
            IntegrationTestPushUtils.createStoreForJob(this.veniceCluster.getClusterName(), write.getKey().toString(), write.getValue().toString(), defaultVPJProps, updateStoreQueryParams).close();
        } else if (updateStoreQueryParams != null) {
            IntegrationTestPushUtils.updateStore(this.veniceCluster.getClusterName(), defaultVPJProps, updateStoreQueryParams);
        }
        if (z2) {
            ControllerClient controllerClient = new ControllerClient(this.veniceCluster.getClusterName(), this.veniceCluster.getRandomRouterURL());
            try {
                TestUtils.waitForNonDeterministicPushCompletion(controllerClient.emptyPush(VeniceSystemStoreType.META_STORE.getSystemStoreName(uniqueString), uniqueString, 10000L).getKafkaTopic(), controllerClient, 10000L, TimeUnit.SECONDS);
                controllerClient.close();
            } catch (Throwable th) {
                try {
                    controllerClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
        TestWriteUtils.runPushJob("Test Batch push job", defaultVPJProps);
        if (z) {
            TestWriteUtils.runPushJob("Test Batch push job 2", defaultVPJProps);
            TestWriteUtils.runPushJob("Test Batch push job 3", defaultVPJProps);
        }
        this.veniceCluster.refreshAllRouterMetaData();
        MetricsRepository metricsRepository = new MetricsRepository();
        AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.veniceCluster.getRandomRouterURL()).setMetricsRepository(metricsRepository));
        try {
            AvroGenericStoreClient andStartGenericAvroClient2 = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultVsonGenericClientConfig(uniqueString).setVeniceURL(this.veniceCluster.getRandomRouterURL()));
            try {
                vPJValidator.validate(andStartGenericAvroClient, andStartGenericAvroClient2, metricsRepository);
                if (andStartGenericAvroClient2 != null) {
                    andStartGenericAvroClient2.close();
                }
                if (andStartGenericAvroClient != null) {
                    andStartGenericAvroClient.close();
                }
                return uniqueString;
            } catch (Throwable th3) {
                if (andStartGenericAvroClient2 != null) {
                    try {
                        andStartGenericAvroClient2.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        } catch (Throwable th5) {
            if (andStartGenericAvroClient != null) {
                try {
                    andStartGenericAvroClient.close();
                } catch (Throwable th6) {
                    th5.addSuppressed(th6);
                }
            }
            throw th5;
        }
    }

    @Test(timeOut = 60000)
    public void testLargeValues() throws Exception {
        try {
            testStoreWithLargeValues(false);
            Assert.fail("Pushing large values with chunking disabled should fail.");
        } catch (VeniceException e) {
        }
        testStoreWithLargeValues(true);
    }

    @Test(timeOut = 180000, dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testKafkaInputBatchJobWithLargeValues(boolean z) throws Exception {
        String testStoreWithLargeValues = testStoreWithLargeValues(true);
        try {
            testKafkaInputBatchJobWithLargeValues(false, testStoreWithLargeValues, Boolean.valueOf(z));
            Assert.fail("Re-pushing large values with chunking disabled should fail.");
        } catch (VeniceException e) {
        }
        testKafkaInputBatchJobWithLargeValues(true, testStoreWithLargeValues, Boolean.valueOf(z));
    }

    private void testKafkaInputBatchJobWithLargeValues(boolean z, String str, Boolean bool) throws Exception {
        testStoreWithLargeValues(z, properties -> {
            properties.setProperty("source.kafka", "true");
            properties.setProperty("venice.store.name", str);
            properties.setProperty("kafka.input.broker.url", this.veniceCluster.getKafka().getAddress());
            properties.setProperty("kafka.input.max.records.per.mapper", "5");
            properties.setProperty("send.control.messages.directly", bool.toString());
        }, str);
    }

    private String testStoreWithLargeValues(boolean z) throws Exception {
        return testStoreWithLargeValues(z, properties -> {
        }, null);
    }

    private String testStoreWithLargeValues(boolean z, Consumer<Properties> consumer, String str) throws Exception {
        int i = 3145728;
        int i2 = 10;
        InputFileWriter inputFileWriter = file -> {
            return new KeyAndValueSchemas(TestWriteUtils.writeSimpleAvroFileWithCustomSize(file, i2, 0, i));
        };
        VPJValidator vPJValidator = (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            HashSet<String> hashSet = new HashSet(10);
            for (int i3 = 0; i3 < i2; i3++) {
                int i4 = (i / i2) * (i3 + 1);
                String num = Integer.toString(i3);
                hashSet.add(num);
                char[] cArr = new char[i4];
                Arrays.fill(cArr, Integer.toString(i3).charAt(0));
                String str2 = new String(cArr);
                Utf8 utf8 = new Utf8(str2);
                LOGGER.info("About to query key: {}", Integer.valueOf(i3));
                Utf8 utf82 = null;
                Integer num2 = 0;
                while (num2.intValue() < MAX_RETRY_ATTEMPTS) {
                    try {
                        utf82 = (Utf8) avroGenericStoreClient.get(num).get();
                        break;
                    } catch (VeniceClientException e) {
                        num2 = Integer.valueOf(num2.intValue() + 1);
                        if (num2.intValue() == MAX_RETRY_ATTEMPTS) {
                            throw e;
                        }
                        Thread.sleep(1000L);
                    }
                }
                Assert.assertNotNull(utf82, "Avro client returned null value for key: " + num + ".");
                LOGGER.info("Received value of size: {} for key: {}", Integer.valueOf(utf82.length()), num);
                Assert.assertEquals(utf82.toString().substring(0, 1), num, "Avro value does not begin with the expected prefix.");
                Assert.assertEquals(utf82.length(), i4, "Avro value does not have the expected size.");
                Assert.assertEquals(utf82, utf8, "The entire large value should be filled with the same char: " + num);
                String str3 = (String) avroGenericStoreClient2.get(num).get();
                Assert.assertNotNull(str3, "VSON client returned null value for key: " + num + ".");
                Assert.assertEquals(str3.substring(0, 1), num, "VSON value does not begin with the expected prefix.");
                Assert.assertEquals(str3.length(), i4, "VSON value does not have the expected size.");
                Assert.assertEquals(str3, str2, "The entire large value should be filled with the same char: " + num);
            }
            Map map = (Map) avroGenericStoreClient.batchGet(hashSet).get();
            Map map2 = (Map) avroGenericStoreClient2.batchGet(hashSet).get();
            for (String str4 : hashSet) {
                int parseInt = (i / i2) * (Integer.parseInt(str4) + 1);
                char[] cArr2 = new char[parseInt];
                Arrays.fill(cArr2, str4.charAt(0));
                String str5 = new String(cArr2);
                Utf8 utf83 = new Utf8(str5);
                Utf8 utf84 = (Utf8) map.get(str4);
                Assert.assertNotNull(utf84, "Avro client returned null value for key: " + str4 + ".");
                LOGGER.info("Received value of size: {} for key: {}", Integer.valueOf(utf84.length()), str4);
                Assert.assertEquals(utf84.toString().substring(0, 1), str4, "Avro value does not begin with the expected prefix.");
                Assert.assertEquals(utf84.length(), parseInt, "Avro value does not have the expected size.");
                Assert.assertEquals(utf84, utf83, "The entire large value should be filled with the same char: " + str4);
                String str6 = (String) map2.get(str4);
                Assert.assertNotNull(str6, "VSON client returned null value for key: " + str4 + ".");
                Assert.assertEquals(str6.substring(0, 1), str4, "VSON value does not begin with the expected prefix.");
                Assert.assertEquals(str6.length(), parseInt, "VSON value does not have the expected size.");
                Assert.assertEquals(str6, str5, "The entire large value should be filled with the same char: " + str4);
            }
        };
        return str == null ? testBatchStore(inputFileWriter, consumer, vPJValidator, new UpdateStoreQueryParams().setChunkingEnabled(z)) : testBatchStore(inputFileWriter, consumer, vPJValidator, str, new UpdateStoreQueryParams().setChunkingEnabled(z), false);
    }

    @Test(timeOut = 60000)
    public void testRunJobWithSchemaThatContainsUnknownField() throws Exception {
        testBatchStore(file -> {
            return new KeyAndValueSchemas(TestWriteUtils.writeSchemaWithUnknownFieldIntoAvroFile(file));
        }, properties -> {
        }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            GenericData.Record record = new GenericData.Record(AvroCompatibilityHelper.parse(new String[]{TestWriteUtils.loadFileAsString("SchemaWithoutSymbolDoc.avsc")}).getField("key").schema());
            Schema schema = record.getSchema().getField("source").schema();
            record.put("memberId", 1L);
            record.put("source", AvroCompatibilityHelper.newEnumSymbol(schema, TestWriteUtils.TestRecordType.OFFLINE.toString()));
            IndexedRecord indexedRecord = (IndexedRecord) avroGenericStoreClient.get(record).get();
            Assert.assertEquals(indexedRecord.get(0).toString(), "LOGO");
            Assert.assertEquals(indexedRecord.get(1), 1);
            GenericData.Record record2 = new GenericData.Record(AvroCompatibilityHelper.parse(new String[]{TestWriteUtils.loadFileAsString("SchemaWithSymbolDoc.avsc")}).getField("key").schema());
            record2.put("memberId", 2L);
            record2.put("source", AvroCompatibilityHelper.newEnumSymbol(schema, TestWriteUtils.TestRecordType.NEARLINE.toString()));
            IndexedRecord indexedRecord2 = (IndexedRecord) avroGenericStoreClient.get(record2).get();
            Assert.assertEquals(indexedRecord2.get(0).toString(), "INDUSTRY");
            Assert.assertEquals(indexedRecord2.get(1), 2);
        });
    }

    @Test(enabled = false)
    public void stressTestLargeMultiGet() throws Exception {
        int i = 800;
        int i2 = 100000;
        testBatchStore(file -> {
            return new KeyAndValueSchemas(TestWriteUtils.writeAvroFileWithManyFloatsAndCustomTotalSize(file, i2, i, i));
        }, properties -> {
        }, (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
            String storeName = avroGenericStoreClient.getStoreName();
            int i3 = 200;
            int i4 = 10;
            int i5 = 10 * 200;
            int i6 = 1000;
            MinLong minLong = new MinLong();
            MaxLong maxLong = new MaxLong();
            TotalLong totalLong = new TotalLong();
            MinLong minLong2 = new MinLong();
            MaxLong maxLong2 = new MaxLong();
            TotalLong totalLong2 = new TotalLong();
            CompletableFuture[] completableFutureArr = new CompletableFuture[10];
            long currentTimeMillis = System.currentTimeMillis();
            for (int i7 = 0; i7 < i5; i7++) {
                int i8 = i7;
                HashSet hashSet = new HashSet(1000);
                for (int i9 = 0; i9 < 1000; i9++) {
                    hashSet.add(String.valueOf(((i7 * 1000) + i9) % i2));
                }
                long nanoTime = System.nanoTime();
                completableFutureArr[i7 % 10] = avroGenericStoreClient.batchGet(hashSet).thenAccept(obj -> {
                    long nanoTime2 = System.nanoTime();
                    Assert.assertEquals(((Map) obj).size(), i6, "Not enough records returned!");
                    long j = nanoTime2 - nanoTime;
                    long j2 = j / 1000000;
                    LOGGER.info("Call #{}: {} ns ({} ms).", Integer.valueOf(i8), Long.valueOf(j), Long.valueOf(j2));
                    minLong.add(j2);
                    maxLong.add(j2);
                    totalLong.add(j2);
                    minLong2.add(j2);
                    maxLong2.add(j2);
                    totalLong2.add(j2);
                });
                if (i7 > 0 && i7 % 10 == 0) {
                    CompletableFuture.allOf(completableFutureArr).thenAccept(r11 -> {
                        LOGGER.info("Min query time: {} ms.", minLong);
                        LOGGER.info("Max query time: {} ms.", maxLong);
                        LOGGER.info("Average query time: {} ms.", Long.valueOf(totalLong.get() / i4));
                        minLong.reset();
                        maxLong.reset();
                        totalLong.reset();
                    }).get();
                }
            }
            CompletableFuture.allOf(completableFutureArr).thenAccept(r20 -> {
                double currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                LOGGER.info("Total query time: {} ms for {} total queries in {} batches of {} calls each.", Double.valueOf(currentTimeMillis2), Integer.valueOf(i5), Integer.valueOf(i3), Integer.valueOf(i4));
                LOGGER.info("Throughput (per test metrics): {} queries / sec", new DecimalFormat("0.0").format(i5 / (currentTimeMillis2 / 1000.0d)));
                LOGGER.info("Global min query time (per test metrics): {} ms.", minLong2);
                LOGGER.info("Global max query time (per test metrics): {} ms.", maxLong2);
                LOGGER.info("Global average query time (per test metrics): {} ms.", Long.valueOf(totalLong2.get() / i5));
                Map metrics = metricsRepository.metrics();
                String str = "." + storeName + "--" + RequestType.MULTI_GET.getMetricPrefix();
                Metric metric = (Metric) metrics.get(str + "request_serialization_time.Avg");
                Metric metric2 = (Metric) metrics.get(str + "request_submission_to_response_handling_time.Avg");
                Metric metric3 = (Metric) metrics.get(str + "response_deserialization_time.Avg");
                Metric metric4 = (Metric) metrics.get(str + "response_envelope_deserialization_time.Avg");
                Metric metric5 = (Metric) metrics.get(str + "response_records_deserialization_time.Avg");
                Metric metric6 = (Metric) metrics.get(str + "response_records_deserialization_submission_to_start_time.Avg");
                Metric metric7 = (Metric) metrics.get(str + "request_serialization_time.50thPercentile");
                Metric metric8 = (Metric) metrics.get(str + "request_submission_to_response_handling_time.50thPercentile");
                Metric metric9 = (Metric) metrics.get(str + "response_deserialization_time.50thPercentile");
                Metric metric10 = (Metric) metrics.get(str + "response_envelope_deserialization_time.50thPercentile");
                Metric metric11 = (Metric) metrics.get(str + "response_records_deserialization_time.50thPercentile");
                Metric metric12 = (Metric) metrics.get(str + "response_records_deserialization_submission_to_start_time.50thPercentile");
                Metric metric13 = (Metric) metrics.get(str + "request_serialization_time.99thPercentile");
                Metric metric14 = (Metric) metrics.get(str + "request_submission_to_response_handling_time.99thPercentile");
                Metric metric15 = (Metric) metrics.get(str + "response_deserialization_time.99thPercentile");
                Metric metric16 = (Metric) metrics.get(str + "response_envelope_deserialization_time.99thPercentile");
                Metric metric17 = (Metric) metrics.get(str + "response_records_deserialization_time.99thPercentile");
                Metric metric18 = (Metric) metrics.get(str + "response_records_deserialization_submission_to_start_time.99thPercentile");
                Metric metric19 = (Metric) metrics.get(str + "healthy_request_latency.50thPercentile");
                Metric metric20 = (Metric) metrics.get(str + "healthy_request_latency.90thPercentile");
                Metric metric21 = (Metric) metrics.get(str + "healthy_request_latency.99thPercentile");
                LOGGER.info("Request serialization time                       (Avg, p50, p99) : {} ms, \t{} ms, \t{} ms.", Double.valueOf(Utils.round(metric.value(), 1)), Double.valueOf(Utils.round(metric7.value(), 1)), Double.valueOf(Utils.round(metric13.value(), 1)));
                LOGGER.info("Request submission to response time              (Avg, p50, p99) : {} ms, \t{} ms, \t{} ms.", Double.valueOf(Utils.round(metric2.value(), 1)), Double.valueOf(Utils.round(metric8.value(), 1)), Double.valueOf(Utils.round(metric14.value(), 1)));
                LOGGER.info("Response deserialization time                    (Avg, p50, p99) : {} ms, \t{} ms, \t{} ms.", Double.valueOf(Utils.round(metric3.value(), 1)), Double.valueOf(Utils.round(metric9.value(), 1)), Double.valueOf(Utils.round(metric15.value(), 1)));
                LOGGER.info("Response envelope deserialization time           (Avg, p50, p99) : {} ms, \t{} ms, \t{} ms.", Double.valueOf(Utils.round(metric4.value(), 1)), Double.valueOf(Utils.round(metric10.value(), 1)), Double.valueOf(Utils.round(metric16.value(), 1)));
                LOGGER.info("Response records deserialization time            (Avg, p50, p99) : {} ms, \t{} ms, \t{} ms.", Double.valueOf(Utils.round(metric5.value(), 9)), Double.valueOf(Utils.round(metric11.value(), 9)), Double.valueOf(Utils.round(metric17.value(), 9)));
                LOGGER.info("Response records deserialization submission time (Avg, p50, p99) : {} ms, \t{} ms, \t{} ms.", Double.valueOf(Utils.round(metric6.value(), 9)), Double.valueOf(Utils.round(metric12.value(), 9)), Double.valueOf(Utils.round(metric18.value(), 9)));
                LOGGER.info("Latency                                          (p50, p90, p99) : {} ms, \t{} ms, \t{} ms.", Double.valueOf(Utils.round(metric19.value(), 1)), Double.valueOf(Utils.round(metric20.value(), 1)), Double.valueOf(Utils.round(metric21.value(), 1)));
            }).get();
        }, new UpdateStoreQueryParams().setBatchGetLimit(1000).setReadQuotaInCU(2147483647L));
    }

    @Test(timeOut = 60000)
    public void testKafkaInputBatchJobSucceedsWhenSourceTopicIsEmpty() throws Exception {
        VPJValidator vPJValidator = (avroGenericStoreClient, avroGenericStoreClient2, metricsRepository) -> {
        };
        String testBatchStore = testBatchStore(file -> {
            return new KeyAndValueSchemas(TestWriteUtils.writeEmptyAvroFileWithUserSchema(file));
        }, properties -> {
        }, vPJValidator);
        TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, () -> {
            Assert.assertEquals(this.veniceCluster.getRandomVeniceController().getVeniceAdmin().getCurrentVersion(this.veniceCluster.getClusterName(), testBatchStore), 1);
        });
        testBatchStore(file2 -> {
            return new KeyAndValueSchemas(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.NULL));
        }, properties2 -> {
            properties2.setProperty("source.kafka", "true");
            properties2.setProperty("kafka.input.topic", Version.composeKafkaTopic(testBatchStore, 1));
            properties2.setProperty("kafka.input.broker.url", this.veniceCluster.getKafka().getAddress());
            properties2.setProperty("kafka.input.max.records.per.mapper", "5");
        }, vPJValidator, testBatchStore, new UpdateStoreQueryParams());
        TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, () -> {
            Assert.assertEquals(this.veniceCluster.getRandomVeniceController().getVeniceAdmin().getCurrentVersion(this.veniceCluster.getClusterName(), testBatchStore), 2);
        });
    }
}
