package com.linkedin.venice.endToEnd;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.davinci.client.DaVinciClient;
import com.linkedin.davinci.client.DaVinciConfig;
import com.linkedin.davinci.client.factory.CachingDaVinciClientFactory;
import com.linkedin.venice.AdminTool;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.integration.utils.D2TestUtils;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import io.tehuti.metrics.MetricsRepository;
import java.io.Closeable;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/endToEnd/DaVinciClusterAgnosticTest.class */
public class DaVinciClusterAgnosticTest {
    private static final String INT_KEY_SCHEMA = "\"int\"";
    private static final String INT_VALUE_SCHEMA = "\"int\"";
    public static final String RECORD_VALUE_SCHEMA = "{  \"namespace\": \"example.avro\",    \"type\": \"record\",     \"name\": \"TestRecord\",       \"fields\": [                  {\"name\": \"field1\", \"type\": \"int\"}    ]  } ";
    public static final String NEW_RECORD_VALUE_SCHEMA = "{  \"namespace\": \"example.avro\",    \"type\": \"record\",     \"name\": \"TestRecord\",       \"fields\": [                  {\"name\": \"field1\", \"type\": \"int\"},         {\"name\": \"field2\", \"type\": \"int\", \"default\": 0}  ]  } ";
    private static final String FABRIC = "dc-0";
    private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper;
    private VeniceMultiClusterWrapper multiClusterVenice;
    private String[] clusterNames;
    private String parentControllerURLs;

    @BeforeClass
    public void setUp() {
        Utils.thisIsLocalhost();
        this.multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(1, 2, 1, 1, 3, 1, 3, Optional.of(new VeniceProperties(Collections.singletonMap("offline.job.start.timeout.ms", "180000"))), Optional.empty(), Optional.empty(), false);
        this.multiClusterVenice = this.multiRegionMultiClusterWrapper.getChildRegions().get(0);
        this.clusterNames = this.multiClusterVenice.getClusterNames();
        this.parentControllerURLs = (String) this.multiRegionMultiClusterWrapper.getParentControllers().stream().map((v0) -> {
            return v0.getControllerUrl();
        }).collect(Collectors.joining(","));
        for (String str : this.clusterNames) {
            ControllerClient controllerClient = new ControllerClient(str, this.multiClusterVenice.getControllerConnectString());
            try {
                TestUtils.waitForNonDeterministicPushCompletion(Version.composeKafkaTopic(VeniceSystemStoreUtils.getParticipantStoreNameForCluster(str), 1), controllerClient, 5L, TimeUnit.MINUTES);
                controllerClient.close();
            } catch (Throwable th) {
                try {
                    controllerClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
    }

    @AfterClass
    public void cleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.multiRegionMultiClusterWrapper});
    }

    @Test(timeOut = 180000)
    public void testMultiClusterDaVinci() throws Exception {
        ControllerClient controllerClient;
        Assert.assertTrue(this.clusterNames.length > 1, "Insufficient clusters for this test to be meaningful");
        int i = 10;
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < this.clusterNames.length; i2++) {
            int i3 = i2;
            String str = this.clusterNames[i2];
            controllerClient = new ControllerClient(str, this.parentControllerURLs);
            try {
                String uniqueString = Utils.getUniqueString("test-store");
                arrayList.add(uniqueString);
                Assert.assertFalse(controllerClient.createNewStore(uniqueString, "venice-test", "\"int\"", "\"int\"").isError());
                TestUtils.waitForNonDeterministicPushCompletion(TestUtils.createVersionWithBatchData(controllerClient, uniqueString, "\"int\"", "\"int\"", IntStream.range(0, 10).mapToObj(i4 -> {
                    return new AbstractMap.SimpleEntry(Integer.valueOf(i4), Integer.valueOf(i3));
                })).getKafkaTopic(), controllerClient, 30L, TimeUnit.SECONDS);
                makeSureSystemStoresAreOnline(controllerClient, uniqueString);
                this.multiClusterVenice.getClusters().get(str).refreshAllRouterMetaData();
                controllerClient.close();
            } finally {
            }
        }
        VeniceProperties build = new PropertyBuilder().put("data.base.path", Utils.getTempDataDirectory().getAbsolutePath()).put("persistence.type", PersistenceType.ROCKS_DB).put("client.use.system.store.repository", true).put("client.system.store.repository.refresh.interval.seconds", 1).build();
        DaVinciConfig daVinciConfig = new DaVinciConfig();
        D2Client andStartD2Client = D2TestUtils.getAndStartD2Client(this.multiClusterVenice.getZkServerWrapper().getAddress());
        try {
            CachingDaVinciClientFactory cachingDaVinciClientFactory = new CachingDaVinciClientFactory(andStartD2Client, VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, new MetricsRepository(), build);
            try {
                ArrayList arrayList2 = new ArrayList();
                for (int i5 = 0; i5 < arrayList.size(); i5++) {
                    DaVinciClient andStartGenericAvroClient = cachingDaVinciClientFactory.getAndStartGenericAvroClient((String) arrayList.get(i5), daVinciConfig);
                    andStartGenericAvroClient.subscribeAll().get();
                    for (int i6 = 0; i6 < 10; i6++) {
                        Assert.assertEquals(andStartGenericAvroClient.get(Integer.valueOf(i6)).get(), Integer.valueOf(i5));
                    }
                    arrayList2.add(andStartGenericAvroClient);
                }
                controllerClient = new ControllerClient(this.clusterNames[0], this.parentControllerURLs);
                try {
                    TestUtils.waitForNonDeterministicPushCompletion(TestUtils.createVersionWithBatchData(controllerClient, (String) arrayList.get(0), "\"int\"", "\"int\"", IntStream.range(0, 10).mapToObj(i7 -> {
                        return new AbstractMap.SimpleEntry(Integer.valueOf(i7), 1000);
                    })).getKafkaTopic(), controllerClient, 60L, TimeUnit.SECONDS);
                    controllerClient.close();
                    TestUtils.waitForNonDeterministicAssertion(120L, TimeUnit.SECONDS, true, () -> {
                        for (int i8 = 0; i8 < i; i8++) {
                            Assert.assertEquals(((DaVinciClient) arrayList2.get(0)).get(Integer.valueOf(i8)).get(), 1000);
                        }
                    });
                    int size = arrayList.size() - 1;
                    String str2 = (String) arrayList.get(size);
                    String str3 = this.clusterNames[size];
                    String str4 = this.clusterNames[0];
                    migrateStore(str2, str3, str4);
                    ControllerClient controllerClient2 = new ControllerClient(str4, this.parentControllerURLs);
                    try {
                        TestUtils.waitForNonDeterministicPushCompletion(TestUtils.createVersionWithBatchData(controllerClient2, str2, "\"int\"", "\"int\"", IntStream.range(0, 10).mapToObj(i8 -> {
                            return new AbstractMap.SimpleEntry(Integer.valueOf(i8), 999);
                        })).getKafkaTopic(), controllerClient2, 60L, TimeUnit.SECONDS);
                        controllerClient2.close();
                        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                            for (int i9 = 0; i9 < i; i9++) {
                                Assert.assertEquals(((DaVinciClient) arrayList2.get(size)).get(Integer.valueOf(i9)).get(), 999);
                            }
                        });
                        cachingDaVinciClientFactory.close();
                    } finally {
                        try {
                            controllerClient2.close();
                        } catch (Throwable th) {
                            th.addSuppressed(th);
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            D2ClientUtils.shutdownClient(andStartD2Client);
        }
    }

    private void migrateStore(String str, String str2, String str3) throws Exception {
        AdminTool.main(new String[]{"--migrate-store", "--url", this.parentControllerURLs, "--store", str, "--cluster-src", str2, "--cluster-dest", str3});
        String[] strArr = {"--complete-migration", "--url", this.parentControllerURLs, "--store", str, "--cluster-src", str2, "--cluster-dest", str3, "--fabric", FABRIC};
        ControllerClient controllerClient = new ControllerClient(str2, this.parentControllerURLs);
        try {
            TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, true, () -> {
                AdminTool.main(strArr);
                Assert.assertEquals(controllerClient.discoverCluster(str).getCluster(), str3);
            });
            controllerClient.close();
            Utils.sleep(10000L);
            AdminTool.main(new String[]{"--end-migration", "--url", this.parentControllerURLs, "--store", str, "--cluster-src", str2, "--cluster-dest", str3});
        } catch (Throwable th) {
            try {
                controllerClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test(timeOut = 60000)
    public void testDaVinciVersionSwap() throws Exception {
        int i = 10;
        String str = this.clusterNames[0];
        String uniqueString = Utils.getUniqueString("test-version-swap");
        GenericData.Record record = new GenericData.Record(Schema.parse(RECORD_VALUE_SCHEMA));
        record.put("field1", 1);
        ControllerClient controllerClient = new ControllerClient(str, this.parentControllerURLs);
        try {
            Assert.assertFalse(controllerClient.createNewStore(uniqueString, "venice-test", "\"int\"", RECORD_VALUE_SCHEMA).isError());
            TestUtils.waitForNonDeterministicPushCompletion(TestUtils.createVersionWithBatchData(controllerClient, uniqueString, "\"int\"", RECORD_VALUE_SCHEMA, IntStream.range(0, 10).mapToObj(i2 -> {
                return new AbstractMap.SimpleEntry(Integer.valueOf(i2), record);
            }), 1).getKafkaTopic(), controllerClient, 30L, TimeUnit.SECONDS);
            makeSureSystemStoresAreOnline(controllerClient, uniqueString);
            this.multiClusterVenice.getClusters().get(str).refreshAllRouterMetaData();
            CachingDaVinciClientFactory cachingDaVinciClientFactory = new CachingDaVinciClientFactory(D2TestUtils.getAndStartD2Client(this.multiClusterVenice.getZkServerWrapper().getAddress()), VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, new MetricsRepository(), new PropertyBuilder().put("data.base.path", Utils.getTempDataDirectory().getAbsolutePath()).put("persistence.type", PersistenceType.ROCKS_DB).put("client.use.system.store.repository", true).put("client.system.store.repository.refresh.interval.seconds", 1).build());
            try {
                DaVinciClient andStartGenericAvroClient = cachingDaVinciClientFactory.getAndStartGenericAvroClient(uniqueString, new DaVinciConfig());
                andStartGenericAvroClient.subscribeAll().get();
                for (int i3 = 0; i3 < 10; i3++) {
                    Assert.assertEquals(((GenericData.Record) andStartGenericAvroClient.get(Integer.valueOf(i3)).get()).get("field1"), 1);
                }
                Assert.assertFalse(controllerClient.addValueSchema(uniqueString, NEW_RECORD_VALUE_SCHEMA).isError());
                GenericData.Record record2 = new GenericData.Record(Schema.parse(NEW_RECORD_VALUE_SCHEMA));
                record2.put("field1", 2);
                record2.put("field2", 2);
                TestUtils.createVersionWithBatchData(controllerClient, uniqueString, "\"int\"", NEW_RECORD_VALUE_SCHEMA, IntStream.range(0, 10).mapToObj(i4 -> {
                    return new AbstractMap.SimpleEntry(Integer.valueOf(i4), record2);
                }), 2);
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                    for (int i5 = 0; i5 < i; i5++) {
                        GenericData.Record record3 = (GenericData.Record) andStartGenericAvroClient.get(Integer.valueOf(i5)).get();
                        Assert.assertEquals(record3.get("field1"), 2);
                        Assert.assertEquals(record3.get("field2"), 2);
                    }
                });
                cachingDaVinciClientFactory.close();
                controllerClient.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                controllerClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void makeSureSystemStoresAreOnline(ControllerClient controllerClient, String str) {
        TestUtils.waitForNonDeterministicPushCompletion(Version.composeKafkaTopic(VeniceSystemStoreType.META_STORE.getSystemStoreName(str), 1), controllerClient, 30L, TimeUnit.SECONDS);
        TestUtils.waitForNonDeterministicPushCompletion(Version.composeKafkaTopic(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(str), 1), controllerClient, 30L, TimeUnit.SECONDS);
    }
}
