package com.linkedin.venice.endToEnd;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.luben.zstd.Zstd;
import com.linkedin.davinci.kafka.consumer.KafkaConsumerService;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.CompressorFactory;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.controller.Admin;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.JobStatusQueryResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.exceptions.RecordTooLargeException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.HelixBaseRoutingRepository;
import com.linkedin.venice.helix.HelixExternalViewRepository;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceControllerCreateOptions;
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
import com.linkedin.venice.integration.utils.VeniceServerWrapper;
import com.linkedin.venice.integration.utils.ZkServerWrapper;
import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.kafka.protocol.GUID;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.LeaderMetadata;
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.BufferReplayPolicy;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.HybridStoreConfigImpl;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.InstanceStatus;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.meta.QueryAction;
import com.linkedin.venice.meta.StoreStatus;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.ZKStore;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.pushmonitor.PartitionStatus;
import com.linkedin.venice.pushmonitor.ReplicaStatus;
import com.linkedin.venice.samza.SamzaExitMode;
import com.linkedin.venice.samza.VeniceSystemFactory;
import com.linkedin.venice.samza.VeniceSystemProducer;
import com.linkedin.venice.serializer.AvroGenericDeserializer;
import com.linkedin.venice.serializer.AvroSerializer;
import com.linkedin.venice.systemstore.schemas.StoreProperties;
import com.linkedin.venice.utils.AvroRecordUtils;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.writer.CompletableFutureCallback;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.Schema;
import org.apache.avro.util.Utf8;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.samza.config.MapConfig;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.SystemProducer;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/endToEnd/TestHybrid.class */
public class TestHybrid {
    private static final Logger LOGGER = LogManager.getLogger(TestHybrid.class);
    public static final int STREAMING_RECORD_SIZE = 1024;
    private static final long MIN_COMPACTION_LAG = 86400000;
    private VeniceClusterWrapper sharedVenice;
    private VeniceClusterWrapper ingestionIsolationEnabledSharedVenice;

    @BeforeClass(alwaysRun = true)
    public void setUp() {
        this.sharedVenice = setUpCluster(false, false);
        this.ingestionIsolationEnabledSharedVenice = setUpCluster(true, false);
    }

    @AfterClass(alwaysRun = true)
    public void cleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.sharedVenice});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.ingestionIsolationEnabledSharedVenice});
    }

    @Test(timeOut = 180000)
    public void testHybridInitializationOnMultiColo() throws IOException {
        Properties properties = new Properties();
        properties.setProperty("server.promotion.to.leader.replica.delay.seconds", Long.toString(3L));
        properties.setProperty("rocksdb.plain.table.format.enabled", "false");
        properties.setProperty("server.database.checksum.verification.enabled", "true");
        properties.setProperty("server.database.sync.bytes.interval.for.deferred.write.mode", "300");
        VeniceClusterWrapper veniceCluster = ServiceFactory.getVeniceCluster(1, 2, 1, 1, VeniceClusterWrapper.NUM_RECORDS, false, false, properties);
        try {
            ZkServerWrapper zkServer = ServiceFactory.getZkServer();
            try {
                VeniceControllerWrapper veniceController = ServiceFactory.getVeniceController(new VeniceControllerCreateOptions.Builder(veniceCluster.getClusterName(), zkServer, veniceCluster.getKafka()).childControllers(new VeniceControllerWrapper[]{veniceCluster.getLeaderVeniceController()}).build());
                try {
                    ControllerClient controllerClient = new ControllerClient(veniceCluster.getClusterName(), veniceController.getControllerUrl());
                    try {
                        TopicManager topicManager = IntegrationTestPushUtils.getTopicManagerRepo(30000L, 100L, 0L, veniceCluster.getKafka().getAddress(), veniceCluster.getPubSubTopicRepository()).getTopicManager();
                        try {
                            String uniqueString = Utils.getUniqueString("multi-colo-hybrid-store");
                            controllerClient.createNewStore(uniqueString, "owner", "\"string\"", "\"string\"");
                            controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setStorageQuotaInByte(-1L).setHybridRewindSeconds(25L).setHybridOffsetLagThreshold(2L));
                            HybridStoreConfigImpl hybridStoreConfigImpl = new HybridStoreConfigImpl(25L, 2L, -1L, DataReplicationPolicy.NON_AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP);
                            Assert.assertEquals(controllerClient.getStore(uniqueString).getStore().getCurrentVersion(), 0, "The newly created store must have a current version of 0");
                            int version = controllerClient.emptyPush(uniqueString, Utils.getUniqueString("empty-hybrid-push"), 1L).getVersion();
                            Assert.assertNotEquals(Integer.valueOf(version), 0, "requesting a topic for a push should provide a non zero version number");
                            TestUtils.waitForNonDeterministicAssertion(100L, TimeUnit.SECONDS, true, () -> {
                                JobStatusQueryResponse queryJobStatus = controllerClient.queryJobStatus(Version.composeKafkaTopic(uniqueString, version));
                                Assert.assertFalse(queryJobStatus.isError(), "Error in getting JobStatusResponse: " + queryJobStatus.getError());
                                Assert.assertEquals(queryJobStatus.getStatus(), "COMPLETED");
                            });
                            Assert.assertTrue(topicManager.containsTopicAndAllPartitionsAreOnline(this.sharedVenice.getPubSubTopicRepository().getTopic(Version.composeRealTimeTopic(uniqueString))));
                            StoreProperties prefillAvroRecordWithDefaultValue = AvroRecordUtils.prefillAvroRecordWithDefaultValue(new StoreProperties());
                            prefillAvroRecordWithDefaultValue.name = uniqueString;
                            prefillAvroRecordWithDefaultValue.owner = "owner";
                            prefillAvroRecordWithDefaultValue.createdTime = System.currentTimeMillis();
                            ZKStore zKStore = new ZKStore(prefillAvroRecordWithDefaultValue);
                            Assert.assertEquals(topicManager.getTopicRetention(this.sharedVenice.getPubSubTopicRepository().getTopic(Version.composeRealTimeTopic(uniqueString))), TopicManager.getExpectedRetentionTimeInMs(zKStore, hybridStoreConfigImpl), "RT retention not configured properly");
                            hybridStoreConfigImpl.setRewindTimeInSeconds(600L);
                            controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setHybridRewindSeconds(600L));
                            Assert.assertEquals(topicManager.getTopicRetention(this.sharedVenice.getPubSubTopicRepository().getTopic(Version.composeRealTimeTopic(uniqueString))), TopicManager.getExpectedRetentionTimeInMs(zKStore, hybridStoreConfigImpl), "RT retention not updated properly");
                            if (topicManager != null) {
                                topicManager.close();
                            }
                            controllerClient.close();
                            if (veniceController != null) {
                                veniceController.close();
                            }
                            if (zkServer != null) {
                                zkServer.close();
                            }
                            if (veniceCluster != null) {
                                veniceCluster.close();
                            }
                        } catch (Throwable th) {
                            if (topicManager != null) {
                                try {
                                    topicManager.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    } catch (Throwable th3) {
                        try {
                            controllerClient.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                        throw th3;
                    }
                } catch (Throwable th5) {
                    if (veniceController != null) {
                        try {
                            veniceController.close();
                        } catch (Throwable th6) {
                            th5.addSuppressed(th6);
                        }
                    }
                    throw th5;
                }
            } catch (Throwable th7) {
                if (zkServer != null) {
                    try {
                        zkServer.close();
                    } catch (Throwable th8) {
                        th7.addSuppressed(th8);
                    }
                }
                throw th7;
            }
        } catch (Throwable th9) {
            if (veniceCluster != null) {
                try {
                    veniceCluster.close();
                } catch (Throwable th10) {
                    th9.addSuppressed(th10);
                }
            }
            throw th9;
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "testPermutations", parallel = false)
    public static Object[][] testPermutations() {
        return new Object[]{new Object[]{false, false, BufferReplayPolicy.REWIND_FROM_EOP}, new Object[]{false, true, BufferReplayPolicy.REWIND_FROM_EOP}, new Object[]{true, false, BufferReplayPolicy.REWIND_FROM_EOP}, new Object[]{true, true, BufferReplayPolicy.REWIND_FROM_EOP}, new Object[]{false, false, BufferReplayPolicy.REWIND_FROM_SOP}, new Object[]{false, true, BufferReplayPolicy.REWIND_FROM_SOP}, new Object[]{true, false, BufferReplayPolicy.REWIND_FROM_SOP}, new Object[]{true, true, BufferReplayPolicy.REWIND_FROM_SOP}};
    }

    @Test(dataProvider = "testPermutations", timeOut = 180000, groups = {"flaky"})
    public void testHybridEndToEnd(boolean z, boolean z2, BufferReplayPolicy bufferReplayPolicy) throws Exception {
        LOGGER.info("About to create VeniceClusterWrapper");
        Properties properties = new Properties();
        properties.setProperty("server.promotion.to.leader.replica.delay.seconds", Long.toString(3L));
        if (z2) {
            properties.setProperty("venice.writer.max.size.for.user.payload.per.message.in.bytes", Integer.toString(512));
        }
        SystemProducer systemProducer = null;
        VeniceClusterWrapper veniceClusterWrapper = this.sharedVenice;
        try {
            LOGGER.info("Finished creating VeniceClusterWrapper");
            String uniqueString = Utils.getUniqueString("hybrid-store");
            File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
            String str = "file://" + tempDataDirectory.getAbsolutePath();
            Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
            Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(veniceClusterWrapper, str, uniqueString);
            ControllerClient createStoreForJob = IntegrationTestPushUtils.createStoreForJob(veniceClusterWrapper.getClusterName(), writeSimpleAvroFileWithUserSchema, defaultVPJProps);
            try {
                AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(veniceClusterWrapper.getRandomRouterURL()));
                try {
                    TopicManager topicManager = IntegrationTestPushUtils.getTopicManagerRepo(30000L, 100L, 0L, veniceClusterWrapper.getKafka().getAddress(), this.sharedVenice.getPubSubTopicRepository()).getTopicManager();
                    try {
                        Cache cache = (Cache) Mockito.mock(Cache.class);
                        Mockito.when(cache.getIfPresent(Mockito.any())).thenReturn((Object) null);
                        topicManager.setTopicConfigCache(cache);
                        Assert.assertFalse(createStoreForJob.updateStore(uniqueString, new UpdateStoreQueryParams().setHybridRewindSeconds(10L).setHybridOffsetLagThreshold(2L).setChunkingEnabled(z2).setHybridBufferReplayPolicy(bufferReplayPolicy)).isError());
                        IntegrationTestPushUtils.runVPJ(defaultVPJProps, 1, createStoreForJob);
                        PubSubTopic topic = this.sharedVenice.getPubSubTopicRepository().getTopic(Version.composeKafkaTopic(uniqueString, 1));
                        Assert.assertTrue(topicManager.isTopicCompactionEnabled(topic), "topic: " + topic + " should have compaction enabled");
                        Assert.assertEquals(topicManager.getTopicMinLogCompactionLagMs(topic), MIN_COMPACTION_LAG, "topic:" + topic + " should have min compaction lag config set to " + MIN_COMPACTION_LAG);
                        TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, () -> {
                            for (int i = 1; i < 100; i++) {
                                try {
                                    String num = Integer.toString(i);
                                    Object obj = andStartGenericAvroClient.get(num).get();
                                    Assert.assertNotNull(obj, "Key " + i + " should not be missing!");
                                    Assert.assertEquals(obj.toString(), "test_name_" + num);
                                } catch (Exception e) {
                                    throw new VeniceException(e);
                                }
                            }
                        });
                        SystemProducer samzaProducer = IntegrationTestPushUtils.getSamzaProducer(veniceClusterWrapper, uniqueString, Version.PushType.STREAM, new Pair[0]);
                        for (int i = 1; i <= 10; i++) {
                            IntegrationTestPushUtils.sendCustomSizeStreamingRecord(samzaProducer, uniqueString, i, 1024);
                        }
                        if (z) {
                            samzaProducer.stop();
                        }
                        TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, () -> {
                            try {
                                checkLargeRecord(andStartGenericAvroClient, 2);
                            } catch (Exception e) {
                                throw new VeniceException(e);
                            }
                        });
                        IntegrationTestPushUtils.runVPJ(defaultVPJProps, 2, createStoreForJob);
                        PubSubTopic topic2 = this.sharedVenice.getPubSubTopicRepository().getTopic(Version.composeKafkaTopic(uniqueString, 2));
                        Assert.assertTrue(topicManager.isTopicCompactionEnabled(topic2), "topic: " + topic2 + " should have compaction enabled");
                        Assert.assertEquals(topicManager.getTopicMinLogCompactionLagMs(topic2), MIN_COMPACTION_LAG, "topic:" + topic2 + " should have min compaction lag config set to " + MIN_COMPACTION_LAG);
                        checkLargeRecord(andStartGenericAvroClient, 2);
                        Assert.assertEquals(andStartGenericAvroClient.get("19").get().toString(), "test_name_19");
                        LOGGER.info("***** Sleeping to get outside of rewind time: {} seconds", 10L);
                        Utils.sleep(TimeUnit.MILLISECONDS.convert(10L, TimeUnit.SECONDS));
                        if (z) {
                            samzaProducer = IntegrationTestPushUtils.getSamzaProducer(veniceClusterWrapper, uniqueString, Version.PushType.STREAM, new Pair[0]);
                        }
                        for (int i2 = 10; i2 <= 20; i2++) {
                            IntegrationTestPushUtils.sendCustomSizeStreamingRecord(samzaProducer, uniqueString, i2, 1024);
                        }
                        TestUtils.waitForNonDeterministicAssertion(15L, TimeUnit.SECONDS, () -> {
                            try {
                                checkLargeRecord(andStartGenericAvroClient, 19);
                            } catch (Exception e) {
                                throw new VeniceException(e);
                            }
                        });
                        IntegrationTestPushUtils.runVPJ(defaultVPJProps, 3, createStoreForJob);
                        PubSubTopic topic3 = this.sharedVenice.getPubSubTopicRepository().getTopic(Version.composeKafkaTopic(uniqueString, 3));
                        Assert.assertTrue(topicManager.isTopicCompactionEnabled(topic3), "topic: " + topic3 + " should have compaction enabled");
                        TestUtils.waitForNonDeterministicAssertion(15L, TimeUnit.SECONDS, () -> {
                            try {
                                checkLargeRecord(andStartGenericAvroClient, 19);
                            } catch (Exception e) {
                                throw new VeniceException(e);
                            }
                        });
                        Assert.assertEquals(andStartGenericAvroClient.get("2").get().toString(), "test_name_2");
                        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                            StoreResponse store = createStoreForJob.getStore(uniqueString);
                            Assert.assertFalse(store.isError());
                            List list = (List) store.getStore().getVersions().stream().map((v0) -> {
                                return v0.getNumber();
                            }).collect(Collectors.toList());
                            Assert.assertFalse(list.contains(1), "After version 3 comes online, version 1 should be retired");
                            Assert.assertTrue(list.contains(2));
                            Assert.assertTrue(list.contains(3));
                        });
                        createStoreForJob.listInstancesStatuses(false).getInstancesStatusMap().keySet().forEach(str2 -> {
                            LOGGER.info("Replicas for {}: {}", str2, Arrays.toString(createStoreForJob.listStorageNodeReplicas(str2).getReplicas()));
                        });
                        int port = veniceClusterWrapper.getVeniceServers().get(0).getPort();
                        veniceClusterWrapper.stopVeniceServer(port);
                        TestUtils.waitForNonDeterministicAssertion(15L, TimeUnit.SECONDS, true, true, () -> {
                            Assert.assertEquals(StoreStatus.UNDER_REPLICATED.toString(), (String) createStoreForJob.listStoresStatuses().getStoreStatusMap().get(uniqueString), "Should be UNDER_REPLICATED");
                            Assert.assertTrue(createStoreForJob.listInstancesStatuses(false).getInstancesStatusMap().entrySet().stream().filter(entry -> {
                                return ((String) entry.getKey()).contains(Integer.toString(port));
                            }).map((v0) -> {
                                return v0.getValue();
                            }).allMatch(str3 -> {
                                return str3.equals(InstanceStatus.DISCONNECTED.toString());
                            }), "Storage Node on port " + port + " should be DISCONNECTED");
                        });
                        veniceClusterWrapper.restartVeniceServer(port);
                        TestUtils.waitForNonDeterministicAssertion(15L, TimeUnit.SECONDS, true, true, () -> {
                            Assert.assertEquals(StoreStatus.FULLLY_REPLICATED.toString(), (String) createStoreForJob.listStoresStatuses().getStoreStatusMap().get(uniqueString), "Should be FULLLY_REPLICATED");
                        });
                        if (topicManager != null) {
                            topicManager.close();
                        }
                        if (andStartGenericAvroClient != null) {
                            andStartGenericAvroClient.close();
                        }
                        if (createStoreForJob != null) {
                            createStoreForJob.close();
                        }
                        if (samzaProducer != null) {
                            samzaProducer.stop();
                        }
                    } catch (Throwable th) {
                        if (topicManager != null) {
                            try {
                                topicManager.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    if (andStartGenericAvroClient != null) {
                        try {
                            andStartGenericAvroClient.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        } catch (Throwable th5) {
            if (0 != 0) {
                systemProducer.stop();
            }
            throw th5;
        }
    }

    private static VeniceCompressor getVeniceCompressor(CompressionStrategy compressionStrategy, String str, int i, VeniceClusterWrapper veniceClusterWrapper, CloseableHttpAsyncClient closeableHttpAsyncClient) throws IOException, ExecutionException, InterruptedException {
        CompressorFactory compressorFactory = new CompressorFactory();
        if (!compressionStrategy.equals(CompressionStrategy.ZSTD_WITH_DICT)) {
            return compressorFactory.getCompressor(compressionStrategy);
        }
        try {
            InputStream content = ((HttpResponse) closeableHttpAsyncClient.execute(new HttpGet("http://" + veniceClusterWrapper.getVeniceServers().get(0).getAddress() + "/" + QueryAction.DICTIONARY.toString().toLowerCase() + "/" + str + "/" + i), (FutureCallback) null).get()).getEntity().getContent();
            try {
                VeniceCompressor createCompressorWithDictionary = compressorFactory.createCompressorWithDictionary(IOUtils.toByteArray(content), Zstd.maxCompressionLevel());
                if (content != null) {
                    content.close();
                }
                return createCompressorWithDictionary;
            } finally {
            }
        } catch (InterruptedException | ExecutionException e) {
            throw e;
        }
    }

    private void checkLargeRecord(AvroGenericStoreClient avroGenericStoreClient, int i) throws ExecutionException, InterruptedException {
        String num = Integer.toString(i);
        String obj = avroGenericStoreClient.get(num).get().toString();
        Assert.assertEquals(obj.length(), 1024, "Expected a large record for key '" + num + "' but instead got: '" + obj + "'.");
        String substring = Integer.toString(i).substring(0, 1);
        for (int i2 = 0; i2 < obj.length(); i2++) {
            Assert.assertEquals(obj.substring(i2, i2 + 1), substring);
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test(timeOut = 180000, enabled = false)
    public void testSamzaBatchLoad() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("persistence.type", PersistenceType.ROCKS_DB.name());
        properties.setProperty("rocksdb.plain.table.format.enabled", "false");
        properties.setProperty("server.promotion.to.leader.replica.delay.seconds", Long.toString(1L));
        SystemProducer systemProducer = null;
        VeniceClusterWrapper veniceCluster = ServiceFactory.getVeniceCluster(1, 3, 1, 2, VeniceClusterWrapper.NUM_RECORDS, false, false, properties);
        try {
            try {
                Admin veniceAdmin = veniceCluster.getLeaderVeniceController().getVeniceAdmin();
                String clusterName = veniceCluster.getClusterName();
                String uniqueString = Utils.getUniqueString("test-store");
                veniceAdmin.createStore(clusterName, uniqueString, "tester", "\"string\"", "\"string\"");
                veniceAdmin.updateStore(clusterName, uniqueString, new UpdateStoreQueryParams().setPartitionCount(1).setHybridRewindSeconds(25L).setHybridOffsetLagThreshold(2L).setChunkingEnabled(true));
                TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, true, () -> {
                    Assert.assertFalse(veniceAdmin.getStore(clusterName, uniqueString).containsVersion(1));
                    Assert.assertEquals(veniceAdmin.getStore(clusterName, uniqueString).getCurrentVersion(), 0);
                });
                VeniceSystemFactory veniceSystemFactory = new VeniceSystemFactory();
                VeniceSystemProducer producer = veniceSystemFactory.getProducer("venice", new MapConfig(IntegrationTestPushUtils.getSamzaProducerConfig(veniceCluster, uniqueString, Version.PushType.STREAM_REPROCESSING)), (MetricsRegistry) null);
                producer.start();
                if (producer instanceof VeniceSystemProducer) {
                    producer.setExitMode(SamzaExitMode.NO_OP);
                }
                for (int i = 10; i >= 1; i--) {
                    IntegrationTestPushUtils.sendStreamingRecord(producer, uniqueString, i);
                }
                TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, true, () -> {
                    Assert.assertTrue(veniceAdmin.getStore(clusterName, uniqueString).containsVersion(1));
                    Assert.assertEquals(veniceAdmin.getStore(clusterName, uniqueString).getCurrentVersion(), 0);
                });
                Utils.sleep(TimeUnit.SECONDS.toMillis(Integer.parseInt(properties.getProperty("server.promotion.to.leader.replica.delay.seconds")) + 2));
                String composeKafkaTopic = Version.composeKafkaTopic(uniqueString, 1);
                HelixBaseRoutingRepository routingDataRepository = veniceCluster.getRandomVeniceRouter().getRoutingDataRepository();
                TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, true, () -> {
                    Assert.assertNotNull(routingDataRepository.getLeaderInstance(composeKafkaTopic, 0));
                });
                Instance leaderInstance = routingDataRepository.getLeaderInstance(composeKafkaTopic, 0);
                veniceCluster.stopVeniceServer(leaderInstance.getPort());
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                    Instance leaderInstance2 = routingDataRepository.getLeaderInstance(composeKafkaTopic, 0);
                    Assert.assertNotNull(leaderInstance2);
                    Assert.assertNotEquals(Integer.valueOf(leaderInstance.getPort()), Integer.valueOf(leaderInstance2.getPort()));
                    Assert.assertTrue(routingDataRepository.getPartitionAssignments(composeKafkaTopic).getPartition(0).getWorkingInstances().size() == 2);
                });
                for (int i2 = 21; i2 <= 30; i2++) {
                    IntegrationTestPushUtils.sendCustomSizeStreamingRecord(producer, uniqueString, i2, 1048576);
                }
                for (int i3 = 31; i3 <= 40; i3++) {
                    IntegrationTestPushUtils.sendStreamingRecord(producer, uniqueString, i3);
                }
                Assert.assertEquals(veniceSystemFactory.getNumberOfActiveSystemProducers(), 1);
                producer.getInternalProducer().broadcastEndOfPush(new HashMap());
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                    Assert.assertTrue(veniceAdmin.getStore(clusterName, uniqueString).containsVersion(1));
                    Assert.assertEquals(veniceAdmin.getStore(clusterName, uniqueString).getCurrentVersion(), 1);
                    Assert.assertEquals(veniceSystemFactory.getNumberOfActiveSystemProducers(), 0);
                });
                SystemProducer samzaProducer = IntegrationTestPushUtils.getSamzaProducer(veniceCluster, uniqueString, Version.PushType.STREAM, new Pair[0]);
                try {
                    AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(veniceCluster.getRandomRouterURL()));
                    try {
                        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                            for (int i4 = 1; i4 <= 10; i4++) {
                                String num = Integer.toString(i4);
                                Object obj = andStartGenericAvroClient.get(num).get();
                                Assert.assertNotNull(obj);
                                Assert.assertEquals(obj.toString(), "stream_" + num);
                            }
                        });
                        Assert.assertNull(andStartGenericAvroClient.get(Integer.toString(11)).get(), "This record should not be found");
                        for (int i4 = 21; i4 <= 30; i4++) {
                            Assert.assertNotNull(andStartGenericAvroClient.get(Integer.toString(i4)).get(), "Key " + i4 + " should not be missing!");
                        }
                        for (int i5 = 31; i5 <= 40; i5++) {
                            String num = Integer.toString(i5);
                            Assert.assertEquals(andStartGenericAvroClient.get(num).get().toString(), "stream_" + num);
                        }
                        samzaProducer.start();
                        for (int i6 = 11; i6 <= 20; i6++) {
                            IntegrationTestPushUtils.sendStreamingRecord(samzaProducer, uniqueString, i6);
                        }
                        Assert.assertThrows(RecordTooLargeException.class, () -> {
                            IntegrationTestPushUtils.sendCustomSizeStreamingRecord(samzaProducer, uniqueString, 0, 1048576);
                        });
                        Assert.assertTrue(veniceAdmin.getStore(clusterName, uniqueString).containsVersion(1));
                        Assert.assertFalse(veniceAdmin.getStore(clusterName, uniqueString).containsVersion(2));
                        Assert.assertEquals(veniceAdmin.getStore(clusterName, uniqueString).getCurrentVersion(), 1);
                        long millis = TimeUnit.SECONDS.toMillis(Long.parseLong(properties.getProperty("server.promotion.to.leader.replica.delay.seconds")));
                        long millis2 = TimeUnit.SECONDS.toMillis(3L);
                        LOGGER.info("normalTimeForConsuming: {} ms; extraWaitTime: {} ms", Long.valueOf(millis2), Long.valueOf(millis));
                        Utils.sleep(millis2 + millis);
                        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, false, true, () -> {
                            for (int i7 = 1; i7 < 20; i7++) {
                                String num2 = Integer.toString(i7);
                                Assert.assertEquals(andStartGenericAvroClient.get(num2).get().toString(), "stream_" + num2);
                            }
                            Assert.assertNull(andStartGenericAvroClient.get(Integer.toString(41)).get(), "This record should not be found");
                        });
                        if (andStartGenericAvroClient != null) {
                            andStartGenericAvroClient.close();
                        }
                        samzaProducer.stop();
                        if (producer != null) {
                            producer.stop();
                        }
                        if (veniceCluster != null) {
                            veniceCluster.close();
                        }
                    } catch (Throwable th) {
                        if (andStartGenericAvroClient != null) {
                            try {
                                andStartGenericAvroClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    samzaProducer.stop();
                    throw th3;
                }
            } catch (Throwable th4) {
                if (0 != 0) {
                    systemProducer.stop();
                }
                throw th4;
            }
        } catch (Throwable th5) {
            if (veniceCluster != null) {
                try {
                    veniceCluster.close();
                } catch (Throwable th6) {
                    th5.addSuppressed(th6);
                }
            }
            throw th5;
        }
    }

    @Test(timeOut = 180000, enabled = false)
    public void testMultiStreamReprocessingSystemProducers() {
        VeniceSystemProducer veniceSystemProducer = null;
        VeniceSystemProducer veniceSystemProducer2 = null;
        try {
            VeniceClusterWrapper veniceClusterWrapper = this.sharedVenice;
            Admin veniceAdmin = veniceClusterWrapper.getLeaderVeniceController().getVeniceAdmin();
            String clusterName = veniceClusterWrapper.getClusterName();
            String uniqueString = Utils.getUniqueString("test-store1");
            String uniqueString2 = Utils.getUniqueString("test-store2");
            veniceAdmin.createStore(clusterName, uniqueString, "tester", "\"string\"", "\"string\"");
            veniceAdmin.createStore(clusterName, uniqueString2, "tester", "\"string\"", "\"string\"");
            UpdateStoreQueryParams hybridOffsetLagThreshold = new UpdateStoreQueryParams().setHybridRewindSeconds(25L).setHybridOffsetLagThreshold(2L);
            veniceAdmin.updateStore(clusterName, uniqueString, hybridOffsetLagThreshold);
            veniceAdmin.updateStore(clusterName, uniqueString2, hybridOffsetLagThreshold);
            veniceClusterWrapper.getVeniceRouters().get(0).refresh();
            Assert.assertFalse(veniceAdmin.getStore(clusterName, uniqueString).containsVersion(1));
            Assert.assertEquals(veniceAdmin.getStore(clusterName, uniqueString).getCurrentVersion(), 0);
            Assert.assertFalse(veniceAdmin.getStore(clusterName, uniqueString2).containsVersion(1));
            Assert.assertEquals(veniceAdmin.getStore(clusterName, uniqueString2).getCurrentVersion(), 0);
            VeniceSystemFactory veniceSystemFactory = new VeniceSystemFactory();
            veniceSystemProducer = veniceSystemFactory.getProducer("venice", new MapConfig(IntegrationTestPushUtils.getSamzaProducerConfig(veniceClusterWrapper, uniqueString, Version.PushType.STREAM_REPROCESSING)), (MetricsRegistry) null);
            veniceSystemProducer.start();
            veniceSystemProducer2 = veniceSystemFactory.getProducer("venice", new MapConfig(IntegrationTestPushUtils.getSamzaProducerConfig(veniceClusterWrapper, uniqueString2, Version.PushType.STREAM_REPROCESSING)), (MetricsRegistry) null);
            veniceSystemProducer2.start();
            if (veniceSystemProducer instanceof VeniceSystemProducer) {
                veniceSystemProducer.setExitMode(SamzaExitMode.NO_OP);
            }
            if (veniceSystemProducer2 instanceof VeniceSystemProducer) {
                veniceSystemProducer2.setExitMode(SamzaExitMode.NO_OP);
            }
            for (int i = 10; i >= 1; i--) {
                IntegrationTestPushUtils.sendStreamingRecord(veniceSystemProducer, uniqueString, i);
                IntegrationTestPushUtils.sendStreamingRecord(veniceSystemProducer2, uniqueString2, i);
            }
            Assert.assertEquals(veniceSystemFactory.getNumberOfActiveSystemProducers(), 2);
            Utils.sleep(500L);
            veniceClusterWrapper.useControllerClient(controllerClient -> {
                controllerClient.writeEndOfPush(uniqueString, 1);
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                    Assert.assertTrue(veniceAdmin.getStore(clusterName, uniqueString).containsVersion(1));
                    Assert.assertEquals(veniceAdmin.getStore(clusterName, uniqueString).getCurrentVersion(), 1);
                    Assert.assertEquals(veniceSystemFactory.getNumberOfActiveSystemProducers(), 1);
                });
                controllerClient.writeEndOfPush(uniqueString2, 1);
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                    Assert.assertTrue(veniceAdmin.getStore(clusterName, uniqueString2).containsVersion(1));
                    Assert.assertEquals(veniceAdmin.getStore(clusterName, uniqueString2).getCurrentVersion(), 1);
                    Assert.assertEquals(veniceSystemFactory.getNumberOfActiveSystemProducers(), 0);
                });
            });
            if (veniceSystemProducer != null) {
                veniceSystemProducer.stop();
            }
            if (veniceSystemProducer2 != null) {
                veniceSystemProducer2.stop();
            }
        } catch (Throwable th) {
            if (veniceSystemProducer != null) {
                veniceSystemProducer.stop();
            }
            if (veniceSystemProducer2 != null) {
                veniceSystemProducer2.stop();
            }
            throw th;
        }
    }

    @Test(timeOut = 180000)
    public void testLeaderHonorLastTopicSwitchMessage() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("server.promotion.to.leader.replica.delay.seconds", Long.toString(10L));
        VeniceClusterWrapper veniceCluster = ServiceFactory.getVeniceCluster(1, 2, 1, 2, VeniceClusterWrapper.NUM_RECORDS, false, false, properties);
        try {
            ControllerClient controllerClient = new ControllerClient(veniceCluster.getClusterName(), veniceCluster.getAllControllersURLs());
            try {
                String uniqueString = Utils.getUniqueString("hybrid-store");
                controllerClient.createNewStore(uniqueString, "owner", "\"string\"", "\"string\"");
                controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setStorageQuotaInByte(-1L).setHybridRewindSeconds(25L).setHybridOffsetLagThreshold(2L));
                VersionCreationResponse assertCommand = TestUtils.assertCommand(controllerClient.emptyPush(uniqueString, Utils.getUniqueString("empty-hybrid-push"), 1L));
                Assert.assertEquals(assertCommand.getVersion(), 1, "Version number should become 1 after an empty-push");
                int partitions = assertCommand.getPartitions();
                PubSubTopic topic = this.sharedVenice.getPubSubTopicRepository().getTopic(uniqueString + "_tmp1_rt");
                PubSubTopic topic2 = this.sharedVenice.getPubSubTopicRepository().getTopic(uniqueString + "_tmp2_rt");
                TopicManager topicManager = veniceCluster.getLeaderVeniceController().getVeniceAdmin().getTopicManager();
                topicManager.createTopic(topic, partitions, 1, true);
                topicManager.createTopic(topic2, partitions, 1, true);
                Properties properties2 = new Properties();
                properties2.put("kafka.bootstrap.servers", veniceCluster.getKafka().getAddress());
                AvroSerializer avroSerializer = new AvroSerializer(Schema.parse("\"string\""));
                VeniceWriter createVeniceWriter = TestUtils.getVeniceWriterFactory(properties2).createVeniceWriter(new VeniceWriterOptions.Builder(topic.getName()).build());
                for (int i = 0; i < 10; i++) {
                    try {
                        createVeniceWriter.put(avroSerializer.serialize("key_" + i), avroSerializer.serialize("value_" + i), 1);
                    } finally {
                    }
                }
                if (createVeniceWriter != null) {
                    createVeniceWriter.close();
                }
                createVeniceWriter = TestUtils.getVeniceWriterFactory(properties2).createVeniceWriter(new VeniceWriterOptions.Builder(topic2.getName()).build());
                for (int i2 = 10; i2 < 20; i2++) {
                    try {
                        createVeniceWriter.put(avroSerializer.serialize("key_" + i2), avroSerializer.serialize("value_" + i2), 1);
                    } finally {
                    }
                }
                if (createVeniceWriter != null) {
                    createVeniceWriter.close();
                }
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                    Assert.assertEquals(TestUtils.assertCommand(controllerClient.getStore(uniqueString)).getStore().getCurrentVersion(), 1);
                });
                AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(veniceCluster.getRandomRouterURL()));
                try {
                    createVeniceWriter = TestUtils.getVeniceWriterFactory(properties2).createVeniceWriter(new VeniceWriterOptions.Builder(Version.composeRealTimeTopic(uniqueString)).build());
                    try {
                        createVeniceWriter.broadcastTopicSwitch(Collections.singletonList(veniceCluster.getKafka().getAddress()), topic.getName(), -1L, (Map) null);
                        createVeniceWriter.broadcastTopicSwitch(Collections.singletonList(veniceCluster.getKafka().getAddress()), topic2.getName(), -1L, (Map) null);
                        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                            for (int i3 = 10; i3 < 20; i3++) {
                                try {
                                    Assert.assertEquals(andStartGenericAvroClient.get("key_" + i3).get(), new Utf8("value_" + i3));
                                } catch (Exception e) {
                                    LOGGER.error("Caught exception in client.get()", e);
                                    Assert.fail(e.getMessage());
                                }
                            }
                            for (int i4 = 0; i4 < 10; i4++) {
                                try {
                                    Assert.assertNull(andStartGenericAvroClient.get("key_" + i4).get());
                                } catch (Exception e2) {
                                    LOGGER.error("Caught exception in client.get()", e2);
                                    Assert.fail(e2.getMessage());
                                    return;
                                }
                            }
                        });
                        if (createVeniceWriter != null) {
                            createVeniceWriter.close();
                        }
                        if (andStartGenericAvroClient != null) {
                            andStartGenericAvroClient.close();
                        }
                        controllerClient.close();
                        if (veniceCluster != null) {
                            veniceCluster.close();
                        }
                    } finally {
                        if (createVeniceWriter != null) {
                            try {
                                createVeniceWriter.close();
                            } catch (Throwable th) {
                                th.addSuppressed(th);
                            }
                        }
                    }
                } catch (Throwable th2) {
                    if (andStartGenericAvroClient != null) {
                        try {
                            andStartGenericAvroClient.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    }
                    throw th2;
                }
            } finally {
            }
        } catch (Throwable th4) {
            if (veniceCluster != null) {
                try {
                    veniceCluster.close();
                } catch (Throwable th5) {
                    th4.addSuppressed(th5);
                }
            }
            throw th4;
        }
    }

    @Test(timeOut = 180000, dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testLeaderCanReleaseLatch(boolean z) {
        VeniceClusterWrapper veniceClusterWrapper = z ? this.ingestionIsolationEnabledSharedVenice : this.sharedVenice;
        Admin veniceAdmin = veniceClusterWrapper.getLeaderVeniceController().getVeniceAdmin();
        String clusterName = veniceClusterWrapper.getClusterName();
        String uniqueString = Utils.getUniqueString("test-store");
        SystemProducer systemProducer = null;
        try {
            ControllerClient controllerClient = new ControllerClient(clusterName, veniceClusterWrapper.getAllControllersURLs());
            try {
                controllerClient.createNewStore(uniqueString, "owner", "\"string\"", "\"string\"");
                controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setStorageQuotaInByte(-1L).setHybridRewindSeconds(25L).setHybridOffsetLagThreshold(1L));
                controllerClient.emptyPush(uniqueString, Utils.getUniqueString("empty-hybrid-push"), 1L);
                SystemProducer samzaProducer = IntegrationTestPushUtils.getSamzaProducer(veniceClusterWrapper, uniqueString, Version.PushType.STREAM, new Pair[0]);
                for (int i = 0; i < 10; i++) {
                    IntegrationTestPushUtils.sendStreamingRecord(samzaProducer, uniqueString, i);
                }
                AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(veniceClusterWrapper.getRandomRouterURL()));
                try {
                    TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, true, true, () -> {
                        Assert.assertEquals(veniceAdmin.getStore(clusterName, uniqueString).getCurrentVersion(), 1);
                    });
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                        for (int i2 = 0; i2 < 10; i2++) {
                            try {
                                String num = Integer.toString(i2);
                                Object obj = andStartGenericAvroClient.get(num).get();
                                Assert.assertNotNull(obj, "Did not find key " + i2 + " in store before restarting SN.");
                                Assert.assertEquals(obj.toString(), "stream_" + num);
                            } catch (Exception e) {
                                throw new VeniceException(e);
                            }
                        }
                    });
                    VeniceServerWrapper veniceServerWrapper = veniceClusterWrapper.getVeniceServers().get(0);
                    veniceClusterWrapper.stopVeniceServer(veniceServerWrapper.getPort());
                    for (int i2 = 10; i2 < 20; i2++) {
                        IntegrationTestPushUtils.sendStreamingRecord(samzaProducer, uniqueString, i2);
                    }
                    veniceClusterWrapper.restartVeniceServer(veniceServerWrapper.getPort());
                    String composeKafkaTopic = Version.composeKafkaTopic(uniqueString, 1);
                    HelixExternalViewRepository routingDataRepository = veniceClusterWrapper.getLeaderVeniceController().getVeniceHelixAdmin().getHelixVeniceClusterResources(clusterName).getRoutingDataRepository();
                    TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, true, true, () -> {
                        Assert.assertNotNull(routingDataRepository.getLeaderInstance(composeKafkaTopic, 0));
                    });
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                        for (int i3 = 10; i3 < 20; i3++) {
                            try {
                                String num = Integer.toString(i3);
                                Object obj = andStartGenericAvroClient.get(num).get();
                                Assert.assertNotNull(obj, "Did not find key " + i3 + " in store after restarting SN.");
                                Assert.assertEquals(obj.toString(), "stream_" + num);
                            } catch (Exception e) {
                                throw new VeniceException(e);
                            }
                        }
                    });
                    if (andStartGenericAvroClient != null) {
                        andStartGenericAvroClient.close();
                    }
                    controllerClient.close();
                    if (samzaProducer != null) {
                        samzaProducer.stop();
                    }
                } catch (Throwable th) {
                    if (andStartGenericAvroClient != null) {
                        try {
                            andStartGenericAvroClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (0 != 0) {
                systemProducer.stop();
            }
            throw th3;
        }
    }

    @Test(timeOut = 180000)
    public void testHybridMultipleVersions() throws Exception {
        new Properties().setProperty("server.promotion.to.leader.replica.delay.seconds", Long.toString(1L));
        VeniceClusterWrapper veniceClusterWrapper = this.sharedVenice;
        UpdateStoreQueryParams partitionCount = new UpdateStoreQueryParams().setHybridRewindSeconds(2000000L).setHybridOffsetLagThreshold(10L).setPartitionCount(2);
        String uniqueString = Utils.getUniqueString("store");
        veniceClusterWrapper.useControllerClient(controllerClient -> {
            controllerClient.createNewStore(uniqueString, "owner", "\"int\"", "\"int\"");
            controllerClient.updateStore(uniqueString, partitionCount);
        });
        veniceClusterWrapper.createVersion(uniqueString, "\"int\"", "\"int\"", IntStream.range(0, 10).mapToObj(i -> {
            return new AbstractMap.SimpleEntry(Integer.valueOf(i), Integer.valueOf(i));
        }));
        AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(veniceClusterWrapper.getRandomRouterURL()));
        try {
            TestUtils.waitForNonDeterministicAssertion(20L, TimeUnit.SECONDS, true, true, () -> {
                for (Integer num = 0; num.intValue() < 10; num = Integer.valueOf(num.intValue() + 1)) {
                    Assert.assertEquals(andStartGenericAvroClient.get(num).get(), num);
                }
            });
            SystemProducer samzaProducer = IntegrationTestPushUtils.getSamzaProducer(veniceClusterWrapper, uniqueString, Version.PushType.STREAM, new Pair[0]);
            for (int i2 = 0; i2 < 10; i2++) {
                IntegrationTestPushUtils.sendStreamingRecord(samzaProducer, uniqueString, Integer.valueOf(i2), Integer.valueOf(i2 + 1));
            }
            samzaProducer.stop();
            TestUtils.waitForNonDeterministicAssertion(20L, TimeUnit.SECONDS, true, true, () -> {
                for (int i3 = 0; i3 < 10; i3++) {
                    Assert.assertEquals(andStartGenericAvroClient.get(Integer.valueOf(i3)).get(), Integer.valueOf(i3 + 1));
                }
            });
            veniceClusterWrapper.createVersion(uniqueString, "\"int\"", "\"int\"", IntStream.range(0, 10).mapToObj(i3 -> {
                return new AbstractMap.SimpleEntry(Integer.valueOf(i3), Integer.valueOf(i3 + 2));
            }));
            TestUtils.waitForNonDeterministicAssertion(20L, TimeUnit.SECONDS, true, true, () -> {
                for (int i4 = 0; i4 < 10; i4++) {
                    Assert.assertEquals(andStartGenericAvroClient.get(Integer.valueOf(i4)).get(), Integer.valueOf(i4 + 1));
                }
            });
            if (andStartGenericAvroClient != null) {
                andStartGenericAvroClient.close();
            }
        } catch (Throwable th) {
            if (andStartGenericAvroClient != null) {
                try {
                    andStartGenericAvroClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 60000)
    public void testHybridWithZeroLagThreshold() throws Exception {
        UpdateStoreQueryParams partitionCount = new UpdateStoreQueryParams().setHybridRewindSeconds(2000000L).setHybridOffsetLagThreshold(0L).setPartitionCount(2);
        String uniqueString = Utils.getUniqueString("store");
        this.sharedVenice.useControllerClient(controllerClient -> {
            controllerClient.createNewStore(uniqueString, "owner", "\"int\"", "\"int\"");
            controllerClient.updateStore(uniqueString, partitionCount);
        });
        this.sharedVenice.createVersion(uniqueString, "\"int\"", "\"int\"", IntStream.range(0, 10).mapToObj(i -> {
            return new AbstractMap.SimpleEntry(Integer.valueOf(i), Integer.valueOf(i));
        }));
    }

    @Test(timeOut = 180000, dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testHybridStoreTimeLagThresholdWithEmptyRT(boolean z) throws Exception {
        SystemProducer systemProducer = null;
        VeniceClusterWrapper veniceClusterWrapper = this.sharedVenice;
        try {
            String uniqueString = Utils.getUniqueString("hybrid-store");
            File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
            String str = "file://" + tempDataDirectory.getAbsolutePath();
            Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
            Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(veniceClusterWrapper, str, uniqueString);
            ControllerClient createStoreForJob = IntegrationTestPushUtils.createStoreForJob(veniceClusterWrapper.getClusterName(), writeSimpleAvroFileWithUserSchema, defaultVPJProps);
            try {
                AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(veniceClusterWrapper.getRandomRouterURL()));
                try {
                    TopicManager topicManager = IntegrationTestPushUtils.getTopicManagerRepo(30000L, 100L, MIN_COMPACTION_LAG, veniceClusterWrapper.getKafka().getAddress(), this.sharedVenice.getPubSubTopicRepository()).getTopicManager();
                    try {
                        Assert.assertFalse(createStoreForJob.updateStore(uniqueString, new UpdateStoreQueryParams().setHybridRewindSeconds(10L).setHybridOffsetLagThreshold(-1L).setHybridTimeLagThreshold(30L)).isError());
                        IntegrationTestPushUtils.runVPJ(defaultVPJProps, 1, createStoreForJob);
                        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                            for (int i = 1; i < 100; i++) {
                                try {
                                    String num = Integer.toString(i);
                                    Object obj = andStartGenericAvroClient.get(num).get();
                                    Assert.assertNotNull(obj, "Key " + i + " should not be missing!");
                                    Assert.assertEquals(obj.toString(), "test_name_" + num);
                                } catch (Exception e) {
                                    throw new VeniceException(e);
                                }
                            }
                        });
                        if (!z) {
                            systemProducer = IntegrationTestPushUtils.getSamzaProducer(veniceClusterWrapper, uniqueString, Version.PushType.STREAM, new Pair[0]);
                            for (int i = 1; i <= 10; i++) {
                                IntegrationTestPushUtils.sendCustomSizeStreamingRecord(systemProducer, uniqueString, i, 1024);
                            }
                            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                                try {
                                    checkLargeRecord(andStartGenericAvroClient, 2);
                                } catch (Exception e) {
                                    throw new VeniceException(e);
                                }
                            });
                        }
                        Iterator<VeniceServerWrapper> it = veniceClusterWrapper.getVeniceServers().iterator();
                        while (it.hasNext()) {
                            veniceClusterWrapper.stopAndRestartVeniceServer(it.next().getPort());
                        }
                        Utils.sleep(5000L);
                        if (z) {
                            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                                for (int i2 = 1; i2 < 100; i2++) {
                                    try {
                                        String num = Integer.toString(i2);
                                        Object obj = andStartGenericAvroClient.get(num).get();
                                        Assert.assertNotNull(obj, "Key " + i2 + " should not be missing!");
                                        Assert.assertEquals(obj.toString(), "test_name_" + num);
                                    } catch (Exception e) {
                                        throw new VeniceException(e);
                                    }
                                }
                            });
                        } else {
                            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                                try {
                                    checkLargeRecord(andStartGenericAvroClient, 2);
                                } catch (Exception e) {
                                    throw new VeniceException(e);
                                }
                            });
                        }
                        if (topicManager != null) {
                            topicManager.close();
                        }
                        if (andStartGenericAvroClient != null) {
                            andStartGenericAvroClient.close();
                        }
                        if (createStoreForJob != null) {
                            createStoreForJob.close();
                        }
                        if (systemProducer != null) {
                            systemProducer.stop();
                        }
                    } catch (Throwable th) {
                        if (topicManager != null) {
                            try {
                                topicManager.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    if (andStartGenericAvroClient != null) {
                        try {
                            andStartGenericAvroClient.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        } catch (Throwable th5) {
            if (0 != 0) {
                systemProducer.stop();
            }
            throw th5;
        }
    }

    @Test(dataProvider = "Boolean-Compression", dataProviderClass = DataProviderUtils.class, timeOut = 60000)
    public void testDuplicatedMessagesWontBePersisted(boolean z, CompressionStrategy compressionStrategy) throws Exception {
        new Properties().setProperty("server.promotion.to.leader.replica.delay.seconds", Long.toString(3L));
        SystemProducer systemProducer = null;
        VeniceClusterWrapper veniceClusterWrapper = z ? this.ingestionIsolationEnabledSharedVenice : this.sharedVenice;
        try {
            LOGGER.info("Finished creating VeniceClusterWrapper");
            String uniqueString = Utils.getUniqueString("hybrid-store");
            File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
            String str = "file://" + tempDataDirectory.getAbsolutePath();
            Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
            Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(veniceClusterWrapper, str, uniqueString);
            ControllerClient createStoreForJob = IntegrationTestPushUtils.createStoreForJob(veniceClusterWrapper.getClusterName(), writeSimpleAvroFileWithUserSchema, defaultVPJProps);
            try {
                Assert.assertFalse(createStoreForJob.updateStore(uniqueString, new UpdateStoreQueryParams().setHybridRewindSeconds(10L).setHybridOffsetLagThreshold(2L).setCompressionStrategy(compressionStrategy).setPartitionCount(1)).isError());
                IntegrationTestPushUtils.runVPJ(defaultVPJProps, 1, createStoreForJob);
                String str2 = "duplicated_message_test_key_1";
                String str3 = "duplicated_message_test_value_1";
                String str4 = "duplicated_message_test_value_2";
                String str5 = "duplicated_message_test_key_2";
                Properties properties = new Properties();
                properties.put("kafka.bootstrap.servers", veniceClusterWrapper.getKafka().getAddress());
                AvroSerializer avroSerializer = new AvroSerializer(Schema.parse("\"string\""));
                AvroGenericDeserializer avroGenericDeserializer = new AvroGenericDeserializer(Schema.parse("\"string\""), Schema.parse("\"string\""));
                VeniceWriter createVeniceWriter = TestUtils.getVeniceWriterFactory(properties).createVeniceWriter(new VeniceWriterOptions.Builder(Version.composeRealTimeTopic(uniqueString)).build());
                try {
                    Pair<KafkaKey, KafkaMessageEnvelope> kafkaKeyAndValueEnvelope = getKafkaKeyAndValueEnvelope(avroSerializer.serialize("duplicated_message_test_key_1"), avroSerializer.serialize("duplicated_message_test_value_1"), 1, createVeniceWriter.getProducerGUID(), 100, 1, -1L);
                    createVeniceWriter.put((KafkaKey) kafkaKeyAndValueEnvelope.getFirst(), (KafkaMessageEnvelope) kafkaKeyAndValueEnvelope.getSecond(), new CompletableFutureCallback(new CompletableFuture()), 0, VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER);
                    Pair<KafkaKey, KafkaMessageEnvelope> kafkaKeyAndValueEnvelope2 = getKafkaKeyAndValueEnvelope(avroSerializer.serialize("duplicated_message_test_key_1"), avroSerializer.serialize("duplicated_message_test_value_2"), 1, createVeniceWriter.getProducerGUID(), 100, 2, -1L);
                    createVeniceWriter.put((KafkaKey) kafkaKeyAndValueEnvelope2.getFirst(), (KafkaMessageEnvelope) kafkaKeyAndValueEnvelope2.getSecond(), new CompletableFutureCallback(new CompletableFuture()), 0, VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER);
                    Pair<KafkaKey, KafkaMessageEnvelope> kafkaKeyAndValueEnvelope3 = getKafkaKeyAndValueEnvelope(avroSerializer.serialize("duplicated_message_test_key_1"), avroSerializer.serialize("duplicated_message_test_value_1"), 1, createVeniceWriter.getProducerGUID(), 100, 1, -1L);
                    createVeniceWriter.put((KafkaKey) kafkaKeyAndValueEnvelope3.getFirst(), (KafkaMessageEnvelope) kafkaKeyAndValueEnvelope3.getSecond(), new CompletableFutureCallback(new CompletableFuture()), 0, VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER);
                    Pair<KafkaKey, KafkaMessageEnvelope> kafkaKeyAndValueEnvelope4 = getKafkaKeyAndValueEnvelope(avroSerializer.serialize("duplicated_message_test_key_2"), avroSerializer.serialize("duplicated_message_test_value_1"), 1, createVeniceWriter.getProducerGUID(), 100, 3, -1L);
                    createVeniceWriter.put((KafkaKey) kafkaKeyAndValueEnvelope4.getFirst(), (KafkaMessageEnvelope) kafkaKeyAndValueEnvelope4.getSecond(), new CompletableFutureCallback(new CompletableFuture()), 0, VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER);
                    if (createVeniceWriter != null) {
                        createVeniceWriter.close();
                    }
                    CloseableHttpAsyncClient createDefault = HttpAsyncClients.createDefault();
                    try {
                        createDefault.start();
                        Base64.Encoder urlEncoder = Base64.getUrlEncoder();
                        VeniceCompressor veniceCompressor = getVeniceCompressor(compressionStrategy, uniqueString, 1, veniceClusterWrapper, createDefault);
                        TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, true, () -> {
                            for (VeniceServerWrapper veniceServerWrapper : veniceClusterWrapper.getVeniceServers()) {
                                HttpGet httpGet = new HttpGet("http://" + veniceServerWrapper.getAddress() + "/storage/" + Version.composeKafkaTopic(uniqueString, 1) + "/0/" + urlEncoder.encodeToString(avroSerializer.serialize(str5)) + "?f=b64");
                                HttpResponse httpResponse = (HttpResponse) createDefault.execute(httpGet, (FutureCallback) null).get();
                                InputStream content = ((HttpResponse) createDefault.execute(httpGet, (FutureCallback) null).get()).getEntity().getContent();
                                try {
                                    byte[] byteArray = IOUtils.toByteArray(content);
                                    Assert.assertEquals(httpResponse.getStatusLine().getStatusCode(), 200, "Response did not return 200: " + new String(byteArray));
                                    Assert.assertEquals(avroGenericDeserializer.deserialize(veniceCompressor.decompress(byteArray, 0, byteArray.length)).toString(), str3);
                                    if (content != null) {
                                        content.close();
                                    }
                                    HttpGet httpGet2 = new HttpGet("http://" + veniceServerWrapper.getAddress() + "/storage/" + Version.composeKafkaTopic(uniqueString, 1) + "/0/" + urlEncoder.encodeToString(avroSerializer.serialize(str2)) + "?f=b64");
                                    HttpResponse httpResponse2 = (HttpResponse) createDefault.execute(httpGet2, (FutureCallback) null).get();
                                    content = ((HttpResponse) createDefault.execute(httpGet2, (FutureCallback) null).get()).getEntity().getContent();
                                    try {
                                        byte[] byteArray2 = IOUtils.toByteArray(content);
                                        Assert.assertEquals(httpResponse2.getStatusLine().getStatusCode(), 200, "Response did not return 200: " + new String(byteArray2));
                                        Assert.assertEquals(avroGenericDeserializer.deserialize(veniceCompressor.decompress(byteArray2, 0, byteArray2.length)).toString(), str4);
                                        if (content != null) {
                                            content.close();
                                        }
                                    } finally {
                                    }
                                } finally {
                                }
                            }
                        });
                        if (createDefault != null) {
                            createDefault.close();
                        }
                        if (createStoreForJob != null) {
                            createStoreForJob.close();
                        }
                    } catch (Throwable th) {
                        if (createDefault != null) {
                            try {
                                createDefault.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    if (createVeniceWriter != null) {
                        try {
                            createVeniceWriter.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        } finally {
            if (0 != 0) {
                systemProducer.stop();
            }
        }
    }

    @Test(timeOut = 180000)
    public void testOffsetRecordSyncedForIngestionIsolationHandover() throws Exception {
        SystemProducer systemProducer = null;
        VeniceClusterWrapper veniceClusterWrapper = this.ingestionIsolationEnabledSharedVenice;
        try {
            String uniqueString = Utils.getUniqueString("hybrid-store");
            File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
            String str = "file://" + tempDataDirectory.getAbsolutePath();
            Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
            Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(veniceClusterWrapper, str, uniqueString);
            ControllerClient createStoreForJob = IntegrationTestPushUtils.createStoreForJob(veniceClusterWrapper.getClusterName(), writeSimpleAvroFileWithUserSchema, defaultVPJProps);
            try {
                AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(veniceClusterWrapper.getRandomRouterURL()));
                try {
                    Assert.assertFalse(createStoreForJob.updateStore(uniqueString, new UpdateStoreQueryParams().setHybridRewindSeconds(0L).setHybridOffsetLagThreshold(1000L)).isError());
                    IntegrationTestPushUtils.runVPJ(defaultVPJProps, 1, createStoreForJob);
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                        for (int i = 1; i < 100; i++) {
                            try {
                                String num = Integer.toString(i);
                                Object obj = andStartGenericAvroClient.get(num).get();
                                Assert.assertNotNull(obj, "Key " + i + " should not be missing!");
                                Assert.assertEquals(obj.toString(), "test_name_" + num);
                            } catch (Exception e) {
                                throw new VeniceException(e);
                            }
                        }
                    });
                    SystemProducer samzaProducer = IntegrationTestPushUtils.getSamzaProducer(veniceClusterWrapper, uniqueString, Version.PushType.STREAM, new Pair[0]);
                    for (int i = 1; i <= 10; i++) {
                        IntegrationTestPushUtils.sendCustomSizeStreamingRecord(samzaProducer, uniqueString, i, 1024);
                    }
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                        try {
                            checkLargeRecord(andStartGenericAvroClient, 2);
                        } catch (Exception e) {
                            throw new VeniceException(e);
                        }
                    });
                    int version = createStoreForJob.emptyPush(uniqueString, Utils.getUniqueString("empty-hybrid-push"), 1L).getVersion();
                    Assert.assertNotEquals(Integer.valueOf(version), 0, "requesting a topic for a push should provide a non zero version number");
                    TestUtils.waitForNonDeterministicAssertion(100L, TimeUnit.SECONDS, true, () -> {
                        JobStatusQueryResponse queryJobStatus = createStoreForJob.queryJobStatus(Version.composeKafkaTopic(uniqueString, version));
                        Assert.assertFalse(queryJobStatus.isError(), "Error in getting JobStatusResponse: " + queryJobStatus.getError());
                        Assert.assertEquals(queryJobStatus.getStatus(), "COMPLETED");
                    });
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                        for (int i2 = 1; i2 <= 10; i2++) {
                            try {
                                Assert.assertNull(andStartGenericAvroClient.get(Integer.toString(i2)).get(), (String) null);
                            } catch (Exception e) {
                                throw new VeniceException(e);
                            }
                        }
                    });
                    if (andStartGenericAvroClient != null) {
                        andStartGenericAvroClient.close();
                    }
                    if (createStoreForJob != null) {
                        createStoreForJob.close();
                    }
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                        try {
                            Iterator it = veniceClusterWrapper.getVeniceServers().get(0).getVeniceServer().getHelixParticipationService().getVeniceOfflinePushMonitorAccessor().getOfflinePushStatusAndItsPartitionStatuses(uniqueString + "_v2").getPartitionStatuses().iterator();
                            while (it.hasNext()) {
                                Iterator it2 = ((PartitionStatus) it.next()).getReplicaStatuses().iterator();
                                while (it2.hasNext()) {
                                    Assert.assertEquals(((ReplicaStatus) it2.next()).getCurrentStatus(), ExecutionStatus.COMPLETED);
                                }
                            }
                        } catch (Exception e) {
                            throw new VeniceException(e);
                        }
                    });
                    if (samzaProducer != null) {
                        samzaProducer.stop();
                    }
                } catch (Throwable th) {
                    if (andStartGenericAvroClient != null) {
                        try {
                            andStartGenericAvroClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (0 != 0) {
                systemProducer.stop();
            }
            throw th3;
        }
    }

    @Test(timeOut = 180000)
    public void testVersionSwapDeferredWithHybrid() throws Exception {
        new Properties().setProperty("server.promotion.to.leader.replica.delay.seconds", Long.toString(3L));
        VeniceClusterWrapper veniceClusterWrapper = this.sharedVenice;
        LOGGER.info("Finished creating VeniceClusterWrapper");
        String uniqueString = Utils.getUniqueString("hybrid-store");
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        String str = "file://" + tempDataDirectory.getAbsolutePath();
        Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(veniceClusterWrapper, str, uniqueString);
        ControllerClient createStoreForJob = IntegrationTestPushUtils.createStoreForJob(veniceClusterWrapper.getClusterName(), writeSimpleAvroFileWithUserSchema, defaultVPJProps);
        try {
            AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(veniceClusterWrapper.getRandomRouterURL()));
            try {
                Assert.assertFalse(createStoreForJob.updateStore(uniqueString, new UpdateStoreQueryParams().setHybridRewindSeconds(10L).setHybridOffsetLagThreshold(2L).setPartitionCount(1)).isError());
                IntegrationTestPushUtils.runVPJ(defaultVPJProps, 1, createStoreForJob);
                defaultVPJProps.put("defer.version.swap", "true");
                IntegrationTestPushUtils.runVPJ(defaultVPJProps, 1, createStoreForJob);
                Properties properties = new Properties();
                properties.put("kafka.bootstrap.servers", veniceClusterWrapper.getKafka().getAddress());
                properties.put("venice.writer.max.elapsed.time.for.segment.in.ms", "0");
                AvroSerializer avroSerializer = new AvroSerializer(Schema.parse("\"string\""));
                String str2 = "foo_object_";
                for (int i = 0; i < 2; i++) {
                    VeniceWriter createVeniceWriter = TestUtils.getVeniceWriterFactory(properties).createVeniceWriter(new VeniceWriterOptions.Builder(Version.composeRealTimeTopic(uniqueString)).build());
                    try {
                        for (int i2 = (i * 50) + 1; i2 <= (i * 50) + 50; i2++) {
                            createVeniceWriter.put(avroSerializer.serialize(String.valueOf(i2)), avroSerializer.serialize("foo_object_" + i2), 1);
                        }
                        if (createVeniceWriter != null) {
                            createVeniceWriter.close();
                        }
                    } catch (Throwable th) {
                        if (createVeniceWriter != null) {
                            try {
                                createVeniceWriter.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                }
                createStoreForJob.overrideSetActiveVersion(uniqueString, 2);
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                    for (int i3 = 1; i3 <= 100; i3++) {
                        try {
                            String num = Integer.toString(i3);
                            Object obj = andStartGenericAvroClient.get(num).get();
                            Assert.assertNotNull(obj, "Key " + i3 + " should not be missing!");
                            Assert.assertEquals(obj.toString(), str2 + num);
                        } catch (Exception e) {
                            throw new VeniceException(e);
                        }
                    }
                });
                if (andStartGenericAvroClient != null) {
                    andStartGenericAvroClient.close();
                }
                if (createStoreForJob != null) {
                    createStoreForJob.close();
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (createStoreForJob != null) {
                try {
                    createStoreForJob.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test(timeOut = 180000)
    public void testHybridDIVEnhancement() throws Exception {
        new Properties().setProperty("server.promotion.to.leader.replica.delay.seconds", Long.toString(3L));
        VeniceClusterWrapper veniceClusterWrapper = this.sharedVenice;
        LOGGER.info("Finished creating VeniceClusterWrapper");
        String uniqueString = Utils.getUniqueString("hybrid-store");
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        String str = "file://" + tempDataDirectory.getAbsolutePath();
        Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(veniceClusterWrapper, str, uniqueString);
        ControllerClient createStoreForJob = IntegrationTestPushUtils.createStoreForJob(veniceClusterWrapper.getClusterName(), writeSimpleAvroFileWithUserSchema, defaultVPJProps);
        try {
            AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(veniceClusterWrapper.getRandomRouterURL()));
            try {
                Assert.assertFalse(createStoreForJob.updateStore(uniqueString, new UpdateStoreQueryParams().setHybridRewindSeconds(10L).setHybridOffsetLagThreshold(2L).setPartitionCount(1)).isError());
                IntegrationTestPushUtils.runVPJ(defaultVPJProps, 1, createStoreForJob);
                Properties properties = new Properties();
                properties.put("kafka.bootstrap.servers", veniceClusterWrapper.getKafka().getAddress());
                properties.put("venice.writer.max.elapsed.time.for.segment.in.ms", "0");
                AvroSerializer avroSerializer = new AvroSerializer(Schema.parse("\"string\""));
                String str2 = "hybrid_DIV_enhancement_";
                for (int i = 0; i < 2; i++) {
                    VeniceWriter createVeniceWriter = TestUtils.getVeniceWriterFactory(properties).createVeniceWriter(new VeniceWriterOptions.Builder(Version.composeRealTimeTopic(uniqueString)).build());
                    try {
                        for (int i2 = (i * 50) + 1; i2 <= (i * 50) + 50; i2++) {
                            createVeniceWriter.put(avroSerializer.serialize(String.valueOf(i2)), avroSerializer.serialize("hybrid_DIV_enhancement_" + i2), 1);
                        }
                        if (createVeniceWriter != null) {
                            createVeniceWriter.close();
                        }
                    } catch (Throwable th) {
                        if (createVeniceWriter != null) {
                            try {
                                createVeniceWriter.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                }
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                    for (int i3 = 1; i3 <= 100; i3++) {
                        try {
                            String num = Integer.toString(i3);
                            Object obj = andStartGenericAvroClient.get(num).get();
                            Assert.assertNotNull(obj, "Key " + i3 + " should not be missing!");
                            Assert.assertEquals(obj.toString(), str2 + num);
                        } catch (Exception e) {
                            throw new VeniceException(e);
                        }
                    }
                });
                if (andStartGenericAvroClient != null) {
                    andStartGenericAvroClient.close();
                }
                if (createStoreForJob != null) {
                    createStoreForJob.close();
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (createStoreForJob != null) {
                try {
                    createStoreForJob.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test(timeOut = 120000, dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testHybridWithAmplificationFactor(boolean z) throws Exception {
        VeniceClusterWrapper veniceClusterWrapper = z ? this.ingestionIsolationEnabledSharedVenice : this.sharedVenice;
        UpdateStoreQueryParams amplificationFactor = new UpdateStoreQueryParams().setPartitionCount(1).setReplicationFactor(2).setAmplificationFactor(5);
        String uniqueString = Utils.getUniqueString("store");
        ControllerClient controllerClient = new ControllerClient(veniceClusterWrapper.getClusterName(), veniceClusterWrapper.getAllControllersURLs());
        try {
            TestUtils.assertCommand(controllerClient.createNewStore(uniqueString, "owner", "\"string\"", "\"string\""));
            TestUtils.assertCommand(controllerClient.updateStore(uniqueString, amplificationFactor));
            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                StoreResponse assertCommand = TestUtils.assertCommand(controllerClient.getStore(uniqueString));
                Assert.assertEquals(assertCommand.getStore().getReplicationFactor(), 2, "Replication factor has not been set to the expected value of '2'.");
                Assert.assertEquals(assertCommand.getStore().getPartitionerConfig().getAmplificationFactor(), 5, "Amplification factor has not been set to the expected value of '5'.");
            });
            veniceClusterWrapper.createVersion(uniqueString, "\"string\"", "\"string\"", IntStream.range(0, 20).mapToObj(i -> {
                return new AbstractMap.SimpleEntry(String.valueOf(i), String.valueOf(i));
            }));
            AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(veniceClusterWrapper.getRandomRouterURL()));
            try {
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                    for (int i2 = 0; i2 < 20; i2++) {
                        Assert.assertEquals(andStartGenericAvroClient.get(String.valueOf(i2)).get().toString(), String.valueOf(i2));
                    }
                });
                TestUtils.assertCommand(controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setAmplificationFactor(3).setHybridRewindSeconds(2000000L).setHybridOffsetLagThreshold(10L)));
                veniceClusterWrapper.createVersion(uniqueString, "\"string\"", "\"string\"", IntStream.range(0, 20).mapToObj(i2 -> {
                    return new AbstractMap.SimpleEntry(String.valueOf(i2), String.valueOf(i2));
                }));
                TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, true, true, () -> {
                    for (Integer num = 0; num.intValue() < 20; num = Integer.valueOf(num.intValue() + 1)) {
                        Assert.assertEquals(andStartGenericAvroClient.get(String.valueOf(num)).get().toString(), String.valueOf(num));
                    }
                });
                SystemProducer samzaProducer = IntegrationTestPushUtils.getSamzaProducer(veniceClusterWrapper, uniqueString, Version.PushType.STREAM, new Pair[0]);
                for (int i3 = 0; i3 < 20; i3++) {
                    IntegrationTestPushUtils.sendCustomSizeStreamingRecord(samzaProducer, uniqueString, i3, 1024);
                }
                samzaProducer.stop();
                AtomicInteger atomicInteger = new AtomicInteger(0);
                TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, true, true, () -> {
                    for (int i4 = atomicInteger.get(); i4 < 20; i4++) {
                        checkLargeRecord(andStartGenericAvroClient, i4);
                        atomicInteger.set(i4);
                    }
                });
                TestUtils.assertCommand(controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setAmplificationFactor(5)));
                veniceClusterWrapper.createVersion(uniqueString, "\"string\"", "\"string\"", IntStream.range(0, 20).mapToObj(i4 -> {
                    return new AbstractMap.SimpleEntry(String.valueOf(i4), String.valueOf(i4));
                }));
                atomicInteger.set(0);
                TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, true, true, () -> {
                    for (int i5 = atomicInteger.get(); i5 < 20; i5++) {
                        checkLargeRecord(andStartGenericAvroClient, i5);
                        atomicInteger.set(i5);
                    }
                });
                if (andStartGenericAvroClient != null) {
                    andStartGenericAvroClient.close();
                }
                controllerClient.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                controllerClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test(timeOut = 180000)
    public void testHybridWithPartitionWiseConsumer() throws Exception {
        new Properties().setProperty("server.promotion.to.leader.replica.delay.seconds", Long.toString(1L));
        VeniceClusterWrapper upCluster = setUpCluster(false, true);
        try {
            UpdateStoreQueryParams partitionCount = new UpdateStoreQueryParams().setHybridRewindSeconds(2000000L).setHybridOffsetLagThreshold(10L).setPartitionCount(4);
            String uniqueString = Utils.getUniqueString("store");
            upCluster.useControllerClient(controllerClient -> {
                controllerClient.createNewStore(uniqueString, "owner", "\"int\"", "\"int\"");
                controllerClient.updateStore(uniqueString, partitionCount);
            });
            upCluster.createVersion(uniqueString, "\"int\"", "\"int\"", IntStream.range(0, 10).mapToObj(i -> {
                return new AbstractMap.SimpleEntry(Integer.valueOf(i), Integer.valueOf(i));
            }));
            AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(upCluster.getRandomRouterURL()));
            try {
                TestUtils.waitForNonDeterministicAssertion(20L, TimeUnit.SECONDS, true, true, () -> {
                    for (Integer num = 0; num.intValue() < 10; num = Integer.valueOf(num.intValue() + 1)) {
                        Assert.assertEquals(andStartGenericAvroClient.get(num).get(), num);
                    }
                });
                SystemProducer samzaProducer = IntegrationTestPushUtils.getSamzaProducer(upCluster, uniqueString, Version.PushType.STREAM, new Pair[0]);
                for (int i2 = 0; i2 < 10; i2++) {
                    IntegrationTestPushUtils.sendStreamingRecord(samzaProducer, uniqueString, Integer.valueOf(i2), Integer.valueOf(i2));
                }
                samzaProducer.stop();
                TestUtils.waitForNonDeterministicAssertion(20L, TimeUnit.SECONDS, true, true, () -> {
                    for (int i3 = 0; i3 < 10; i3++) {
                        Assert.assertEquals(andStartGenericAvroClient.get(Integer.valueOf(i3)).get(), Integer.valueOf(i3));
                    }
                });
                upCluster.createVersion(uniqueString, "\"int\"", "\"int\"", IntStream.range(10, 20).mapToObj(i3 -> {
                    return new AbstractMap.SimpleEntry(Integer.valueOf(i3), Integer.valueOf(i3));
                }));
                SystemProducer samzaProducer2 = IntegrationTestPushUtils.getSamzaProducer(upCluster, uniqueString, Version.PushType.STREAM, new Pair[0]);
                for (int i4 = 10; i4 < 20; i4++) {
                    IntegrationTestPushUtils.sendStreamingRecord(samzaProducer2, uniqueString, Integer.valueOf(i4), Integer.valueOf(i4));
                }
                samzaProducer2.stop();
                TestUtils.waitForNonDeterministicAssertion(20L, TimeUnit.SECONDS, true, true, () -> {
                    for (int i5 = 0; i5 < 20; i5++) {
                        Assert.assertEquals(andStartGenericAvroClient.get(Integer.valueOf(i5)).get(), Integer.valueOf(i5));
                    }
                });
                upCluster.createVersion(uniqueString, "\"int\"", "\"int\"", IntStream.range(20, 30).mapToObj(i5 -> {
                    return new AbstractMap.SimpleEntry(Integer.valueOf(i5), Integer.valueOf(i5));
                }));
                SystemProducer samzaProducer3 = IntegrationTestPushUtils.getSamzaProducer(upCluster, uniqueString, Version.PushType.STREAM, new Pair[0]);
                for (int i6 = 20; i6 < 30; i6++) {
                    IntegrationTestPushUtils.sendStreamingRecord(samzaProducer3, uniqueString, Integer.valueOf(i6), Integer.valueOf(i6));
                }
                TestUtils.waitForNonDeterministicAssertion(20L, TimeUnit.SECONDS, true, true, () -> {
                    for (int i7 = 0; i7 < 30; i7++) {
                        Assert.assertEquals(andStartGenericAvroClient.get(Integer.valueOf(i7)).get(), Integer.valueOf(i7));
                    }
                });
                AvroSerializer avroSerializer = new AvroSerializer(Schema.parse("\"int\""));
                AvroGenericDeserializer avroGenericDeserializer = new AvroGenericDeserializer(Schema.parse("\"int\""), Schema.parse("\"int\""));
                CloseableHttpAsyncClient createDefault = HttpAsyncClients.createDefault();
                try {
                    createDefault.start();
                    Base64.Encoder urlEncoder = Base64.getUrlEncoder();
                    TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, true, () -> {
                        VeniceServerWrapper veniceServerWrapper = upCluster.getVeniceServers().get(1);
                        int i7 = 0;
                        for (int i8 = 0; i8 < 30; i8++) {
                            for (int i9 = 0; i9 < 4; i9++) {
                                HttpGet httpGet = new HttpGet("http://" + veniceServerWrapper.getAddress() + "/storage/" + Version.composeKafkaTopic(uniqueString, 2) + "/" + i9 + "/" + urlEncoder.encodeToString(avroSerializer.serialize(Integer.valueOf(i8))) + "?f=b64");
                                HttpResponse httpResponse = (HttpResponse) createDefault.execute(httpGet, (FutureCallback) null).get();
                                InputStream content = ((HttpResponse) createDefault.execute(httpGet, (FutureCallback) null).get()).getEntity().getContent();
                                try {
                                    byte[] byteArray = IOUtils.toByteArray(content);
                                    if (httpResponse.getStatusLine().getStatusCode() == 200) {
                                        Assert.assertEquals(avroGenericDeserializer.deserialize((Object) null, byteArray), Integer.valueOf(i8));
                                        i7++;
                                    }
                                    if (content != null) {
                                        content.close();
                                    }
                                } catch (Throwable th) {
                                    if (content != null) {
                                        try {
                                            content.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    }
                                    throw th;
                                }
                            }
                        }
                        Assert.assertEquals(i7, 30);
                    });
                    if (createDefault != null) {
                        createDefault.close();
                    }
                    if (andStartGenericAvroClient != null) {
                        andStartGenericAvroClient.close();
                    }
                    if (upCluster != null) {
                        upCluster.close();
                    }
                } catch (Throwable th) {
                    if (createDefault != null) {
                        try {
                            createDefault.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (upCluster != null) {
                try {
                    upCluster.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    private static Pair<KafkaKey, KafkaMessageEnvelope> getKafkaKeyAndValueEnvelope(byte[] bArr, byte[] bArr2, int i, GUID guid, int i2, int i3, long j) {
        KafkaKey kafkaKey = new KafkaKey(MessageType.PUT, bArr);
        Put put = new Put();
        put.putValue = ByteBuffer.wrap(bArr2);
        put.schemaId = i;
        put.replicationMetadataVersionId = -1;
        put.replicationMetadataPayload = ByteBuffer.wrap(new byte[0]);
        KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope();
        kafkaMessageEnvelope.messageType = MessageType.PUT.getValue();
        kafkaMessageEnvelope.payloadUnion = put;
        ProducerMetadata producerMetadata = new ProducerMetadata();
        producerMetadata.producerGUID = guid;
        producerMetadata.segmentNumber = i2;
        producerMetadata.messageSequenceNumber = i3;
        producerMetadata.messageTimestamp = System.currentTimeMillis();
        kafkaMessageEnvelope.producerMetadata = producerMetadata;
        kafkaMessageEnvelope.leaderMetadataFooter = new LeaderMetadata();
        kafkaMessageEnvelope.leaderMetadataFooter.upstreamOffset = j;
        return Pair.create(kafkaKey, kafkaMessageEnvelope);
    }

    private static VeniceClusterWrapper setUpCluster(boolean z, boolean z2) {
        Properties properties = new Properties();
        properties.setProperty("default.partition.max.count", "5");
        VeniceClusterWrapper veniceCluster = ServiceFactory.getVeniceCluster(1, 0, 0, 2, VeniceClusterWrapper.NUM_RECORDS, false, false, properties);
        veniceCluster.addVeniceRouter(new Properties());
        Properties properties2 = new Properties();
        properties2.setProperty("persistence.type", PersistenceType.ROCKS_DB.name());
        properties2.setProperty("server.promotion.to.leader.replica.delay.seconds", Long.toString(1L));
        properties2.setProperty("rocksdb.plain.table.format.enabled", "false");
        properties2.setProperty("server.database.checksum.verification.enabled", "true");
        properties2.setProperty("server.database.sync.bytes.interval.for.deferred.write.mode", "300");
        properties2.setProperty("ssl.to.kakfa", "false");
        properties2.setProperty("server.consumer.pool.size.per.kafka.cluster", "3");
        properties2.setProperty("server.dedicated.drainer.queue.for.sorted.input.enabled", "true");
        if (z) {
            TestUtils.addIngestionIsolationToProperties(properties2);
        }
        if (z2) {
            properties2.setProperty("default.partition.max.count", "4");
            properties2.setProperty("server.shared.consumer.assignment.strategy", KafkaConsumerService.ConsumerAssignmentStrategy.PARTITION_WISE_SHARED_CONSUMER_ASSIGNMENT_STRATEGY.name());
        }
        veniceCluster.addVeniceServer(new Properties(), properties2);
        veniceCluster.addVeniceServer(new Properties(), properties2);
        return veniceCluster;
    }
}
