package com.linkedin.venice.endToEnd;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.d2.balancer.D2ClientBuilder;
import com.linkedin.davinci.client.DaVinciClient;
import com.linkedin.davinci.client.DaVinciConfig;
import com.linkedin.davinci.client.StorageClass;
import com.linkedin.davinci.client.factory.CachingDaVinciClientFactory;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.controllerapi.NewStoreResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.meta.IngestionMode;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import io.tehuti.Metric;
import io.tehuti.metrics.MetricsRepository;
import java.io.Closeable;
import java.io.File;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.samza.system.SystemProducer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/endToEnd/DaVinciClientMemoryLimitTest.class */
public class DaVinciClientMemoryLimitTest {
    private static final int TEST_TIMEOUT = 120000;
    private VeniceClusterWrapper venice;
    private D2Client d2Client;

    @BeforeClass
    public void setUp() {
        Utils.thisIsLocalhost();
        Properties properties = new Properties();
        properties.put("server.promotion.to.leader.replica.delay.seconds", 10L);
        properties.put("offline.push.monitor.davinci.push.status.scan.no.davinci.status.report.retry.max.attempts", 5);
        this.venice = ServiceFactory.getVeniceCluster(1, 2, 1, 1, 100, false, false, properties);
        this.d2Client = new D2ClientBuilder().setZkHosts(this.venice.getZk().getAddress()).setZkSessionTimeout(3L, TimeUnit.SECONDS).setZkStartupTimeout(3L, TimeUnit.SECONDS).build();
        D2ClientUtils.startClient(this.d2Client);
    }

    @AfterClass
    public void cleanUp() {
        if (this.d2Client != null) {
            D2ClientUtils.shutdownClient(this.d2Client);
        }
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.venice});
    }

    private VeniceProperties getDaVinciBackendConfig(boolean z) {
        String absolutePath = Utils.getTempDataDirectory().getAbsolutePath();
        PropertyBuilder propertyBuilder = new PropertyBuilder();
        propertyBuilder.put("client.use.system.store.repository", true).put("client.system.store.repository.refresh.interval.seconds", 1).put("data.base.path", absolutePath).put("persistence.type", PersistenceType.ROCKS_DB).put("push.status.store.enabled", true).put("r2d2Client.zkHosts", this.venice.getZk().getAddress()).put("cluster.discovery.d2.service", VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME).put("rocksdb.memtable.size.in.bytes", "2MB").put("rocksdb.total.memtable.usage.cap.in.bytes", "10MB");
        if (z) {
            propertyBuilder.put("server.ingestion.mode", IngestionMode.ISOLATED);
            propertyBuilder.put("server.ingestion.isolation.application.port", Integer.valueOf(Utils.getFreePort()));
            propertyBuilder.put("server.ingestion.isolation.service.port", Integer.valueOf(Utils.getFreePort()));
            propertyBuilder.put("server.forked.process.jvm.arg.list", "-Xms256M;-Xmx256M");
            propertyBuilder.put("ingestion.memory.limit", "296MB");
        } else {
            propertyBuilder.put("ingestion.memory.limit", "30MB");
        }
        return propertyBuilder.build();
    }

    @Test(timeOut = 120000, dataProviderClass = DataProviderUtils.class, dataProvider = "True-and-False")
    public void testDaVinciMemoryLimitShouldFailLargeDataPush(boolean z) throws Exception {
        String uniqueString = Utils.getUniqueString("davinci_memory_limit_test");
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        String str = "file://" + tempDataDirectory.getAbsolutePath();
        Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory, true, 100, 100);
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.venice, str, uniqueString);
        ControllerClient createStoreForJob = IntegrationTestPushUtils.createStoreForJob(this.venice.getClusterName(), writeSimpleAvroFileWithUserSchema, defaultVPJProps);
        try {
            AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.venice.getRandomRouterURL()));
            try {
                this.venice.createMetaSystemStore(uniqueString);
                this.venice.createPushStatusSystemStore(uniqueString);
                StoreResponse store = createStoreForJob.getStore(uniqueString);
                Assert.assertFalse(store.isError(), "Store response receives an error: " + store.getError());
                Assert.assertTrue(store.getStore().isDaVinciPushStatusStoreEnabled());
                IntegrationTestPushUtils.runVPJ(defaultVPJProps, 1, createStoreForJob);
                TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, () -> {
                    for (int i = 1; i <= 100; i++) {
                        try {
                            Assert.assertNotNull(andStartGenericAvroClient.get(Integer.toString(i)).get(), "Key " + i + " should not be missing!");
                        } catch (Exception e) {
                            throw new VeniceException(e);
                        }
                    }
                });
                CachingDaVinciClientFactory cachingDaVinciClientFactory = new CachingDaVinciClientFactory(this.d2Client, VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, new MetricsRepository(), getDaVinciBackendConfig(z));
                try {
                    DaVinciClient genericAvroClient = cachingDaVinciClientFactory.getGenericAvroClient(uniqueString, new DaVinciConfig().setIsolated(true).setStorageClass(StorageClass.MEMORY_BACKED_BY_DISK));
                    genericAvroClient.start();
                    genericAvroClient.subscribeAll().get(30L, TimeUnit.SECONDS);
                    for (int i = 1; i <= 100; i++) {
                        Assert.assertNotNull(genericAvroClient.get(Integer.toString(i)).get(), "Key " + i + " should not be missing!");
                    }
                    File tempDataDirectory2 = TestWriteUtils.getTempDataDirectory();
                    String str2 = "file://" + tempDataDirectory2.getAbsolutePath();
                    TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory2, true, 1000, 100000);
                    Properties defaultVPJProps2 = IntegrationTestPushUtils.defaultVPJProps(this.venice, str2, uniqueString);
                    Assert.assertTrue(Assert.expectThrows(VeniceException.class, () -> {
                        IntegrationTestPushUtils.runVPJ(defaultVPJProps2, 2, createStoreForJob);
                    }).getMessage().contains("Found a failed partition replica in Da Vinci"));
                    cachingDaVinciClientFactory.close();
                    if (andStartGenericAvroClient != null) {
                        andStartGenericAvroClient.close();
                    }
                    if (createStoreForJob != null) {
                        createStoreForJob.close();
                    }
                } catch (Throwable th) {
                    try {
                        cachingDaVinciClientFactory.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (createStoreForJob != null) {
                try {
                    createStoreForJob.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test(timeOut = 120000, dataProviderClass = DataProviderUtils.class, dataProvider = "True-and-False")
    public void testDaVinciMemoryLimitShouldFailLargeDataPushAndResumeHybridStore(boolean z) throws Exception {
        String uniqueString = Utils.getUniqueString("davinci_memory_limit_test_batch_only");
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        String str = "file://" + tempDataDirectory.getAbsolutePath();
        Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory, true, 100, 100);
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.venice, str, uniqueString);
        ControllerClient createStoreForJob = IntegrationTestPushUtils.createStoreForJob(this.venice.getClusterName(), writeSimpleAvroFileWithUserSchema, defaultVPJProps);
        try {
            AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.venice.getRandomRouterURL()));
            try {
                this.venice.createMetaSystemStore(uniqueString);
                this.venice.createPushStatusSystemStore(uniqueString);
                StoreResponse store = createStoreForJob.getStore(uniqueString);
                Assert.assertFalse(store.isError(), "Store response receives an error: " + store.getError());
                Assert.assertTrue(store.getStore().isDaVinciPushStatusStoreEnabled());
                String uniqueString2 = Utils.getUniqueString("davinci_memory_limit_test_hybrid");
                NewStoreResponse createNewStore = createStoreForJob.createNewStore(uniqueString2, "test_owner", "\"string\"", "\"string\"");
                Assert.assertFalse(createNewStore.isError(), "Received error when creating a store: " + createNewStore.getError());
                ControllerResponse updateStore = createStoreForJob.updateStore(uniqueString2, new UpdateStoreQueryParams().setHybridRewindSeconds(60L).setHybridOffsetLagThreshold(1L));
                Assert.assertFalse(updateStore.isError(), "Received error when converting a hybrid store: " + updateStore.getError());
                this.venice.createMetaSystemStore(uniqueString2);
                this.venice.createPushStatusSystemStore(uniqueString2);
                ControllerResponse sendEmptyPushAndWait = createStoreForJob.sendEmptyPushAndWait(uniqueString2, "test_hybrid_push_v1", 104857600L, 30000L);
                Assert.assertFalse(sendEmptyPushAndWait.isError(), "Failed to empty push to the hybrid store: " + sendEmptyPushAndWait.getError());
                IntegrationTestPushUtils.runVPJ(defaultVPJProps, 1, createStoreForJob);
                TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, () -> {
                    for (int i = 1; i <= 100; i++) {
                        try {
                            Assert.assertNotNull(andStartGenericAvroClient.get(Integer.toString(i)).get(), "Key " + i + " should not be missing!");
                        } catch (Exception e) {
                            throw new VeniceException(e);
                        }
                    }
                });
                CachingDaVinciClientFactory cachingDaVinciClientFactory = new CachingDaVinciClientFactory(this.d2Client, VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, new MetricsRepository(), getDaVinciBackendConfig(z));
                try {
                    DaVinciClient genericAvroClient = cachingDaVinciClientFactory.getGenericAvroClient(uniqueString, new DaVinciConfig().setStorageClass(StorageClass.MEMORY_BACKED_BY_DISK));
                    genericAvroClient.start();
                    genericAvroClient.subscribeAll().get(30L, TimeUnit.SECONDS);
                    for (int i = 1; i <= 100; i++) {
                        Assert.assertNotNull(genericAvroClient.get(Integer.toString(i)).get(), "Key " + i + " should not be missing!");
                    }
                    DaVinciClient genericAvroClient2 = cachingDaVinciClientFactory.getGenericAvroClient(uniqueString2, new DaVinciConfig().setStorageClass(StorageClass.MEMORY_BACKED_BY_DISK));
                    genericAvroClient2.start();
                    genericAvroClient2.subscribeAll().get(30L, TimeUnit.SECONDS);
                    SystemProducer samzaProducer = IntegrationTestPushUtils.getSamzaProducer(this.venice, uniqueString2, Version.PushType.STREAM, new Pair[0]);
                    int i2 = 0;
                    while (i2 <= 100) {
                        IntegrationTestPushUtils.sendCustomSizeStreamingRecord(samzaProducer, uniqueString2, i2, 1000);
                        i2++;
                    }
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                        for (int i3 = 0; i3 <= 100; i3++) {
                            try {
                                Assert.assertNotNull(genericAvroClient2.get(Integer.toString(i3)).get(), "Value for key: " + i3 + " shouldn't be null");
                            } catch (Exception e) {
                                throw new VeniceException(e);
                            }
                        }
                    });
                    File tempDataDirectory2 = TestWriteUtils.getTempDataDirectory();
                    String str2 = "file://" + tempDataDirectory2.getAbsolutePath();
                    TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory2, true, 1000, 100000);
                    Properties defaultVPJProps2 = IntegrationTestPushUtils.defaultVPJProps(this.venice, str2, uniqueString);
                    Assert.assertTrue(Assert.expectThrows(VeniceException.class, () -> {
                        IntegrationTestPushUtils.runVPJ(defaultVPJProps2, 2, createStoreForJob);
                    }).getMessage().contains("Found a failed partition replica in Da Vinci"));
                    while (i2 < 200) {
                        IntegrationTestPushUtils.sendCustomSizeStreamingRecord(samzaProducer, uniqueString2, i2, 1000);
                        i2++;
                    }
                    TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, () -> {
                        for (int i3 = 0; i3 < 200; i3++) {
                            try {
                                Assert.assertNotNull(genericAvroClient2.get(Integer.toString(i3)).get(), "Value for key: " + i3 + " shouldn't be null");
                            } catch (Exception e) {
                                throw new VeniceException(e);
                            }
                        }
                    });
                    cachingDaVinciClientFactory.close();
                    if (andStartGenericAvroClient != null) {
                        andStartGenericAvroClient.close();
                    }
                    if (createStoreForJob != null) {
                        createStoreForJob.close();
                    }
                } catch (Throwable th) {
                    try {
                        cachingDaVinciClientFactory.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (createStoreForJob != null) {
                try {
                    createStoreForJob.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test(timeOut = 120000, dataProviderClass = DataProviderUtils.class, dataProvider = "True-and-False")
    public void testHybridStoreHittingMemoryLimiterShouldResumeAfterFreeUpResource(boolean z) throws Exception {
        String uniqueString = Utils.getUniqueString("davinci_memory_limit_test_batch_only");
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        String str = "file://" + tempDataDirectory.getAbsolutePath();
        Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory, true, 190, 100000);
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.venice, str, uniqueString);
        ControllerClient createStoreForJob = IntegrationTestPushUtils.createStoreForJob(this.venice.getClusterName(), writeSimpleAvroFileWithUserSchema, defaultVPJProps);
        try {
            AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.venice.getRandomRouterURL()));
            try {
                this.venice.createMetaSystemStore(uniqueString);
                this.venice.createPushStatusSystemStore(uniqueString);
                StoreResponse store = createStoreForJob.getStore(uniqueString);
                Assert.assertFalse(store.isError(), "Store response receives an error: " + store.getError());
                Assert.assertTrue(store.getStore().isDaVinciPushStatusStoreEnabled());
                String uniqueString2 = Utils.getUniqueString("davinci_memory_limit_test_hybrid");
                NewStoreResponse createNewStore = createStoreForJob.createNewStore(uniqueString2, "test_owner", "\"string\"", "\"string\"");
                Assert.assertFalse(createNewStore.isError(), "Received error when creating a store: " + createNewStore.getError());
                ControllerResponse updateStore = createStoreForJob.updateStore(uniqueString2, new UpdateStoreQueryParams().setHybridRewindSeconds(60L).setHybridOffsetLagThreshold(1L));
                Assert.assertFalse(updateStore.isError(), "Received error when converting a hybrid store: " + updateStore.getError());
                this.venice.createMetaSystemStore(uniqueString2);
                this.venice.createPushStatusSystemStore(uniqueString2);
                ControllerResponse sendEmptyPushAndWait = createStoreForJob.sendEmptyPushAndWait(uniqueString2, "test_hybrid_push_v1", 104857600L, 30000L);
                Assert.assertFalse(sendEmptyPushAndWait.isError(), "Failed to empty push to the hybrid store: " + sendEmptyPushAndWait.getError());
                IntegrationTestPushUtils.runVPJ(defaultVPJProps, 1, createStoreForJob);
                TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, () -> {
                    for (int i = 1; i <= 150; i++) {
                        try {
                            Assert.assertNotNull(andStartGenericAvroClient.get(Integer.toString(i)).get(), "Key " + i + " should not be missing!");
                        } catch (Exception e) {
                            throw new VeniceException(e);
                        }
                    }
                });
                VeniceProperties daVinciBackendConfig = getDaVinciBackendConfig(z);
                MetricsRepository metricsRepository = new MetricsRepository();
                CachingDaVinciClientFactory cachingDaVinciClientFactory = new CachingDaVinciClientFactory(this.d2Client, VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, metricsRepository, daVinciBackendConfig);
                try {
                    DaVinciClient genericAvroClient = cachingDaVinciClientFactory.getGenericAvroClient(uniqueString, new DaVinciConfig().setStorageClass(StorageClass.MEMORY_BACKED_BY_DISK));
                    genericAvroClient.start();
                    genericAvroClient.subscribeAll().get(30L, TimeUnit.SECONDS);
                    for (int i = 1; i <= 150; i++) {
                        Assert.assertNotNull(genericAvroClient.get(Integer.toString(i)).get(), "Key " + i + " should not be missing!");
                    }
                    DaVinciClient genericAvroClient2 = cachingDaVinciClientFactory.getGenericAvroClient(uniqueString2, new DaVinciConfig().setStorageClass(StorageClass.MEMORY_BACKED_BY_DISK));
                    genericAvroClient2.start();
                    genericAvroClient2.subscribeAll().get(30L, TimeUnit.SECONDS);
                    SystemProducer samzaProducer = IntegrationTestPushUtils.getSamzaProducer(this.venice, uniqueString2, Version.PushType.STREAM, new Pair[0]);
                    for (int i2 = 0; i2 < 100; i2++) {
                        IntegrationTestPushUtils.sendCustomSizeStreamingRecord(samzaProducer, uniqueString2, i2, 100000);
                    }
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                        Assert.assertEquals(Double.valueOf(((Metric) metricsRepository.metrics().get("." + uniqueString2 + "--ingestion_stuck_by_memory_constraint.Gauge")).value()), Double.valueOf(1.0d));
                        Assert.assertEquals(Double.valueOf(((Metric) metricsRepository.metrics().get(".total--ingestion_stuck_by_memory_constraint.Gauge")).value()), Double.valueOf(1.0d));
                        Assert.assertEquals(Double.valueOf(((Metric) metricsRepository.metrics().get("." + uniqueString + "--ingestion_stuck_by_memory_constraint.Gauge")).value()), Double.valueOf(0.0d));
                    });
                    genericAvroClient.unsubscribeAll();
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                        Assert.assertEquals(Double.valueOf(((Metric) metricsRepository.metrics().get("." + uniqueString2 + "--ingestion_stuck_by_memory_constraint.Gauge")).value()), Double.valueOf(0.0d));
                        Assert.assertEquals(Double.valueOf(((Metric) metricsRepository.metrics().get(".total--ingestion_stuck_by_memory_constraint.Gauge")).value()), Double.valueOf(0.0d));
                        Assert.assertEquals(Double.valueOf(((Metric) metricsRepository.metrics().get("." + uniqueString + "--ingestion_stuck_by_memory_constraint.Gauge")).value()), Double.valueOf(0.0d));
                    });
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                        for (int i3 = 0; i3 < 100; i3++) {
                            try {
                                Assert.assertNotNull(genericAvroClient2.get(Integer.toString(i3)).get(), "Value for key: " + i3 + " shouldn't be null");
                            } catch (Exception e) {
                                throw new VeniceException(e);
                            }
                        }
                    });
                    cachingDaVinciClientFactory.close();
                    if (andStartGenericAvroClient != null) {
                        andStartGenericAvroClient.close();
                    }
                    if (createStoreForJob != null) {
                        createStoreForJob.close();
                    }
                } catch (Throwable th) {
                    try {
                        cachingDaVinciClientFactory.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (createStoreForJob != null) {
                try {
                    createStoreForJob.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }
}
