package com.linkedin.venice.endToEnd;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.d2.balancer.D2ClientBuilder;
import com.linkedin.davinci.client.DaVinciClient;
import com.linkedin.davinci.client.DaVinciConfig;
import com.linkedin.davinci.client.factory.CachingDaVinciClientFactory;
import com.linkedin.davinci.kafka.consumer.StoreIngestionTaskBackdoor;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.JobStatusQueryResponse;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.helix.HelixBaseRoutingRepository;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.integration.utils.VeniceServerWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.VeniceUserStoreType;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.samza.VeniceObjectWithTimestamp;
import com.linkedin.venice.samza.VeniceSystemFactory;
import com.linkedin.venice.samza.VeniceSystemProducer;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.MockCircularTime;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import io.tehuti.metrics.MetricsRepository;
import java.io.Closeable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.helix.HelixAdmin;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.IdealState;
import org.apache.samza.config.MapConfig;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/endToEnd/ActiveActiveReplicationForHybridTest.class */
public class ActiveActiveReplicationForHybridTest {
    private static final int TEST_TIMEOUT = 300000;
    private static final int PUSH_TIMEOUT = 150000;
    protected static final int NUMBER_OF_CHILD_DATACENTERS = 3;
    protected static final int NUMBER_OF_CLUSTERS = 1;
    static final String[] CLUSTER_NAMES = (String[]) IntStream.range(0, 1).mapToObj(i -> {
        return "venice-cluster" + i;
    }).toArray(i2 -> {
        return new String[i2];
    });
    protected List<VeniceMultiClusterWrapper> childDatacenters;
    protected List<VeniceControllerWrapper> parentControllers;
    protected VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper;
    private D2Client d2ClientForDC0Region;
    private Properties serverProperties;
    private ControllerClient parentControllerClient;
    private ControllerClient dc0Client;
    private ControllerClient dc1Client;
    private ControllerClient dc2Client;
    private List<ControllerClient> dcControllerClientList;

    @BeforeClass(alwaysRun = true)
    public void setUp() {
        this.serverProperties = new Properties();
        this.serverProperties.put("server.promotion.to.leader.replica.delay.seconds", 1L);
        this.serverProperties.put("rocksdb.plain.table.format.enabled", false);
        this.serverProperties.put("server.database.checksum.verification.enabled", true);
        this.serverProperties.put("server.database.sync.bytes.interval.for.deferred.write.mode", "300");
        this.serverProperties.put("server.shared.kafka.producer.enabled", true);
        this.serverProperties.put("server.kafka.producer.pool.size.per.kafka.cluster", "2");
        Properties properties = new Properties();
        properties.put("native.replication.source.fabric", "dc-0");
        properties.put("parent.kafka.cluster.fabric.list", VeniceControllerWrapper.DEFAULT_PARENT_DATA_CENTER_REGION_NAME);
        this.multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(3, 1, 1, 1, 2, 1, 2, Optional.of(new VeniceProperties(properties)), Optional.of(properties), Optional.of(new VeniceProperties(this.serverProperties)), false);
        this.childDatacenters = this.multiRegionMultiClusterWrapper.getChildRegions();
        this.parentControllers = this.multiRegionMultiClusterWrapper.getParentControllers();
        this.d2ClientForDC0Region = new D2ClientBuilder().setZkHosts(this.childDatacenters.get(0).getZkServerWrapper().getAddress()).setZkSessionTimeout(3L, TimeUnit.SECONDS).setZkStartupTimeout(3L, TimeUnit.SECONDS).build();
        D2ClientUtils.startClient(this.d2ClientForDC0Region);
        String str = CLUSTER_NAMES[0];
        this.parentControllerClient = new ControllerClient(str, (String) this.parentControllers.stream().map((v0) -> {
            return v0.getControllerUrl();
        }).collect(Collectors.joining(",")));
        this.dc0Client = new ControllerClient(str, this.childDatacenters.get(0).getControllerConnectString());
        this.dc1Client = new ControllerClient(str, this.childDatacenters.get(1).getControllerConnectString());
        this.dc2Client = new ControllerClient(str, this.childDatacenters.get(2).getControllerConnectString());
        this.dcControllerClientList = Arrays.asList(this.dc0Client, this.dc1Client, this.dc2Client);
    }

    @AfterClass(alwaysRun = true)
    public void cleanUp() {
        if (this.d2ClientForDC0Region != null) {
            D2ClientUtils.shutdownClient(this.d2ClientForDC0Region);
        }
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.parentControllerClient});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.dc0Client});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.dc1Client});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.dc2Client});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.multiRegionMultiClusterWrapper});
    }

    @Test(timeOut = 300000)
    public void testEnableActiveActiveReplicationForCluster() {
        String uniqueString = Utils.getUniqueString("test-batch-store");
        String uniqueString2 = Utils.getUniqueString("test-hybrid-agg-store");
        String uniqueString3 = Utils.getUniqueString("test-hybrid-non-agg-store");
        String uniqueString4 = Utils.getUniqueString("test-incremental-push-store");
        try {
            TestUtils.createAndVerifyStoreInAllRegions(uniqueString, this.parentControllerClient, this.dcControllerClientList);
            TestUtils.createAndVerifyStoreInAllRegions(uniqueString2, this.parentControllerClient, this.dcControllerClientList);
            TestUtils.createAndVerifyStoreInAllRegions(uniqueString3, this.parentControllerClient, this.dcControllerClientList);
            TestUtils.createAndVerifyStoreInAllRegions(uniqueString4, this.parentControllerClient, this.dcControllerClientList);
            TestUtils.assertCommand(this.parentControllerClient.updateStore(uniqueString2, new UpdateStoreQueryParams().setHybridRewindSeconds(10L).setHybridOffsetLagThreshold(2L).setHybridDataReplicationPolicy(DataReplicationPolicy.AGGREGATE)));
            TestUtils.assertCommand(this.parentControllerClient.updateStore(uniqueString3, new UpdateStoreQueryParams().setHybridRewindSeconds(10L).setHybridOffsetLagThreshold(2L)));
            TestUtils.assertCommand(this.parentControllerClient.updateStore(uniqueString4, new UpdateStoreQueryParams().setIncrementalPushEnabled(true)));
            TestUtils.assertCommand(this.parentControllerClient.configureActiveActiveReplicationForCluster(true, VeniceUserStoreType.BATCH_ONLY.toString(), Optional.empty()));
            verifyDCConfigAARepl(this.parentControllerClient, uniqueString, false, false, true);
            verifyDCConfigAARepl(this.dc0Client, uniqueString, false, false, true);
            verifyDCConfigAARepl(this.dc1Client, uniqueString, false, false, true);
            verifyDCConfigAARepl(this.dc2Client, uniqueString, false, false, true);
            TestUtils.assertCommand(TestUtils.assertCommand(this.parentControllerClient.configureActiveActiveReplicationForCluster(false, VeniceUserStoreType.BATCH_ONLY.toString(), Optional.of("dc-parent-0.parent,dc-0"))));
            verifyDCConfigAARepl(this.parentControllerClient, uniqueString, false, true, false);
            verifyDCConfigAARepl(this.dc0Client, uniqueString, false, true, false);
            verifyDCConfigAARepl(this.dc1Client, uniqueString, false, true, true);
            verifyDCConfigAARepl(this.dc2Client, uniqueString, false, true, true);
            TestUtils.assertCommand(this.parentControllerClient.configureActiveActiveReplicationForCluster(true, VeniceUserStoreType.HYBRID_ONLY.toString(), Optional.empty()));
            verifyDCConfigAARepl(this.parentControllerClient, uniqueString2, true, false, false);
            verifyDCConfigAARepl(this.dc0Client, uniqueString2, true, false, false);
            verifyDCConfigAARepl(this.dc1Client, uniqueString2, true, false, false);
            verifyDCConfigAARepl(this.dc2Client, uniqueString2, true, false, false);
            verifyDCConfigAARepl(this.parentControllerClient, uniqueString3, true, false, true);
            verifyDCConfigAARepl(this.dc0Client, uniqueString3, true, false, true);
            verifyDCConfigAARepl(this.dc1Client, uniqueString3, true, false, true);
            verifyDCConfigAARepl(this.dc2Client, uniqueString3, true, false, true);
            TestUtils.assertCommand(this.parentControllerClient.configureActiveActiveReplicationForCluster(false, VeniceUserStoreType.HYBRID_ONLY.toString(), Optional.empty()));
            verifyDCConfigAARepl(this.parentControllerClient, uniqueString3, true, true, false);
            verifyDCConfigAARepl(this.dc0Client, uniqueString3, true, true, false);
            verifyDCConfigAARepl(this.dc1Client, uniqueString3, true, true, false);
            verifyDCConfigAARepl(this.dc2Client, uniqueString3, true, true, false);
            TestUtils.assertCommand(this.parentControllerClient.configureActiveActiveReplicationForCluster(true, VeniceUserStoreType.INCREMENTAL_PUSH.toString(), Optional.empty()));
            verifyDCConfigAARepl(this.parentControllerClient, uniqueString4, false, false, true);
            verifyDCConfigAARepl(this.dc0Client, uniqueString4, false, false, true);
            verifyDCConfigAARepl(this.dc1Client, uniqueString4, false, false, true);
            verifyDCConfigAARepl(this.dc2Client, uniqueString4, false, false, true);
            deleteStores(uniqueString, uniqueString2, uniqueString3, uniqueString4);
        } catch (Throwable th) {
            deleteStores(uniqueString, uniqueString2, uniqueString3, uniqueString4);
            throw th;
        }
    }

    @Test(timeOut = 300000)
    public void testEnableNRisRequiredBeforeEnablingAA() {
        String uniqueString = Utils.getUniqueString("test-store");
        String uniqueString2 = Utils.getUniqueString("test-store");
        try {
            TestUtils.assertCommand(this.parentControllerClient.createNewStore(uniqueString, "owner", "\"string\"", "\"string\""));
            try {
                TestUtils.updateStoreToHybrid(uniqueString, this.parentControllerClient, Optional.of(false), Optional.of(true), Optional.of(false));
                Assert.fail("The update store command should not have succeeded since AA cannot be enabled without enabling NR.");
            } catch (AssertionError e) {
                Assert.assertTrue(e.getMessage().contains("Http Status 400"));
            }
            TestUtils.updateStoreToHybrid(uniqueString, this.parentControllerClient, Optional.of(true), Optional.of(true), Optional.of(false));
            TestUtils.assertCommand(this.parentControllerClient.createNewStore(uniqueString2, "owner", "\"string\"", "\"string\""));
            TestUtils.updateStoreToHybrid(uniqueString2, this.parentControllerClient, Optional.of(true), Optional.of(false), Optional.of(false));
            TestUtils.updateStoreToHybrid(uniqueString2, this.parentControllerClient, Optional.empty(), Optional.of(true), Optional.of(false));
            try {
                TestUtils.updateStoreToHybrid(uniqueString2, this.parentControllerClient, Optional.of(false), Optional.of(true), Optional.of(false));
                Assert.fail("The update store command should not have succeeded since AA cannot be enabled without enabling NR.");
            } catch (AssertionError e2) {
                Assert.assertTrue(e2.getMessage().contains("Http Status 400"));
            }
            deleteStores(uniqueString, uniqueString2);
        } catch (Throwable th) {
            deleteStores(uniqueString, uniqueString2);
            throw th;
        }
    }

    @Test(timeOut = 300000, dataProvider = "Two-True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testAAReplicationCanConsumeFromAllRegions(boolean z, boolean z2) throws InterruptedException, ExecutionException {
        String str = CLUSTER_NAMES[0];
        String uniqueString = Utils.getUniqueString("test-store");
        try {
            TestUtils.assertCommand(this.parentControllerClient.createNewStore(uniqueString, "owner", "\"string\"", "\"string\""));
            TestUtils.updateStoreToHybrid(uniqueString, this.parentControllerClient, Optional.of(true), Optional.of(true), Optional.of(Boolean.valueOf(z)));
            JobStatusQueryResponse assertCommand = TestUtils.assertCommand(this.parentControllerClient.sendEmptyPushAndWait(uniqueString, Utils.getUniqueString("empty-hybrid-push"), 1L, 150000L));
            Assert.assertTrue(assertCommand instanceof JobStatusQueryResponse);
            int version = assertCommand.getVersion();
            TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, true, () -> {
                Iterator<ControllerClient> it = this.dcControllerClientList.iterator();
                while (it.hasNext()) {
                    Assert.assertEquals(TestUtils.assertCommand(it.next().getStore(uniqueString)).getStore().getCurrentVersion(), version);
                }
            });
            if (z2) {
                String composeKafkaTopic = Version.composeKafkaTopic(uniqueString, version);
                Iterator<VeniceMultiClusterWrapper> it = this.multiRegionMultiClusterWrapper.getChildRegions().iterator();
                while (it.hasNext()) {
                    Iterator<VeniceServerWrapper> it2 = it.next().getClusters().get(str).getVeniceServers().iterator();
                    while (it2.hasNext()) {
                        StoreIngestionTaskBackdoor.setPurgeTransientRecordBuffer(it2.next(), composeKafkaTopic, false);
                    }
                }
            }
            HashMap hashMap = new HashMap(3);
            int i = 10;
            for (int i2 = 0; i2 < 3; i2++) {
                try {
                    String str2 = "dc-" + i2 + "_key_";
                    VeniceMultiClusterWrapper veniceMultiClusterWrapper = this.childDatacenters.get(i2);
                    HashMap hashMap2 = new HashMap();
                    hashMap2.put("systems.venice.push.type", Version.PushType.STREAM.toString());
                    hashMap2.put("systems.venice.store", uniqueString);
                    hashMap2.put("systems.venice.aggregate", "false");
                    hashMap2.put("venice.child.d2.zk.hosts", veniceMultiClusterWrapper.getZkServerWrapper().getAddress());
                    hashMap2.put("venice.child.controller.d2.service", VeniceControllerWrapper.D2_SERVICE_NAME);
                    hashMap2.put("venice.parent.d2.zk.hosts", this.multiRegionMultiClusterWrapper.getZkServerWrapper().getAddress());
                    hashMap2.put("venice.parent.controller.d2.service", VeniceControllerWrapper.PARENT_D2_SERVICE_NAME);
                    hashMap2.put("deployment.id", Utils.getUniqueString("venice-push-id"));
                    hashMap2.put("ssl.enabled", "false");
                    VeniceSystemProducer closableProducer = new VeniceSystemFactory().getClosableProducer("venice", new MapConfig(hashMap2), (MetricsRegistry) null);
                    closableProducer.start();
                    hashMap.put(veniceMultiClusterWrapper, closableProducer);
                    for (int i3 = 0; i3 < 10; i3++) {
                        IntegrationTestPushUtils.sendStreamingRecordWithKeyPrefix(closableProducer, uniqueString, str2, i3);
                    }
                } catch (Throwable th) {
                    Iterator it3 = hashMap.values().iterator();
                    while (it3.hasNext()) {
                        Utils.closeQuietlyWithErrorLogged(new Closeable[]{(VeniceSystemProducer) it3.next()});
                    }
                    throw th;
                }
            }
            AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.childDatacenters.get(0).getClusters().get(str).getRandomRouterURL()));
            try {
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                    for (int i4 = 0; i4 < 3; i4++) {
                        String str3 = "dc-" + i4 + "_key_";
                        for (int i5 = 0; i5 < i; i5++) {
                            String str4 = "stream_" + i5;
                            Object obj = andStartGenericAvroClient.get(str3 + i5).get();
                            if (obj == null) {
                                Assert.fail("Servers in dc-0 haven't consumed real-time data from region dc-" + i4 + " for key: " + str3 + i5);
                            } else {
                                Assert.assertEquals(obj.toString(), str4, "Servers in dc-0 contain corrupted data sent from region dc-" + i4);
                            }
                        }
                    }
                });
                for (int i4 = 0; i4 < 3; i4++) {
                    String str3 = "dc-" + i4 + "_key_";
                    IntegrationTestPushUtils.sendStreamingDeleteRecord((SystemProducer) hashMap.get(this.childDatacenters.get(i4)), uniqueString, str3 + (10 - 1));
                    IntegrationTestPushUtils.sendStreamingDeleteRecord((SystemProducer) hashMap.get(this.childDatacenters.get(i4)), uniqueString, str3 + 10);
                }
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                    for (int i5 = 0; i5 < 3; i5++) {
                        String str4 = "dc-" + i5 + "_key_";
                        Assert.assertNull(andStartGenericAvroClient.get(str4 + (i - 1)).get(), "Servers in dc-0 haven't consumed real-time data from region dc-" + i5);
                        Assert.assertNull(andStartGenericAvroClient.get(str4 + i).get(), "Servers in dc-0 haven't consumed real-time data from region dc-" + i5);
                    }
                });
                for (int i5 = 0; i5 < 3; i5++) {
                    IntegrationTestPushUtils.sendStreamingRecordWithKeyPrefix((SystemProducer) hashMap.get(this.childDatacenters.get(i5)), uniqueString, "dc-" + i5 + "_key_", 10);
                }
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                    for (int i6 = 0; i6 < 3; i6++) {
                        String str4 = "dc-" + i6 + "_key_";
                        Assert.assertNull(andStartGenericAvroClient.get(str4 + (i - 1)).get(), "Servers in dc-0 haven't consumed real-time data from region dc-" + i6);
                        String str5 = "stream_" + i;
                        Object obj = andStartGenericAvroClient.get(str4 + i).get();
                        if (obj == null) {
                            Assert.fail("Servers in dc-0 haven't consumed real-time data from region dc-" + i6);
                        } else {
                            Assert.assertEquals(obj.toString(), str5, "Servers in dc-0 contain corrupted data sent from region dc-" + i6);
                        }
                    }
                });
                if (andStartGenericAvroClient != null) {
                    andStartGenericAvroClient.close();
                }
                Iterator it4 = hashMap.values().iterator();
                while (it4.hasNext()) {
                    Utils.closeQuietlyWithErrorLogged(new Closeable[]{(VeniceSystemProducer) it4.next()});
                }
                CachingDaVinciClientFactory cachingDaVinciClientFactory = new CachingDaVinciClientFactory(this.d2ClientForDC0Region, VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, new MetricsRepository(), new PropertyBuilder().put("data.base.path", Utils.getTempDataDirectory().getAbsolutePath()).put("persistence.type", PersistenceType.ROCKS_DB).build());
                try {
                    DaVinciClient andStartGenericAvroClient2 = cachingDaVinciClientFactory.getAndStartGenericAvroClient(uniqueString, new DaVinciConfig());
                    try {
                        andStartGenericAvroClient2.subscribeAll().get();
                        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
                            for (int i6 = 0; i6 < 3; i6++) {
                                String str4 = "dc-" + i6 + "_key_";
                                Assert.assertNull(andStartGenericAvroClient2.get(str4 + (i - 1)).get(), "DaVinci clients in dc-0 haven't consumed real-time data from region dc-" + i6);
                                String str5 = "stream_" + i;
                                Object obj = andStartGenericAvroClient2.get(str4 + i).get();
                                if (obj == null) {
                                    Assert.fail("DaVinci clients in dc-0 haven't consumed real-time data from region dc-" + i6);
                                } else {
                                    Assert.assertEquals(obj.toString(), str5, "DaVinci clients in dc-0 contain corrupted data sent from region dc-" + i6);
                                }
                            }
                        });
                        if (andStartGenericAvroClient2 != null) {
                            andStartGenericAvroClient2.close();
                        }
                        cachingDaVinciClientFactory.close();
                        deleteStores(uniqueString);
                    } catch (Throwable th2) {
                        if (andStartGenericAvroClient2 != null) {
                            try {
                                andStartGenericAvroClient2.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        }
                        throw th2;
                    }
                } finally {
                }
            } catch (Throwable th4) {
                if (andStartGenericAvroClient != null) {
                    try {
                        andStartGenericAvroClient.close();
                    } catch (Throwable th5) {
                        th4.addSuppressed(th5);
                    }
                }
                throw th4;
            }
        } catch (Throwable th6) {
            deleteStores(uniqueString);
            throw th6;
        }
    }

    @Test(timeOut = 300000)
    public void controllerClientCanGetStoreReplicationMetadataSchema() {
        String uniqueString = Utils.getUniqueString("test-store");
        try {
            TestUtils.assertCommand(this.parentControllerClient.createNewStore(uniqueString, "owner", "\"string\"", "\"string\""));
            TestUtils.updateStoreToHybrid(uniqueString, this.parentControllerClient, Optional.of(true), Optional.of(true), Optional.of(false));
            TestUtils.assertCommand(this.parentControllerClient.emptyPush(uniqueString, Utils.getUniqueString("empty-hybrid-push"), 1L));
            MultiSchemaResponse assertCommand = TestUtils.assertCommand(this.parentControllerClient.getAllReplicationMetadataSchemas(uniqueString));
            Assert.assertEquals(assertCommand.getSchemas()[0].getSchemaStr(), "{\"type\":\"record\",\"name\":\"string_MetadataRecord\",\"namespace\":\"com.linkedin.venice\",\"fields\":[{\"name\":\"timestamp\",\"type\":[\"long\"],\"doc\":\"timestamp when the full record was last updated\",\"default\":0},{\"name\":\"replication_checkpoint_vector\",\"type\":{\"type\":\"array\",\"items\":\"long\"},\"doc\":\"high watermark remote checkpoints which touched this record\",\"default\":[]}]}");
            Assert.assertEquals(assertCommand.getSchemas()[0].getRmdValueSchemaId(), 1);
            Assert.assertEquals(assertCommand.getSchemas()[0].getId(), 1);
            deleteStores(uniqueString);
        } catch (Throwable th) {
            deleteStores(uniqueString);
            throw th;
        }
    }

    @Test(timeOut = 300000, dataProvider = "Two-True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testAAReplicationCanResolveConflicts(boolean z, boolean z2) {
        String str = CLUSTER_NAMES[0];
        String uniqueString = Utils.getUniqueString("test-store");
        try {
            TestUtils.assertCommand(this.parentControllerClient.createNewStore(uniqueString, "owner", "\"string\"", "\"string\""));
            TestUtils.updateStoreToHybrid(uniqueString, this.parentControllerClient, Optional.of(true), Optional.of(true), Optional.of(Boolean.valueOf(z2)));
            TestUtils.assertCommand(this.parentControllerClient.sendEmptyPushAndWait(uniqueString, Utils.getUniqueString("empty-hybrid-push"), 1L, 150000L));
            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                Assert.assertEquals(TestUtils.assertCommand(this.dc0Client.getStore(uniqueString)).getStore().getCurrentVersion(), 1);
            });
            LinkedList linkedList = new LinkedList();
            long currentTimeMillis = System.currentTimeMillis();
            if (!z) {
                linkedList.add(Long.valueOf(currentTimeMillis));
                linkedList.add(Long.valueOf(currentTimeMillis));
            }
            linkedList.add(Long.valueOf(currentTimeMillis));
            linkedList.add(Long.valueOf(currentTimeMillis - 10));
            linkedList.add(Long.valueOf(currentTimeMillis));
            MockCircularTime mockCircularTime = new MockCircularTime(linkedList);
            String str2 = "key1";
            String str3 = "value1";
            String str4 = "key2";
            VeniceMultiClusterWrapper veniceMultiClusterWrapper = this.childDatacenters.get(0);
            VeniceSystemProducer veniceSystemProducer = new VeniceSystemProducer(veniceMultiClusterWrapper.getZkServerWrapper().getAddress(), veniceMultiClusterWrapper.getZkServerWrapper().getAddress(), VeniceControllerWrapper.D2_SERVICE_NAME, uniqueString, Version.PushType.STREAM, Utils.getUniqueString("venice-push-id"), "dc-0", true, (VeniceSystemFactory) null, Optional.empty(), Optional.empty(), mockCircularTime);
            try {
                veniceSystemProducer.start();
                OutgoingMessageEnvelope outgoingMessageEnvelope = new OutgoingMessageEnvelope(new SystemStream("venice", uniqueString), "key1", z ? new VeniceObjectWithTimestamp("value1", mockCircularTime.getMilliseconds()) : "value1");
                veniceSystemProducer.send(uniqueString, outgoingMessageEnvelope);
                veniceSystemProducer.send(uniqueString, new OutgoingMessageEnvelope(new SystemStream("venice", uniqueString), "key1", z ? new VeniceObjectWithTimestamp("value2", mockCircularTime.getMilliseconds()) : "value2"));
                veniceSystemProducer.send(uniqueString, outgoingMessageEnvelope);
                veniceSystemProducer.send(uniqueString, new OutgoingMessageEnvelope(new SystemStream("venice", uniqueString), "key2", z ? new VeniceObjectWithTimestamp("value1", mockCircularTime.getMilliseconds()) : "value1"));
                veniceSystemProducer.close();
                String randomRouterURL = veniceMultiClusterWrapper.getClusters().get(str).getRandomRouterURL();
                AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(randomRouterURL));
                try {
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                        Object obj = andStartGenericAvroClient.get(str4).get();
                        Assert.assertNotNull(obj);
                        Assert.assertEquals(obj.toString(), str3);
                        Object obj2 = andStartGenericAvroClient.get(str2).get();
                        Assert.assertNotNull(obj2);
                        Assert.assertEquals(obj2.toString(), str3, "DCR is not working properly");
                    });
                    if (andStartGenericAvroClient != null) {
                        andStartGenericAvroClient.close();
                    }
                    LinkedList linkedList2 = new LinkedList();
                    if (!z) {
                        linkedList2.add(Long.valueOf(currentTimeMillis));
                        linkedList2.add(Long.valueOf(currentTimeMillis));
                    }
                    linkedList2.add(Long.valueOf(currentTimeMillis - 5));
                    linkedList2.add(Long.valueOf(currentTimeMillis));
                    MockCircularTime mockCircularTime2 = new MockCircularTime(linkedList2);
                    String str5 = "key3";
                    veniceSystemProducer = new VeniceSystemProducer(veniceMultiClusterWrapper.getZkServerWrapper().getAddress(), this.childDatacenters.get(1).getZkServerWrapper().getAddress(), VeniceControllerWrapper.D2_SERVICE_NAME, uniqueString, Version.PushType.STREAM, Utils.getUniqueString("venice-push-id"), "dc-1", true, (VeniceSystemFactory) null, Optional.empty(), Optional.empty(), mockCircularTime2);
                    try {
                        veniceSystemProducer.start();
                        veniceSystemProducer.send(uniqueString, new OutgoingMessageEnvelope(new SystemStream("venice", uniqueString), "key1", z ? new VeniceObjectWithTimestamp("value3", mockCircularTime2.getMilliseconds()) : "value3"));
                        veniceSystemProducer.send(uniqueString, new OutgoingMessageEnvelope(new SystemStream("venice", uniqueString), "key3", z ? new VeniceObjectWithTimestamp("value1", mockCircularTime2.getMilliseconds()) : "value1"));
                        veniceSystemProducer.close();
                        andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(randomRouterURL));
                        try {
                            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                                Object obj = andStartGenericAvroClient.get(str5).get();
                                Assert.assertNotNull(obj);
                                Assert.assertEquals(obj.toString(), str3);
                                Object obj2 = andStartGenericAvroClient.get(str2).get();
                                Assert.assertNotNull(obj2);
                                Assert.assertEquals(obj2.toString(), str3, "DCR is not working properly");
                            });
                            if (andStartGenericAvroClient != null) {
                                andStartGenericAvroClient.close();
                            }
                            deleteStores(uniqueString);
                        } finally {
                        }
                    } finally {
                    }
                } finally {
                }
            } finally {
            }
        } catch (Throwable th) {
            deleteStores(uniqueString);
            throw th;
        }
    }

    @Test(timeOut = 300000)
    public void testHelixReplicationFactorConfigChange() {
        String str = CLUSTER_NAMES[0];
        String uniqueString = Utils.getUniqueString("test-store");
        VeniceClusterWrapper veniceClusterWrapper = this.childDatacenters.get(0).getClusters().get(str);
        try {
            TestUtils.assertCommand(this.parentControllerClient.createNewStore(uniqueString, "owner", "\"string\"", "\"string\""));
            TestUtils.updateStoreToHybrid(uniqueString, this.parentControllerClient, Optional.of(true), Optional.of(true), Optional.of(true));
            JobStatusQueryResponse assertCommand = TestUtils.assertCommand(this.parentControllerClient.sendEmptyPushAndWait(uniqueString, Utils.getUniqueString("empty-hybrid-push"), 1L, 150000L));
            Assert.assertTrue(assertCommand instanceof JobStatusQueryResponse);
            String composeKafkaTopic = Version.composeKafkaTopic(uniqueString, assertCommand.getVersion());
            HelixBaseRoutingRepository routingDataRepository = veniceClusterWrapper.getRandomVeniceRouter().getRoutingDataRepository();
            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                Assert.assertEquals(TestUtils.assertCommand(this.dc0Client.getStore(uniqueString)).getStore().getCurrentVersion(), 1);
                Assert.assertTrue(routingDataRepository.getReadyToServeInstances(composeKafkaTopic, 0).size() < 3);
            });
            VeniceServerWrapper veniceServerWrapper = null;
            HelixAdmin helixAdmin = null;
            try {
                veniceServerWrapper = veniceClusterWrapper.addVeniceServer(new Properties(), this.serverProperties);
                helixAdmin = new ZKHelixAdmin(veniceClusterWrapper.getZk().getAddress());
                IdealState resourceIdealState = helixAdmin.getResourceIdealState(str, composeKafkaTopic);
                resourceIdealState.setReplicas("3");
                helixAdmin.setResourceIdealState(str, composeKafkaTopic, resourceIdealState);
                HelixBaseRoutingRepository routingDataRepository2 = veniceClusterWrapper.getRandomVeniceRouter().getRoutingDataRepository();
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                    Assert.assertEquals(routingDataRepository2.getReadyToServeInstances(composeKafkaTopic, 0).size(), 3);
                });
                if (veniceServerWrapper != null) {
                    veniceClusterWrapper.removeVeniceServer(veniceServerWrapper.getPort());
                }
                if (helixAdmin != null) {
                    helixAdmin.close();
                }
                deleteStores(uniqueString);
            } catch (Throwable th) {
                if (veniceServerWrapper != null) {
                    veniceClusterWrapper.removeVeniceServer(veniceServerWrapper.getPort());
                }
                if (helixAdmin != null) {
                    helixAdmin.close();
                }
                throw th;
            }
        } catch (Throwable th2) {
            deleteStores(uniqueString);
            throw th2;
        }
    }

    public static void verifyDCConfigAARepl(ControllerClient controllerClient, String str, boolean z, boolean z2, boolean z3) {
        TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, () -> {
            StoreInfo store = TestUtils.assertCommand(controllerClient.getStore(str)).getStore();
            Assert.assertEquals(store.isActiveActiveReplicationEnabled(), z3, "The active active replication config does not match.");
            if (!z || z2 == z3) {
                return;
            }
            HybridStoreConfig hybridStoreConfig = store.getHybridStoreConfig();
            Assert.assertNotNull(hybridStoreConfig);
            Assert.assertEquals(hybridStoreConfig.getDataReplicationPolicy(), DataReplicationPolicy.NON_AGGREGATE, "The active active replication policy does not match.");
        });
    }

    private void deleteStores(String... strArr) {
        CompletableFuture.runAsync(() -> {
            try {
                for (String str : strArr) {
                    this.parentControllerClient.disableAndDeleteStore(str);
                }
            } catch (Exception e) {
            }
        });
    }
}
