package com.linkedin.venice.endToEnd;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.client.store.AvroSpecificStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.integration.utils.D2TestUtils;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceControllerCreateOptions;
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.integration.utils.VeniceServerWrapper;
import com.linkedin.venice.integration.utils.ZkServerWrapper;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreDataChangedListener;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.participant.protocol.ParticipantMessageKey;
import com.linkedin.venice.participant.protocol.ParticipantMessageValue;
import com.linkedin.venice.participant.protocol.enums.ParticipantMessageType;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import io.tehuti.Metric;
import java.io.Closeable;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/endToEnd/ParticipantStoreTest.class */
public class ParticipantStoreTest {
    private static final Logger LOGGER = LogManager.getLogger(ParticipantStoreTest.class);
    private VeniceClusterWrapper venice;
    private VeniceControllerWrapper parentController;
    private ZkServerWrapper parentZk;
    private ControllerClient controllerClient;
    private ControllerClient parentControllerClient;
    private String participantMessageStoreName;
    private VeniceServerWrapper veniceServerWrapper;
    private D2Client d2Client;

    /* loaded from: input_file:com/linkedin/venice/endToEnd/ParticipantStoreTest$TestListener.class */
    static class TestListener implements StoreDataChangedListener {
        AtomicInteger creationCount = new AtomicInteger(0);
        AtomicInteger changeCount = new AtomicInteger(0);
        AtomicInteger deletionCount = new AtomicInteger(0);

        TestListener() {
        }

        public void handleStoreCreated(Store store) {
            this.creationCount.incrementAndGet();
        }

        public void handleStoreDeleted(String str) {
            this.deletionCount.incrementAndGet();
        }

        public void handleStoreChanged(Store store) {
            ParticipantStoreTest.LOGGER.info("Received handleStoreChanged: {}", store);
            this.changeCount.incrementAndGet();
        }

        public int getCreationCount() {
            return this.creationCount.get();
        }

        public int getChangeCount() {
            return this.changeCount.get();
        }

        public int getDeletionCount() {
            return this.deletionCount.get();
        }
    }

    @BeforeClass
    public void setUp() {
        Properties properties = new Properties();
        Properties properties2 = new Properties();
        Properties properties3 = new Properties();
        properties.setProperty("participant.message.store.enabled", "true");
        properties.setProperty("admin.helix.messaging.channel.enabled", "false");
        properties.setProperty("topic.cleanup.sleep.interval.between.topic.list.fetch.ms", String.valueOf(Long.MAX_VALUE));
        this.venice = ServiceFactory.getVeniceCluster(1, 0, 1, 1, 100000, false, false, properties);
        this.d2Client = D2TestUtils.getAndStartD2Client(this.venice.getZk().getAddress());
        properties2.put(VeniceServerWrapper.CLIENT_CONFIG_FOR_CONSUMER, ClientConfig.defaultGenericClientConfig("").setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME).setD2Client(this.d2Client));
        properties3.setProperty("participant.message.consumption.delay.ms", Long.toString(100L));
        this.veniceServerWrapper = this.venice.addVeniceServer(properties2, properties3);
        this.parentZk = ServiceFactory.getZkServer();
        this.parentController = ServiceFactory.getVeniceController(new VeniceControllerCreateOptions.Builder(this.venice.getClusterName(), this.parentZk, this.venice.getKafka()).childControllers((VeniceControllerWrapper[]) this.venice.getVeniceControllers().toArray(new VeniceControllerWrapper[0])).extraProperties(properties).build());
        this.participantMessageStoreName = VeniceSystemStoreUtils.getParticipantStoreNameForCluster(this.venice.getClusterName());
        this.controllerClient = this.venice.getControllerClient();
        this.parentControllerClient = new ControllerClient(this.venice.getClusterName(), this.parentController.getControllerUrl());
        TestUtils.waitForNonDeterministicPushCompletion(Version.composeKafkaTopic(this.participantMessageStoreName, 1), this.controllerClient, 2L, TimeUnit.MINUTES);
    }

    @AfterClass
    public void cleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.controllerClient});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.parentControllerClient});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.parentController});
        if (this.d2Client != null) {
            D2ClientUtils.shutdownClient(this.d2Client);
        }
        IOUtils.closeQuietly(this.venice);
        IOUtils.closeQuietly(this.parentZk);
    }

    public void testParticipantStoreKill() {
        VersionCreationResponse newStoreVersion = getNewStoreVersion(this.parentControllerClient, true);
        Assert.assertFalse(newStoreVersion.isError());
        String kafkaTopic = newStoreVersion.getKafkaTopic();
        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
            Assert.assertEquals(this.controllerClient.queryJobStatus(kafkaTopic).getStatus(), ExecutionStatus.STARTED.toString());
        });
        String str = "." + this.venice.getClusterName() + "-participant_store_consumption_task";
        Assert.assertEquals(Double.valueOf(((Metric) this.venice.getVeniceServers().iterator().next().getMetricsRepository().metrics().get(str + "--killed_push_jobs.Count")).value()), Double.valueOf(0.0d));
        Assert.assertFalse(this.parentControllerClient.killOfflinePushJob(kafkaTopic).isError());
        verifyKillMessageInParticipantStore(kafkaTopic, true);
        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
            Assert.assertEquals(this.controllerClient.queryJobStatus(kafkaTopic).getStatus(), ExecutionStatus.ERROR.toString());
        });
        String str2 = VeniceSystemStoreUtils.getParticipantStoreNameForCluster(this.venice.getClusterName()) + "--success_request_key_count.Avg";
        Map metrics = this.venice.getVeniceServers().iterator().next().getMetricsRepository().metrics();
        Assert.assertEquals(Double.valueOf(((Metric) metrics.get(str + "--killed_push_jobs.Count")).value()), Double.valueOf(1.0d));
        Assert.assertTrue(((Metric) metrics.get(new StringBuilder().append(str).append("--kill_push_job_latency.Avg").toString())).value() > 0.0d);
        Assert.assertTrue(((Metric) metrics.get(new StringBuilder().append(".").append(str2).toString())).value() > 0.0d);
        Assert.assertTrue(((Metric) metrics.get(new StringBuilder().append(".venice-client_").append(str2).toString())).value() > 0.0d);
    }

    @Test(timeOut = 60000)
    public void testParticipantStoreThrottlerRestartRouter() {
        VersionCreationResponse newStoreVersion = getNewStoreVersion(this.parentControllerClient, true);
        Assert.assertFalse(newStoreVersion.isError());
        String kafkaTopic = newStoreVersion.getKafkaTopic();
        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
            Assert.assertEquals(this.controllerClient.queryJobStatus(kafkaTopic).getStatus(), ExecutionStatus.STARTED.toString());
        });
        Assert.assertFalse(this.parentControllerClient.killOfflinePushJob(kafkaTopic).isError());
        verifyKillMessageInParticipantStore(kafkaTopic, true);
        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
            Assert.assertEquals(this.controllerClient.queryJobStatus(kafkaTopic).getStatus(), ExecutionStatus.ERROR.toString());
        });
        for (VeniceRouterWrapper veniceRouterWrapper : this.venice.getVeniceRouters()) {
            this.venice.stopVeniceRouter(veniceRouterWrapper.getPort());
            this.venice.restartVeniceRouter(veniceRouterWrapper.getPort());
        }
        ParticipantMessageKey participantMessageKey = new ParticipantMessageKey();
        participantMessageKey.resourceName = kafkaTopic;
        participantMessageKey.messageType = ParticipantMessageType.KILL_PUSH_JOB.getValue();
        AvroSpecificStoreClient andStartSpecificAvroClient = ClientFactory.getAndStartSpecificAvroClient(ClientConfig.defaultSpecificClientConfig(this.participantMessageStoreName, ParticipantMessageValue.class).setVeniceURL(this.venice.getRandomRouterURL()));
        try {
            try {
                andStartSpecificAvroClient.get(participantMessageKey).get();
            } catch (Exception e) {
                Assert.fail("Should be able to query participant store successfully");
            }
            if (andStartSpecificAvroClient != null) {
                andStartSpecificAvroClient.close();
            }
        } catch (Throwable th) {
            if (andStartSpecificAvroClient != null) {
                try {
                    andStartSpecificAvroClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 60000)
    public void testKillWhenVersionIsOnline() {
        String uniqueString = Utils.getUniqueString("testKillWhenVersionIsOnline");
        VersionCreationResponse newStoreVersion = getNewStoreVersion(this.parentControllerClient, uniqueString, true);
        String kafkaTopic = newStoreVersion.getKafkaTopic();
        this.parentControllerClient.writeEndOfPush(uniqueString, newStoreVersion.getVersion());
        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
            StoreInfo store = this.controllerClient.getStore(uniqueString).getStore();
            Assert.assertTrue(store.getVersions().iterator().hasNext() && ((Version) store.getVersions().iterator().next()).getStatus().equals(VersionStatus.ONLINE), "Waiting for a version to become online");
        });
        this.parentControllerClient.killOfflinePushJob(kafkaTopic);
        String kafkaTopic2 = getNewStoreVersion(this.parentControllerClient, uniqueString, false).getKafkaTopic();
        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
            Assert.assertEquals(this.controllerClient.queryJobStatus(kafkaTopic2).getStatus(), ExecutionStatus.STARTED.toString());
        });
        Assert.assertFalse(this.parentControllerClient.killOfflinePushJob(kafkaTopic2).isError());
        verifyKillMessageInParticipantStore(kafkaTopic2, true);
        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
            Assert.assertEquals(this.controllerClient.queryJobStatus(kafkaTopic2).getStatus(), ExecutionStatus.ERROR.toString());
        });
        verifyKillMessageInParticipantStore(kafkaTopic, false);
        this.venice.stopVeniceServer(this.veniceServerWrapper.getPort());
        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, true, () -> {
            Assert.assertFalse(this.venice.getRandomVeniceRouter().getRoutingDataRepository().containsKafkaTopic(kafkaTopic));
        });
        this.venice.restartVeniceServer(this.veniceServerWrapper.getPort());
        int replicas = newStoreVersion.getReplicas();
        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
            for (int i = 0; i < newStoreVersion.getPartitions(); i++) {
                Assert.assertEquals(this.venice.getRandomVeniceRouter().getRoutingDataRepository().getReadyToServeInstances(kafkaTopic, i).size(), replicas, "Not all replicas are ONLINE yet");
            }
        });
        VersionCreationResponse newStoreVersion2 = getNewStoreVersion(this.parentControllerClient, uniqueString, false);
        Assert.assertFalse(newStoreVersion2.isError(), "Controller error: " + newStoreVersion2.getError());
        this.parentControllerClient.writeEndOfPush(uniqueString, newStoreVersion2.getVersion());
        TestUtils.waitForNonDeterministicPushCompletion(newStoreVersion2.getKafkaTopic(), this.parentControllerClient, 30L, TimeUnit.SECONDS);
        this.parentControllerClient.deleteOldVersion(uniqueString, Version.parseVersionFromKafkaTopicName(kafkaTopic));
        verifyKillMessageInParticipantStore(kafkaTopic, true);
    }

    private void verifyKillMessageInParticipantStore(String str, boolean z) {
        ParticipantMessageKey participantMessageKey = new ParticipantMessageKey();
        participantMessageKey.resourceName = str;
        participantMessageKey.messageType = ParticipantMessageType.KILL_PUSH_JOB.getValue();
        AvroSpecificStoreClient andStartSpecificAvroClient = ClientFactory.getAndStartSpecificAvroClient(ClientConfig.defaultSpecificClientConfig(this.participantMessageStoreName, ParticipantMessageValue.class).setVeniceURL(this.venice.getRandomRouterURL()));
        try {
            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                try {
                    if (z) {
                        Assert.assertNotNull(andStartSpecificAvroClient.get(participantMessageKey).get());
                    } else {
                        Assert.assertNull(andStartSpecificAvroClient.get(participantMessageKey).get());
                    }
                } catch (Exception e) {
                    Assert.fail();
                }
            });
            if (andStartSpecificAvroClient != null) {
                andStartSpecificAvroClient.close();
            }
        } catch (Throwable th) {
            if (andStartSpecificAvroClient != null) {
                try {
                    andStartSpecificAvroClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private VersionCreationResponse getNewStoreVersion(ControllerClient controllerClient, String str, boolean z) {
        if (z) {
            controllerClient.createNewStore(str, "test-user", "\"string\"", "\"string\"");
        }
        return this.parentControllerClient.requestTopicForWrites(str, 1024L, Version.PushType.BATCH, Version.guidBasedDummyPushId(), true, true, false, Optional.empty(), Optional.empty(), Optional.empty(), false, -1L);
    }

    private VersionCreationResponse getNewStoreVersion(ControllerClient controllerClient, boolean z) {
        return getNewStoreVersion(controllerClient, Utils.getUniqueString("test-kill"), z);
    }
}
