package com.linkedin.venice.endToEnd;

import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.samza.VeniceSystemFactory;
import com.linkedin.venice.samza.VeniceSystemProducer;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;

/* loaded from: input_file:com/linkedin/venice/endToEnd/TestActiveActiveReplicationWithDownRegion.class */
public class TestActiveActiveReplicationWithDownRegion {
    private static final int TEST_TIMEOUT = 90000;
    private static final int RECORDS_TO_POPULATE = 4;
    private static final int NUMBER_OF_CHILD_DATACENTERS = 2;
    private static final int NUMBER_OF_CLUSTERS = 1;
    protected List<VeniceMultiClusterWrapper> childDatacenters;
    protected List<VeniceControllerWrapper> parentControllers;
    protected VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper;
    private static final Logger LOGGER = LogManager.getLogger(TestActiveActiveReplicationWithDownRegion.class);
    private static final String[] CLUSTER_NAMES = (String[]) IntStream.range(0, 1).mapToObj(i -> {
        return "venice-cluster" + i;
    }).toArray(i2 -> {
        return new String[i2];
    });

    public Map<String, String> getExtraServerProperties() {
        return Collections.emptyMap();
    }

    @BeforeClass(alwaysRun = true)
    public void setUp() {
        Properties properties = new Properties();
        properties.put("kafka.admin.get.topic.config.max.retry.sec", 10L);
        properties.put("server.promotion.to.leader.replica.delay.seconds", 1L);
        properties.setProperty("rocksdb.plain.table.format.enabled", "false");
        properties.setProperty("server.database.checksum.verification.enabled", "true");
        properties.setProperty("server.database.sync.bytes.interval.for.deferred.write.mode", "300");
        properties.put("server.shared.kafka.producer.enabled", "true");
        properties.put("server.kafka.producer.pool.size.per.kafka.cluster", "2");
        properties.put("server.remote.ingestion.repair.sleep.interval.seconds", 5);
        Map<String, String> extraServerProperties = getExtraServerProperties();
        Objects.requireNonNull(properties);
        extraServerProperties.forEach((v1, v2) -> {
            r1.put(v1, v2);
        });
        Properties properties2 = new Properties();
        properties2.put("kafka.admin.get.topic.config.max.retry.sec", 10L);
        properties2.put("native.replication.source.fabric", "dc-0");
        properties2.put("parent.kafka.cluster.fabric.list", VeniceControllerWrapper.DEFAULT_PARENT_DATA_CENTER_REGION_NAME);
        this.multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(NUMBER_OF_CHILD_DATACENTERS, 1, 1, 1, NUMBER_OF_CHILD_DATACENTERS, 1, NUMBER_OF_CHILD_DATACENTERS, Optional.of(new VeniceProperties(properties2)), Optional.of(properties2), Optional.of(new VeniceProperties(properties)), false);
        this.childDatacenters = this.multiRegionMultiClusterWrapper.getChildRegions();
        this.parentControllers = this.multiRegionMultiClusterWrapper.getParentControllers();
    }

    @AfterClass(alwaysRun = true)
    public void cleanUp() {
        this.multiRegionMultiClusterWrapper.close();
    }

    public void testDownedKafka() throws Exception {
        int port = this.multiRegionMultiClusterWrapper.getZkServerWrapper().getPort();
        int port2 = this.multiRegionMultiClusterWrapper.getChildRegions().get(0).getKafkaBrokerWrapper().getPort();
        int port3 = this.multiRegionMultiClusterWrapper.getChildRegions().get(1).getKafkaBrokerWrapper().getPort();
        LOGGER.info("zkPort: {}", Integer.valueOf(port));
        LOGGER.info("dc0Kafka: {}", Integer.valueOf(port2));
        LOGGER.info("dc1kafka: {}", Integer.valueOf(port3));
        String str = CLUSTER_NAMES[0];
        String uniqueString = Utils.getUniqueString("test-store");
        String controllerConnectString = this.multiRegionMultiClusterWrapper.getControllerConnectString();
        ControllerClient controllerClient = new ControllerClient(str, controllerConnectString);
        try {
            controllerClient.createNewStore(uniqueString, "owner", "\"int\"", "\"string\"");
            TestUtils.updateStoreToHybrid(uniqueString, controllerClient, Optional.of(true), Optional.of(true), Optional.of(false));
            controllerClient.emptyPush(uniqueString, Utils.getUniqueString("empty-hybrid-push"), 1L);
            controllerClient.close();
            for (int i = 0; i < NUMBER_OF_CHILD_DATACENTERS; i++) {
                controllerClient = new ControllerClient(str, this.childDatacenters.get(i).getLeaderController(str).getControllerUrl());
                try {
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                        StoreResponse store = controllerClient.getStore(uniqueString);
                        Assert.assertFalse(store.isError());
                        Assert.assertEquals(store.getStore().getCurrentVersion(), 1);
                    });
                    controllerClient.close();
                } finally {
                }
            }
            VeniceSystemProducer veniceSystemProducer = new VeniceSystemProducer(this.childDatacenters.get(0).getZkServerWrapper().getAddress(), this.childDatacenters.get(0).getZkServerWrapper().getAddress(), VeniceControllerWrapper.D2_SERVICE_NAME, uniqueString, Version.PushType.STREAM, Utils.getUniqueString("venice-push-id"), "dc-0", true, (VeniceSystemFactory) null, Optional.empty(), Optional.empty());
            veniceSystemProducer.start();
            VeniceSystemProducer veniceSystemProducer2 = new VeniceSystemProducer(this.childDatacenters.get(1).getZkServerWrapper().getAddress(), this.childDatacenters.get(1).getZkServerWrapper().getAddress(), VeniceControllerWrapper.D2_SERVICE_NAME, uniqueString, Version.PushType.STREAM, Utils.getUniqueString("venice-push-id"), "dc-1", true, (VeniceSystemFactory) null, Optional.empty(), Optional.empty());
            veniceSystemProducer2.start();
            VeniceSystemProducer veniceSystemProducer3 = new VeniceSystemProducer(this.childDatacenters.get(0).getZkServerWrapper().getAddress(), this.multiRegionMultiClusterWrapper.getZkServerWrapper().getAddress(), VeniceControllerWrapper.PARENT_D2_SERVICE_NAME, uniqueString, Version.PushType.BATCH, Utils.getUniqueString("venice-push-id"), "dc-0", true, (VeniceSystemFactory) null, Optional.empty(), Optional.empty());
            veniceSystemProducer3.start();
            for (int i2 = 0; i2 < RECORDS_TO_POPULATE; i2++) {
                veniceSystemProducer.send(uniqueString, new OutgoingMessageEnvelope(new SystemStream("venice", uniqueString), Integer.valueOf(i2), "value" + i2));
            }
            veniceSystemProducer.stop();
            for (int i3 = 0; i3 < RECORDS_TO_POPULATE; i3++) {
                veniceSystemProducer2.send(uniqueString, new OutgoingMessageEnvelope(new SystemStream("venice", uniqueString), Integer.valueOf(i3 + 10), "value1" + i3));
            }
            veniceSystemProducer2.stop();
            for (String str2 : CLUSTER_NAMES) {
                AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.childDatacenters.get(0).getClusters().get(str2).getRandomRouterURL()));
                try {
                    TestUtils.waitForNonDeterministicAssertion(80L, TimeUnit.SECONDS, () -> {
                        for (int i4 = 0; i4 < RECORDS_TO_POPULATE; i4++) {
                            Object obj = andStartGenericAvroClient.get(Integer.valueOf(i4)).get();
                            Assert.assertNotNull(obj, "Cluster:" + str2 + " didn't have key:" + i4);
                            Assert.assertEquals(obj.toString(), "value" + i4);
                        }
                    });
                    if (andStartGenericAvroClient != null) {
                        andStartGenericAvroClient.close();
                    }
                } catch (Throwable th) {
                    if (andStartGenericAvroClient != null) {
                        try {
                            andStartGenericAvroClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            this.multiRegionMultiClusterWrapper.getChildRegions().get(1).getKafkaBrokerWrapper().close();
            ControllerClient controllerClient2 = new ControllerClient(str, controllerConnectString);
            for (int i4 = 0; i4 < RECORDS_TO_POPULATE; i4++) {
                try {
                    veniceSystemProducer3.send(uniqueString, new OutgoingMessageEnvelope(new SystemStream("venice", uniqueString), Integer.valueOf(i4), "value" + i4));
                } finally {
                    try {
                        controllerClient2.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            }
            controllerClient2.writeEndOfPush(uniqueString, NUMBER_OF_CHILD_DATACENTERS);
            controllerClient2.close();
            for (int i5 = 0; i5 < 1; i5++) {
                controllerClient = new ControllerClient(str, this.childDatacenters.get(i5).getLeaderController(str).getControllerUrl());
                try {
                    TestUtils.waitForNonDeterministicAssertion(6000L, TimeUnit.SECONDS, () -> {
                        StoreResponse store = controllerClient.getStore(uniqueString);
                        Assert.assertFalse(store.isError());
                        Assert.assertEquals(store.getStore().getCurrentVersion(), NUMBER_OF_CHILD_DATACENTERS);
                    });
                    controllerClient.close();
                } finally {
                    try {
                        controllerClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                }
            }
        } finally {
        }
    }
}
