package com.linkedin.venice.helix;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pushmonitor.KillOfflinePushMessage;
import com.linkedin.venice.stats.HelixMessageChannelStats;
import com.linkedin.venice.status.StatusMessage;
import com.linkedin.venice.status.StatusMessageChannel;
import com.linkedin.venice.status.StatusMessageHandler;
import com.linkedin.venice.status.StoreStatusMessage;
import com.linkedin.venice.utils.Utils;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.Criteria;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.AsyncCallback;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.Message;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/helix/HelixStatusMessageChannel.class */
public class HelixStatusMessageChannel implements StatusMessageChannel {
    private static final Logger LOGGER = LogManager.getLogger(HelixStatusMessageChannel.class);
    public static final int DEFAULT_SEND_MESSAGE_TIME_OUT = 1000;
    public static final String HELIX_MESSAGE_TYPE = "control_message";
    public static final String VENICE_MESSAGE_CLASS = "veniceMessageClass";
    public static final String VENICE_MESSAGE_FIELD = "veniceMessageFields";
    private final ClusterMessagingService messageService;
    private final Map<String, StatusMessageHandler> handlers;
    private final int sendMessageTimeOut;
    private final HelixMessageChannelStats stats;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/helix/HelixStatusMessageChannel$ControlMessageCallback.class */
    public class ControlMessageCallback extends AsyncCallback {
        private boolean isTimeOut;

        private ControlMessageCallback() {
            this.isTimeOut = false;
        }

        public void onTimeOut() {
            this.isTimeOut = true;
        }

        public void onReplyMessage(Message message) {
            HelixStatusMessageChannel.this.stats.recordOnReplyFromStorageNodesCount();
        }
    }

    /* loaded from: input_file:com/linkedin/venice/helix/HelixStatusMessageChannel$HelixStatusMessageHandleFactory.class */
    private class HelixStatusMessageHandleFactory implements MessageHandlerFactory {
        private HelixStatusMessageHandleFactory() {
        }

        public MessageHandler createHandler(Message message, NotificationContext notificationContext) {
            if (message.getMsgType().equals(HelixStatusMessageChannel.HELIX_MESSAGE_TYPE)) {
                return new HelixStatusMessageHandler(message, notificationContext);
            }
            throw new VeniceException("Unexpected message type:" + message.getMsgType() + " for message:" + message.getMsgId());
        }

        public String getMessageType() {
            return HelixStatusMessageChannel.HELIX_MESSAGE_TYPE;
        }

        public void reset() {
        }
    }

    /* loaded from: input_file:com/linkedin/venice/helix/HelixStatusMessageChannel$HelixStatusMessageHandler.class */
    private class HelixStatusMessageHandler extends MessageHandler {
        public HelixStatusMessageHandler(Message message, NotificationContext notificationContext) {
            super(message, notificationContext);
        }

        public HelixTaskResult handleMessage() {
            StatusMessage convertHelixMessageToVeniceMessage = HelixStatusMessageChannel.this.convertHelixMessageToVeniceMessage(this._message);
            HelixTaskResult helixTaskResult = new HelixTaskResult();
            try {
                HelixStatusMessageChannel.this.getHandler(convertHelixMessageToVeniceMessage.getClass()).handleMessage(convertHelixMessageToVeniceMessage);
                helixTaskResult.setSuccess(true);
                return helixTaskResult;
            } catch (Exception e) {
                HelixStatusMessageChannel.LOGGER.error("Handle message {} failed. Venice message content: {}.", this._message.getId(), convertHelixMessageToVeniceMessage, e);
                throw e;
            }
        }

        public void onError(Exception exc, MessageHandler.ErrorCode errorCode, MessageHandler.ErrorType errorType) {
            HelixStatusMessageChannel.LOGGER.error("Message handling pipeline met error for message: {}.", this._message.getMsgId(), exc);
        }
    }

    public HelixStatusMessageChannel(SafeHelixManager safeHelixManager, HelixMessageChannelStats helixMessageChannelStats) {
        this(safeHelixManager, helixMessageChannelStats, 1000);
    }

    public HelixStatusMessageChannel(SafeHelixManager safeHelixManager, HelixMessageChannelStats helixMessageChannelStats, int i) {
        this.handlers = new ConcurrentHashMap();
        this.messageService = safeHelixManager.getMessagingService();
        this.stats = helixMessageChannelStats;
        this.sendMessageTimeOut = i;
        this.messageService.registerMessageHandlerFactory(HELIX_MESSAGE_TYPE, new HelixStatusMessageHandleFactory());
    }

    @Override // com.linkedin.venice.status.StatusMessageChannel
    public void sendToController(StatusMessage statusMessage, int i, long j) {
        Message convertVeniceMessageToHelixMessage = convertVeniceMessageToHelixMessage(statusMessage);
        convertVeniceMessageToHelixMessage.setTgtSessionId("*");
        Criteria criteria = new Criteria();
        criteria.setRecipientInstanceType(InstanceType.CONTROLLER);
        criteria.setSessionSpecific(false);
        boolean z = false;
        int i2 = 0;
        while (!z && i2 <= i) {
            i2++;
            if (i2 > 1) {
                LOGGER.info("Wait {}ms to retry.", Long.valueOf(j));
                Utils.sleep(j);
                LOGGER.info("Attempt #{}: Sending message to controller.", Integer.valueOf(i2));
                convertVeniceMessageToHelixMessage.setMsgId(StatusMessage.generateMessageId());
            }
            try {
                ControlMessageCallback controlMessageCallback = new ControlMessageCallback();
                if (this.messageService.sendAndWait(criteria, convertVeniceMessageToHelixMessage, controlMessageCallback, this.sendMessageTimeOut) == 0) {
                    LOGGER.error("No controller could be found to send messages {}.", statusMessage.getMessageId());
                } else if (controlMessageCallback.isTimeOut) {
                    LOGGER.error("Error: Can not send message to controller. Sending is time out.");
                } else {
                    Message message = (Message) controlMessageCallback.getMessageReplied().get(0);
                    z = Boolean.valueOf((String) message.getResultMap().get("SUCCESS")).booleanValue();
                    if (!z) {
                        LOGGER.error("Error: controller can not handle this message correctly. " + ((String) message.getResultMap().get("ERRORINFO")));
                    }
                }
            } catch (Exception e) {
                LOGGER.error("Error: Can not send message to controller.", e);
            }
        }
        if (z) {
            return;
        }
        String str = "Error: After attempting " + i2 + " times, sending is still failed.";
        LOGGER.error(str);
        throw new VeniceException(str);
    }

    @Override // com.linkedin.venice.status.StatusMessageChannel
    public void sendToController(StatusMessage statusMessage) {
        sendToController(statusMessage, 0, 0L);
    }

    @Override // com.linkedin.venice.status.StatusMessageChannel
    public void sendToStorageNodes(String str, StatusMessage statusMessage, String str2, int i) {
        this.stats.recordToStorageNodesInvokeCount();
        Message convertVeniceMessageToHelixMessage = convertVeniceMessageToHelixMessage(statusMessage);
        convertVeniceMessageToHelixMessage.setTgtSessionId("*");
        Criteria criteria = new Criteria();
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setSessionSpecific(false);
        criteria.setDataSource(Criteria.DataSource.LIVEINSTANCES);
        criteria.setInstanceName("%");
        criteria.setClusterName(str);
        ControlMessageCallback controlMessageCallback = new ControlMessageCallback();
        int sendAndWait = this.messageService.sendAndWait(criteria, convertVeniceMessageToHelixMessage, controlMessageCallback, this.sendMessageTimeOut, i);
        if (sendAndWait == 0) {
            LOGGER.error("No storage node is found to send message to. Message: {}", statusMessage);
        } else {
            LOGGER.info("Sent {} messages to storage nodes. Message: {}.", Integer.valueOf(sendAndWait), statusMessage);
        }
        this.stats.recordToStorageNodesMessageCount(sendAndWait);
        this.stats.recordMissedStorageNodesReplyCount(sendAndWait - controlMessageCallback.getMessageReplied().size());
        if (controlMessageCallback.isTimeOut) {
            String str3 = "Sending messages to storage node is time out. Resource:" + str2 + ". Message sent:" + sendAndWait + ". Message replied:" + controlMessageCallback.getMessageReplied().size();
            LOGGER.error(str3);
            throw new VeniceException(str3);
        }
        boolean z = true;
        for (Message message : controlMessageCallback.getMessageReplied()) {
            if (!Boolean.valueOf((String) message.getResultMap().get("SUCCESS")).booleanValue()) {
                LOGGER.error("Message is not processed successfully by instance: {}.", message.getMsgSrc());
                z = false;
            }
        }
        if (!z) {
            throw new VeniceException("Some of storage node did not process message successfully. Message:" + statusMessage);
        }
        LOGGER.info("{} messages have been send and processed. Message: {}.", Integer.valueOf(sendAndWait), statusMessage);
    }

    @Override // com.linkedin.venice.status.StatusMessageChannel
    public <T extends StatusMessage> void registerHandler(Class<T> cls, StatusMessageHandler<T> statusMessageHandler) {
        if (this.handlers.containsKey(cls.getName())) {
            throw new VeniceException("Handler already exists for message type:" + cls.getName());
        }
        this.handlers.put(cls.getName(), statusMessageHandler);
    }

    @Override // com.linkedin.venice.status.StatusMessageChannel
    public <T extends StatusMessage> void unRegisterHandler(Class<T> cls, StatusMessageHandler<T> statusMessageHandler) {
        if (!this.handlers.containsKey(cls.getName())) {
            LOGGER.info("Can not find any handler for given message type: {}.", cls.toGenericString());
        } else {
            if (!statusMessageHandler.equals(this.handlers.get(cls.getName()))) {
                throw new VeniceException("Handler is different from the registered one. Message type:" + cls.getName());
            }
            this.handlers.remove(cls.getName());
        }
    }

    protected StatusMessage convertHelixMessageToVeniceMessage(Message message) {
        String simpleField = message.getRecord().getSimpleField(VENICE_MESSAGE_CLASS);
        Map mapField = message.getRecord().getMapField(VENICE_MESSAGE_FIELD);
        if (StoreStatusMessage.class.getName().equals(simpleField)) {
            return new StoreStatusMessage(mapField);
        }
        if (KillOfflinePushMessage.class.getName().equals(simpleField)) {
            return new KillOfflinePushMessage((Map<String, String>) mapField);
        }
        throw new VeniceException("Message handler not implemented yet for class." + simpleField);
    }

    protected Message convertVeniceMessageToHelixMessage(StatusMessage statusMessage) {
        Message message = new Message(HELIX_MESSAGE_TYPE, statusMessage.getMessageId());
        message.getRecord().setMapField(VENICE_MESSAGE_FIELD, statusMessage.getFields());
        message.getRecord().setSimpleField(VENICE_MESSAGE_CLASS, statusMessage.getClass().getName());
        return message;
    }

    protected <T extends StatusMessage> StatusMessageHandler getHandler(Class<T> cls) {
        StatusMessageHandler statusMessageHandler = this.handlers.get(cls.getName());
        if (statusMessageHandler == null) {
            throw new VeniceException("No handler for this type of message:" + cls.getName());
        }
        return statusMessageHandler;
    }
}
