package com.linkedin.venice.restart;

import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.writer.VeniceWriter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:com/linkedin/venice/restart/TestRestartController.class */
public class TestRestartController {
    private static final int OPERATION_TIMEOUT_MS = 3000;
    private VeniceClusterWrapper cluster;

    @BeforeMethod
    public void setUp() {
        this.cluster = ServiceFactory.getVeniceCluster(2, 1, 1);
    }

    @AfterMethod
    public void cleanUp() {
        this.cluster.close();
    }

    @Test(timeOut = 60000)
    public void testLeaderControllerFailover() {
        String uniqueString = Utils.getUniqueString("testLeaderControllerFailover");
        this.cluster.getNewStore(uniqueString);
        VersionCreationResponse newVersion = this.cluster.getNewVersion(uniqueString);
        Assert.assertFalse(newVersion.isError());
        String kafkaTopic = newVersion.getKafkaTopic();
        int version = newVersion.getVersion();
        VeniceWriter<String, String, byte[]> veniceWriter = this.cluster.getVeniceWriter(kafkaTopic);
        ControllerClient constructClusterControllerClient = ControllerClient.constructClusterControllerClient(this.cluster.getClusterName(), this.cluster.getAllControllersURLs());
        Assert.assertEquals(constructClusterControllerClient.queryJobStatus(kafkaTopic).getStatus(), ExecutionStatus.STARTED.toString());
        veniceWriter.broadcastStartOfPush(new HashMap());
        veniceWriter.put("1", "1", 1);
        int stopLeaderVeniceController = this.cluster.stopLeaderVeniceController();
        veniceWriter.put("2", "2", 1);
        veniceWriter.broadcastEndOfPush(new HashMap());
        TestUtils.waitForNonDeterministicPushCompletion(kafkaTopic, constructClusterControllerClient, 3000L, TimeUnit.MILLISECONDS);
        VersionCreationResponse createNewVersionWithRetry = createNewVersionWithRetry(uniqueString);
        Assert.assertFalse(createNewVersionWithRetry.isError());
        Assert.assertEquals(createNewVersionWithRetry.getVersion(), version + 1);
        String kafkaTopic2 = createNewVersionWithRetry.getKafkaTopic();
        Assert.assertEquals(constructClusterControllerClient.queryJobStatus(kafkaTopic2).getStatus(), ExecutionStatus.STARTED.toString());
        this.cluster.restartVeniceController(stopLeaderVeniceController);
        TestUtils.waitForNonDeterministicAssertion(3000L, TimeUnit.MILLISECONDS, false, true, () -> {
            Assert.assertEquals(this.cluster.getLeaderVeniceController().getVeniceHelixAdmin().getHelixVeniceClusterResources(this.cluster.getClusterName()).getRoutersClusterManager().getLiveRoutersCount(), 1);
        });
        TestUtils.waitForNonDeterministicAssertion(3000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(constructClusterControllerClient.queryJobStatus(kafkaTopic2).getStatus(), ExecutionStatus.STARTED.toString());
        });
        this.cluster.getVeniceWriter(kafkaTopic2).broadcastEndOfPush(new HashMap());
        TestUtils.waitForNonDeterministicPushCompletion(kafkaTopic2, constructClusterControllerClient, 3000L, TimeUnit.MILLISECONDS);
        VersionCreationResponse createNewVersionWithRetry2 = createNewVersionWithRetry(uniqueString);
        Assert.assertFalse(createNewVersionWithRetry2.isError());
        Assert.assertEquals(createNewVersionWithRetry2.getVersion(), version + 2);
        String kafkaTopic3 = createNewVersionWithRetry2.getKafkaTopic();
        Assert.assertEquals(constructClusterControllerClient.queryJobStatus(kafkaTopic3).getStatus(), ExecutionStatus.STARTED.toString());
        this.cluster.getVeniceWriter(kafkaTopic3).broadcastEndOfPush(new HashMap());
        TestUtils.waitForNonDeterministicPushCompletion(kafkaTopic3, constructClusterControllerClient, 3000L, TimeUnit.MILLISECONDS);
    }

    @Test(timeOut = 60000)
    public void testControllerRestartFetchesLastSuccessfulPushDuration() {
        String uniqueString = Utils.getUniqueString("testControllerRestartFetchesLastSuccessfulPushDuration");
        this.cluster.getNewStore(uniqueString);
        VersionCreationResponse newVersion = this.cluster.getNewVersion(uniqueString);
        Assert.assertFalse(newVersion.isError());
        String kafkaTopic = newVersion.getKafkaTopic();
        VeniceWriter<String, String, byte[]> veniceWriter = this.cluster.getVeniceWriter(kafkaTopic);
        ControllerClient constructClusterControllerClient = ControllerClient.constructClusterControllerClient(this.cluster.getClusterName(), this.cluster.getAllControllersURLs());
        Assert.assertEquals(constructClusterControllerClient.queryJobStatus(kafkaTopic).getStatus(), ExecutionStatus.STARTED.toString());
        veniceWriter.broadcastStartOfPush(new HashMap());
        veniceWriter.put("1", "1", 1);
        veniceWriter.put("2", "2", 1);
        veniceWriter.broadcastEndOfPush(new HashMap());
        TestUtils.waitForNonDeterministicPushCompletion(kafkaTopic, constructClusterControllerClient, 3000L, TimeUnit.MILLISECONDS);
        double value = this.cluster.getLeaderVeniceController().getMetricRepository().getMetric("." + uniqueString + "--successful_push_duration_sec_gauge.Gauge").value();
        int port = this.cluster.getLeaderVeniceController().getPort();
        int i = 0;
        Iterator<VeniceControllerWrapper> it = this.cluster.getVeniceControllers().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            VeniceControllerWrapper next = it.next();
            if (next.getPort() != port) {
                i = next.getPort();
                break;
            }
        }
        this.cluster.stopLeaderVeniceController();
        VeniceControllerWrapper leaderVeniceController = this.cluster.getLeaderVeniceController();
        Assert.assertEquals(leaderVeniceController.getPort(), i);
        Assert.assertEquals(Double.valueOf(value), Double.valueOf(leaderVeniceController.getMetricRepository().getMetric("." + uniqueString + "--successful_push_duration_sec_gauge.Gauge").value()));
        this.cluster.restartVeniceController(port);
        VeniceControllerWrapper leaderVeniceController2 = this.cluster.getLeaderVeniceController();
        Assert.assertEquals(leaderVeniceController2.getPort(), i);
        Assert.assertEquals(Double.valueOf(value), Double.valueOf(leaderVeniceController2.getMetricRepository().getMetric("." + uniqueString + "--successful_push_duration_sec_gauge.Gauge").value()));
    }

    private VersionCreationResponse createNewVersionWithRetry(String str) {
        VersionCreationResponse newVersion;
        try {
            newVersion = this.cluster.getNewVersion(str);
        } catch (VeniceException e) {
            this.cluster.getLeaderVeniceController();
            newVersion = this.cluster.getNewVersion(str);
        }
        return newVersion;
    }
}
