package com.linkedin.venice.endToEnd;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.d2.balancer.D2ClientBuilder;
import com.linkedin.davinci.client.AvroGenericDaVinciClient;
import com.linkedin.davinci.client.DaVinciClient;
import com.linkedin.davinci.client.DaVinciConfig;
import com.linkedin.davinci.client.factory.CachingDaVinciClientFactory;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.controllerapi.NewStoreResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.integration.utils.DaVinciTestContext;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.meta.IngestionMode;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import io.tehuti.metrics.MetricsRepository;
import java.io.Closeable;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/endToEnd/DaVinciLiveUpdateSuppressionTest.class */
public class DaVinciLiveUpdateSuppressionTest {
    private static final Logger LOGGER = LogManager.getLogger(DaVinciLiveUpdateSuppressionTest.class);
    private static final int KEY_COUNT = 10;
    private static final int TEST_TIMEOUT = 120000;
    private VeniceClusterWrapper cluster;
    private D2Client d2Client;

    @BeforeClass
    public void setUp() {
        Utils.thisIsLocalhost();
        Properties properties = new Properties();
        properties.put("server.promotion.to.leader.replica.delay.seconds", 1L);
        this.cluster = ServiceFactory.getVeniceCluster(1, 2, 1, 1, 100, false, false, properties);
        this.d2Client = new D2ClientBuilder().setZkHosts(this.cluster.getZk().getAddress()).setZkSessionTimeout(3L, TimeUnit.SECONDS).setZkStartupTimeout(3L, TimeUnit.SECONDS).build();
        D2ClientUtils.startClient(this.d2Client);
    }

    @AfterClass
    public void cleanUp() {
        if (this.d2Client != null) {
            D2ClientUtils.shutdownClient(this.d2Client);
        }
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.cluster});
    }

    @AfterMethod
    public void verifyPostConditions(Method method) {
        try {
            Assert.assertThrows(NullPointerException.class, AvroGenericDaVinciClient::getBackend);
        } catch (AssertionError e) {
            throw new AssertionError(method.getName() + " leaked DaVinciBackend.", e);
        }
    }

    @Test(dataProvider = "Isolated-Ingestion", dataProviderClass = DataProviderUtils.class, timeOut = 240000)
    public void testLiveUpdateSuppression(IngestionMode ingestionMode) throws Exception {
        String uniqueString = Utils.getUniqueString("store");
        this.cluster.useControllerClient(controllerClient -> {
            NewStoreResponse createNewStore = controllerClient.createNewStore(uniqueString, getClass().getName(), "\"int\"", "\"int\"");
            if (createNewStore.isError()) {
                throw new VeniceException(createNewStore.getError());
            }
            TestUtils.createMetaSystemStore(controllerClient, uniqueString, Optional.of(LOGGER));
            controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setHybridRewindSeconds(10L).setHybridOffsetLagThreshold(10L));
        });
        String kafkaTopic = this.cluster.getNewVersion(uniqueString).getKafkaTopic();
        VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory(this.cluster.getKafka().getAddress());
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer = new VeniceAvroKafkaSerializer("\"int\"");
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer2 = new VeniceAvroKafkaSerializer("\"int\"");
        Map ingestionIsolationPropertyMap = ingestionMode.equals(IngestionMode.ISOLATED) ? TestUtils.getIngestionIsolationPropertyMap() : new HashMap();
        ingestionIsolationPropertyMap.put("freeze.ingestion.if.ready.to.serve.or.local.data.exists", true);
        Future[] futureArr = new Future[10];
        VeniceWriter createVeniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(kafkaTopic).setKeySerializer(veniceAvroKafkaSerializer).setValueSerializer(veniceAvroKafkaSerializer2).build());
        try {
            createVeniceWriter.broadcastStartOfPush(Collections.emptyMap());
            for (int i = 0; i < 10; i++) {
                futureArr[i] = createVeniceWriter.put(Integer.valueOf(i), Integer.valueOf(i), 1);
            }
            for (int i2 = 0; i2 < 10; i2++) {
                futureArr[i2].get();
            }
            createVeniceWriter.broadcastEndOfPush(Collections.emptyMap());
            if (createVeniceWriter != null) {
                createVeniceWriter.close();
            }
            DaVinciTestContext genericAvroDaVinciFactoryAndClientWithRetries = ServiceFactory.getGenericAvroDaVinciFactoryAndClientWithRetries(this.d2Client, new MetricsRepository(), Optional.empty(), this.cluster.getZk().getAddress(), uniqueString, new DaVinciConfig(), ingestionIsolationPropertyMap);
            CachingDaVinciClientFactory daVinciClientFactory = genericAvroDaVinciFactoryAndClientWithRetries.getDaVinciClientFactory();
            try {
                DaVinciClient daVinciClient = genericAvroDaVinciFactoryAndClientWithRetries.getDaVinciClient();
                try {
                    VeniceWriter createVeniceWriter2 = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(Version.composeRealTimeTopic(uniqueString)).setKeySerializer(veniceAvroKafkaSerializer).setValueSerializer(veniceAvroKafkaSerializer2).build());
                    try {
                        daVinciClient.subscribe(Collections.singleton(0)).get();
                        Future[] futureArr2 = new Future[10];
                        for (int i3 = 0; i3 < 10; i3++) {
                            futureArr2[i3] = createVeniceWriter2.put(Integer.valueOf(i3), Integer.valueOf(i3 * 1000), 1);
                        }
                        for (int i4 = 0; i4 < 10; i4++) {
                            futureArr2[i4].get();
                        }
                        try {
                            TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, false, true, () -> {
                                for (int i5 = 0; i5 < 10; i5++) {
                                    Assert.assertEquals(((Integer) daVinciClient.get(Integer.valueOf(i5)).get()).intValue(), i5 * 1000);
                                }
                            });
                            throw new VeniceException("Should not be able to read live updates.");
                        } catch (AssertionError e) {
                            if (createVeniceWriter2 != null) {
                                createVeniceWriter2.close();
                            }
                            if (daVinciClient != null) {
                                daVinciClient.close();
                            }
                            if (daVinciClientFactory != null) {
                                daVinciClientFactory.close();
                            }
                            DaVinciTestContext genericAvroDaVinciFactoryAndClientWithRetries2 = ServiceFactory.getGenericAvroDaVinciFactoryAndClientWithRetries(this.d2Client, new MetricsRepository(), Optional.empty(), this.cluster.getZk().getAddress(), uniqueString, new DaVinciConfig(), ingestionIsolationPropertyMap);
                            daVinciClientFactory = genericAvroDaVinciFactoryAndClientWithRetries2.getDaVinciClientFactory();
                            try {
                                DaVinciClient daVinciClient2 = genericAvroDaVinciFactoryAndClientWithRetries2.getDaVinciClient();
                                try {
                                    daVinciClient2.subscribeAll().get();
                                    for (int i5 = 0; i5 < 10; i5++) {
                                        Assert.assertEquals(((Integer) daVinciClient2.get(Integer.valueOf(i5)).get()).intValue(), i5);
                                    }
                                    try {
                                        TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, false, true, () -> {
                                            for (int i6 = 0; i6 < 10; i6++) {
                                                Assert.assertEquals(((Integer) daVinciClient2.get(Integer.valueOf(i6)).get()).intValue(), i6 * 1000);
                                            }
                                        });
                                        throw new VeniceException("Should not be able to read live updates.");
                                    } catch (AssertionError e2) {
                                        if (daVinciClient2 != null) {
                                            daVinciClient2.close();
                                        }
                                        if (daVinciClientFactory != null) {
                                            daVinciClientFactory.close();
                                        }
                                    }
                                } catch (Throwable th) {
                                    if (daVinciClient2 != null) {
                                        try {
                                            daVinciClient2.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    }
                                    throw th;
                                }
                            } finally {
                                if (daVinciClientFactory != null) {
                                    try {
                                        daVinciClientFactory.close();
                                    } catch (Throwable th3) {
                                        th.addSuppressed(th3);
                                    }
                                }
                            }
                        }
                    } catch (Throwable th4) {
                        if (createVeniceWriter2 != null) {
                            try {
                                createVeniceWriter2.close();
                            } catch (Throwable th5) {
                                th4.addSuppressed(th5);
                            }
                        }
                        throw th4;
                    }
                } catch (Throwable th6) {
                    if (daVinciClient != null) {
                        try {
                            daVinciClient.close();
                        } catch (Throwable th7) {
                            th6.addSuppressed(th7);
                        }
                    }
                    throw th6;
                }
            } catch (Throwable th8) {
                throw th8;
            }
        } catch (Throwable th9) {
            if (createVeniceWriter != null) {
                try {
                    createVeniceWriter.close();
                } catch (Throwable th10) {
                    th9.addSuppressed(th10);
                }
            }
            throw th9;
        }
    }
}
