package com.linkedin.venice.endToEnd;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.davinci.client.DaVinciClient;
import com.linkedin.davinci.client.DaVinciConfig;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.controller.Admin;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.hadoop.VenicePushJob;
import com.linkedin.venice.integration.utils.D2TestUtils;
import com.linkedin.venice.integration.utils.DaVinciTestContext;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.pushstatushelper.PushStatusStoreReader;
import com.linkedin.venice.pushstatushelper.PushStatusStoreRecordDeleter;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.Closeable;
import java.io.File;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/endToEnd/PushStatusStoreTest.class */
public class PushStatusStoreTest {
    private static final Logger LOGGER = LogManager.getLogger(PushStatusStoreTest.class);
    private static final int TEST_TIMEOUT_MS = 60000;
    private static final int NUMBER_OF_SERVERS = 2;
    private static final int PARTITION_COUNT = 2;
    private static final int REPLICATION_FACTOR = 2;
    private VeniceClusterWrapper cluster;
    private ControllerClient controllerClient;
    private D2Client d2Client;
    private PushStatusStoreReader reader;
    private String storeName;
    private PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();

    @BeforeClass
    public void setUp() {
        Properties properties = new Properties();
        properties.setProperty("server.promotion.to.leader.replica.delay.seconds", Long.toString(1L));
        properties.setProperty("controller.server.incremental.push.use.push.status.store", String.valueOf(true));
        Utils.thisIsLocalhost();
        this.cluster = ServiceFactory.getVeniceCluster(1, 2, 1, 2, 10000, false, false, properties);
        this.controllerClient = this.cluster.getControllerClient();
        this.d2Client = D2TestUtils.getAndStartD2Client(this.cluster.getZk().getAddress());
        this.reader = new PushStatusStoreReader(this.d2Client, VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, TimeUnit.MINUTES.toSeconds(10L));
    }

    @AfterClass
    public void cleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.reader});
        D2ClientUtils.shutdownClient(this.d2Client);
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.controllerClient});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.cluster});
    }

    @BeforeMethod
    public void setUpStore() {
        this.storeName = Utils.getUniqueString("store");
        TestUtils.assertCommand(this.controllerClient.createNewStore(this.storeName, "test", "\"int\"", "\"string\""));
        this.cluster.createMetaSystemStore(this.storeName);
        TestUtils.assertCommand(this.controllerClient.updateStore(this.storeName, new UpdateStoreQueryParams().setStorageQuotaInByte(-1L).setPartitionCount(2).setAmplificationFactor(1).setIncrementalPushEnabled(true)));
        String systemStoreName = VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(this.storeName);
        VersionCreationResponse emptyPush = this.controllerClient.emptyPush(systemStoreName, "test_da_vinci_push_status_system_store_push_1", 10000L);
        Assert.assertFalse(emptyPush.isError(), "New version creation for Da Vinci push status system store: " + systemStoreName + " should success, but got error: " + emptyPush.getError());
        TestUtils.waitForNonDeterministicPushCompletion(emptyPush.getKafkaTopic(), this.controllerClient, 30L, TimeUnit.SECONDS);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = 120000)
    public void testKafkaPushJob(boolean z) throws Exception {
        Properties vPJProperties = getVPJProperties();
        runVPJ(vPJProperties, 1, this.cluster);
        Map ingestionIsolationPropertyMap = z ? TestUtils.getIngestionIsolationPropertyMap() : new HashMap();
        ingestionIsolationPropertyMap.put("client.use.system.store.repository", true);
        ingestionIsolationPropertyMap.put("client.system.store.repository.refresh.interval.seconds", 10);
        ingestionIsolationPropertyMap.put("push.status.store.enabled", true);
        DaVinciClient genericAvroDaVinciClientWithRetries = ServiceFactory.getGenericAvroDaVinciClientWithRetries(this.storeName, this.cluster.getZk().getAddress(), new DaVinciConfig(), ingestionIsolationPropertyMap);
        try {
            genericAvroDaVinciClientWithRetries.subscribeAll().get();
            runVPJ(vPJProperties, 2, this.cluster);
            TestUtils.waitForNonDeterministicAssertion(60000L, TimeUnit.MILLISECONDS, () -> {
                Assert.assertEquals(this.reader.getPartitionStatus(this.storeName, 2, 0, Optional.empty()).size(), 1);
            });
            if (genericAvroDaVinciClientWithRetries != null) {
                genericAvroDaVinciClientWithRetries.close();
            }
            Admin veniceAdmin = this.cluster.getVeniceControllers().get(0).getVeniceAdmin();
            String composeKafkaTopic = Version.composeKafkaTopic(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(this.storeName), 1);
            Assert.assertTrue(veniceAdmin.isResourceStillAlive(composeKafkaTopic));
            Assert.assertFalse(veniceAdmin.isTopicTruncated(composeKafkaTopic));
            TestUtils.assertCommand(this.controllerClient.disableAndDeleteStore(this.storeName));
            PubSubTopic topic = this.pubSubTopicRepository.getTopic(composeKafkaTopic);
            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                Assert.assertFalse(veniceAdmin.isResourceStillAlive(composeKafkaTopic));
                Assert.assertTrue(!veniceAdmin.getTopicManager().containsTopic(topic) || veniceAdmin.isTopicTruncated(composeKafkaTopic));
            });
        } catch (Throwable th) {
            if (genericAvroDaVinciClientWithRetries != null) {
                try {
                    genericAvroDaVinciClientWithRetries.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 60000)
    public void testIncrementalPush() throws Exception {
        VeniceProperties build = getBackendConfigBuilder().build();
        runVPJ(getVPJProperties(), 1, this.cluster);
        DaVinciClient genericAvroDaVinciClient = ServiceFactory.getGenericAvroDaVinciClient(this.storeName, this.cluster, new DaVinciConfig(), build);
        try {
            genericAvroDaVinciClient.subscribeAll().get();
            Properties vPJProperties = getVPJProperties();
            vPJProperties.setProperty("incremental.push", "true");
            runVPJ(vPJProperties, 1, this.cluster);
            Assert.assertEquals(genericAvroDaVinciClient.get(1).get().toString(), "name 1");
            if (genericAvroDaVinciClient != null) {
                genericAvroDaVinciClient.close();
            }
        } catch (Throwable th) {
            if (genericAvroDaVinciClient != null) {
                try {
                    genericAvroDaVinciClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 60000)
    public void testIncrementalPushStatusStoredInPushStatusStore() throws Exception {
        runVPJ(getVPJProperties(), 1, this.cluster);
        AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(this.storeName).setD2Client(this.d2Client).setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME));
        try {
            Properties vPJProperties = getVPJProperties();
            vPJProperties.setProperty("incremental.push", "true");
            long currentTimeMillis = System.currentTimeMillis();
            VenicePushJob venicePushJob = new VenicePushJob(Utils.getUniqueString("batch-job-1"), vPJProperties);
            try {
                venicePushJob.run();
                this.cluster.waitVersion(this.storeName, 1, this.controllerClient);
                LOGGER.info("**TIME** VPJ1 takes " + (System.currentTimeMillis() - currentTimeMillis));
                Assert.assertEquals(andStartGenericAvroClient.get(1).get().toString(), "name 1");
                Optional incrementalPushVersion = venicePushJob.getIncrementalPushVersion();
                for (int i = 0; i < 2; i++) {
                    Map partitionStatus = this.reader.getPartitionStatus(this.storeName, 1, i, incrementalPushVersion, Optional.of("SERVER_SIDE_INCREMENTAL_PUSH_STATUS"));
                    Assert.assertNotNull(partitionStatus);
                    Assert.assertEquals(partitionStatus.size(), 2);
                    Iterator it = partitionStatus.values().iterator();
                    while (it.hasNext()) {
                        Assert.assertTrue(ExecutionStatus.isIncrementalPushStatus(((Integer) it.next()).intValue()));
                    }
                }
                venicePushJob.close();
                if (andStartGenericAvroClient != null) {
                    andStartGenericAvroClient.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (andStartGenericAvroClient != null) {
                try {
                    andStartGenericAvroClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 60000)
    public void testIncrementalPushStatusReadingFromPushStatusStoreInController() throws Exception {
        Properties vPJProperties = getVPJProperties();
        runVPJ(vPJProperties, 1, this.cluster);
        AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(this.storeName).setD2Client(this.d2Client).setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME));
        try {
            vPJProperties.setProperty("incremental.push", "true");
            long currentTimeMillis = System.currentTimeMillis();
            VenicePushJob venicePushJob = new VenicePushJob(Utils.getUniqueString("batch-job-1"), vPJProperties);
            try {
                venicePushJob.run();
                this.cluster.waitVersion(this.storeName, 1, this.controllerClient);
                LOGGER.info("**TIME** VPJ1 takes " + (System.currentTimeMillis() - currentTimeMillis));
                Assert.assertEquals(andStartGenericAvroClient.get(1).get().toString(), "name 1");
                Optional incrementalPushVersion = venicePushJob.getIncrementalPushVersion();
                Map partitionStatuses = this.reader.getPartitionStatuses(this.storeName, 1, (String) incrementalPushVersion.get(), 2);
                Assert.assertNotNull(partitionStatuses, "Server incremental push status cannot be null");
                Assert.assertEquals(partitionStatuses.size(), 2, "Incremental push status of some partitions is missing");
                for (int i = 0; i < 2; i++) {
                    Map map = (Map) partitionStatuses.get(Integer.valueOf(i));
                    Assert.assertNotNull(map, "Push status of a partition is missing");
                    Iterator it = map.values().iterator();
                    while (it.hasNext()) {
                        Assert.assertEquals(((Integer) it.next()).intValue(), ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED.getValue());
                    }
                }
                Assert.assertEquals(this.controllerClient.queryJobStatus(venicePushJob.getTopicToMonitor(), Optional.of("randomIPVersion")).getStatus(), ExecutionStatus.NOT_CREATED.name());
                Assert.assertEquals(this.controllerClient.queryJobStatus(venicePushJob.getTopicToMonitor(), venicePushJob.getIncrementalPushVersion()).getStatus(), ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED.name());
                PushStatusStoreRecordDeleter pushStatusStoreRecordDeleter = new PushStatusStoreRecordDeleter(this.cluster.getLeaderVeniceController().getVeniceHelixAdmin().getVeniceWriterFactory());
                pushStatusStoreRecordDeleter.deletePartitionIncrementalPushStatus(this.storeName, 1, (String) incrementalPushVersion.get(), 1).get();
                TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, () -> {
                    Assert.assertEquals(this.controllerClient.queryJobStatus(venicePushJob.getTopicToMonitor(), venicePushJob.getIncrementalPushVersion()).getStatus(), ExecutionStatus.START_OF_INCREMENTAL_PUSH_RECEIVED.name());
                });
                pushStatusStoreRecordDeleter.deletePartitionIncrementalPushStatus(this.storeName, 1, (String) incrementalPushVersion.get(), 0).get();
                TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, () -> {
                    Assert.assertEquals(this.controllerClient.queryJobStatus(venicePushJob.getTopicToMonitor(), venicePushJob.getIncrementalPushVersion()).getStatus(), ExecutionStatus.NOT_CREATED.name());
                });
                venicePushJob.close();
                if (andStartGenericAvroClient != null) {
                    andStartGenericAvroClient.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (andStartGenericAvroClient != null) {
                try {
                    andStartGenericAvroClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 60000)
    public void testAutomaticPurge() throws Exception {
        VeniceProperties build = getBackendConfigBuilder().build();
        Properties vPJProperties = getVPJProperties();
        runVPJ(vPJProperties, 1, this.cluster);
        DaVinciClient genericAvroDaVinciClient = ServiceFactory.getGenericAvroDaVinciClient(this.storeName, this.cluster, new DaVinciConfig(), build);
        try {
            genericAvroDaVinciClient.subscribeAll().get();
            TestUtils.waitForNonDeterministicAssertion(60000L, TimeUnit.MILLISECONDS, () -> {
                Assert.assertEquals(this.reader.getPartitionStatus(this.storeName, 1, 0, Optional.empty()).size(), 1);
            });
            runVPJ(vPJProperties, 2, this.cluster);
            runVPJ(vPJProperties, 3, this.cluster);
            TestUtils.waitForNonDeterministicAssertion(60000L, TimeUnit.MILLISECONDS, () -> {
                Assert.assertEquals(this.reader.getPartitionStatus(this.storeName, 1, 0, Optional.empty()).size(), 0);
            });
            if (genericAvroDaVinciClient != null) {
                genericAvroDaVinciClient.close();
            }
        } catch (Throwable th) {
            if (genericAvroDaVinciClient != null) {
                try {
                    genericAvroDaVinciClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private PropertyBuilder getBackendConfigBuilder() {
        return DaVinciTestContext.getDaVinciPropertyBuilder(this.cluster.getZk().getAddress()).put("push.status.store.enabled", true);
    }

    private Properties getVPJProperties() throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        String str = "file://" + tempDataDirectory.getAbsolutePath();
        TestWriteUtils.writeSimpleAvroFileWithIntToStringSchema(tempDataDirectory, true);
        return IntegrationTestPushUtils.defaultVPJProps(this.cluster, str, this.storeName);
    }

    private void runVPJ(Properties properties, int i, VeniceClusterWrapper veniceClusterWrapper) {
        long currentTimeMillis = System.currentTimeMillis();
        VenicePushJob venicePushJob = new VenicePushJob(Utils.getUniqueString("batch-job-" + i), properties);
        try {
            venicePushJob.run();
            veniceClusterWrapper.waitVersion((String) properties.get("venice.store.name"), i, this.controllerClient);
            LOGGER.info("**TIME** VPJ" + i + " takes " + (System.currentTimeMillis() - currentTimeMillis));
            venicePushJob.close();
        } catch (Throwable th) {
            try {
                venicePushJob.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
