package com.linkedin.venice.testStatusMessage.integration;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.HelixStatusMessageChannel;
import com.linkedin.venice.helix.SafeHelixManager;
import com.linkedin.venice.integration.utils.DelayedZkClientUtils;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.ZkServerWrapper;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.stats.HelixMessageChannelStats;
import com.linkedin.venice.status.StatusMessageHandler;
import com.linkedin.venice.status.StoreStatusMessage;
import com.linkedin.venice.utils.MockTestStateModel;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import io.tehuti.metrics.MetricsRepository;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import org.apache.helix.HelixAdmin;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:com/linkedin/venice/testStatusMessage/integration/SendStatusMessageIntegrationTest.class */
public class SendStatusMessageIntegrationTest {
    private ZkServerWrapper zkServerWrapper;
    private String zkAddress;
    private HelixAdmin admin;
    private SafeHelixManager controller;
    private HelixMessageChannelStats helixMessageChannelStats;
    private String cluster = Utils.getUniqueString("sendStatusMessage");
    private ArrayList<SafeHelixManager> participants = new ArrayList<>();

    @BeforeClass(alwaysRun = true)
    public void setUp() throws Exception {
        this.zkServerWrapper = ServiceFactory.getZkServer();
        this.zkAddress = this.zkServerWrapper.getAddress();
        this.admin = new ZKHelixAdmin(this.zkAddress);
        this.admin.addCluster(this.cluster);
        HelixConfigScope build = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(this.cluster).build();
        HashMap hashMap = new HashMap();
        hashMap.put("allowParticipantAutoJoin", String.valueOf(true));
        this.admin.setConfig(build, hashMap);
        this.admin.addStateModelDef(this.cluster, "MockTestStateModel", MockTestStateModel.getDefinition());
        this.helixMessageChannelStats = new HelixMessageChannelStats(new MetricsRepository(), this.cluster);
        this.controller = new SafeHelixManager(HelixControllerMain.startHelixController(this.zkAddress, this.cluster, "integrationController", "STANDALONE"));
        this.controller.connect();
    }

    @AfterClass(alwaysRun = true)
    public void cleanUp() {
        this.controller.disconnect();
        this.zkServerWrapper.close();
    }

    private HelixStatusMessageChannel getParticipantDelayedChannel(int i, long j, long j2) throws Exception {
        DelayedZkClientUtils.startDelayingSocketIoForNewZkClients(j, j2);
        SafeHelixManager participant = TestUtils.getParticipant(this.cluster, Utils.getHelixNodeIdentifier(Utils.getHostName(), i), this.zkAddress, i, "MockTestStateModel");
        participant.connect();
        DelayedZkClientUtils.stopDelayingSocketIoForNewZkClients();
        this.participants.add(participant);
        return new HelixStatusMessageChannel(participant, this.helixMessageChannelStats);
    }

    private void cleanUpParticipants() {
        Iterator<SafeHelixManager> it = this.participants.iterator();
        while (it.hasNext()) {
            it.next().disconnect();
        }
    }

    @Test(groups = {"flaky"}, timeOut = 60000)
    public void testReceiveMessageFromThreeParticipants() throws Exception {
        final HelixStatusMessageChannel participantDelayedChannel = getParticipantDelayedChannel(50123, 10L, 2L);
        final HelixStatusMessageChannel participantDelayedChannel2 = getParticipantDelayedChannel(50223, 12L, 3L);
        HelixStatusMessageChannel participantDelayedChannel3 = getParticipantDelayedChannel(50323, 50L, 50L);
        try {
            HelixStatusMessageChannel helixStatusMessageChannel = new HelixStatusMessageChannel(this.controller, this.helixMessageChannelStats);
            final LinkedList linkedList = new LinkedList();
            helixStatusMessageChannel.registerHandler(StoreStatusMessage.class, new StatusMessageHandler<StoreStatusMessage>() { // from class: com.linkedin.venice.testStatusMessage.integration.SendStatusMessageIntegrationTest.1
                public void handleMessage(StoreStatusMessage storeStatusMessage) {
                    linkedList.add(storeStatusMessage);
                }
            });
            final StoreStatusMessage storeStatusMessage = new StoreStatusMessage("test", 0, "test", ExecutionStatus.STARTED);
            final StoreStatusMessage storeStatusMessage2 = new StoreStatusMessage("test", 1, "test", ExecutionStatus.STARTED);
            final StoreStatusMessage storeStatusMessage3 = new StoreStatusMessage("test", 0, "test", ExecutionStatus.COMPLETED);
            StoreStatusMessage storeStatusMessage4 = new StoreStatusMessage("test", 2, "test", ExecutionStatus.STARTED);
            Thread thread = new Thread(new Runnable() { // from class: com.linkedin.venice.testStatusMessage.integration.SendStatusMessageIntegrationTest.2
                @Override // java.lang.Runnable
                public void run() {
                    participantDelayedChannel.sendToController(storeStatusMessage);
                    participantDelayedChannel.sendToController(storeStatusMessage3);
                }
            });
            thread.start();
            Thread thread2 = new Thread(new Runnable() { // from class: com.linkedin.venice.testStatusMessage.integration.SendStatusMessageIntegrationTest.3
                @Override // java.lang.Runnable
                public void run() {
                    participantDelayedChannel2.sendToController(storeStatusMessage2);
                }
            });
            do {
            } while (!thread.isAlive());
            thread2.start();
            thread.join();
            thread2.join();
            Assert.assertEquals(linkedList.size(), 3);
            Assert.assertEquals(linkedList.get(0), storeStatusMessage, "The first message should come from channel 1");
            Assert.assertEquals(linkedList.get(1), storeStatusMessage2, "The second message should come from channel 2");
            Assert.assertEquals(linkedList.get(2), storeStatusMessage3, "The third message should come from channel 1");
            try {
                participantDelayedChannel3.sendToController(storeStatusMessage4);
                Assert.fail("Channel 3 has 150ms latency, sending and receive a helix message spend at least 150*4+150*2+150*2=1200ms, but the timeout we setup for message channel is 1000ms, so sending should be timeout.");
            } catch (VeniceException e) {
            }
        } finally {
            cleanUpParticipants();
        }
    }
}
