package com.linkedin.venice.endToEnd;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.davinci.client.DaVinciClient;
import com.linkedin.davinci.client.DaVinciConfig;
import com.linkedin.venice.AdminTool;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.AvroSpecificStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.client.store.StatTrackingStoreClient;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.VenicePushJob;
import com.linkedin.venice.integration.utils.D2TestUtils;
import com.linkedin.venice.integration.utils.DaVinciTestContext;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pushstatushelper.PushStatusStoreReader;
import com.linkedin.venice.system.store.MetaStoreDataType;
import com.linkedin.venice.systemstore.schemas.StoreMetaKey;
import com.linkedin.venice.systemstore.schemas.StoreMetaValue;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.Closeable;
import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.samza.system.SystemProducer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:com/linkedin/venice/endToEnd/TestStoreMigration.class */
public class TestStoreMigration {
    private static final int TEST_TIMEOUT = 120000;
    private static final int RECORD_COUNT = 20;
    private static final String NEW_OWNER = "newtest@linkedin.com";
    private static final String FABRIC0 = "dc-0";
    private static final boolean[] ABORT_MIGRATION_PROMPTS_OVERRIDE = {false, true, true};
    private VeniceTwoLayerMultiRegionMultiClusterWrapper twoLayerMultiRegionMultiClusterWrapper;
    private VeniceMultiClusterWrapper multiClusterWrapper;
    private String srcClusterName;
    private String destClusterName;
    private String parentControllerUrl;
    private String childControllerUrl0;

    @BeforeClass
    public void setUp() {
        Properties properties = new Properties();
        properties.setProperty("topic.cleanup.sleep.interval.between.topic.list.fetch.ms", String.valueOf(Long.MAX_VALUE));
        properties.setProperty("offline.job.start.timeout.ms", "180000");
        Properties properties2 = new Properties();
        properties2.put("server.promotion.to.leader.replica.delay.seconds", 1L);
        this.twoLayerMultiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(1, 2, 1, 1, 2, 1, 2, Optional.of(new VeniceProperties(properties)), Optional.empty(), Optional.of(new VeniceProperties(properties2)), false);
        this.multiClusterWrapper = this.twoLayerMultiRegionMultiClusterWrapper.getChildRegions().get(0);
        String[] clusterNames = this.multiClusterWrapper.getClusterNames();
        Arrays.sort(clusterNames);
        this.srcClusterName = clusterNames[0];
        this.destClusterName = clusterNames[1];
        this.parentControllerUrl = this.twoLayerMultiRegionMultiClusterWrapper.getControllerConnectString();
        this.childControllerUrl0 = this.multiClusterWrapper.getControllerConnectString();
        for (String str : clusterNames) {
            ControllerClient controllerClient = new ControllerClient(str, this.childControllerUrl0);
            try {
                TestUtils.waitForNonDeterministicPushCompletion(Version.composeKafkaTopic(VeniceSystemStoreUtils.getParticipantStoreNameForCluster(str), 1), controllerClient, 5L, TimeUnit.MINUTES);
                controllerClient.close();
            } catch (Throwable th) {
                try {
                    controllerClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
    }

    @AfterClass(alwaysRun = true)
    public void cleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.twoLayerMultiRegionMultiClusterWrapper});
    }

    @Test(timeOut = 120000)
    public void testStoreMigration() throws Exception {
        String uniqueString = Utils.getUniqueString("test");
        createAndPushStore(this.srcClusterName, uniqueString);
        String str = this.multiClusterWrapper.getClusterToD2().get(this.srcClusterName);
        String str2 = this.multiClusterWrapper.getClusterToD2().get(this.destClusterName);
        AvroGenericStoreClient<String, Object> andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setD2ServiceName(str).setD2Client(D2TestUtils.getAndStartD2Client(this.multiClusterWrapper.getClusters().get(this.srcClusterName).getZk().getAddress())));
        try {
            readFromStore(andStartGenericAvroClient);
            startMigration(this.parentControllerUrl, uniqueString);
            completeMigration(this.parentControllerUrl, uniqueString);
            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                readFromStore(andStartGenericAvroClient);
                Assert.assertTrue(((StatTrackingStoreClient) andStartGenericAvroClient).getInnerStoreClient().toString().contains(str2));
            });
            if (andStartGenericAvroClient != null) {
                andStartGenericAvroClient.close();
            }
            ControllerClient controllerClient = new ControllerClient(this.srcClusterName, this.parentControllerUrl);
            try {
                ControllerClient controllerClient2 = new ControllerClient(this.destClusterName, this.parentControllerUrl);
                try {
                    abortMigration(this.parentControllerUrl, uniqueString, true);
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                        checkStatusAfterAbortMigration(controllerClient, controllerClient2, uniqueString);
                    });
                    controllerClient2.close();
                    controllerClient.close();
                } finally {
                }
            } catch (Throwable th) {
                try {
                    controllerClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (andStartGenericAvroClient != null) {
                try {
                    andStartGenericAvroClient.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test(timeOut = 120000)
    public void testStoreMigrationWithNewPushesAndUpdates() throws Exception {
        String uniqueString = Utils.getUniqueString("testWithNewPushesAndUpdates");
        Properties createAndPushStore = createAndPushStore(this.srcClusterName, uniqueString);
        ControllerClient controllerClient = new ControllerClient(this.srcClusterName, this.parentControllerUrl);
        try {
            ControllerClient controllerClient2 = new ControllerClient(this.destClusterName, this.parentControllerUrl);
            try {
                startMigration(this.parentControllerUrl, uniqueString);
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                    Assert.assertTrue(controllerClient.getStore(uniqueString).getStore().isMigrating());
                });
                TestWriteUtils.runPushJob("Test push job 2", createAndPushStore);
                controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setOwner(NEW_OWNER));
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                    StoreInfo store = controllerClient.getStore(uniqueString).getStore();
                    StoreInfo store2 = controllerClient2.getStore(uniqueString).getStore();
                    Assert.assertNotNull(store);
                    Assert.assertNotNull(store2);
                    Assert.assertEquals(store.getLargestUsedVersionNumber(), 2);
                    Assert.assertEquals(store2.getLargestUsedVersionNumber(), 2);
                    Assert.assertEquals(store.getOwner(), NEW_OWNER);
                    Assert.assertEquals(store2.getOwner(), NEW_OWNER);
                    Assert.assertEquals(((Version) store.getVersions().get(1)).getRmdVersionId(), 1);
                    Assert.assertEquals(((Version) store2.getVersions().get(1)).getRmdVersionId(), 1);
                });
                controllerClient2.close();
                controllerClient.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                controllerClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test(timeOut = 120000)
    public void testStoreMigrationWithMetaSystemStore() throws Exception {
        final String uniqueString = Utils.getUniqueString("testWithMetaSystemStore");
        createAndPushStore(this.srcClusterName, uniqueString);
        ControllerClient controllerClient = new ControllerClient(this.srcClusterName, this.childControllerUrl0);
        try {
            String systemStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(uniqueString);
            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                StoreResponse store = controllerClient.getStore(systemStoreName);
                Assert.assertFalse(store.isError());
                Assert.assertTrue(store.getStore().getCurrentVersion() > 0, systemStoreName + " is not ready");
            });
            controllerClient.close();
            String systemStoreName2 = VeniceSystemStoreType.META_STORE.getSystemStoreName(uniqueString);
            String str = this.multiClusterWrapper.getClusterToD2().get(this.srcClusterName);
            AvroSpecificStoreClient andStartSpecificAvroClient = ClientFactory.getAndStartSpecificAvroClient(ClientConfig.defaultSpecificClientConfig(systemStoreName2, StoreMetaValue.class).setD2ServiceName(str).setD2Client(D2TestUtils.getAndStartD2Client(this.multiClusterWrapper.getClusters().get(this.srcClusterName).getZk().getAddress())).setStoreName(systemStoreName2));
            try {
                StoreMetaKey storeMetaKey = MetaStoreDataType.STORE_PROPERTIES.getStoreMetaKey(new HashMap<String, String>() { // from class: com.linkedin.venice.endToEnd.TestStoreMigration.1
                    {
                        put("KEY_STORE_NAME", uniqueString);
                        put("KEY_CLUSTER_NAME", TestStoreMigration.this.srcClusterName);
                    }
                });
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                    StoreMetaValue storeMetaValue = (StoreMetaValue) andStartSpecificAvroClient.get(storeMetaKey).get();
                    Assert.assertTrue((storeMetaValue == null || storeMetaValue.storeProperties == null) ? false : true);
                });
                startMigration(this.parentControllerUrl, uniqueString);
                completeMigration(this.parentControllerUrl, uniqueString);
                StoreMetaKey storeMetaKey2 = MetaStoreDataType.STORE_PROPERTIES.getStoreMetaKey(new HashMap<String, String>() { // from class: com.linkedin.venice.endToEnd.TestStoreMigration.2
                    {
                        put("KEY_STORE_NAME", uniqueString);
                        put("KEY_CLUSTER_NAME", TestStoreMigration.this.destClusterName);
                    }
                });
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                    try {
                        StoreMetaValue storeMetaValue = (StoreMetaValue) andStartSpecificAvroClient.get(storeMetaKey2).get();
                        Assert.assertTrue((storeMetaValue == null || storeMetaValue.storeProperties == null) ? false : true);
                    } catch (Exception e) {
                        Assert.fail("Exception is not unexpected: " + e.getMessage());
                    }
                });
                endMigration(this.parentControllerUrl, uniqueString);
                if (andStartSpecificAvroClient != null) {
                    andStartSpecificAvroClient.close();
                }
            } catch (Throwable th) {
                if (andStartSpecificAvroClient != null) {
                    try {
                        andStartSpecificAvroClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            try {
                controllerClient.close();
            } catch (Throwable th4) {
                th3.addSuppressed(th4);
            }
            throw th3;
        }
    }

    @Test(timeOut = 120000)
    public void testStoreMigrationWithDaVinciPushStatusSystemStore() throws Exception {
        String uniqueString = Utils.getUniqueString("testWithDaVinciPushStatusSystemStore");
        createAndPushStore(this.srcClusterName, uniqueString);
        ControllerClient controllerClient = new ControllerClient(this.srcClusterName, this.childControllerUrl0);
        try {
            String systemStoreName = VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(uniqueString);
            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                StoreResponse store = controllerClient.getStore(systemStoreName);
                Assert.assertFalse(store.isError());
                Assert.assertTrue(store.getStore().getCurrentVersion() > 0, systemStoreName + " is not ready");
            });
            controllerClient.close();
            VeniceProperties build = DaVinciTestContext.getDaVinciPropertyBuilder(this.multiClusterWrapper.getZkServerWrapper().getAddress()).put("push.status.store.enabled", true).put("offline.push.monitor.davinci.push.status.scan.interval.in.seconds", 5).build();
            D2Client andStartD2Client = D2TestUtils.getAndStartD2Client(this.multiClusterWrapper.getClusters().get(this.srcClusterName).getZk().getAddress());
            Closeable pushStatusStoreReader = new PushStatusStoreReader(andStartD2Client, VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, TimeUnit.MINUTES.toSeconds(10L));
            try {
                DaVinciClient genericAvroDaVinciClient = ServiceFactory.getGenericAvroDaVinciClient(uniqueString, this.multiClusterWrapper.getClusters().get(this.srcClusterName), new DaVinciConfig(), build);
                try {
                    genericAvroDaVinciClient.subscribeAll().get();
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                        Assert.assertEquals(pushStatusStoreReader.getPartitionStatus(uniqueString, 1, 0, Optional.empty()).size(), 1);
                    });
                    startMigration(this.parentControllerUrl, uniqueString);
                    completeMigration(this.parentControllerUrl, uniqueString);
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                        Assert.assertEquals(pushStatusStoreReader.getPartitionStatus(uniqueString, 1, 0, Optional.empty()).size(), 1);
                    });
                    if (genericAvroDaVinciClient != null) {
                        genericAvroDaVinciClient.close();
                    }
                    Utils.closeQuietlyWithErrorLogged(new Closeable[]{pushStatusStoreReader});
                    D2ClientUtils.shutdownClient(andStartD2Client);
                    controllerClient = new ControllerClient(this.destClusterName, this.parentControllerUrl);
                    try {
                        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                            StoreResponse store = controllerClient.getStore(uniqueString);
                            Assert.assertFalse(store.isError());
                            Assert.assertTrue(store.getStore().isStoreMetaSystemStoreEnabled());
                            Assert.assertTrue(store.getStore().isDaVinciPushStatusStoreEnabled());
                        });
                        controllerClient.close();
                    } finally {
                    }
                } finally {
                }
            } catch (Throwable th) {
                Utils.closeQuietlyWithErrorLogged(new Closeable[]{pushStatusStoreReader});
                D2ClientUtils.shutdownClient(andStartD2Client);
                throw th;
            }
        } finally {
        }
    }

    private Properties createAndPushStore(String str, String str2) throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.twoLayerMultiRegionMultiClusterWrapper, "file:" + tempDataDirectory.getAbsolutePath(), str2);
        defaultVPJProps.put("send.control.messages.directly", true);
        Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory, true, RECORD_COUNT);
        IntegrationTestPushUtils.createStoreForJob(str, writeSimpleAvroFileWithUserSchema.getField("key").schema().toString(), writeSimpleAvroFileWithUserSchema.getField("value").schema().toString(), defaultVPJProps, new UpdateStoreQueryParams().setStorageQuotaInByte(-1L).setHybridRewindSeconds(120000L).setHybridOffsetLagThreshold(2L)).close();
        ControllerClient controllerClient = new ControllerClient(str, this.childControllerUrl0);
        try {
            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                Assert.assertNotNull(controllerClient.getStore(str2).getStore());
            });
            controllerClient.close();
            SystemProducer systemProducer = null;
            try {
                try {
                    VenicePushJob venicePushJob = new VenicePushJob("Test push job", defaultVPJProps);
                    try {
                        venicePushJob.run();
                        SystemProducer samzaProducer = IntegrationTestPushUtils.getSamzaProducer(this.multiClusterWrapper.getClusters().get(str), str2, Version.PushType.STREAM, new Pair[0]);
                        for (int i = 1; i <= 10; i++) {
                            IntegrationTestPushUtils.sendStreamingRecord(samzaProducer, str2, i);
                        }
                        venicePushJob.close();
                        if (samzaProducer != null) {
                            samzaProducer.stop();
                        }
                        return defaultVPJProps;
                    } catch (Throwable th) {
                        try {
                            venicePushJob.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } catch (Exception e) {
                    throw new VeniceException(e);
                }
            } catch (Throwable th3) {
                if (0 != 0) {
                    systemProducer.stop();
                }
                throw th3;
            }
        } catch (Throwable th4) {
            try {
                controllerClient.close();
            } catch (Throwable th5) {
                th4.addSuppressed(th5);
            }
            throw th4;
        }
    }

    private void startMigration(String str, String str2) throws Exception {
        AdminTool.main(new String[]{"--migrate-store", "--url", str, "--store", str2, "--cluster-src", this.srcClusterName, "--cluster-dest", this.destClusterName});
    }

    private void completeMigration(String str, String str2) {
        String[] strArr = {"--complete-migration", "--url", str, "--store", str2, "--cluster-src", this.srcClusterName, "--cluster-dest", this.destClusterName, "--fabric", FABRIC0};
        ControllerClient controllerClient = new ControllerClient(this.destClusterName, str);
        try {
            TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, () -> {
                AdminTool.main(strArr);
                Assert.assertEquals(controllerClient.discoverCluster(str2).getCluster(), this.destClusterName);
            });
            controllerClient.close();
        } catch (Throwable th) {
            try {
                controllerClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void endMigration(String str, String str2) throws Exception {
        AdminTool.main(new String[]{"--end-migration", "--url", str, "--store", str2, "--cluster-src", this.srcClusterName, "--cluster-dest", this.destClusterName});
        ControllerClient controllerClient = new ControllerClient(this.srcClusterName, str);
        try {
            ControllerClient controllerClient2 = new ControllerClient(this.destClusterName, str);
            try {
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                    Assert.assertNull(controllerClient.getStore(str2).getStore());
                    StoreResponse store = controllerClient2.getStore(str2);
                    Assert.assertNotNull(store.getStore());
                    Assert.assertFalse(store.getStore().isMigrating());
                    Assert.assertFalse(store.getStore().isMigrationDuplicateStore());
                });
                controllerClient2.close();
                controllerClient.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                controllerClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void readFromStore(AvroGenericStoreClient<String, Object> avroGenericStoreClient) {
        avroGenericStoreClient.get(Integer.toString(ThreadLocalRandom.current().nextInt(RECORD_COUNT) + 1));
    }

    private void abortMigration(String str, String str2, boolean z) {
        AdminTool.abortMigration(str, str2, this.srcClusterName, this.destClusterName, z, ABORT_MIGRATION_PROMPTS_OVERRIDE);
    }

    private void checkStatusAfterAbortMigration(ControllerClient controllerClient, ControllerClient controllerClient2, String str) {
        StoreResponse store = controllerClient.getStore(str);
        Assert.assertNotNull(store.getStore());
        Assert.assertFalse(store.getStore().isMigrating());
        Assert.assertNull(controllerClient2.getStore(str).getStore());
        Assert.assertEquals(controllerClient2.discoverCluster(str).getCluster(), this.srcClusterName);
    }
}
