package com.linkedin.venice.endToEnd;

import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.hadoop.VenicePushJob;
import com.linkedin.venice.integration.utils.PubSubBrokerWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.File;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/endToEnd/TestActiveActiveReplicationForIncPush.class */
public class TestActiveActiveReplicationForIncPush {
    private static final Logger LOGGER = LogManager.getLogger(TestActiveActiveReplicationForIncPush.class);
    private static final int TEST_TIMEOUT = 180000;
    private static final int NUMBER_OF_CHILD_DATACENTERS = 3;
    private static final int NUMBER_OF_CLUSTERS = 1;
    private String[] clusterNames;
    private String parentRegionName;
    private String[] dcNames;
    private List<VeniceMultiClusterWrapper> childDatacenters;
    private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper;
    PubSubBrokerWrapper veniceParentDefaultKafka;

    @BeforeClass(alwaysRun = true)
    public void setUp() {
        Properties properties = new Properties();
        properties.put("server.promotion.to.leader.replica.delay.seconds", 1L);
        properties.setProperty("rocksdb.plain.table.format.enabled", "false");
        properties.setProperty("server.database.checksum.verification.enabled", "true");
        properties.setProperty("server.database.sync.bytes.interval.for.deferred.write.mode", "300");
        properties.put("server.shared.kafka.producer.enabled", "true");
        properties.put("server.kafka.producer.pool.size.per.kafka.cluster", "1");
        Properties properties2 = new Properties();
        properties2.put("controller.auto.materialize.davinci.push.status.system.store", "true");
        this.multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(NUMBER_OF_CHILD_DATACENTERS, 1, 1, 1, 2, 1, 2, Optional.of(new VeniceProperties(properties2)), Optional.of(properties2), Optional.of(new VeniceProperties(properties)), false);
        this.childDatacenters = this.multiRegionMultiClusterWrapper.getChildRegions();
        this.clusterNames = this.multiRegionMultiClusterWrapper.getClusterNames();
        this.parentRegionName = this.multiRegionMultiClusterWrapper.getParentRegionName();
        this.dcNames = (String[]) this.multiRegionMultiClusterWrapper.getChildRegionNames().toArray(new String[0]);
        this.veniceParentDefaultKafka = this.multiRegionMultiClusterWrapper.getParentKafkaBrokerWrapper();
    }

    @AfterClass(alwaysRun = true)
    public void cleanUp() {
        this.multiRegionMultiClusterWrapper.close();
    }

    @Test(timeOut = 180000)
    public void testAAReplicationForIncrementalPushToRT() throws Exception {
        String str = this.clusterNames[0];
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        File tempDataDirectory2 = TestWriteUtils.getTempDataDirectory();
        File tempDataDirectory3 = TestWriteUtils.getTempDataDirectory();
        String controllerConnectString = this.multiRegionMultiClusterWrapper.getControllerConnectString();
        String str2 = "file:" + tempDataDirectory.getAbsolutePath();
        String str3 = "file:" + tempDataDirectory2.getAbsolutePath();
        String str4 = "file:" + tempDataDirectory3.getAbsolutePath();
        Function function = num -> {
            return this.childDatacenters.get(num.intValue()).getControllerConnectString();
        };
        ControllerClient controllerClient = new ControllerClient(str, controllerConnectString);
        try {
            ControllerClient controllerClient2 = new ControllerClient(str, (String) function.apply(0));
            try {
                ControllerClient controllerClient3 = new ControllerClient(str, (String) function.apply(1));
                try {
                    controllerClient3 = new ControllerClient(str, (String) function.apply(2));
                    try {
                        String uniqueString = Utils.getUniqueString("store");
                        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.multiRegionMultiClusterWrapper, str2, uniqueString);
                        defaultVPJProps.put("send.control.messages.directly", true);
                        Properties defaultVPJProps2 = IntegrationTestPushUtils.defaultVPJProps(this.multiRegionMultiClusterWrapper, str3, uniqueString);
                        defaultVPJProps2.put("send.control.messages.directly", true);
                        Properties defaultVPJProps3 = IntegrationTestPushUtils.defaultVPJProps(this.multiRegionMultiClusterWrapper, str4, uniqueString);
                        defaultVPJProps3.put("send.control.messages.directly", true);
                        Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory, true, 100);
                        String schema = writeSimpleAvroFileWithUserSchema.getField("key").schema().toString();
                        String schema2 = writeSimpleAvroFileWithUserSchema.getField("value").schema().toString();
                        defaultVPJProps2.setProperty("incremental.push", "true");
                        defaultVPJProps2.put("source.grid.fabric", this.dcNames[2]);
                        TestWriteUtils.writeSimpleAvroFileWithUserSchema2(tempDataDirectory2);
                        defaultVPJProps3.setProperty("incremental.push", "true");
                        defaultVPJProps3.put("source.grid.fabric", this.dcNames[1]);
                        TestWriteUtils.writeSimpleAvroFileWithUserSchema3(tempDataDirectory3);
                        TestUtils.assertCommand(controllerClient.createNewStore(uniqueString, "owner", schema, schema2));
                        TestUtils.assertCommand(controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setStorageQuotaInByte(-1L).setPartitionCount(1).setHybridOffsetLagThreshold(90000L).setHybridRewindSeconds(2L).setIncrementalPushEnabled(true).setNativeReplicationEnabled(true).setNativeReplicationSourceFabric("dc-2")));
                        UpdateStoreQueryParams activeActiveReplicationEnabled = new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(true);
                        LOGGER.info("KafkaURL {}:{}", this.dcNames[0], this.childDatacenters.get(0).getKafkaBrokerWrapper().getAddress());
                        LOGGER.info("KafkaURL {}:{}", this.dcNames[1], this.childDatacenters.get(1).getKafkaBrokerWrapper().getAddress());
                        LOGGER.info("KafkaURL {}:{}", this.dcNames[2], this.childDatacenters.get(2).getKafkaBrokerWrapper().getAddress());
                        LOGGER.info("KafkaURL {}:{}", this.parentRegionName, this.veniceParentDefaultKafka.getAddress());
                        TestWriteUtils.updateStore(uniqueString, controllerClient, activeActiveReplicationEnabled);
                        TestUtils.verifyDCConfigNativeAndActiveRepl(uniqueString, true, true, new ControllerClient[]{controllerClient, controllerClient2, controllerClient3, controllerClient3});
                        VenicePushJob venicePushJob = new VenicePushJob("Test push job batch with NR + A/A all fabrics", defaultVPJProps);
                        try {
                            venicePushJob.run();
                            Assert.assertEquals(venicePushJob.getKafkaUrl(), this.childDatacenters.get(2).getKafkaBrokerWrapper().getAddress());
                            venicePushJob.close();
                            VenicePushJob venicePushJob2 = new VenicePushJob("Test push job incremental with NR + A/A from dc-2", defaultVPJProps2);
                            try {
                                venicePushJob2.run();
                                Assert.assertEquals(venicePushJob2.getKafkaUrl(), this.childDatacenters.get(2).getKafkaBrokerWrapper().getAddress());
                                venicePushJob2.close();
                                for (int i = 0; i < this.childDatacenters.size(); i++) {
                                    Assert.assertTrue(this.childDatacenters.get(i).getRandomController().getVeniceAdmin().getStore(str, uniqueString).getVersion(1).isPresent(), "Version 1 is not present for DC: " + this.dcNames[i]);
                                }
                                NativeReplicationTestUtils.verifyIncrementalPushData(this.childDatacenters, str, uniqueString, 150, 2);
                                venicePushJob = new VenicePushJob("Test push job incremental with NR + A/A from dc-1", defaultVPJProps3);
                                try {
                                    venicePushJob.run();
                                    Assert.assertEquals(venicePushJob.getKafkaUrl(), this.childDatacenters.get(1).getKafkaBrokerWrapper().getAddress());
                                    venicePushJob.close();
                                    NativeReplicationTestUtils.verifyIncrementalPushData(this.childDatacenters, str, uniqueString, 200, NUMBER_OF_CHILD_DATACENTERS);
                                    controllerClient3.close();
                                    controllerClient3.close();
                                    controllerClient2.close();
                                    controllerClient.close();
                                } finally {
                                    try {
                                        venicePushJob.close();
                                    } catch (Throwable th) {
                                        th.addSuppressed(th);
                                    }
                                }
                            } finally {
                            }
                        } finally {
                        }
                    } finally {
                        try {
                            controllerClient3.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (Throwable th3) {
            throw th3;
        }
    }
}
