package com.linkedin.venice.endToEnd;

import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.controller.Admin;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ReadyForDataRecoveryResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.samza.VeniceSystemFactory;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.Closeable;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.samza.config.MapConfig;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.SystemProducer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/endToEnd/DataRecoveryTest.class */
public class DataRecoveryTest {
    private static final long TEST_TIMEOUT = 120000;
    private static final int NUMBER_OF_CHILD_DATACENTERS = 2;
    private static final int NUMBER_OF_CLUSTERS = 1;
    private static final String[] CLUSTER_NAMES = (String[]) IntStream.range(0, 1).mapToObj(i -> {
        return "venice-cluster" + i;
    }).toArray(i2 -> {
        return new String[i2];
    });
    private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper;
    private List<VeniceMultiClusterWrapper> childDatacenters;
    private List<VeniceControllerWrapper> parentControllers;
    private String clusterName;
    private PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();

    @BeforeClass(alwaysRun = true)
    public void setUp() {
        Utils.thisIsLocalhost();
        Properties properties = new Properties();
        properties.put("server.promotion.to.leader.replica.delay.seconds", 1L);
        properties.setProperty("rocksdb.plain.table.format.enabled", "false");
        properties.setProperty("server.database.checksum.verification.enabled", "true");
        properties.setProperty("server.database.sync.bytes.interval.for.deferred.write.mode", "300");
        properties.put("server.shared.kafka.producer.enabled", "true");
        properties.put("server.kafka.producer.pool.size.per.kafka.cluster", "2");
        Properties properties2 = new Properties();
        properties2.put("native.replication.source.fabric", VeniceControllerWrapper.DEFAULT_PARENT_DATA_CENTER_REGION_NAME);
        properties2.put("parent.kafka.cluster.fabric.list", VeniceControllerWrapper.DEFAULT_PARENT_DATA_CENTER_REGION_NAME);
        properties2.put("allow.cluster.wipe", "true");
        properties2.put("topic.cleanup.sleep.interval.between.topic.list.fetch.ms", "1000");
        properties2.put("min.number.of.unused.kafka.topics.to.preserve", "0");
        this.multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(NUMBER_OF_CHILD_DATACENTERS, 1, 1, 1, NUMBER_OF_CHILD_DATACENTERS, 1, NUMBER_OF_CHILD_DATACENTERS, Optional.of(new VeniceProperties(properties2)), Optional.of(properties2), Optional.of(new VeniceProperties(properties)), false);
        this.childDatacenters = this.multiRegionMultiClusterWrapper.getChildRegions();
        this.parentControllers = this.multiRegionMultiClusterWrapper.getParentControllers();
        this.clusterName = CLUSTER_NAMES[0];
    }

    @AfterClass(alwaysRun = true)
    public void cleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.multiRegionMultiClusterWrapper});
    }

    @Test(timeOut = TEST_TIMEOUT)
    public void testStartDataRecoveryAPIs() {
        String uniqueString = Utils.getUniqueString("dataRecovery-store");
        ControllerClient controllerClient = new ControllerClient(this.clusterName, this.multiRegionMultiClusterWrapper.getControllerConnectString());
        try {
            ControllerClient controllerClient2 = new ControllerClient(this.clusterName, this.childDatacenters.get(0).getControllerConnectString());
            try {
                controllerClient = new ControllerClient(this.clusterName, this.childDatacenters.get(1).getControllerConnectString());
                try {
                    TestUtils.createAndVerifyStoreInAllRegions(uniqueString, controllerClient, Arrays.asList(controllerClient2, controllerClient));
                    Assert.assertFalse(controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setHybridRewindSeconds(10L).setHybridOffsetLagThreshold(2L).setHybridDataReplicationPolicy(DataReplicationPolicy.AGGREGATE).setNativeReplicationEnabled(true).setPartitionCount(1)).isError());
                    TestUtils.verifyDCConfigNativeAndActiveRepl(uniqueString, true, false, new ControllerClient[]{controllerClient2, controllerClient});
                    Assert.assertFalse(controllerClient.emptyPush(uniqueString, "empty-push-" + System.currentTimeMillis(), 1000L).isError());
                    TestUtils.waitForNonDeterministicPushCompletion(Version.composeKafkaTopic(uniqueString, 1), controllerClient, 30L, TimeUnit.SECONDS);
                    ReadyForDataRecoveryResponse isStoreVersionReadyForDataRecovery = controllerClient.isStoreVersionReadyForDataRecovery("dc-0", "dc-1", uniqueString, 1, Optional.empty());
                    Assert.assertFalse(isStoreVersionReadyForDataRecovery.isError());
                    Assert.assertFalse(isStoreVersionReadyForDataRecovery.isReady());
                    Assert.assertFalse(controllerClient.prepareDataRecovery("dc-0", "dc-1", uniqueString, 1, Optional.empty()).isError());
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                        ReadyForDataRecoveryResponse isStoreVersionReadyForDataRecovery2 = controllerClient.isStoreVersionReadyForDataRecovery("dc-0", "dc-1", uniqueString, 1, Optional.empty());
                        Assert.assertTrue(isStoreVersionReadyForDataRecovery2.isReady(), isStoreVersionReadyForDataRecovery2.getReason());
                    });
                    Assert.assertFalse(controllerClient.dataRecovery("dc-0", "dc-1", uniqueString, 1, false, true, Optional.empty()).isError());
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                        Admin veniceAdmin = this.childDatacenters.get(1).getLeaderController(this.clusterName).getVeniceAdmin();
                        Assert.assertTrue(veniceAdmin.getStore(this.clusterName, uniqueString).containsVersion(1));
                        Assert.assertTrue(veniceAdmin.getTopicManager().containsTopic(this.pubSubTopicRepository.getTopic(Version.composeKafkaTopic(uniqueString, 1))));
                    });
                    controllerClient.close();
                    controllerClient2.close();
                    controllerClient.close();
                } finally {
                    try {
                        controllerClient.close();
                    } catch (Throwable th) {
                        th.addSuppressed(th);
                    }
                }
            } finally {
            }
        } catch (Throwable th2) {
            throw th2;
        }
    }

    @Test(timeOut = TEST_TIMEOUT)
    public void testBatchOnlyDataRecovery() throws Exception {
        String uniqueString = Utils.getUniqueString("dataRecovery-store-batch");
        ControllerClient controllerClient = new ControllerClient(this.clusterName, this.multiRegionMultiClusterWrapper.getControllerConnectString());
        try {
            ControllerClient controllerClient2 = new ControllerClient(this.clusterName, this.childDatacenters.get(0).getControllerConnectString());
            try {
                controllerClient = new ControllerClient(this.clusterName, this.childDatacenters.get(1).getControllerConnectString());
                try {
                    TestUtils.createAndVerifyStoreInAllRegions(uniqueString, controllerClient, Arrays.asList(controllerClient2, controllerClient));
                    Assert.assertFalse(controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setNativeReplicationEnabled(true).setPartitionCount(1)).isError());
                    TestUtils.verifyDCConfigNativeAndActiveRepl(uniqueString, true, false, new ControllerClient[]{controllerClient2, controllerClient});
                    VersionCreationResponse requestTopicForWrites = controllerClient.requestTopicForWrites(uniqueString, 1024L, Version.PushType.BATCH, Version.guidBasedDummyPushId(), true, false, false, Optional.empty(), Optional.empty(), Optional.empty(), false, -1L);
                    Assert.assertFalse(requestTopicForWrites.isError());
                    TestUtils.writeBatchData(requestTopicForWrites, "\"string\"", "\"string\"", IntStream.range(0, 10).mapToObj(i -> {
                        return new AbstractMap.SimpleEntry(String.valueOf(i), String.valueOf(i));
                    }), 1);
                    TestUtils.waitForNonDeterministicPushCompletion(requestTopicForWrites.getKafkaTopic(), controllerClient, 60L, TimeUnit.SECONDS);
                    Assert.assertFalse(controllerClient.prepareDataRecovery("dc-0", "dc-1", uniqueString, 1, Optional.empty()).isError());
                    TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, true, () -> {
                        ReadyForDataRecoveryResponse isStoreVersionReadyForDataRecovery = controllerClient.isStoreVersionReadyForDataRecovery("dc-0", "dc-1", uniqueString, 1, Optional.empty());
                        Assert.assertTrue(isStoreVersionReadyForDataRecovery.isReady(), isStoreVersionReadyForDataRecovery.getReason());
                    });
                    Assert.assertFalse(controllerClient.dataRecovery("dc-0", "dc-1", uniqueString, 1, false, true, Optional.empty()).isError());
                    TestUtils.waitForNonDeterministicPushCompletion(requestTopicForWrites.getKafkaTopic(), controllerClient, 60L, TimeUnit.SECONDS);
                    AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.childDatacenters.get(1).getClusters().get(this.clusterName).getRandomRouterURL()));
                    for (int i2 = 0; i2 < 10; i2++) {
                        try {
                            Object obj = andStartGenericAvroClient.get(String.valueOf(i2)).get();
                            Assert.assertNotNull(obj, "Batch data should be consumed already in data center dc-1");
                            Assert.assertEquals(obj.toString(), String.valueOf(i2));
                        } catch (Throwable th) {
                            if (andStartGenericAvroClient != null) {
                                try {
                                    andStartGenericAvroClient.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    }
                    if (andStartGenericAvroClient != null) {
                        andStartGenericAvroClient.close();
                    }
                    controllerClient.close();
                    controllerClient2.close();
                    controllerClient.close();
                } finally {
                    try {
                        controllerClient.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            } finally {
            }
        } catch (Throwable th4) {
            throw th4;
        }
    }

    @Test(timeOut = 240000)
    public void testHybridAADataRecovery() throws Exception {
        String uniqueString = Utils.getUniqueString("dataRecovery-store-hybrid-AA");
        ControllerClient controllerClient = new ControllerClient(this.clusterName, this.multiRegionMultiClusterWrapper.getControllerConnectString());
        try {
            ControllerClient controllerClient2 = new ControllerClient(this.clusterName, this.childDatacenters.get(0).getControllerConnectString());
            try {
                controllerClient = new ControllerClient(this.clusterName, this.childDatacenters.get(1).getControllerConnectString());
                try {
                    TestUtils.createAndVerifyStoreInAllRegions(uniqueString, controllerClient, Arrays.asList(controllerClient2, controllerClient));
                    Assert.assertFalse(controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setHybridRewindSeconds(10L).setHybridOffsetLagThreshold(2L).setHybridDataReplicationPolicy(DataReplicationPolicy.ACTIVE_ACTIVE).setNativeReplicationEnabled(true).setActiveActiveReplicationEnabled(true).setPartitionCount(1)).isError());
                    TestUtils.verifyDCConfigNativeAndActiveRepl(uniqueString, true, true, new ControllerClient[]{controllerClient2, controllerClient});
                    Assert.assertFalse(controllerClient.emptyPush(uniqueString, "empty-push-" + System.currentTimeMillis(), 1000L).isError());
                    String composeKafkaTopic = Version.composeKafkaTopic(uniqueString, 1);
                    TestUtils.waitForNonDeterministicPushCompletion(composeKafkaTopic, controllerClient, 60L, TimeUnit.SECONDS);
                    HashMap hashMap = new HashMap();
                    hashMap.put("systems.venice.push.type", Version.PushType.STREAM.toString());
                    hashMap.put("systems.venice.store", uniqueString);
                    hashMap.put("systems.venice.aggregate", "false");
                    hashMap.put("venice.child.d2.zk.hosts", this.childDatacenters.get(0).getZkServerWrapper().getAddress());
                    hashMap.put("venice.child.controller.d2.service", VeniceControllerWrapper.D2_SERVICE_NAME);
                    hashMap.put("venice.parent.d2.zk.hosts", this.parentControllers.get(0).getZkAddress());
                    hashMap.put("venice.parent.controller.d2.service", VeniceControllerWrapper.PARENT_D2_SERVICE_NAME);
                    hashMap.put("deployment.id", Utils.getUniqueString("venice-push-id"));
                    hashMap.put("ssl.enabled", "false");
                    SystemProducer producer = new VeniceSystemFactory().getProducer("venice", new MapConfig(hashMap), (MetricsRegistry) null);
                    producer.start();
                    for (int i = 0; i < 10; i++) {
                        IntegrationTestPushUtils.sendStreamingRecordWithKeyPrefix(producer, uniqueString, "dc-0_", i);
                    }
                    Assert.assertFalse(controllerClient.prepareDataRecovery("dc-0", "dc-1", uniqueString, 1, Optional.empty()).isError());
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                        ReadyForDataRecoveryResponse isStoreVersionReadyForDataRecovery = controllerClient.isStoreVersionReadyForDataRecovery("dc-0", "dc-1", uniqueString, 1, Optional.empty());
                        Assert.assertTrue(isStoreVersionReadyForDataRecovery.isReady(), isStoreVersionReadyForDataRecovery.getReason());
                    });
                    Assert.assertFalse(controllerClient.dataRecovery("dc-0", "dc-1", uniqueString, 1, false, true, Optional.empty()).isError());
                    TestUtils.waitForNonDeterministicPushCompletion(composeKafkaTopic, controllerClient, 60L, TimeUnit.SECONDS);
                    AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.childDatacenters.get(1).getClusters().get(this.clusterName).getRandomRouterURL()));
                    for (int i2 = 0; i2 < 10; i2++) {
                        try {
                            String valueOf = String.valueOf(i2);
                            TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, () -> {
                                Object obj = andStartGenericAvroClient.get("dc-0_" + valueOf).get();
                                Assert.assertNotNull(obj, "Batch data should be consumed already in data center dc-1");
                                Assert.assertEquals(obj.toString(), "stream_" + valueOf);
                            });
                        } catch (Throwable th) {
                            if (andStartGenericAvroClient != null) {
                                try {
                                    andStartGenericAvroClient.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    }
                    if (andStartGenericAvroClient != null) {
                        andStartGenericAvroClient.close();
                    }
                    controllerClient.close();
                    controllerClient2.close();
                    controllerClient.close();
                } finally {
                    try {
                        controllerClient.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            } finally {
            }
        } catch (Throwable th4) {
            throw th4;
        }
    }
}
