package com.linkedin.venice.controller;

import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.meta.BackupStrategy;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.writer.VeniceWriter;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/controller/TestIncrementalPush.class */
public class TestIncrementalPush {
    private VeniceClusterWrapper cluster;
    private String storeName;
    private static final int PARTITION_SIZE = 1000;

    @BeforeClass(alwaysRun = true)
    public void setUpClass() {
        this.cluster = ServiceFactory.getVeniceCluster(new VeniceClusterCreateOptions.Builder().numberOfServers(3).replicationFactor(3).partitionSize(PARTITION_SIZE).build());
    }

    @AfterClass(alwaysRun = true)
    public void tearDownClass() {
        this.cluster.close();
    }

    @BeforeMethod(alwaysRun = true)
    public void setUp() {
        this.storeName = Utils.getUniqueString("testIncPushStore");
        this.cluster.getNewStore(this.storeName);
        UpdateStoreQueryParams updateStoreQueryParams = new UpdateStoreQueryParams();
        updateStoreQueryParams.setIncrementalPushEnabled(true).setHybridRewindSeconds(1L).setHybridOffsetLagThreshold(1L).setStorageQuotaInByte(2000L).setBackupStrategy(BackupStrategy.KEEP_MIN_VERSIONS).setNumVersionsToPreserve(3);
        this.cluster.updateStore(this.storeName, updateStoreQueryParams);
    }

    @AfterMethod(alwaysRun = true)
    public void cleanUp() {
        this.cluster.useControllerClient(controllerClient -> {
            controllerClient.deleteStore(this.storeName);
        });
    }

    @Test(timeOut = 120000)
    public void testGetOfflineStatusIncrementalPush() {
        VersionCreationResponse newVersion = this.cluster.getNewVersion(this.storeName);
        Assert.assertFalse(newVersion.isError());
        String kafkaTopic = newVersion.getKafkaTopic();
        VeniceWriter<String, String, byte[]> veniceWriter = this.cluster.getVeniceWriter(kafkaTopic);
        veniceWriter.broadcastStartOfPush(new HashMap());
        veniceWriter.broadcastEndOfPush(new HashMap());
        String str = "incPush1";
        veniceWriter.broadcastStartOfIncrementalPush("incPush1", new HashMap());
        veniceWriter.broadcastEndOfIncrementalPush("incPush1", new HashMap());
        TestUtils.waitForNonDeterministicCompletion(30L, TimeUnit.SECONDS, () -> {
            return this.cluster.getLeaderVeniceController().getVeniceAdmin().getOffLinePushStatus(this.cluster.getClusterName(), kafkaTopic, Optional.of(str), (String) null).getExecutionStatus().equals(ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED);
        });
        String kafkaTopic2 = this.cluster.getNewVersion(this.storeName).getKafkaTopic();
        VeniceWriter<String, String, byte[]> veniceWriter2 = this.cluster.getVeniceWriter(kafkaTopic2);
        veniceWriter2.broadcastStartOfPush(new HashMap());
        veniceWriter2.broadcastEndOfPush(new HashMap());
        this.cluster.useControllerClient(controllerClient -> {
            Assert.assertEquals(((List) Objects.requireNonNull(((StoreInfo) Objects.requireNonNull(TestUtils.assertCommand(controllerClient.getStore(this.storeName), "Store response should not be null").getStore(), "Store should not be null")).getVersions(), "Store versions should not be null")).size(), 2, "Store should have 2 versions");
        });
        TestUtils.waitForNonDeterministicCompletion(30L, TimeUnit.SECONDS, () -> {
            return this.cluster.getLeaderVeniceController().getVeniceAdmin().getOffLinePushStatus(this.cluster.getClusterName(), kafkaTopic2, Optional.of(str), (String) null).getExecutionStatus().equals(ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED);
        });
        String str2 = "incPush2";
        veniceWriter2.broadcastStartOfIncrementalPush("incPush2", new HashMap());
        TestUtils.waitForNonDeterministicCompletion(30L, TimeUnit.SECONDS, () -> {
            return this.cluster.getLeaderVeniceController().getVeniceAdmin().getOffLinePushStatus(this.cluster.getClusterName(), kafkaTopic2, Optional.of(str2), (String) null).getExecutionStatus().equals(ExecutionStatus.START_OF_INCREMENTAL_PUSH_RECEIVED);
        });
    }
}
