package com.linkedin.venice.endToEnd;

import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.AvroSpecificStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.VenicePushJob;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.status.PushJobDetailsStatus;
import com.linkedin.venice.status.protocol.PushJobDetails;
import com.linkedin.venice.status.protocol.PushJobDetailsStatusTuple;
import com.linkedin.venice.status.protocol.PushJobStatusRecordKey;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/endToEnd/PushJobDetailsTest.class */
public class PushJobDetailsTest {
    private final Map<Integer, Schema> schemaVersionMap = new HashMap();
    private static final int latestSchemaId = 2;
    private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper;
    private VeniceClusterWrapper childRegionClusterWrapper;
    private ControllerClient controllerClient;
    private ControllerClient parentControllerClient;
    private Schema recordSchema;
    private String inputDirPath;

    @BeforeClass
    public void setUp() throws IOException {
        Properties properties = new Properties();
        properties.setProperty("rocksdb.plain.table.format.enabled", "false");
        properties.setProperty("server.database.checksum.verification.enabled", "true");
        properties.setProperty("server.database.sync.bytes.interval.for.deferred.write.mode", "300");
        Properties properties2 = new Properties();
        properties2.setProperty("controller.push.job.status.store.cluster.name", "venice-cluster0");
        this.multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(1, 1, 1, 1, 1, 1, 1, Optional.of(new VeniceProperties(properties2)), Optional.empty(), Optional.of(new VeniceProperties(properties)), false);
        String str = this.multiRegionMultiClusterWrapper.getClusterNames()[0];
        VeniceMultiClusterWrapper veniceMultiClusterWrapper = this.multiRegionMultiClusterWrapper.getChildRegions().get(0);
        this.childRegionClusterWrapper = veniceMultiClusterWrapper.getClusters().get(str);
        this.controllerClient = new ControllerClient(str, veniceMultiClusterWrapper.getControllerConnectString());
        this.parentControllerClient = new ControllerClient(str, this.multiRegionMultiClusterWrapper.getControllerConnectString());
        TestUtils.waitForNonDeterministicPushCompletion(Version.composeKafkaTopic(VeniceSystemStoreUtils.getPushJobDetailsStoreName(), 1), this.controllerClient, 2L, TimeUnit.MINUTES);
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        this.inputDirPath = "file://" + tempDataDirectory.getAbsolutePath();
        this.recordSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory, false);
        for (int i = 1; i <= latestSchemaId; i++) {
            this.schemaVersionMap.put(Integer.valueOf(i), Utils.getSchemaFromResource("avro/PushJobDetails/v" + i + "/PushJobDetails.avsc"));
        }
    }

    @AfterClass
    public void cleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.parentControllerClient});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.multiRegionMultiClusterWrapper});
    }

    @Test(timeOut = 60000)
    public void testPushJobDetails() throws ExecutionException, InterruptedException, IOException {
        this.parentControllerClient.createNewStore("test-push-store", "test-user", this.recordSchema.getField("key").schema().toString(), this.recordSchema.getField("value").schema().toString());
        this.parentControllerClient.updateStore("test-push-store", new UpdateStoreQueryParams().setStorageQuotaInByte(-1L).setPartitionCount(latestSchemaId));
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.multiRegionMultiClusterWrapper, this.inputDirPath, "test-push-store");
        defaultVPJProps.setProperty("push.job.status.upload.enable", String.valueOf(true));
        VenicePushJob venicePushJob = new VenicePushJob("test-push-job-details-job", defaultVPJProps);
        try {
            venicePushJob.run();
            venicePushJob.close();
            AvroSpecificStoreClient andStartSpecificAvroClient = ClientFactory.getAndStartSpecificAvroClient(ClientConfig.defaultSpecificClientConfig(VeniceSystemStoreUtils.getPushJobDetailsStoreName(), PushJobDetails.class).setVeniceURL(this.childRegionClusterWrapper.getRandomRouterURL()));
            try {
                PushJobStatusRecordKey pushJobStatusRecordKey = new PushJobStatusRecordKey();
                pushJobStatusRecordKey.storeName = "test-push-store";
                pushJobStatusRecordKey.versionNumber = 1;
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                    try {
                        Assert.assertNotNull(andStartSpecificAvroClient.get(pushJobStatusRecordKey).get(), "RT writes are not reflected in store yet");
                    } catch (Exception e) {
                        Assert.fail("Unexpected exception thrown while reading from the venice store", e);
                    }
                });
                PushJobDetails pushJobDetails = (PushJobDetails) andStartSpecificAvroClient.get(pushJobStatusRecordKey).get();
                Assert.assertEquals(pushJobDetails.clusterName.toString(), this.childRegionClusterWrapper.getClusterName(), "Unexpected cluster name from push job details");
                Assert.assertTrue(pushJobDetails.reportTimestamp > 0, "Push job details report timestamp is missing");
                List asList = Arrays.asList(Integer.valueOf(PushJobDetailsStatus.STARTED.getValue()), Integer.valueOf(PushJobDetailsStatus.TOPIC_CREATED.getValue()), Integer.valueOf(PushJobDetailsStatus.WRITE_COMPLETED.getValue()), Integer.valueOf(PushJobDetailsStatus.COMPLETED.getValue()));
                Assert.assertEquals(pushJobDetails.overallStatus.size(), asList.size(), "Unexpected number of overall statuses in push job details");
                for (int i = 0; i < asList.size(); i++) {
                    Assert.assertEquals(((PushJobDetailsStatusTuple) pushJobDetails.overallStatus.get(i)).status, ((Integer) asList.get(i)).intValue());
                    Assert.assertTrue(((PushJobDetailsStatusTuple) pushJobDetails.overallStatus.get(i)).timestamp > 0, "Timestamp for status tuple is missing");
                }
                Assert.assertFalse(pushJobDetails.coloStatus.isEmpty(), "Region status shouldn't be empty");
                for (List list : pushJobDetails.coloStatus.values()) {
                    Assert.assertEquals(((PushJobDetailsStatusTuple) list.get(list.size() - 1)).status, PushJobDetailsStatus.COMPLETED.getValue(), "Latest status for every region should be COMPLETED");
                    Assert.assertTrue(((PushJobDetailsStatusTuple) list.get(list.size() - 1)).timestamp > 0, "Timestamp for region status tuple is missing");
                }
                Assert.assertTrue(pushJobDetails.jobDurationInMs > 0);
                Assert.assertTrue(pushJobDetails.totalNumberOfRecords > 0);
                Assert.assertTrue(pushJobDetails.totalKeyBytes > 0);
                Assert.assertTrue(pushJobDetails.totalRawValueBytes > 0);
                Assert.assertTrue(pushJobDetails.totalCompressedValueBytes > 0);
                Assert.assertNotNull(pushJobDetails.pushJobConfigs);
                Assert.assertFalse(pushJobDetails.pushJobConfigs.isEmpty());
                Assert.assertNotNull(pushJobDetails.producerConfigs);
                Assert.assertTrue(pushJobDetails.producerConfigs.isEmpty());
                if (andStartSpecificAvroClient != null) {
                    andStartSpecificAvroClient.close();
                }
                AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig("test-push-store").setVeniceURL(this.childRegionClusterWrapper.getRandomRouterURL()));
                try {
                    TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, () -> {
                        for (int i2 = 1; i2 < 100; i2++) {
                            try {
                                String valueOf = String.valueOf(i2);
                                Object obj = andStartGenericAvroClient.get(valueOf).get();
                                Assert.assertNotNull(obj, "Key " + i2 + " should not be missing!");
                                Assert.assertEquals(obj.toString(), "test_name_" + valueOf);
                            } catch (Exception e) {
                                throw new VeniceException(e);
                            }
                        }
                    });
                    if (andStartGenericAvroClient != null) {
                        andStartGenericAvroClient.close();
                    }
                } catch (Throwable th) {
                    if (andStartGenericAvroClient != null) {
                        try {
                            andStartGenericAvroClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (andStartSpecificAvroClient != null) {
                    try {
                        andStartSpecificAvroClient.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        } catch (Throwable th5) {
            try {
                venicePushJob.close();
            } catch (Throwable th6) {
                th5.addSuppressed(th6);
            }
            throw th5;
        }
    }

    @Test(timeOut = 60000)
    public void testPushJobDetailsFailureTags() throws ExecutionException, InterruptedException {
        this.parentControllerClient.createNewStore("test-push-failure-store", "test-user", this.recordSchema.getField("key").schema().toString(), this.recordSchema.getField("value").schema().toString());
        this.parentControllerClient.updateStore("test-push-failure-store", new UpdateStoreQueryParams().setStorageQuotaInByte(0L));
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.multiRegionMultiClusterWrapper, this.inputDirPath, "test-push-failure-store");
        defaultVPJProps.setProperty("push.job.status.upload.enable", String.valueOf(true));
        VenicePushJob venicePushJob = new VenicePushJob("test-push-job-details-job", defaultVPJProps);
        try {
            Objects.requireNonNull(venicePushJob);
            Assert.assertThrows(VeniceException.class, venicePushJob::run);
            venicePushJob.close();
            AvroSpecificStoreClient andStartSpecificAvroClient = ClientFactory.getAndStartSpecificAvroClient(ClientConfig.defaultSpecificClientConfig(VeniceSystemStoreUtils.getPushJobDetailsStoreName(), PushJobDetails.class).setVeniceURL(this.childRegionClusterWrapper.getRandomRouterURL()));
            try {
                PushJobStatusRecordKey pushJobStatusRecordKey = new PushJobStatusRecordKey();
                pushJobStatusRecordKey.storeName = "test-push-failure-store";
                pushJobStatusRecordKey.versionNumber = 1;
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                    try {
                        Assert.assertNotNull(andStartSpecificAvroClient.get(pushJobStatusRecordKey).get(), "RT writes are not reflected in store yet");
                    } catch (Exception e) {
                        Assert.fail("Unexpected exception thrown while reading from the venice store", e);
                    }
                });
                PushJobDetails pushJobDetails = (PushJobDetails) andStartSpecificAvroClient.get(pushJobStatusRecordKey).get();
                Assert.assertEquals(pushJobDetails.pushJobLatestCheckpoint.intValue(), VenicePushJob.PushJobCheckpoints.START_MAP_REDUCE_JOB.getValue(), "Unexpected latest push job checkpoint reported");
                Assert.assertFalse(pushJobDetails.failureDetails.toString().isEmpty());
                if (andStartSpecificAvroClient != null) {
                    andStartSpecificAvroClient.close();
                }
            } catch (Throwable th) {
                if (andStartSpecificAvroClient != null) {
                    try {
                        andStartSpecificAvroClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            try {
                venicePushJob.close();
            } catch (Throwable th4) {
                th3.addSuppressed(th4);
            }
            throw th3;
        }
    }

    @Test(timeOut = 60000)
    public void testPushJobDetailsStatusEnums() {
        HashSet hashSet = new HashSet(Arrays.asList(ExecutionStatus.NEW, ExecutionStatus.NOT_STARTED, ExecutionStatus.PROGRESS, ExecutionStatus.START_OF_BUFFER_REPLAY_RECEIVED, ExecutionStatus.TOPIC_SWITCH_RECEIVED, ExecutionStatus.DROPPED, ExecutionStatus.WARNING, ExecutionStatus.ARCHIVED, ExecutionStatus.CATCH_UP_BASE_TOPIC_OFFSET_LAG, ExecutionStatus.DATA_RECOVERY_COMPLETED));
        HashSet hashSet2 = new HashSet();
        for (ExecutionStatus executionStatus : ExecutionStatus.values()) {
            if (!hashSet.contains(executionStatus)) {
                Integer valueOf = Integer.valueOf(PushJobDetailsStatus.valueOf(executionStatus.toString()).getValue());
                Assert.assertFalse(hashSet2.contains(valueOf), "Each PushJobDetailsStatus should have its own unique int value");
                hashSet2.add(valueOf);
            }
        }
    }
}
