package com.linkedin.venice.helix;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.ZkServerWrapper;
import com.linkedin.venice.meta.RoutingDataRepository;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.stats.HelixMessageChannelStats;
import com.linkedin.venice.status.StatusMessageHandler;
import com.linkedin.venice.status.StoreStatusMessage;
import com.linkedin.venice.utils.MockTestStateModel;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import io.tehuti.metrics.MetricsRepository;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixAdmin;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/helix/TestHelixStatusMessageChannel.class */
public class TestHelixStatusMessageChannel {
    private static final String cluster = "UnitTestCluster";
    private static final String kafkaTopic = "test_resource_1";
    private static final int partitionId = 0;
    private static final int port = 4396;
    private String instanceId;
    private ExecutionStatus status = ExecutionStatus.COMPLETED;
    private ZkServerWrapper zkServerWrapper;
    private String zkAddress;
    private HelixStatusMessageChannel channel;
    private HelixMessageChannelStats helixMessageChannelStats;
    private SafeHelixManager manager;
    private HelixAdmin admin;
    private SafeHelixManager controller;
    private RoutingDataRepository routingDataRepository;
    private static final long WAIT_ZK_TIME = 1000;

    /* loaded from: input_file:com/linkedin/venice/helix/TestHelixStatusMessageChannel$FailedTestStoreStatusMessageHandler.class */
    private static class FailedTestStoreStatusMessageHandler implements StatusMessageHandler<StoreStatusMessage> {
        private int errorReplyCount;
        private int errorReply = 0;

        public FailedTestStoreStatusMessageHandler(int i) {
            this.errorReplyCount = i;
        }

        public void handleMessage(StoreStatusMessage storeStatusMessage) {
            if (this.errorReply == this.errorReplyCount) {
                return;
            }
            this.errorReply++;
            throw new VeniceException("Failed to handle message. ErrorReply #" + this.errorReply);
        }
    }

    /* loaded from: input_file:com/linkedin/venice/helix/TestHelixStatusMessageChannel$TimeoutTestStoreStatusMessageHandler.class */
    private static class TimeoutTestStoreStatusMessageHandler implements StatusMessageHandler<StoreStatusMessage> {
        private int timeOutReplyCount;
        private int timeOutReply = 0;

        public TimeoutTestStoreStatusMessageHandler(int i) {
            this.timeOutReplyCount = i;
        }

        public void handleMessage(StoreStatusMessage storeStatusMessage) {
            if (this.timeOutReply == this.timeOutReplyCount) {
                return;
            }
            this.timeOutReply++;
            try {
                Thread.sleep(1300L);
            } catch (InterruptedException e) {
            }
        }
    }

    @BeforeMethod(alwaysRun = true)
    public void setUp() throws Exception {
        this.zkServerWrapper = ServiceFactory.getZkServer();
        this.zkAddress = this.zkServerWrapper.getAddress();
        this.admin = new ZKHelixAdmin(this.zkAddress);
        this.admin.addCluster(cluster);
        HelixConfigScope build = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(cluster).build();
        HashMap hashMap = new HashMap();
        hashMap.put("allowParticipantAutoJoin", String.valueOf(true));
        this.admin.setConfig(build, hashMap);
        this.admin.addStateModelDef(cluster, "MockTestStateModel", MockTestStateModel.getDefinition());
        this.admin.addResource(cluster, kafkaTopic, 1, "MockTestStateModel", IdealState.RebalanceMode.FULL_AUTO.toString());
        this.admin.rebalance(cluster, kafkaTopic, 1);
        this.controller = new SafeHelixManager(HelixControllerMain.startHelixController(this.zkAddress, cluster, "UnitTestController", "STANDALONE"));
        this.controller.connect();
        this.instanceId = Utils.getHelixNodeIdentifier(Utils.getHostName(), port);
        this.manager = TestUtils.getParticipant(cluster, this.instanceId, this.zkAddress, port, "MockTestStateModel");
        this.manager.connect();
        this.helixMessageChannelStats = new HelixMessageChannelStats(new MetricsRepository(), cluster);
        this.channel = new HelixStatusMessageChannel(this.manager, this.helixMessageChannelStats);
        this.routingDataRepository = new HelixExternalViewRepository(this.controller);
        this.routingDataRepository.refresh();
    }

    @AfterMethod(alwaysRun = true)
    public void cleanUp() {
        this.manager.disconnect();
        this.controller.disconnect();
        this.admin.dropCluster(cluster);
        this.admin.close();
        this.zkServerWrapper.close();
    }

    private void compareConversion(StoreStatusMessage storeStatusMessage) {
        Message convertVeniceMessageToHelixMessage = this.channel.convertVeniceMessageToHelixMessage(storeStatusMessage);
        Assert.assertEquals(storeStatusMessage.getMessageId(), convertVeniceMessageToHelixMessage.getMsgId(), "Message Ids are different.");
        Assert.assertEquals(StoreStatusMessage.class.getName(), convertVeniceMessageToHelixMessage.getRecord().getSimpleField("veniceMessageClass"), "Class names are different.");
        Map mapField = convertVeniceMessageToHelixMessage.getRecord().getMapField("veniceMessageFields");
        for (Map.Entry entry : storeStatusMessage.getFields().entrySet()) {
            Assert.assertEquals((String) entry.getValue(), (String) mapField.get(entry.getKey()), "Message fields are different.");
        }
        Assert.assertEquals(storeStatusMessage, this.channel.convertHelixMessageToVeniceMessage(convertVeniceMessageToHelixMessage), "Message fields are different. Convert it failed,");
    }

    @Test
    public void testConvertBetweenVeniceMessageAndHelixMessage() throws ClassNotFoundException {
        StoreStatusMessage storeStatusMessage = new StoreStatusMessage(kafkaTopic, 0, this.instanceId, this.status);
        compareConversion(storeStatusMessage);
        storeStatusMessage.setOffset(10L);
        compareConversion(storeStatusMessage);
        storeStatusMessage.setDescription("Sample Description ");
        compareConversion(storeStatusMessage);
    }

    @Test
    public void testRegisterHandler() {
        StoreStatusMessageHandler storeStatusMessageHandler = new StoreStatusMessageHandler();
        this.channel.registerHandler(StoreStatusMessage.class, storeStatusMessageHandler);
        Assert.assertEquals(storeStatusMessageHandler, this.channel.getHandler(StoreStatusMessage.class), "Can not get correct handler.Register is failed.");
        this.channel.unRegisterHandler(StoreStatusMessage.class, storeStatusMessageHandler);
        try {
            this.channel.getHandler(StoreStatusMessage.class);
            Assert.fail("Handler should be un-register before.");
        } catch (VeniceException e) {
        }
    }

    @Test
    public void testSendMessage() throws IOException, InterruptedException {
        StoreStatusMessageHandler storeStatusMessageHandler = new StoreStatusMessageHandler();
        getControllerChannel(storeStatusMessageHandler);
        StoreStatusMessage storeStatusMessage = new StoreStatusMessage(kafkaTopic, 0, this.instanceId, this.status);
        this.channel.sendToController(storeStatusMessage);
        StoreStatusMessage status = storeStatusMessageHandler.getStatus(storeStatusMessage.getKafkaTopic());
        Assert.assertNotNull(status, "Message is not received.");
        Assert.assertEquals(storeStatusMessage.getMessageId(), status.getMessageId(), "Message is not received correctly. Id is wrong.");
        Assert.assertEquals(storeStatusMessage.getFields(), status.getFields(), "Message is not received correctly. Fields are wrong");
    }

    private HelixStatusMessageChannel getControllerChannel(StatusMessageHandler<StoreStatusMessage> statusMessageHandler) {
        HelixStatusMessageChannel helixStatusMessageChannel = new HelixStatusMessageChannel(this.controller, this.helixMessageChannelStats);
        helixStatusMessageChannel.registerHandler(StoreStatusMessage.class, statusMessageHandler);
        return helixStatusMessageChannel;
    }

    @Test(expectedExceptions = {VeniceException.class})
    public void testSendMessageFailed() throws IOException, InterruptedException {
        getControllerChannel(new FailedTestStoreStatusMessageHandler(1));
        this.channel.sendToController(new StoreStatusMessage(kafkaTopic, 0, this.instanceId, this.status), 0, 0L);
        Assert.fail("Sending should be failed, because we thrown an exception during handing message.");
    }

    @Test(expectedExceptions = {VeniceException.class})
    public void testSendMessageRetryFailed() throws IOException, InterruptedException {
        getControllerChannel(new FailedTestStoreStatusMessageHandler(5));
        this.channel.sendToController(new StoreStatusMessage(kafkaTopic, 0, this.instanceId, this.status), 2, 0L);
        Assert.fail("Sending should be failed, because after retrying 2 times, handling message is stil failed.");
    }

    @Test
    public void testSendMessageRetrySuccessful() throws IOException, InterruptedException {
        getControllerChannel(new FailedTestStoreStatusMessageHandler(2));
        try {
            this.channel.sendToController(new StoreStatusMessage(kafkaTopic, 0, this.instanceId, this.status), 2, 0L);
        } catch (VeniceException e) {
            Assert.fail("Sending should be successful after retrying 2 times", e);
        }
    }

    @Test(expectedExceptions = {VeniceException.class})
    public void testSendMessageTimeout() throws IOException, InterruptedException {
        getControllerChannel(new TimeoutTestStoreStatusMessageHandler(1));
        this.channel.sendToController(new StoreStatusMessage(kafkaTopic, 0, this.instanceId, this.status), 0, 0L);
        Assert.fail("Sending should be failed, because timeout");
    }

    @Test
    public void testSendMessageHandleTimeout() throws IOException, InterruptedException {
        getControllerChannel(new TimeoutTestStoreStatusMessageHandler(1));
        try {
            this.channel.sendToController(new StoreStatusMessage(kafkaTopic, 0, this.instanceId, this.status), 1, 0L);
        } catch (VeniceException e) {
            Assert.fail("Sending should be successful after retry 1 times", e);
        }
    }

    @Test
    public void testSendMessageToStorageNodes() throws Exception {
        this.channel.registerHandler(StoreStatusMessage.class, new TimeoutTestStoreStatusMessageHandler(1));
        SafeHelixManager participant = TestUtils.getParticipant(cluster, Utils.getHelixNodeIdentifier(Utils.getHostName(), 4397), this.zkAddress, 4397, "MockTestStateModel");
        participant.connect();
        new HelixStatusMessageChannel(participant, this.helixMessageChannelStats).registerHandler(StoreStatusMessage.class, new TimeoutTestStoreStatusMessageHandler(1));
        this.admin.rebalance(cluster, kafkaTopic, 2);
        TestUtils.waitForNonDeterministicCompletion(WAIT_ZK_TIME, TimeUnit.MILLISECONDS, () -> {
            return this.routingDataRepository.containsKafkaTopic(kafkaTopic) && this.routingDataRepository.getReadyToServeInstances(kafkaTopic, 0).size() == 2;
        });
        try {
            try {
                getControllerChannel(new TimeoutTestStoreStatusMessageHandler(1)).sendToStorageNodes(cluster, new StoreStatusMessage(kafkaTopic, 0, this.instanceId, this.status), kafkaTopic, 1);
                participant.disconnect();
            } catch (VeniceException e) {
                Assert.fail("Sending should be successful after retry 1 times", e);
                participant.disconnect();
            }
        } catch (Throwable th) {
            participant.disconnect();
            throw th;
        }
    }

    @Test
    public void testSendMessageToAllLiveInstances() throws Exception {
        this.channel.registerHandler(StoreStatusMessage.class, new TimeoutTestStoreStatusMessageHandler(0));
        SafeHelixManager participant = TestUtils.getParticipant(cluster, Utils.getHelixNodeIdentifier(Utils.getHostName(), 4397), this.zkAddress, 4397, "MockTestStateModel");
        participant.connect();
        HelixStatusMessageChannel helixStatusMessageChannel = new HelixStatusMessageChannel(participant, this.helixMessageChannelStats);
        boolean[] zArr = {false};
        helixStatusMessageChannel.registerHandler(StoreStatusMessage.class, storeStatusMessage -> {
            zArr[0] = true;
        });
        TestUtils.waitForNonDeterministicCompletion(WAIT_ZK_TIME, TimeUnit.MILLISECONDS, () -> {
            return this.routingDataRepository.isLiveInstance(Utils.getHelixNodeIdentifier(Utils.getHostName(), 4397));
        });
        try {
            try {
                getControllerChannel(new TimeoutTestStoreStatusMessageHandler(0)).sendToStorageNodes(cluster, new StoreStatusMessage(kafkaTopic, 0, this.instanceId, this.status), kafkaTopic, 0);
                Assert.assertTrue(zArr[0], "We should send message to all live instance regardless it's assigned to resource or not.");
                participant.disconnect();
            } catch (VeniceException e) {
                Assert.fail("Sending should be successful after retry 0 times", e);
                participant.disconnect();
            }
        } catch (Throwable th) {
            participant.disconnect();
            throw th;
        }
    }

    @Test
    public void testSendMessageToNodeWithoutRegisteringHandler() throws Exception {
        SafeHelixManager participant = TestUtils.getParticipant(cluster, Utils.getHelixNodeIdentifier(Utils.getHostName(), 4397), this.zkAddress, 4397, "MockTestStateModel");
        participant.connect();
        this.admin.rebalance(cluster, kafkaTopic, 2);
        TestUtils.waitForNonDeterministicCompletion(WAIT_ZK_TIME, TimeUnit.MILLISECONDS, () -> {
            return this.routingDataRepository.containsKafkaTopic(kafkaTopic) && this.routingDataRepository.getReadyToServeInstances(kafkaTopic, 0).size() == 2;
        });
        try {
            getControllerChannel(new TimeoutTestStoreStatusMessageHandler(1)).sendToStorageNodes(cluster, new StoreStatusMessage(kafkaTopic, 0, this.instanceId, this.status), kafkaTopic, 1);
            Assert.fail("Sending should be failed, because storage node have not processed this message.");
            participant.disconnect();
        } catch (VeniceException e) {
            participant.disconnect();
        } catch (Throwable th) {
            participant.disconnect();
            throw th;
        }
    }

    @Test(groups = {"flaky"})
    public void testSendMessageBelongToWrongResourceToStorageNodes() {
        TestUtils.waitForNonDeterministicCompletion(WAIT_ZK_TIME, TimeUnit.MILLISECONDS, () -> {
            return this.routingDataRepository.containsKafkaTopic(kafkaTopic) && this.routingDataRepository.getReadyToServeInstances(kafkaTopic, 0).size() > 0;
        });
        TimeoutTestStoreStatusMessageHandler timeoutTestStoreStatusMessageHandler = new TimeoutTestStoreStatusMessageHandler(1);
        this.channel.registerHandler(StoreStatusMessage.class, timeoutTestStoreStatusMessageHandler);
        try {
            getControllerChannel(timeoutTestStoreStatusMessageHandler).sendToStorageNodes(cluster, new StoreStatusMessage("wrong kafak topic", 0, this.instanceId, this.status), "wrong kafka topic", 1);
            Assert.fail("Sending should be failed due to wrong resource name");
        } catch (VeniceException e) {
        }
    }

    @Test
    public void testSendMessageCrossingClusters() throws Exception {
        this.admin.addCluster("testSendMessageCrossingClusters");
        HelixConfigScope build = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster("testSendMessageCrossingClusters").build();
        HashMap hashMap = new HashMap();
        hashMap.put("allowParticipantAutoJoin", String.valueOf(true));
        this.admin.setConfig(build, hashMap);
        this.admin.addStateModelDef("testSendMessageCrossingClusters", "MockTestStateModel", MockTestStateModel.getDefinition());
        final boolean[] zArr = new boolean[1];
        this.channel.registerHandler(StoreStatusMessage.class, new StatusMessageHandler<StoreStatusMessage>() { // from class: com.linkedin.venice.helix.TestHelixStatusMessageChannel.1
            public void handleMessage(StoreStatusMessage storeStatusMessage) {
                zArr[0] = true;
            }
        });
        SafeHelixManager participant = TestUtils.getParticipant("testSendMessageCrossingClusters", Utils.getHelixNodeIdentifier(Utils.getHostName(), 4406), this.zkAddress, 4406, "MockTestStateModel");
        participant.connect();
        try {
            try {
                new HelixStatusMessageChannel(participant, this.helixMessageChannelStats).sendToStorageNodes(cluster, new StoreStatusMessage(kafkaTopic, 0, this.instanceId, this.status), kafkaTopic, 1);
                Assert.assertTrue(zArr[0], "Storage node in another cluster should receive the message.");
                participant.disconnect();
            } catch (VeniceException e) {
                Assert.fail("Sending should be successful after retry 1 times", e);
                participant.disconnect();
            }
        } catch (Throwable th) {
            participant.disconnect();
            throw th;
        }
    }
}
