package com.linkedin.venice.endToEnd;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.davinci.kafka.consumer.StoreIngestionTaskBackdoor;
import com.linkedin.davinci.listener.response.ReadResponse;
import com.linkedin.davinci.replication.RmdWithValueSchemaId;
import com.linkedin.davinci.replication.merge.RmdSerDe;
import com.linkedin.davinci.replication.merge.StringAnnotatedStoreSchemaCache;
import com.linkedin.davinci.storage.chunking.SingleGetChunkingAdapter;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.VenicePushJob;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceServerWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.samza.VeniceSystemFactory;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.rmd.RmdSchemaEntry;
import com.linkedin.venice.schema.rmd.RmdSchemaGenerator;
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.writer.update.UpdateBuilderImpl;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.util.Utf8;
import org.apache.logging.log4j.LogManager;
import org.apache.samza.config.MapConfig;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.SystemProducer;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/endToEnd/PartialUpdateTest.class */
public class PartialUpdateTest {
    private static final int NUMBER_OF_CHILD_DATACENTERS = 1;
    private static final int NUMBER_OF_CLUSTERS = 1;
    private static final int TEST_TIMEOUT_MS = 120000;
    private static final String CLUSTER_NAME = "venice-cluster0";
    private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper;
    private VeniceControllerWrapper parentController;
    private List<VeniceMultiClusterWrapper> childDatacenters;

    @BeforeClass(alwaysRun = true)
    public void setUp() {
        Properties properties = new Properties();
        Properties properties2 = new Properties();
        properties2.put("controller.auto.materialize.meta.system.store", false);
        this.multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(1, 1, 1, 1, 2, 1, 2, Optional.of(new VeniceProperties(properties2)), Optional.of(new Properties(properties2)), Optional.of(new VeniceProperties(properties)), false);
        this.childDatacenters = this.multiRegionMultiClusterWrapper.getChildRegions();
        List<VeniceControllerWrapper> parentControllers = this.multiRegionMultiClusterWrapper.getParentControllers();
        if (parentControllers.size() != 1) {
            throw new IllegalStateException("Expect only one parent controller. Got: " + parentControllers.size());
        }
        this.parentController = parentControllers.get(0);
    }

    @Test
    public void testRepushWithChunkingFlagChanged() throws IOException {
        String uniqueString = Utils.getUniqueString("reproduce");
        String controllerUrl = this.parentController.getControllerUrl();
        Schema parse = AvroCompatibilityHelper.parse(new String[]{TestWriteUtils.loadFileAsString("UserKey.avsc")});
        Schema parse2 = AvroCompatibilityHelper.parse(new String[]{TestWriteUtils.loadFileAsString("UserValue.avsc")});
        Schema convertFromValueRecordSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(parse2);
        ControllerClient controllerClient = new ControllerClient(CLUSTER_NAME, controllerUrl);
        try {
            TestUtils.assertCommand(controllerClient.createNewStore(uniqueString, "test_owner", parse.toString(), parse2.toString()));
            UpdateStoreQueryParams hybridOffsetLagThreshold = new UpdateStoreQueryParams().setStorageQuotaInByte(-1L).setCompressionStrategy(CompressionStrategy.NO_OP).setWriteComputationEnabled(true).setHybridRewindSeconds(86400L).setHybridOffsetLagThreshold(10L);
            ControllerResponse retryableRequest = controllerClient.retryableRequest(5, controllerClient2 -> {
                return controllerClient2.updateStore(uniqueString, hybridOffsetLagThreshold);
            });
            Assert.assertFalse(retryableRequest.isError(), "Update store got error: " + retryableRequest.getError());
            VersionCreationResponse emptyPush = controllerClient.emptyPush(uniqueString, "test_push_id", 1000L);
            Assert.assertEquals(emptyPush.getVersion(), 1);
            Assert.assertFalse(emptyPush.isError(), "Empty push to parent colo should succeed");
            TestUtils.waitForNonDeterministicPushCompletion(Version.composeKafkaTopic(uniqueString, 1), controllerClient, 30L, TimeUnit.SECONDS);
            VeniceClusterWrapper veniceClusterWrapper = this.childDatacenters.get(0).getClusters().get(CLUSTER_NAME);
            AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(veniceClusterWrapper.getRandomRouterURL()));
            SystemProducer samzaProducer = IntegrationTestPushUtils.getSamzaProducer(veniceClusterWrapper, uniqueString, Version.PushType.STREAM, new Pair[0]);
            GenericData.Record record = new GenericData.Record(parse);
            record.put("learnerUrn", "urn:li:member:682787898");
            record.put("query", "python");
            GenericData.Record record2 = new GenericData.Record(parse);
            record2.put("learnerUrn", "urn:li:member:123");
            record2.put("query", "python");
            IntegrationTestPushUtils.sendStreamingRecord(samzaProducer, uniqueString, record, new UpdateBuilderImpl(convertFromValueRecordSchema).setElementsToAddToListField("blockedContentsUrns", Collections.singletonList("urn:li:lyndaCourse:751323")).build());
            Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.multiRegionMultiClusterWrapper, "dummyInputPath", uniqueString);
            defaultVPJProps.setProperty("source.kafka", "true");
            defaultVPJProps.setProperty("kafka.input.broker.url", veniceClusterWrapper.getKafka().getAddress());
            defaultVPJProps.setProperty("kafka.input.max.records.per.mapper", "5");
            TestWriteUtils.runPushJob("Run repush job 1", defaultVPJProps);
            TestUtils.waitForNonDeterministicPushCompletion(Version.composeKafkaTopic(uniqueString, 2), controllerClient, 30L, TimeUnit.SECONDS);
            TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, () -> {
                try {
                    GenericRecord genericRecord = (GenericRecord) andStartGenericAvroClient.get(record).get();
                    Assert.assertNotNull(genericRecord, "key " + record + " should not be missing!");
                    LogManager.getLogger().info("DEBUGGING: " + genericRecord);
                } catch (Exception e) {
                    throw new VeniceException(e);
                }
            });
            UpdateStoreQueryParams chunkingEnabled = new UpdateStoreQueryParams().setStorageQuotaInByte(-1L).setChunkingEnabled(true);
            ControllerResponse retryableRequest2 = controllerClient.retryableRequest(5, controllerClient3 -> {
                return controllerClient3.updateStore(uniqueString, chunkingEnabled);
            });
            Assert.assertFalse(retryableRequest2.isError(), "Update store got error: " + retryableRequest2.getError());
            defaultVPJProps.put("rewind.time.in.seconds.override", 0);
            TestWriteUtils.runPushJob("Run repush job 2", defaultVPJProps);
            TestUtils.waitForNonDeterministicPushCompletion(Version.composeKafkaTopic(uniqueString, 3), controllerClient, 30L, TimeUnit.SECONDS);
            TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, () -> {
                try {
                    Assert.assertNotNull((GenericRecord) andStartGenericAvroClient.get(record).get(), "key " + record + " should not be missing!");
                } catch (Exception e) {
                    throw new VeniceException(e);
                }
            });
            IntegrationTestPushUtils.sendStreamingRecord(samzaProducer, uniqueString, record, new UpdateBuilderImpl(convertFromValueRecordSchema).setElementsToAddToListField("blockedContentsUrns", Collections.singletonList("urn:li:lyndaCourse:1")).build());
            GenericRecord build = new UpdateBuilderImpl(convertFromValueRecordSchema).setElementsToAddToListField("blockedContentsUrns", Collections.singletonList("urn:li:lyndaCourse:2")).build();
            IntegrationTestPushUtils.sendStreamingRecord(samzaProducer, uniqueString, record, build);
            IntegrationTestPushUtils.sendStreamingRecord(samzaProducer, uniqueString, record2, build);
            TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, () -> {
                try {
                    Assert.assertNotNull((GenericRecord) andStartGenericAvroClient.get(record2).get(), "key " + record2 + " should not be missing!");
                } catch (Exception e) {
                    throw new VeniceException(e);
                }
            });
            TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, () -> {
                try {
                    Assert.assertNotNull((GenericRecord) andStartGenericAvroClient.get(record).get(), "key " + record + " should not be missing!");
                } catch (Exception e) {
                    throw new VeniceException(e);
                }
            });
            controllerClient.close();
        } catch (Throwable th) {
            try {
                controllerClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testPartialUpdateOnBatchPushedKeys() throws IOException {
        String uniqueString = Utils.getUniqueString("rmdChunking");
        String controllerUrl = this.parentController.getControllerUrl();
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        Schema writeSimpleAvroFileWithStringToRecordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToRecordSchema(tempDataDirectory, true);
        String schema = writeSimpleAvroFileWithStringToRecordSchema.getField("key").schema().toString();
        String schema2 = writeSimpleAvroFileWithStringToRecordSchema.getField("value").schema().toString();
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.multiRegionMultiClusterWrapper, "file://" + tempDataDirectory.getAbsolutePath(), uniqueString);
        Schema parse = AvroCompatibilityHelper.parse(new String[]{schema2});
        Schema generateMetadataSchema = RmdSchemaGenerator.generateMetadataSchema(parse);
        Schema convertFromValueRecordSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(parse);
        ReadOnlySchemaRepository readOnlySchemaRepository = (ReadOnlySchemaRepository) Mockito.mock(ReadOnlySchemaRepository.class);
        Mockito.when(readOnlySchemaRepository.getReplicationMetadataSchema(uniqueString, 1, 1)).thenReturn(new RmdSchemaEntry(1, 1, generateMetadataSchema));
        Mockito.when(readOnlySchemaRepository.getDerivedSchema(uniqueString, 1, 1)).thenReturn(new DerivedSchemaEntry(1, 1, convertFromValueRecordSchema));
        Mockito.when(readOnlySchemaRepository.getValueSchema(uniqueString, 1)).thenReturn(new SchemaEntry(1, parse));
        VeniceClusterWrapper veniceClusterWrapper = this.childDatacenters.get(0).getClusters().get(CLUSTER_NAME);
        ControllerClient controllerClient = new ControllerClient(CLUSTER_NAME, controllerUrl);
        try {
            AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(veniceClusterWrapper.getRandomRouterURL()));
            try {
                TestUtils.assertCommand(controllerClient.createNewStore(uniqueString, "test_owner", schema, parse.toString()));
                UpdateStoreQueryParams hybridOffsetLagThreshold = new UpdateStoreQueryParams().setStorageQuotaInByte(-1L).setCompressionStrategy(CompressionStrategy.NO_OP).setWriteComputationEnabled(true).setActiveActiveReplicationEnabled(true).setChunkingEnabled(true).setRmdChunkingEnabled(true).setHybridRewindSeconds(10L).setHybridOffsetLagThreshold(2L);
                ControllerResponse retryableRequest = controllerClient.retryableRequest(5, controllerClient2 -> {
                    return controllerClient2.updateStore(uniqueString, hybridOffsetLagThreshold);
                });
                Assert.assertFalse(retryableRequest.isError(), "Update store got error: " + retryableRequest.getError());
                controllerClient = new ControllerClient(CLUSTER_NAME, this.childDatacenters.get(0).getRandomController().getControllerUrl());
                try {
                    runVPJ(defaultVPJProps, 1, controllerClient);
                    controllerClient.close();
                    SystemProducer samzaProducer = IntegrationTestPushUtils.getSamzaProducer(veniceClusterWrapper, uniqueString, Version.PushType.STREAM, new Pair[0]);
                    for (int i = 1; i < 100; i++) {
                        IntegrationTestPushUtils.sendStreamingRecord(samzaProducer, uniqueString, String.valueOf(i), new UpdateBuilderImpl(convertFromValueRecordSchema).setNewFieldValue("firstName", "new_name_" + i).build());
                    }
                    TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, () -> {
                        for (int i2 = 1; i2 < 100; i2++) {
                            try {
                                String valueOf = String.valueOf(i2);
                                GenericRecord readValue = readValue(andStartGenericAvroClient, valueOf);
                                Assert.assertNotNull(readValue, "Key " + valueOf + " should not be missing!");
                                Assert.assertEquals(readValue.get("firstName").toString(), "new_name_" + valueOf);
                                Assert.assertEquals(readValue.get("lastName").toString(), "last_name_" + valueOf);
                            } catch (Exception e) {
                                throw new VeniceException(e);
                            }
                        }
                    });
                    if (andStartGenericAvroClient != null) {
                        andStartGenericAvroClient.close();
                    }
                    controllerClient.close();
                } finally {
                    try {
                        controllerClient.close();
                    } catch (Throwable th) {
                        th.addSuppressed(th);
                    }
                }
            } finally {
            }
        } catch (Throwable th2) {
            throw th2;
        }
    }

    @Test(timeOut = 480000)
    public void testReplicationMetadataChunkingE2E() throws IOException {
        String uniqueString = Utils.getUniqueString("rmdChunking");
        String controllerUrl = this.parentController.getControllerUrl();
        Schema parse = AvroCompatibilityHelper.parse(new String[]{TestWriteUtils.loadFileAsString("CollectionRecordV1.avsc")});
        Schema generateMetadataSchema = RmdSchemaGenerator.generateMetadataSchema(parse);
        Schema convertFromValueRecordSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(parse);
        ReadOnlySchemaRepository readOnlySchemaRepository = (ReadOnlySchemaRepository) Mockito.mock(ReadOnlySchemaRepository.class);
        Mockito.when(readOnlySchemaRepository.getReplicationMetadataSchema(uniqueString, 1, 1)).thenReturn(new RmdSchemaEntry(1, 1, generateMetadataSchema));
        Mockito.when(readOnlySchemaRepository.getDerivedSchema(uniqueString, 1, 1)).thenReturn(new DerivedSchemaEntry(1, 1, convertFromValueRecordSchema));
        Mockito.when(readOnlySchemaRepository.getValueSchema(uniqueString, 1)).thenReturn(new SchemaEntry(1, parse));
        RmdSerDe rmdSerDe = new RmdSerDe(new StringAnnotatedStoreSchemaCache(uniqueString, readOnlySchemaRepository), 1);
        ControllerClient controllerClient = new ControllerClient(CLUSTER_NAME, controllerUrl);
        try {
            TestUtils.assertCommand(controllerClient.createNewStore(uniqueString, "test_owner", "{\"type\" : \"string\"}", parse.toString()));
            UpdateStoreQueryParams hybridOffsetLagThreshold = new UpdateStoreQueryParams().setStorageQuotaInByte(-1L).setCompressionStrategy(CompressionStrategy.NO_OP).setWriteComputationEnabled(true).setActiveActiveReplicationEnabled(true).setChunkingEnabled(true).setRmdChunkingEnabled(true).setHybridRewindSeconds(10L).setHybridOffsetLagThreshold(2L);
            ControllerResponse retryableRequest = controllerClient.retryableRequest(5, controllerClient2 -> {
                return controllerClient2.updateStore(uniqueString, hybridOffsetLagThreshold);
            });
            Assert.assertFalse(retryableRequest.isError(), "Update store got error: " + retryableRequest.getError());
            VersionCreationResponse emptyPush = controllerClient.emptyPush(uniqueString, "test_push_id", 1000L);
            Assert.assertEquals(emptyPush.getVersion(), 1);
            Assert.assertFalse(emptyPush.isError(), "Empty push to parent colo should succeed");
            TestUtils.waitForNonDeterministicPushCompletion(Version.composeKafkaTopic(uniqueString, 1), controllerClient, 30L, TimeUnit.SECONDS);
            Assert.assertTrue(controllerClient.getStore(uniqueString).getStore().isRmdChunkingEnabled());
            Assert.assertTrue(((Version) controllerClient.getStore(uniqueString).getStore().getVersion(1).get()).isRmdChunkingEnabled());
            controllerClient.close();
            VeniceClusterWrapper veniceClusterWrapper = this.childDatacenters.get(0).getClusters().get(CLUSTER_NAME);
            SystemProducer samzaProducer = IntegrationTestPushUtils.getSamzaProducer(veniceClusterWrapper, uniqueString, Version.PushType.STREAM, new Pair[0]);
            String str = "key1";
            String str2 = "name";
            String str3 = "stringMap";
            int i = 30;
            int i2 = 10000;
            try {
                AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(veniceClusterWrapper.getRandomRouterURL()));
                try {
                    HashMap hashMap = new HashMap();
                    for (int i3 = 0; i3 < 30; i3++) {
                        UpdateBuilderImpl updateBuilderImpl = new UpdateBuilderImpl(convertFromValueRecordSchema);
                        updateBuilderImpl.setNewFieldValue("name", "Tottenham");
                        hashMap.clear();
                        for (int i4 = 0; i4 < 10000; i4++) {
                            String valueOf = String.valueOf((i3 * 10000) + i4);
                            hashMap.put("key_" + valueOf, "value_" + valueOf);
                        }
                        updateBuilderImpl.setEntriesToAddToMapField("stringMap", hashMap);
                        IntegrationTestPushUtils.sendStreamingRecord(samzaProducer, uniqueString, "key1", updateBuilderImpl.build(), Long.valueOf((i3 * 10) + 1));
                    }
                    TestUtils.waitForNonDeterministicAssertion(240000L, TimeUnit.MILLISECONDS, true, () -> {
                        try {
                            GenericRecord readValue = readValue(andStartGenericAvroClient, str);
                            Assert.assertFalse(readValue == null);
                            Assert.assertEquals(readValue.get(str2).toString(), "Tottenham");
                            HashMap hashMap2 = new HashMap();
                            ((Map) readValue.get(str3)).forEach((utf8, utf82) -> {
                                hashMap2.put(utf8.toString(), utf82.toString());
                            });
                            Assert.assertEquals(hashMap2.size(), i * i2);
                        } catch (Exception e) {
                            throw new VeniceException(e);
                        }
                    });
                    validateRmdData(rmdSerDe, Version.composeKafkaTopic(uniqueString, 1), "key1", rmdWithValueSchemaId -> {
                        Assert.assertEquals(((List) ((GenericRecord) ((GenericRecord) rmdWithValueSchemaId.getRmdRecord().get("timestamp")).get("stringMap")).get("activeElementsTimestamps")).size(), i * i2);
                    });
                    Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.multiRegionMultiClusterWrapper, "dummyInputPath", uniqueString);
                    defaultVPJProps.setProperty("source.kafka", "true");
                    defaultVPJProps.setProperty("kafka.input.broker.url", veniceClusterWrapper.getKafka().getAddress());
                    defaultVPJProps.setProperty("kafka.input.max.records.per.mapper", "5");
                    defaultVPJProps.put("rewind.time.in.seconds.override", 0);
                    TestWriteUtils.runPushJob("Run repush job", defaultVPJProps);
                    ControllerClient controllerClient3 = new ControllerClient(CLUSTER_NAME, this.childDatacenters.get(0).getControllerConnectString());
                    TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, () -> {
                        Assert.assertEquals(controllerClient3.getStore(uniqueString).getStore().getCurrentVersion(), 2);
                    });
                    veniceClusterWrapper.refreshAllRouterMetaData();
                    TestUtils.waitForNonDeterministicAssertion(240000L, TimeUnit.MILLISECONDS, true, () -> {
                        try {
                            GenericRecord readValue = readValue(andStartGenericAvroClient, str);
                            Assert.assertFalse(readValue == null);
                            Assert.assertEquals(readValue.get(str2).toString(), "Tottenham");
                            HashMap hashMap2 = new HashMap();
                            ((Map) readValue.get(str3)).forEach((utf8, utf82) -> {
                                hashMap2.put(utf8.toString(), utf82.toString());
                            });
                            Assert.assertEquals(hashMap2.size(), i * i2);
                        } catch (Exception e) {
                            throw new VeniceException(e);
                        }
                    });
                    String composeKafkaTopic = Version.composeKafkaTopic(uniqueString, 2);
                    validateRmdData(rmdSerDe, composeKafkaTopic, "key1", rmdWithValueSchemaId2 -> {
                        Assert.assertEquals(((List) ((GenericRecord) ((GenericRecord) rmdWithValueSchemaId2.getRmdRecord().get("timestamp")).get("stringMap")).get("activeElementsTimestamps")).size(), i * i2);
                    });
                    IntegrationTestPushUtils.sendStreamingDeleteRecord(samzaProducer, uniqueString, "key1", Long.valueOf((30 - 1) * 10));
                    TestUtils.waitForNonDeterministicAssertion(120000L, TimeUnit.MILLISECONDS, true, () -> {
                        GenericRecord readValue = readValue(andStartGenericAvroClient, str);
                        Assert.assertFalse(readValue == null);
                        HashMap hashMap2 = new HashMap();
                        ((Map) readValue.get(str3)).forEach((utf8, utf82) -> {
                            hashMap2.put(utf8.toString(), utf82.toString());
                        });
                        Assert.assertEquals(hashMap2.size(), i2);
                    });
                    validateRmdData(rmdSerDe, composeKafkaTopic, "key1", rmdWithValueSchemaId3 -> {
                        GenericRecord genericRecord = (GenericRecord) ((GenericRecord) rmdWithValueSchemaId3.getRmdRecord().get("timestamp")).get("stringMap");
                        Assert.assertEquals(((List) genericRecord.get("activeElementsTimestamps")).size(), i2);
                        Assert.assertEquals(((List) genericRecord.get("deletedElementsTimestamps")).size(), 0);
                    });
                    IntegrationTestPushUtils.sendStreamingDeleteRecord(samzaProducer, uniqueString, "key1", Long.valueOf(30 * 10));
                    TestUtils.waitForNonDeterministicAssertion(120000L, TimeUnit.MILLISECONDS, true, () -> {
                        Assert.assertTrue(readValue(andStartGenericAvroClient, str) == null);
                    });
                    validateRmdData(rmdSerDe, composeKafkaTopic, "key1", rmdWithValueSchemaId4 -> {
                        Assert.assertTrue(rmdWithValueSchemaId4.getRmdRecord().get("timestamp") instanceof GenericRecord);
                        Assert.assertEquals(((GenericRecord) ((GenericRecord) rmdWithValueSchemaId4.getRmdRecord().get("timestamp")).get("stringMap")).get("topLevelFieldTimestamp"), Long.valueOf(i * 10));
                    });
                    if (andStartGenericAvroClient != null) {
                        andStartGenericAvroClient.close();
                    }
                } finally {
                }
            } finally {
                samzaProducer.stop();
            }
        } catch (Throwable th) {
            try {
                controllerClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void validateRmdData(RmdSerDe rmdSerDe, String str, String str2, Consumer<RmdWithValueSchemaId> consumer) {
        Iterator<VeniceServerWrapper> it = this.multiRegionMultiClusterWrapper.getChildRegions().get(0).getClusters().get(CLUSTER_NAME).getVeniceServers().iterator();
        while (it.hasNext()) {
            AbstractStorageEngine storageEngine = it.next().getVeniceServer().getStorageService().getStorageEngine(str);
            Assert.assertNotNull(storageEngine);
            ValueRecord replicationMetadata = SingleGetChunkingAdapter.getReplicationMetadata(storageEngine, 0, serializeStringKeyToByteArray(str2), true, (ReadResponse) null);
            Assert.assertFalse(replicationMetadata == null);
            consumer.accept(rmdSerDe.deserializeValueSchemaIdPrependedRmdBytes(replicationMetadata.serialize()));
        }
    }

    @Test(timeOut = 120000)
    public void testUpdateWithSupersetSchema() throws IOException {
        String uniqueString = Utils.getUniqueString("store");
        String controllerUrl = this.parentController.getControllerUrl();
        Schema parse = AvroCompatibilityHelper.parse(new String[]{TestWriteUtils.loadFileAsString("writecompute/test/PersonV1.avsc")});
        Schema parse2 = AvroCompatibilityHelper.parse(new String[]{TestWriteUtils.loadFileAsString("writecompute/test/PersonV2.avsc")});
        String str = "name";
        ControllerClient controllerClient = new ControllerClient(CLUSTER_NAME, controllerUrl);
        try {
            TestUtils.assertCommand(controllerClient.createNewStore(uniqueString, "test_owner", "{\"type\" : \"string\"}", parse.toString()));
            UpdateStoreQueryParams hybridOffsetLagThreshold = new UpdateStoreQueryParams().setStorageQuotaInByte(-1L).setCompressionStrategy(CompressionStrategy.NO_OP).setWriteComputationEnabled(true).setHybridRewindSeconds(10L).setHybridOffsetLagThreshold(2L);
            ControllerResponse retryableRequest = controllerClient.retryableRequest(5, controllerClient2 -> {
                return controllerClient2.updateStore(uniqueString, hybridOffsetLagThreshold);
            });
            Assert.assertFalse(retryableRequest.isError(), "Update store got error: " + retryableRequest.getError());
            VersionCreationResponse emptyPush = controllerClient.emptyPush(uniqueString, "test_push_id", 1000L);
            Assert.assertEquals(emptyPush.getVersion(), 1);
            Assert.assertFalse(emptyPush.isError(), "Empty push to parent colo should succeed");
            TestUtils.waitForNonDeterministicPushCompletion(Version.composeKafkaTopic(uniqueString, 1), controllerClient, 30L, TimeUnit.SECONDS);
            TestUtils.assertCommand(controllerClient.addValueSchema(uniqueString, parse2.toString()));
            controllerClient.close();
            SystemProducer systemProducer = null;
            VeniceClusterWrapper veniceClusterWrapper = this.childDatacenters.get(0).getClusters().get(CLUSTER_NAME);
            try {
                AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(veniceClusterWrapper.getRandomRouterURL()));
                try {
                    systemProducer = IntegrationTestPushUtils.getSamzaProducer(veniceClusterWrapper, uniqueString, Version.PushType.STREAM, new Pair[0]);
                    String str2 = "key1";
                    GenericData.Record record = new GenericData.Record(parse);
                    record.put("name", "Lebron");
                    record.put("age", 37);
                    IntegrationTestPushUtils.sendStreamingRecord(systemProducer, uniqueString, "key1", record);
                    TestUtils.waitForNonDeterministicAssertion(120L, TimeUnit.SECONDS, () -> {
                        try {
                            GenericRecord readValue = readValue(andStartGenericAvroClient, str2);
                            Assert.assertNotNull(readValue);
                            Assert.assertEquals(readValue.get(str).toString(), "Lebron");
                            Assert.assertEquals(readValue.get("age").toString(), "37");
                        } catch (Exception e) {
                            throw new VeniceException(e);
                        }
                    });
                    UpdateBuilderImpl updateBuilderImpl = new UpdateBuilderImpl(WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(parse2));
                    updateBuilderImpl.setNewFieldValue("name", "Lebron James");
                    updateBuilderImpl.setNewFieldValue("hometown", "Akron");
                    IntegrationTestPushUtils.sendStreamingRecord(systemProducer, uniqueString, "key1", updateBuilderImpl.build());
                    TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, () -> {
                        try {
                            GenericRecord readValue = readValue(andStartGenericAvroClient, str2);
                            Assert.assertNotNull(readValue);
                            Assert.assertEquals(readValue.get(str).toString(), "Lebron James");
                            Assert.assertEquals(readValue.get("age").toString(), "37");
                            Assert.assertEquals(readValue.get("hometown").toString(), "Akron");
                        } catch (Exception e) {
                            throw new VeniceException(e);
                        }
                    });
                    if (andStartGenericAvroClient != null) {
                        andStartGenericAvroClient.close();
                    }
                    if (systemProducer != null) {
                        systemProducer.stop();
                    }
                } finally {
                }
            } catch (Throwable th) {
                if (systemProducer != null) {
                    systemProducer.stop();
                }
                throw th;
            }
        } catch (Throwable th2) {
            try {
                controllerClient.close();
            } catch (Throwable th3) {
                th2.addSuppressed(th3);
            }
            throw th2;
        }
    }

    private GenericRecord readValue(AvroGenericStoreClient<Object, Object> avroGenericStoreClient, String str) throws ExecutionException, InterruptedException {
        return (GenericRecord) avroGenericStoreClient.get(str).get();
    }

    @Test(timeOut = 120000, dataProvider = "Boolean-Compression", dataProviderClass = DataProviderUtils.class)
    public void testWriteComputeWithHybridLeaderFollowerLargeRecord(boolean z, CompressionStrategy compressionStrategy) throws Exception {
        SystemProducer systemProducer = null;
        try {
            String uniqueString = Utils.getUniqueString("write-compute-store");
            File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
            String str = "file://" + tempDataDirectory.getAbsolutePath();
            String controllerUrl = this.parentController.getControllerUrl();
            Schema writeSimpleAvroFileWithStringToRecordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToRecordSchema(tempDataDirectory, true);
            VeniceClusterWrapper veniceClusterWrapper = this.childDatacenters.get(0).getClusters().get(CLUSTER_NAME);
            Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.multiRegionMultiClusterWrapper, str, uniqueString);
            ControllerClient controllerClient = new ControllerClient(CLUSTER_NAME, controllerUrl);
            try {
                AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(veniceClusterWrapper.getRandomRouterURL()));
                try {
                    TestUtils.assertCommand(controllerClient.createNewStore(uniqueString, "test_owner", writeSimpleAvroFileWithStringToRecordSchema.getField("key").schema().toString(), writeSimpleAvroFileWithStringToRecordSchema.getField("value").schema().toString()));
                    Assert.assertFalse(controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setHybridRewindSeconds(10L).setHybridOffsetLagThreshold(2L).setStorageQuotaInByte(-1L).setChunkingEnabled(true).setCompressionStrategy(compressionStrategy).setWriteComputationEnabled(true).setHybridRewindSeconds(10L).setHybridOffsetLagThreshold(2L)).isError());
                    SchemaResponse addValueSchema = controllerClient.addValueSchema(uniqueString, "{  \"namespace\" : \"example.avro\",    \"type\": \"record\",     \"name\": \"nameRecord\",       \"fields\": [                  { \"name\": \"firstName\", \"type\": \"string\", \"default\": \"\" },         { \"name\": \"lastName\", \"type\": \"string\", \"default\": \"\" },         { \"name\": \"age\", \"type\": \"int\", \"default\": -1 }    ] } ");
                    Assert.assertFalse(addValueSchema.isError());
                    Schema convertFromValueRecordSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(AvroCompatibilityHelper.parse(new String[]{"{  \"namespace\" : \"example.avro\",    \"type\": \"record\",     \"name\": \"nameRecord\",       \"fields\": [                  { \"name\": \"firstName\", \"type\": \"string\", \"default\": \"\" },         { \"name\": \"lastName\", \"type\": \"string\", \"default\": \"\" },         { \"name\": \"age\", \"type\": \"int\", \"default\": -1 }    ] } "}));
                    Assert.assertFalse(controllerClient.addDerivedSchema(uniqueString, addValueSchema.getId(), convertFromValueRecordSchema.toString()).isError());
                    ControllerClient controllerClient2 = new ControllerClient(CLUSTER_NAME, this.childDatacenters.get(0).getRandomController().getControllerUrl());
                    try {
                        runVPJ(defaultVPJProps, 1, controllerClient2);
                        controllerClient2.close();
                        TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, () -> {
                            for (int i = 1; i < 100; i++) {
                                try {
                                    String valueOf = String.valueOf(i);
                                    GenericRecord readValue = readValue(andStartGenericAvroClient, valueOf);
                                    Assert.assertNotNull(readValue, "Key " + valueOf + " should not be missing!");
                                    Assert.assertEquals(readValue.get("firstName").toString(), "first_name_" + valueOf);
                                    Assert.assertEquals(readValue.get("lastName").toString(), "last_name_" + valueOf);
                                    Assert.assertEquals(readValue.get("age"), -1);
                                } catch (Exception e) {
                                    throw new VeniceException(e);
                                }
                            }
                        });
                        if (z) {
                            String composeKafkaTopic = Version.composeKafkaTopic(uniqueString, 1);
                            Iterator<VeniceServerWrapper> it = veniceClusterWrapper.getVeniceServers().iterator();
                            while (it.hasNext()) {
                                StoreIngestionTaskBackdoor.setPurgeTransientRecordBuffer(it.next(), composeKafkaTopic, false);
                            }
                        }
                        SystemProducer samzaProducer = IntegrationTestPushUtils.getSamzaProducer(veniceClusterWrapper, uniqueString, Version.PushType.STREAM, new Pair[0]);
                        String valueOf = String.valueOf(101);
                        GenericData.Record record = new GenericData.Record(AvroCompatibilityHelper.parse(new String[]{"{  \"namespace\" : \"example.avro\",    \"type\": \"record\",     \"name\": \"nameRecord\",       \"fields\": [                  { \"name\": \"firstName\", \"type\": \"string\", \"default\": \"\" },         { \"name\": \"lastName\", \"type\": \"string\", \"default\": \"\" }    ] } "}));
                        char[] cArr = new char[100];
                        Arrays.fill(cArr, 'f');
                        String str2 = new String(cArr);
                        Arrays.fill(cArr, 'l');
                        String str3 = new String(cArr);
                        record.put("firstName", str2);
                        record.put("lastName", str3);
                        IntegrationTestPushUtils.sendStreamingRecord(samzaProducer, uniqueString, valueOf, record);
                        TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, true, () -> {
                            try {
                                GenericRecord readValue = readValue(andStartGenericAvroClient, valueOf);
                                Assert.assertNotNull(readValue, "Key " + valueOf + " should not be missing!");
                                Assert.assertEquals(readValue.get("firstName").toString(), str2);
                                Assert.assertEquals(readValue.get("lastName").toString(), str3);
                            } catch (Exception e) {
                                throw new VeniceException(e);
                            }
                        });
                        Arrays.fill(cArr, 'u');
                        String str4 = new String(cArr);
                        UpdateBuilderImpl updateBuilderImpl = new UpdateBuilderImpl(convertFromValueRecordSchema);
                        updateBuilderImpl.setNewFieldValue("firstName", str4);
                        updateBuilderImpl.setNewFieldValue("age", 1);
                        IntegrationTestPushUtils.sendStreamingRecord(samzaProducer, uniqueString, valueOf, updateBuilderImpl.build());
                        TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, true, () -> {
                            try {
                                GenericRecord readValue = readValue(andStartGenericAvroClient, valueOf);
                                Assert.assertNotNull(readValue, "Key " + valueOf + " should not be missing!");
                                Assert.assertEquals(readValue.get("firstName").toString(), str4);
                                Assert.assertEquals(readValue.get("lastName").toString(), str3);
                                Assert.assertEquals(readValue.get("age"), 1);
                            } catch (Exception e) {
                                throw new VeniceException(e);
                            }
                        });
                        Arrays.fill(cArr, 'v');
                        String str5 = new String(cArr);
                        UpdateBuilderImpl updateBuilderImpl2 = new UpdateBuilderImpl(convertFromValueRecordSchema);
                        updateBuilderImpl2.setNewFieldValue("firstName", str5);
                        IntegrationTestPushUtils.sendStreamingRecord(samzaProducer, uniqueString, valueOf, updateBuilderImpl2.build());
                        TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, () -> {
                            try {
                                GenericRecord readValue = readValue(andStartGenericAvroClient, valueOf);
                                Assert.assertNotNull(readValue, "Key " + valueOf + " should not be missing!");
                                Assert.assertEquals(readValue.get("firstName").toString(), str5);
                                Assert.assertEquals(readValue.get("lastName").toString(), str3);
                            } catch (Exception e) {
                                throw new VeniceException(e);
                            }
                        });
                        IntegrationTestPushUtils.sendStreamingRecord(samzaProducer, uniqueString, valueOf, null);
                        TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, true, () -> {
                            try {
                                Assert.assertNull(readValue(andStartGenericAvroClient, valueOf), "Key " + valueOf + " should be missing!");
                            } catch (Exception e) {
                                throw new VeniceException(e);
                            }
                        });
                        Arrays.fill(cArr, 'w');
                        String str6 = new String(cArr);
                        Arrays.fill(cArr, 'g');
                        String str7 = new String(cArr);
                        UpdateBuilderImpl updateBuilderImpl3 = new UpdateBuilderImpl(convertFromValueRecordSchema);
                        updateBuilderImpl3.setNewFieldValue("firstName", str6);
                        updateBuilderImpl3.setNewFieldValue("lastName", str7);
                        updateBuilderImpl3.setNewFieldValue("age", 2);
                        IntegrationTestPushUtils.sendStreamingRecord(samzaProducer, uniqueString, valueOf, updateBuilderImpl3.build());
                        TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, true, () -> {
                            try {
                                GenericRecord readValue = readValue(andStartGenericAvroClient, valueOf);
                                Assert.assertNotNull(readValue, "Key " + valueOf + " should not be missing!");
                                Assert.assertEquals(readValue.get("firstName").toString(), str6);
                                Assert.assertEquals(readValue.get("lastName").toString(), str7);
                                Assert.assertEquals(readValue.get("age"), 2);
                            } catch (Exception e) {
                                throw new VeniceException(e);
                            }
                        });
                        Arrays.fill(cArr, 'x');
                        String str8 = new String(cArr);
                        UpdateBuilderImpl updateBuilderImpl4 = new UpdateBuilderImpl(convertFromValueRecordSchema);
                        updateBuilderImpl4.setNewFieldValue("firstName", str8);
                        IntegrationTestPushUtils.sendStreamingRecord(samzaProducer, uniqueString, valueOf, updateBuilderImpl4.build());
                        TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, true, () -> {
                            try {
                                GenericRecord readValue = readValue(andStartGenericAvroClient, valueOf);
                                Assert.assertNotNull(readValue, "Key " + valueOf + " should not be missing!");
                                Assert.assertEquals(readValue.get("firstName").toString(), str8);
                                Assert.assertEquals(readValue.get("lastName").toString(), str7);
                            } catch (Exception e) {
                                throw new VeniceException(e);
                            }
                        });
                        if (andStartGenericAvroClient != null) {
                            andStartGenericAvroClient.close();
                        }
                        controllerClient.close();
                        if (samzaProducer != null) {
                            samzaProducer.stop();
                        }
                    } catch (Throwable th) {
                        try {
                            controllerClient2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    if (andStartGenericAvroClient != null) {
                        try {
                            andStartGenericAvroClient.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        } catch (Throwable th5) {
            if (0 != 0) {
                systemProducer.stop();
            }
            throw th5;
        }
    }

    @Test(timeOut = 120000)
    public void testWriteComputeWithSamzaBatchJob() throws Exception {
        SystemProducer systemProducer = null;
        String uniqueString = Utils.getUniqueString("write-compute-store");
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        String controllerUrl = this.parentController.getControllerUrl();
        Schema writeSimpleAvroFileWithStringToRecordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToRecordSchema(tempDataDirectory, true);
        VeniceClusterWrapper veniceClusterWrapper = this.childDatacenters.get(0).getClusters().get(CLUSTER_NAME);
        try {
            ControllerClient controllerClient = new ControllerClient(CLUSTER_NAME, controllerUrl);
            try {
                AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(veniceClusterWrapper.getRandomRouterURL()));
                try {
                    TestUtils.assertCommand(controllerClient.createNewStore(uniqueString, "test_owner", writeSimpleAvroFileWithStringToRecordSchema.getField("key").schema().toString(), writeSimpleAvroFileWithStringToRecordSchema.getField("value").schema().toString()));
                    Assert.assertFalse(controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setHybridRewindSeconds(10L).setHybridOffsetLagThreshold(2L).setStorageQuotaInByte(-1L).setWriteComputationEnabled(true).setChunkingEnabled(true).setHybridRewindSeconds(10L).setHybridOffsetLagThreshold(2L)).isError());
                    SchemaResponse addValueSchema = controllerClient.addValueSchema(uniqueString, "{  \"namespace\" : \"example.avro\",    \"type\": \"record\",     \"name\": \"nameRecord\",       \"fields\": [                  { \"name\": \"firstName\", \"type\": \"string\", \"default\": \"\" },         { \"name\": \"lastName\", \"type\": \"string\", \"default\": \"\" },         { \"name\": \"age\", \"type\": \"int\", \"default\": -1 }    ] } ");
                    Assert.assertFalse(addValueSchema.isError());
                    Schema convertFromValueRecordSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(AvroCompatibilityHelper.parse(new String[]{"{  \"namespace\" : \"example.avro\",    \"type\": \"record\",     \"name\": \"nameRecord\",       \"fields\": [                  { \"name\": \"firstName\", \"type\": \"string\", \"default\": \"\" },         { \"name\": \"lastName\", \"type\": \"string\", \"default\": \"\" },         { \"name\": \"age\", \"type\": \"int\", \"default\": -1 }    ] } "}));
                    Assert.assertFalse(controllerClient.addDerivedSchema(uniqueString, addValueSchema.getId(), convertFromValueRecordSchema.toString()).isError());
                    controllerClient.sendEmptyPushAndWait(uniqueString, "foopush", 10000L, 60000L);
                    VeniceSystemFactory veniceSystemFactory = new VeniceSystemFactory();
                    Map<String, String> samzaProducerConfig = IntegrationTestPushUtils.getSamzaProducerConfig(veniceClusterWrapper, uniqueString, Version.PushType.BATCH);
                    samzaProducerConfig.put("systems.venice.aggregate", "true");
                    samzaProducerConfig.put("venice.parent.d2.zk.hosts", this.multiRegionMultiClusterWrapper.getZkServerWrapper().getAddress());
                    samzaProducerConfig.put("venice.parent.controller.d2.service", VeniceControllerWrapper.PARENT_D2_SERVICE_NAME);
                    samzaProducerConfig.put("deployment.id", Utils.getUniqueString("venice-push-id"));
                    SystemProducer producer = veniceSystemFactory.getProducer("venice", new MapConfig(samzaProducerConfig), (MetricsRegistry) null);
                    producer.start();
                    char[] cArr = new char[5];
                    Arrays.fill(cArr, 'f');
                    String str = new String(cArr);
                    Arrays.fill(cArr, 'l');
                    String str2 = new String(cArr);
                    UpdateBuilderImpl updateBuilderImpl = new UpdateBuilderImpl(convertFromValueRecordSchema);
                    updateBuilderImpl.setNewFieldValue("firstName", str);
                    updateBuilderImpl.setNewFieldValue("lastName", str2);
                    GenericRecord build = updateBuilderImpl.build();
                    for (int i = 0; i < 10; i++) {
                        IntegrationTestPushUtils.sendStreamingRecord(producer, uniqueString, String.valueOf(i), build);
                    }
                    controllerClient.writeEndOfPush(uniqueString, 2);
                    TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, true, () -> {
                        for (int i2 = 0; i2 < 10; i2++) {
                            try {
                                GenericRecord readValue = readValue(andStartGenericAvroClient, Integer.toString(i2));
                                Assert.assertNotNull(readValue, "Key " + i2 + " should not be missing!");
                                Assert.assertEquals(readValue.get("firstName").toString(), str);
                                Assert.assertEquals(readValue.get("lastName").toString(), str2);
                                Assert.assertEquals(readValue.get("age").toString(), "-1");
                            } catch (Exception e) {
                                throw new VeniceException(e);
                            }
                        }
                    });
                    if (andStartGenericAvroClient != null) {
                        andStartGenericAvroClient.close();
                    }
                    controllerClient.close();
                    if (producer != null) {
                        producer.stop();
                    }
                } catch (Throwable th) {
                    if (andStartGenericAvroClient != null) {
                        try {
                            andStartGenericAvroClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (0 != 0) {
                systemProducer.stop();
            }
            throw th3;
        }
    }

    private void runVPJ(Properties properties, int i, ControllerClient controllerClient) {
        VenicePushJob venicePushJob = new VenicePushJob(Utils.getUniqueString("write-compute-job-" + i), properties);
        try {
            venicePushJob.run();
            TestUtils.waitForNonDeterministicCompletion(60L, TimeUnit.SECONDS, () -> {
                return controllerClient.getStore((String) properties.get("venice.store.name")).getStore().getCurrentVersion() == i;
            });
            venicePushJob.close();
        } catch (Throwable th) {
            try {
                venicePushJob.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @AfterClass(alwaysRun = true)
    public void cleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.multiRegionMultiClusterWrapper});
    }

    private byte[] serializeStringKeyToByteArray(String str) {
        Utf8 utf8 = new Utf8(str);
        GenericDatumWriter genericDatumWriter = new GenericDatumWriter(Schema.create(Schema.Type.STRING));
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryEncoder newBinaryEncoder = AvroCompatibilityHelper.newBinaryEncoder(byteArrayOutputStream);
        try {
            genericDatumWriter.write(utf8, newBinaryEncoder);
            newBinaryEncoder.flush();
            return byteArrayOutputStream.toByteArray();
        } catch (IOException e) {
            throw new RuntimeException("Failed to write input: " + utf8 + " to binary encoder", e);
        }
    }
}
