package com.linkedin.venice.endToEnd;

import com.linkedin.davinci.client.DaVinciClient;
import com.linkedin.davinci.client.DaVinciConfig;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.exceptions.ErrorType;
import com.linkedin.venice.exceptions.ExceptionType;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.VenicePushJob;
import com.linkedin.venice.helix.HelixExternalViewRepository;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.TestVeniceServer;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.VeniceUserStoreType;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.samza.VeniceSystemFactory;
import com.linkedin.venice.samza.VeniceSystemProducer;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.status.BatchJobHeartbeatConfigs;
import com.linkedin.venice.status.protocol.BatchJobHeartbeatKey;
import com.linkedin.venice.status.protocol.BatchJobHeartbeatValue;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.samza.config.MapConfig;
import org.apache.samza.metrics.MetricsRegistry;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.class */
public class TestPushJobWithNativeReplication {
    private static final int TEST_TIMEOUT = 120000;
    private static final int NUMBER_OF_CHILD_DATACENTERS = 2;
    private static final int NUMBER_OF_CLUSTERS = 1;
    private static final String DEFAULT_NATIVE_REPLICATION_SOURCE = "dc-0";
    private List<VeniceMultiClusterWrapper> childDatacenters;
    private List<VeniceControllerWrapper> parentControllers;
    private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper;
    private PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
    private static final Logger LOGGER = LogManager.getLogger(TestPushJobWithNativeReplication.class);
    private static final String[] CLUSTER_NAMES = (String[]) IntStream.range(0, 1).mapToObj(i -> {
        return "venice-cluster" + i;
    }).toArray(i2 -> {
        return new String[i2];
    });
    private static final String VPJ_HEARTBEAT_STORE_CLUSTER = CLUSTER_NAMES[0];
    private static final String VPJ_HEARTBEAT_STORE_NAME = AvroProtocolDefinition.BATCH_JOB_HEARTBEAT.getSystemStoreName();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication$NativeReplicationTest.class */
    public interface NativeReplicationTest {
        void run(ControllerClient controllerClient, String str, String str2, Properties properties, File file) throws Exception;
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "storeSize")
    public static Object[][] storeSize() {
        return new Object[]{new Object[]{50, Integer.valueOf(NUMBER_OF_CHILD_DATACENTERS)}, new Object[]{1000, 10}};
    }

    @BeforeClass(alwaysRun = true)
    public void setUp() {
        Properties properties = new Properties();
        properties.put("server.promotion.to.leader.replica.delay.seconds", 1L);
        properties.setProperty("rocksdb.plain.table.format.enabled", "false");
        properties.setProperty("server.database.checksum.verification.enabled", "true");
        properties.setProperty("server.database.sync.bytes.interval.for.deferred.write.mode", "300");
        properties.put("server.shared.kafka.producer.enabled", "true");
        properties.put("server.kafka.producer.pool.size.per.kafka.cluster", "1");
        Properties properties2 = new Properties();
        properties2.put("default.partition.max.count", 10);
        properties2.put(BatchJobHeartbeatConfigs.HEARTBEAT_STORE_CLUSTER_CONFIG.getConfigName(), VPJ_HEARTBEAT_STORE_CLUSTER);
        properties2.put(BatchJobHeartbeatConfigs.HEARTBEAT_ENABLED_CONFIG.getConfigName(), true);
        properties2.put("emergency.source.region", DEFAULT_NATIVE_REPLICATION_SOURCE);
        this.multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(NUMBER_OF_CHILD_DATACENTERS, 1, 1, 1, NUMBER_OF_CHILD_DATACENTERS, 1, NUMBER_OF_CHILD_DATACENTERS, Optional.of(new VeniceProperties(properties2)), Optional.of(properties2), Optional.of(new VeniceProperties(properties)), false);
        this.childDatacenters = this.multiRegionMultiClusterWrapper.getChildRegions();
        this.parentControllers = this.multiRegionMultiClusterWrapper.getParentControllers();
    }

    @AfterClass(alwaysRun = true)
    public void cleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.multiRegionMultiClusterWrapper});
    }

    @Test(timeOut = 120000, dataProvider = "storeSize")
    public void testNativeReplicationForBatchPush(int i, int i2) throws Exception {
        motherOfAllTests("testNativeReplicationForBatchPush", updateStoreQueryParams -> {
            return updateStoreQueryParams.setPartitionCount(i2).setAmplificationFactor(NUMBER_OF_CHILD_DATACENTERS);
        }, i, (controllerClient, str, str2, properties, file) -> {
            VenicePushJob venicePushJob = new VenicePushJob("Test push job", properties);
            try {
                venicePushJob.run();
                Assert.assertEquals(venicePushJob.getKafkaUrl(), this.childDatacenters.get(0).getKafkaBrokerWrapper().getAddress());
                venicePushJob.close();
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                    Iterator it = controllerClient.getStore(str2).getStore().getColoToCurrentVersions().values().iterator();
                    while (it.hasNext()) {
                        Assert.assertEquals(((Integer) it.next()).intValue(), 1);
                    }
                    VeniceMultiClusterWrapper veniceMultiClusterWrapper = this.childDatacenters.get(1);
                    AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(str2).setVeniceURL(veniceMultiClusterWrapper.getClusters().get(str).getRandomRouterURL()));
                    for (int i3 = 1; i3 <= i; i3++) {
                        try {
                            Assert.assertEquals(andStartGenericAvroClient.get(Integer.toString(i3)).get().toString(), "test_name_" + i3);
                        } catch (Throwable th) {
                            if (andStartGenericAvroClient != null) {
                                try {
                                    andStartGenericAvroClient.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    }
                    if (andStartGenericAvroClient != null) {
                        andStartGenericAvroClient.close();
                    }
                    String composeKafkaTopic = Version.composeKafkaTopic(str2, 1);
                    TestVeniceServer veniceServer = veniceMultiClusterWrapper.getClusters().get(str).getVeniceServers().get(0).getVeniceServer();
                    Set partitionIds = veniceServer.getStorageService().getStorageEngineRepository().getLocalStorageEngine(composeKafkaTopic).getPartitionIds();
                    Assert.assertFalse(partitionIds.isEmpty());
                    int intValue = ((Integer) partitionIds.iterator().next()).intValue();
                    Assert.assertTrue(veniceServer.getStorageMetadataService().getLastOffset(composeKafkaTopic, intValue).getLocalVersionTopicOffset() <= veniceMultiClusterWrapper.getRandomController().getVeniceAdmin().getTopicManager().getPartitionLatestOffsetAndRetry(new PubSubTopicPartitionImpl(this.pubSubTopicRepository.getTopic(composeKafkaTopic), intValue), 5));
                });
            } catch (Throwable th) {
                try {
                    venicePushJob.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        });
    }

    @Test(timeOut = 240000)
    public void testNativeReplicationWithLeadershipHandover() throws Exception {
        int i = 10000;
        motherOfAllTests("testNativeReplicationWithLeadershipHandover", updateStoreQueryParams -> {
            return updateStoreQueryParams.setPartitionCount(1).setAmplificationFactor(NUMBER_OF_CHILD_DATACENTERS);
        }, 10000, (controllerClient, str, str2, properties, file) -> {
            Thread thread = new Thread(() -> {
                TestWriteUtils.runPushJob("Test push job", properties);
            });
            thread.start();
            try {
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, false, () -> {
                    VeniceClusterWrapper veniceClusterWrapper = this.childDatacenters.get(1).getClusters().get(str);
                    String composeKafkaTopic = Version.composeKafkaTopic(str2, 1);
                    HelixExternalViewRepository routingDataRepository = veniceClusterWrapper.getLeaderVeniceController().getVeniceHelixAdmin().getHelixVeniceClusterResources(str).getRoutingDataRepository();
                    Assert.assertTrue(routingDataRepository.containsKafkaTopic(composeKafkaTopic));
                    Instance leaderInstance = routingDataRepository.getLeaderInstance(composeKafkaTopic, 0);
                    Assert.assertNotNull(leaderInstance);
                    LOGGER.info("Restart server port {}", Integer.valueOf(leaderInstance.getPort()));
                    veniceClusterWrapper.stopAndRestartVeniceServer(leaderInstance.getPort());
                });
                TestUtils.waitForNonDeterministicAssertion(120L, TimeUnit.SECONDS, true, () -> {
                    Iterator it = controllerClient.getStore(str2).getStore().getColoToCurrentVersions().values().iterator();
                    while (it.hasNext()) {
                        Assert.assertEquals(((Integer) it.next()).intValue(), 1, "Current version should become 1!");
                    }
                    AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(str2).setVeniceURL(this.childDatacenters.get(1).getClusters().get(str).getRandomRouterURL()));
                    for (int i2 = 1; i2 <= i; i2++) {
                        try {
                            Assert.assertEquals(andStartGenericAvroClient.get(Integer.toString(i2)).get().toString(), "test_name_" + i2);
                        } catch (Throwable th) {
                            if (andStartGenericAvroClient != null) {
                                try {
                                    andStartGenericAvroClient.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    }
                    if (andStartGenericAvroClient != null) {
                        andStartGenericAvroClient.close();
                    }
                });
                TestUtils.shutdownThread(thread);
            } catch (Throwable th) {
                TestUtils.shutdownThread(thread);
                throw th;
            }
        });
    }

    @Test(timeOut = 120000)
    public void testNativeReplicationWithIngestionIsolationInDaVinci() throws Exception {
        int i = 100;
        motherOfAllTests("testNativeReplicationWithIngestionIsolationInDaVinci", updateStoreQueryParams -> {
            return updateStoreQueryParams.setPartitionCount(NUMBER_OF_CHILD_DATACENTERS).setAmplificationFactor(NUMBER_OF_CHILD_DATACENTERS);
        }, 100, (controllerClient, str, str2, properties, file) -> {
            VenicePushJob venicePushJob = new VenicePushJob("Test push job", properties);
            try {
                venicePushJob.run();
                Assert.assertEquals(venicePushJob.getKafkaUrl(), this.childDatacenters.get(0).getKafkaBrokerWrapper().getAddress());
                venicePushJob.close();
                DaVinciClient genericAvroDaVinciClientWithRetries = ServiceFactory.getGenericAvroDaVinciClientWithRetries(str2, this.childDatacenters.get(1).getClusters().get(str).getZk().getAddress(), new DaVinciConfig(), TestUtils.getIngestionIsolationPropertyMap());
                try {
                    genericAvroDaVinciClientWithRetries.subscribeAll().get();
                    for (int i2 = 1; i2 <= i; i2++) {
                        Assert.assertEquals(genericAvroDaVinciClientWithRetries.get(Integer.toString(i2)).get().toString(), "test_name_" + i2);
                    }
                    if (genericAvroDaVinciClientWithRetries != null) {
                        genericAvroDaVinciClientWithRetries.close();
                    }
                } catch (Throwable th) {
                    if (genericAvroDaVinciClientWithRetries != null) {
                        try {
                            genericAvroDaVinciClientWithRetries.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                try {
                    venicePushJob.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
                throw th3;
            }
        });
    }

    @Test(timeOut = 120000)
    public void testNativeReplicationForHybrid() throws Exception {
        motherOfAllTests("testNativeReplicationForHybrid", updateStoreQueryParams -> {
            return updateStoreQueryParams.setPartitionCount(1).setAmplificationFactor(NUMBER_OF_CHILD_DATACENTERS).setHybridRewindSeconds(120000L).setHybridOffsetLagThreshold(2L).setHybridDataReplicationPolicy(DataReplicationPolicy.AGGREGATE);
        }, 50, (controllerClient, str, str2, properties, file) -> {
            TestWriteUtils.runPushJob("Test push job", properties);
            VeniceMultiClusterWrapper veniceMultiClusterWrapper = this.childDatacenters.get(1);
            HybridStoreConfig hybridStoreConfig = ((Version) veniceMultiClusterWrapper.getRandomController().getVeniceAdmin().getStore(str, str2).getVersion(1).get()).getHybridStoreConfig();
            Assert.assertNotNull(hybridStoreConfig);
            Assert.assertEquals(hybridStoreConfig.getRewindTimeInSeconds(), 120000L);
            Assert.assertEquals(hybridStoreConfig.getOffsetLagThresholdToGoOnline(), 2L);
            Assert.assertEquals(hybridStoreConfig.getDataReplicationPolicy(), DataReplicationPolicy.AGGREGATE);
            HashMap hashMap = new HashMap();
            hashMap.put("systems.venice.push.type", Version.PushType.STREAM.toString());
            hashMap.put("systems.venice.store", str2);
            hashMap.put("systems.venice.aggregate", "true");
            hashMap.put("venice.child.d2.zk.hosts", this.childDatacenters.get(0).getZkServerWrapper().getAddress());
            hashMap.put("venice.child.controller.d2.service", VeniceControllerWrapper.D2_SERVICE_NAME);
            hashMap.put("venice.parent.d2.zk.hosts", this.multiRegionMultiClusterWrapper.getZkServerWrapper().getAddress());
            hashMap.put("venice.parent.controller.d2.service", VeniceControllerWrapper.PARENT_D2_SERVICE_NAME);
            hashMap.put("deployment.id", Utils.getUniqueString("venice-push-id"));
            hashMap.put("ssl.enabled", "false");
            VeniceSystemProducer closableProducer = new VeniceSystemFactory().getClosableProducer("venice", new MapConfig(hashMap), (MetricsRegistry) null);
            try {
                closableProducer.start();
                Assert.assertEquals(closableProducer.getKafkaBootstrapServers(), this.multiRegionMultiClusterWrapper.getParentKafkaBrokerWrapper().getAddress());
                for (int i = 1; i <= 10; i++) {
                    IntegrationTestPushUtils.sendStreamingRecord(closableProducer, str2, i);
                }
                AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(str2).setVeniceURL(veniceMultiClusterWrapper.getClusters().get(str).getRandomRouterURL()));
                try {
                    TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, () -> {
                        Iterator it = controllerClient.getStore(str2).getStore().getColoToCurrentVersions().values().iterator();
                        while (it.hasNext()) {
                            Assert.assertEquals(((Integer) it.next()).intValue(), 1);
                        }
                        for (int i2 = 1; i2 <= 10; i2++) {
                            Assert.assertEquals(andStartGenericAvroClient.get(Integer.toString(i2)).get().toString(), "stream_" + i2);
                        }
                        for (int i3 = 11; i3 <= 50; i3++) {
                            Assert.assertEquals(andStartGenericAvroClient.get(Integer.toString(i3)).get().toString(), "test_name_" + i3);
                        }
                    });
                    if (andStartGenericAvroClient != null) {
                        andStartGenericAvroClient.close();
                    }
                    if (closableProducer != null) {
                        closableProducer.close();
                    }
                } finally {
                }
            } catch (Throwable th) {
                if (closableProducer != null) {
                    try {
                        closableProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
    }

    @Test(timeOut = 120000)
    public void testNativeReplicationForIncrementalPush() throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        motherOfAllTests("testNativeReplicationForIncrementalPush", updateStoreQueryParams -> {
            return updateStoreQueryParams.setPartitionCount(1).setHybridOffsetLagThreshold(120000L).setHybridRewindSeconds(2L).setIncrementalPushEnabled(true);
        }, 100, (controllerClient, str, str2, properties, file) -> {
            VenicePushJob venicePushJob = new VenicePushJob("Batch Push", properties);
            try {
                venicePushJob.run();
                Assert.assertEquals(venicePushJob.getKafkaUrl(), this.childDatacenters.get(0).getKafkaBrokerWrapper().getAddress());
                venicePushJob.close();
                properties.setProperty("incremental.push", "true");
                properties.put("input.path", tempDataDirectory);
                properties.put("send.control.messages.directly", true);
                TestWriteUtils.writeSimpleAvroFileWithUserSchema2(tempDataDirectory);
                venicePushJob = new VenicePushJob("Incremental Push", properties);
                try {
                    venicePushJob.run();
                    venicePushJob.close();
                    NativeReplicationTestUtils.verifyIncrementalPushData(this.childDatacenters, str, str2, 150, NUMBER_OF_CHILD_DATACENTERS);
                } finally {
                }
            } finally {
            }
        });
    }

    @Test(timeOut = 120000, dataProvider = "storeSize")
    public void testActiveActiveForHeartbeatSystemStores(int i, int i2) throws Exception {
        motherOfAllTests("testActiveActiveForHeartbeatSystemStores", updateStoreQueryParams -> {
            return updateStoreQueryParams.setPartitionCount(i2).setIncrementalPushEnabled(true);
        }, i, (controllerClient, str, str2, properties, file) -> {
            properties.put(BatchJobHeartbeatConfigs.HEARTBEAT_ENABLED_CONFIG.getConfigName(), true);
            properties.put(BatchJobHeartbeatConfigs.HEARTBEAT_LAST_HEARTBEAT_IS_DELETE_CONFIG.getConfigName(), false);
            ControllerClient controllerClient = new ControllerClient(str, this.childDatacenters.get(0).getControllerConnectString());
            try {
                ControllerClient controllerClient2 = new ControllerClient(str, this.childDatacenters.get(1).getControllerConnectString());
                try {
                    NativeReplicationTestUtils.verifyDCConfigNativeRepl(Arrays.asList(controllerClient, controllerClient2), VPJ_HEARTBEAT_STORE_NAME, true);
                    controllerClient2.close();
                    controllerClient.close();
                    VenicePushJob venicePushJob = new VenicePushJob("Test push job", properties);
                    try {
                        venicePushJob.run();
                        Assert.assertEquals(venicePushJob.getKafkaUrl(), this.childDatacenters.get(0).getKafkaBrokerWrapper().getAddress());
                        venicePushJob.close();
                        TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, () -> {
                            Iterator it = controllerClient.getStore(str2).getStore().getColoToCurrentVersions().values().iterator();
                            while (it.hasNext()) {
                                Assert.assertEquals(((Integer) it.next()).intValue(), 1);
                            }
                            Iterator<VeniceMultiClusterWrapper> it2 = this.childDatacenters.iterator();
                            while (it2.hasNext()) {
                                String randomRouterURL = it2.next().getClusters().get(str).getRandomRouterURL();
                                AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(str2).setVeniceURL(randomRouterURL));
                                for (int i3 = 1; i3 <= i; i3++) {
                                    try {
                                        Assert.assertEquals(andStartGenericAvroClient.get(Integer.toString(i3)).get().toString(), "test_name_" + i3);
                                    } finally {
                                    }
                                }
                                if (andStartGenericAvroClient != null) {
                                    andStartGenericAvroClient.close();
                                }
                                andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(VPJ_HEARTBEAT_STORE_NAME).setVeniceURL(randomRouterURL));
                                try {
                                    BatchJobHeartbeatKey batchJobHeartbeatKey = new BatchJobHeartbeatKey();
                                    batchJobHeartbeatKey.storeName = str2;
                                    batchJobHeartbeatKey.storeVersion = 1;
                                    GenericRecord genericRecord = (GenericRecord) andStartGenericAvroClient.get(batchJobHeartbeatKey).get();
                                    Assert.assertNotNull(genericRecord);
                                    Assert.assertEquals(genericRecord.getSchema(), BatchJobHeartbeatValue.getClassSchema());
                                    if (andStartGenericAvroClient != null) {
                                        andStartGenericAvroClient.close();
                                    }
                                } finally {
                                }
                            }
                        });
                    } catch (Throwable th) {
                        try {
                            venicePushJob.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } finally {
                }
            } catch (Throwable th3) {
                try {
                    controllerClient.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
                throw th3;
            }
        });
    }

    @Test(timeOut = 120000)
    public void testClusterLevelAdminCommandForNativeReplication() throws Exception {
        motherOfAllTests("testClusterLevelAdminCommandForNativeReplication", updateStoreQueryParams -> {
            return updateStoreQueryParams.setPartitionCount(1);
        }, 10, (controllerClient, str, str2, properties, file) -> {
            String uniqueString = Utils.getUniqueString("hybrid-store");
            Assert.assertFalse(controllerClient.createNewStore(uniqueString, "", "\"string\"", "\"string\"").isError());
            TestUtils.assertCommand(controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setHybridRewindSeconds(10L).setHybridOffsetLagThreshold(2L)));
            String uniqueString2 = Utils.getUniqueString("incremental-push-store");
            Assert.assertFalse(controllerClient.createNewStore(uniqueString2, "", "\"string\"", "\"string\"").isError());
            TestUtils.assertCommand(controllerClient.updateStore(uniqueString2, new UpdateStoreQueryParams().setIncrementalPushEnabled(true)));
            Optional of = Optional.of(DEFAULT_NATIVE_REPLICATION_SOURCE);
            Optional of2 = Optional.of("new-nr-source");
            TestUtils.assertCommand(controllerClient.configureNativeReplicationForCluster(false, VeniceUserStoreType.BATCH_ONLY.toString(), Optional.empty(), Optional.empty()));
            this.childDatacenters.get(0).getClusters().get(str).useControllerClient(controllerClient -> {
                this.childDatacenters.get(1).getClusters().get(str).useControllerClient(controllerClient -> {
                    List asList = Arrays.asList(controllerClient, controllerClient, controllerClient);
                    NativeReplicationTestUtils.verifyDCConfigNativeRepl(asList, str2, false);
                    NativeReplicationTestUtils.verifyDCConfigNativeRepl(asList, uniqueString, true, of);
                    NativeReplicationTestUtils.verifyDCConfigNativeRepl(asList, uniqueString2, true, of);
                    TestUtils.assertCommand(controllerClient.configureNativeReplicationForCluster(true, VeniceUserStoreType.BATCH_ONLY.toString(), of2, Optional.empty()));
                    TestUtils.assertCommand(controllerClient.configureNativeReplicationForCluster(false, VeniceUserStoreType.HYBRID_ONLY.toString(), Optional.empty(), Optional.empty()));
                    NativeReplicationTestUtils.verifyDCConfigNativeRepl(asList, str2, true, of2);
                    NativeReplicationTestUtils.verifyDCConfigNativeRepl(asList, uniqueString, false);
                    NativeReplicationTestUtils.verifyDCConfigNativeRepl(asList, uniqueString2, true, of);
                    TestUtils.assertCommand(controllerClient.configureNativeReplicationForCluster(true, VeniceUserStoreType.HYBRID_ONLY.toString(), of2, Optional.empty()));
                    TestUtils.assertCommand(controllerClient.configureNativeReplicationForCluster(false, VeniceUserStoreType.INCREMENTAL_PUSH.toString(), Optional.empty(), Optional.empty()));
                    NativeReplicationTestUtils.verifyDCConfigNativeRepl(asList, str2, true, of2);
                    NativeReplicationTestUtils.verifyDCConfigNativeRepl(asList, uniqueString, true, of2);
                    NativeReplicationTestUtils.verifyDCConfigNativeRepl(asList, uniqueString2, false);
                    TestUtils.assertCommand(controllerClient.configureNativeReplicationForCluster(true, VeniceUserStoreType.INCREMENTAL_PUSH.toString(), of2, Optional.empty()));
                    NativeReplicationTestUtils.verifyDCConfigNativeRepl(asList, str2, true, of2);
                    NativeReplicationTestUtils.verifyDCConfigNativeRepl(asList, uniqueString, true, of2);
                    NativeReplicationTestUtils.verifyDCConfigNativeRepl(asList, uniqueString2, true, of2);
                });
            });
        });
    }

    @Test(timeOut = 120000)
    public void testMultiDataCenterRePushWithIncrementalPush() throws Exception {
        motherOfAllTests("testMultiDataCenterRePushWithIncrementalPush", updateStoreQueryParams -> {
            return updateStoreQueryParams.setPartitionCount(1);
        }, 100, (controllerClient, str, str2, properties, file) -> {
            VenicePushJob venicePushJob = new VenicePushJob("Test push job", properties);
            try {
                venicePushJob.run();
                Assert.assertEquals(venicePushJob.getKafkaUrl(), this.childDatacenters.get(0).getKafkaBrokerWrapper().getAddress());
                venicePushJob.close();
                VeniceWriter<String, String, byte[]> veniceWriter = null;
                try {
                    Assert.assertFalse(controllerClient.updateStore(str2, new UpdateStoreQueryParams().setIncrementalPushEnabled(true).setHybridOffsetLagThreshold(1L).setHybridRewindSeconds(86400L)).isError());
                    properties.setProperty("source.kafka", "true");
                    properties.setProperty("kafka.input.broker.url", this.multiRegionMultiClusterWrapper.getParentKafkaBrokerWrapper().getAddress());
                    properties.setProperty("kafka.input.max.records.per.mapper", "5");
                    properties.setProperty("venice.writer.chunking.enabled", "false");
                    properties.setProperty("kafka.input.topic", Version.composeKafkaTopic(str2, 1));
                    VenicePushJob venicePushJob2 = new VenicePushJob("Test re-push job re-push", properties);
                    try {
                        venicePushJob2.run();
                        venicePushJob2.close();
                        String str = System.currentTimeMillis() + "_test_inc_push_to_rt";
                        veniceWriter = startIncrementalPush(controllerClient, str2, this.parentControllers.stream().filter(veniceControllerWrapper -> {
                            return veniceControllerWrapper.isLeaderController(str);
                        }).findAny().get().getVeniceAdmin().getVeniceWriterFactory(), str);
                        String composeKafkaTopic = Version.composeKafkaTopic(str2, controllerClient.getStore(str2).getStore().getLargestUsedVersionNumber());
                        String str2 = "inc_test_";
                        int parseVersionFromKafkaTopicName = Version.parseVersionFromKafkaTopicName(composeKafkaTopic) + 1;
                        CompletableFuture.runAsync(() -> {
                            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                                Assert.assertEquals(controllerClient.getStore(str2).getStore().getLargestUsedVersionNumber(), parseVersionFromKafkaTopicName);
                            });
                            for (int i = 1; i <= 10; i++) {
                                veniceWriter.put(Integer.toString(i), str2 + i, 1);
                            }
                            veniceWriter.broadcastEndOfIncrementalPush(str, new HashMap());
                        });
                        properties.setProperty("kafka.input.topic", composeKafkaTopic);
                        venicePushJob2 = new VenicePushJob("Test re-push job re-push", properties);
                        try {
                            venicePushJob2.run();
                            venicePushJob2.close();
                            Optional version = controllerClient.getStore(str2).getStore().getVersion(parseVersionFromKafkaTopicName);
                            Assert.assertTrue(version.isPresent());
                            Assert.assertEquals(((Version) version.get()).getHybridStoreConfig().getRewindTimeInSeconds(), 86400L);
                            for (int i = 0; i < NUMBER_OF_CHILD_DATACENTERS; i++) {
                                verifyVeniceStoreData(str2, this.childDatacenters.get(i).getClusters().get(str).getRandomRouterURL(), "inc_test_", 10);
                            }
                            if (veniceWriter != null) {
                                veniceWriter.close();
                            }
                        } finally {
                        }
                    } finally {
                    }
                } catch (Throwable th) {
                    if (veniceWriter != null) {
                        veniceWriter.close();
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                try {
                    venicePushJob.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
                throw th2;
            }
        });
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = 120000)
    public void testEmptyPush(boolean z) {
        String str = CLUSTER_NAMES[0];
        String uniqueString = Utils.getUniqueString("testEmptyPush");
        String controllerUrl = this.parentControllers.get(0).getControllerUrl();
        String controllerConnectString = this.childDatacenters.get(0).getControllerConnectString();
        ControllerClient controllerClient = new ControllerClient(str, controllerUrl);
        try {
            TestUtils.assertCommand(controllerClient.createNewStore(uniqueString, "test_owner", "\"int\"", "\"int\""));
            TestUtils.assertCommand(controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setStorageQuotaInByte(1000L)));
            controllerClient.close();
            controllerClient = new ControllerClient(str, z ? controllerUrl : controllerConnectString);
            try {
                VersionCreationResponse emptyPush = controllerClient.emptyPush(uniqueString, "test_push_id", 1000L);
                if (z) {
                    Assert.assertFalse(emptyPush.isError(), "Empty push to parent colo should succeed");
                } else {
                    Assert.assertTrue(emptyPush.isError(), "Empty push to child colo should be blocked");
                }
                controllerClient.close();
            } finally {
            }
        } finally {
        }
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*Failed to create new store version.*", timeOut = 120000)
    public void testPushDirectlyToChildRegion() throws IOException {
        String str = CLUSTER_NAMES[0];
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.childDatacenters.get(0), "file:" + tempDataDirectory.getAbsolutePath(), Utils.getUniqueString("testPushDirectlyToChildColo"));
        IntegrationTestPushUtils.createStoreForJob(str, writeSimpleAvroFileWithUserSchema, defaultVPJProps).close();
        TestWriteUtils.runPushJob("Test push job", defaultVPJProps);
    }

    @Test(timeOut = 120000)
    public void testControllerBlocksConcurrentBatchPush() {
        String str = CLUSTER_NAMES[0];
        String uniqueString = Utils.getUniqueString("testControllerBlocksConcurrentBatchPush");
        String uniqueString2 = Utils.getUniqueString(uniqueString + "_push");
        String uniqueString3 = Utils.getUniqueString(uniqueString + "_push");
        ControllerClient controllerClient = new ControllerClient(str, this.parentControllers.get(0).getControllerUrl());
        try {
            controllerClient.createNewStore(uniqueString, "test", "\"string\"", "\"string\"");
            controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setStorageQuotaInByte(100L));
            TestUtils.assertCommand(controllerClient.requestTopicForWrites(uniqueString, 1L, Version.PushType.BATCH, uniqueString2, false, true, false, Optional.empty(), Optional.empty(), Optional.empty(), false, -1L));
            VersionCreationResponse requestTopicForWrites = controllerClient.requestTopicForWrites(uniqueString, 1L, Version.PushType.BATCH, uniqueString3, false, true, false, Optional.empty(), Optional.empty(), Optional.empty(), false, -1L);
            Assert.assertTrue(requestTopicForWrites.isError());
            Assert.assertEquals(requestTopicForWrites.getErrorType(), ErrorType.CONCURRENT_BATCH_PUSH);
            Assert.assertEquals(requestTopicForWrites.getExceptionType(), ExceptionType.BAD_REQUEST);
            controllerClient.close();
        } catch (Throwable th) {
            try {
                controllerClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test(timeOut = 120000)
    public void testTargetedRegionPushJob() throws Exception {
        motherOfAllTests("testTargetedRegionPushJob", updateStoreQueryParams -> {
            return updateStoreQueryParams.setPartitionCount(1);
        }, 100, (controllerClient, str, str2, properties, file) -> {
            VenicePushJob venicePushJob = new VenicePushJob("Test push job 1", properties);
            try {
                venicePushJob.run();
                Assert.assertEquals(venicePushJob.getKafkaUrl(), this.childDatacenters.get(0).getKafkaBrokerWrapper().getAddress());
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                    Iterator it = controllerClient.getStore(str2).getStore().getColoToCurrentVersions().values().iterator();
                    while (it.hasNext()) {
                        Assert.assertEquals(((Integer) it.next()).intValue(), 1);
                    }
                });
                venicePushJob.close();
                properties.put("targeted.region.push.enabled", true);
                venicePushJob = new VenicePushJob("Test push job 2", properties);
                try {
                    venicePushJob.run();
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                        controllerClient.getStore(str2).getStore().getColoToCurrentVersions().forEach((str, num) -> {
                            if (str.equals(DEFAULT_NATIVE_REPLICATION_SOURCE)) {
                                Assert.assertEquals(num.intValue(), NUMBER_OF_CHILD_DATACENTERS);
                            } else {
                                Assert.assertEquals(num.intValue(), 1);
                            }
                        });
                    });
                    venicePushJob.close();
                    properties.setProperty("targeted.region.push.list", "dc-0, dc-1");
                    VenicePushJob venicePushJob2 = new VenicePushJob("Test push job 3", properties);
                    try {
                        venicePushJob2.run();
                        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                            Iterator it = controllerClient.getStore(str2).getStore().getColoToCurrentVersions().values().iterator();
                            while (it.hasNext()) {
                                Assert.assertEquals(((Integer) it.next()).intValue(), 3);
                            }
                        });
                        venicePushJob2.close();
                        properties.setProperty("targeted.region.push.list", "dc-1");
                        venicePushJob = new VenicePushJob("Test push job 4", properties);
                        try {
                            venicePushJob.run();
                            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                                controllerClient.getStore(str2).getStore().getColoToCurrentVersions().forEach((str, num) -> {
                                    if (str.equals("dc-1")) {
                                        Assert.assertEquals(num.intValue(), 4);
                                    } else {
                                        Assert.assertEquals(num.intValue(), 3);
                                    }
                                });
                            });
                            venicePushJob.close();
                        } finally {
                            try {
                                venicePushJob.close();
                            } catch (Throwable th) {
                                th.addSuppressed(th);
                            }
                        }
                    } finally {
                        try {
                            venicePushJob2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } finally {
                }
            } finally {
            }
        });
    }

    private void motherOfAllTests(String str, Function<UpdateStoreQueryParams, UpdateStoreQueryParams> function, int i, NativeReplicationTest nativeReplicationTest) throws Exception {
        String str2 = CLUSTER_NAMES[0];
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        String controllerConnectString = this.multiRegionMultiClusterWrapper.getControllerConnectString();
        String str3 = "file:" + tempDataDirectory.getAbsolutePath();
        String uniqueString = Utils.getUniqueString(str);
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.multiRegionMultiClusterWrapper, str3, uniqueString);
        defaultVPJProps.put("send.control.messages.directly", true);
        UpdateStoreQueryParams apply = function.apply(new UpdateStoreQueryParams().setStorageQuotaInByte(-1L));
        Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory, true, i);
        try {
            IntegrationTestPushUtils.createStoreForJob(str2, writeSimpleAvroFileWithUserSchema.getField("key").schema().toString(), writeSimpleAvroFileWithUserSchema.getField("value").schema().toString(), defaultVPJProps, apply).close();
            this.childDatacenters.get(0).getClusters().get(str2).useControllerClient(controllerClient -> {
                this.childDatacenters.get(1).getClusters().get(str2).useControllerClient(controllerClient -> {
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                        StoreInfo store = controllerClient.getStore(uniqueString).getStore();
                        Assert.assertNotNull(store);
                        Assert.assertEquals(store.getStorageQuotaInByte(), -1L);
                        StoreInfo store2 = controllerClient.getStore(uniqueString).getStore();
                        Assert.assertNotNull(store2);
                        Assert.assertEquals(store2.getStorageQuotaInByte(), -1L);
                    });
                });
            });
            makeSureSystemStoreIsPushed(str2, uniqueString);
            ControllerClient constructClusterControllerClient = ControllerClient.constructClusterControllerClient(str2, controllerConnectString);
            try {
                nativeReplicationTest.run(constructClusterControllerClient, str2, uniqueString, defaultVPJProps, tempDataDirectory);
                if (constructClusterControllerClient != null) {
                    constructClusterControllerClient.close();
                }
            } finally {
            }
        } finally {
            FileUtils.deleteDirectory(tempDataDirectory);
        }
    }

    private void makeSureSystemStoreIsPushed(String str, String str2) {
        TestUtils.waitForNonDeterministicAssertion(1L, TimeUnit.MINUTES, true, () -> {
            for (int i = 0; i < this.childDatacenters.size(); i++) {
                int i2 = i;
                this.childDatacenters.get(i).getClusters().get(str).useControllerClient(controllerClient -> {
                    String systemStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(str2);
                    StoreResponse store = controllerClient.getStore(systemStoreName);
                    Assert.assertFalse(store.isError());
                    Assert.assertTrue(store.getStore().getCurrentVersion() > 0, systemStoreName + " is not ready for DC-" + i2);
                    String systemStoreName2 = VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(str2);
                    StoreResponse store2 = controllerClient.getStore(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(str2));
                    Assert.assertFalse(store2.isError());
                    Assert.assertTrue(store2.getStore().getCurrentVersion() > 0, systemStoreName2 + " is not ready for DC-" + i2);
                });
            }
        });
    }

    private VeniceWriter<String, String, byte[]> startIncrementalPush(ControllerClient controllerClient, String str, VeniceWriterFactory veniceWriterFactory, String str2) {
        VersionCreationResponse requestTopicForWrites = controllerClient.requestTopicForWrites(str, 1024L, Version.PushType.INCREMENTAL, "test-incremental-push", true, true, false, Optional.empty(), Optional.empty(), Optional.empty(), false, -1L);
        Assert.assertFalse(requestTopicForWrites.isError());
        Assert.assertNotNull(requestTopicForWrites.getKafkaTopic());
        VeniceWriter<String, String, byte[]> createVeniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(requestTopicForWrites.getKafkaTopic()).setKeySerializer(new VeniceAvroKafkaSerializer("\"string\"")).setValueSerializer(new VeniceAvroKafkaSerializer("\"string\"")).build());
        createVeniceWriter.broadcastStartOfIncrementalPush(str2, new HashMap());
        return createVeniceWriter;
    }

    private void verifyVeniceStoreData(String str, String str2, String str3, int i) throws ExecutionException, InterruptedException {
        AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(str).setVeniceURL(str2));
        for (int i2 = 1; i2 <= i; i2++) {
            try {
                Object obj = andStartGenericAvroClient.get(Integer.toString(i2)).get();
                Assert.assertNotNull(obj, "Unexpected null value for key: " + i2);
                Assert.assertEquals(obj.toString(), str3 + i2);
            } catch (Throwable th) {
                if (andStartGenericAvroClient != null) {
                    try {
                        andStartGenericAvroClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (andStartGenericAvroClient != null) {
            andStartGenericAvroClient.close();
        }
    }
}
