package org.apache.rocketmq.broker;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.plugin.MessageStoreFactory;
import org.apache.rocketmq.broker.plugin.MessageStorePluginContext;
import org.apache.rocketmq.broker.processor.AdminBrokerProcessor;
import org.apache.rocketmq.broker.processor.ClientManageProcessor;
import org.apache.rocketmq.broker.processor.ConsumerManageProcessor;
import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
import org.apache.rocketmq.broker.processor.PullMessageProcessor;
import org.apache.rocketmq.broker.processor.QueryMessageProcessor;
import org.apache.rocketmq.broker.processor.SendMessageProcessor;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
import org.apache.rocketmq.broker.util.ServiceProvider;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.stats.MomentStatsItem;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.srvutil.FileWatchService;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;

/* loaded from: input_file:org/apache/rocketmq/broker/BrokerController.class */
public class BrokerController {
    private static final InternalLogger log = InternalLoggerFactory.getLogger("RocketmqBroker");
    private static final InternalLogger LOG_PROTECTION = InternalLoggerFactory.getLogger("RocketmqProtection");
    private static final InternalLogger LOG_WATER_MARK = InternalLoggerFactory.getLogger("RocketmqWaterMark");
    private final BrokerConfig brokerConfig;
    private final NettyServerConfig nettyServerConfig;
    private final NettyClientConfig nettyClientConfig;
    private final MessageStoreConfig messageStoreConfig;
    private final BrokerOuterAPI brokerOuterAPI;
    private final BlockingQueue<Runnable> sendThreadPoolQueue;
    private final BlockingQueue<Runnable> pullThreadPoolQueue;
    private final BlockingQueue<Runnable> queryThreadPoolQueue;
    private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
    private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
    private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
    private final BrokerStatsManager brokerStatsManager;
    private MessageStore messageStore;
    private RemotingServer remotingServer;
    private RemotingServer fastRemotingServer;
    private ExecutorService sendMessageExecutor;
    private ExecutorService pullMessageExecutor;
    private ExecutorService queryMessageExecutor;
    private ExecutorService adminBrokerExecutor;
    private ExecutorService clientManageExecutor;
    private ExecutorService heartbeatExecutor;
    private ExecutorService consumerManageExecutor;
    private BrokerStats brokerStats;
    private InetSocketAddress storeHost;
    private BrokerFastFailure brokerFastFailure;
    private Configuration configuration;
    private FileWatchService fileWatchService;
    private TransactionalMessageCheckService transactionalMessageCheckService;
    private TransactionalMessageService transactionalMessageService;
    private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
    private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("BrokerControllerScheduledThread"));
    private final List<SendMessageHook> sendMessageHookList = new ArrayList();
    private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList();
    private boolean updateMasterHAServerAddrPeriodically = false;
    private final ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(this);
    private TopicConfigManager topicConfigManager = new TopicConfigManager(this);
    private final PullMessageProcessor pullMessageProcessor = new PullMessageProcessor(this);
    private final PullRequestHoldService pullRequestHoldService = new PullRequestHoldService(this);
    private final MessageArrivingListener messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
    private final ConsumerIdsChangeListener consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
    private final ConsumerManager consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
    private final ConsumerFilterManager consumerFilterManager = new ConsumerFilterManager(this);
    private final ProducerManager producerManager = new ProducerManager();
    private final ClientHousekeepingService clientHousekeepingService = new ClientHousekeepingService(this);
    private final Broker2Client broker2Client = new Broker2Client(this);
    private final SubscriptionGroupManager subscriptionGroupManager = new SubscriptionGroupManager(this);
    private final FilterServerManager filterServerManager = new FilterServerManager(this);
    private final SlaveSynchronize slaveSynchronize = new SlaveSynchronize(this);

    public BrokerController(BrokerConfig brokerConfig, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig, MessageStoreConfig messageStoreConfig) {
        this.brokerConfig = brokerConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.nettyClientConfig = nettyClientConfig;
        this.messageStoreConfig = messageStoreConfig;
        this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
        this.sendThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getSendThreadPoolQueueCapacity());
        this.pullThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getPullThreadPoolQueueCapacity());
        this.queryThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getQueryThreadPoolQueueCapacity());
        this.clientManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
        this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
        this.heartbeatThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
        this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
        setStoreHost(new InetSocketAddress(getBrokerConfig().getBrokerIP1(), getNettyServerConfig().getListenPort()));
        this.brokerFastFailure = new BrokerFastFailure(this);
        this.configuration = new Configuration(log, BrokerPathConfigHelper.getBrokerConfigPath(), new Object[]{this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig});
    }

    public BrokerConfig getBrokerConfig() {
        return this.brokerConfig;
    }

    public NettyServerConfig getNettyServerConfig() {
        return this.nettyServerConfig;
    }

    public BlockingQueue<Runnable> getPullThreadPoolQueue() {
        return this.pullThreadPoolQueue;
    }

    public BlockingQueue<Runnable> getQueryThreadPoolQueue() {
        return this.queryThreadPoolQueue;
    }

    public boolean initialize() throws CloneNotSupportedException {
        boolean z = ((this.topicConfigManager.load() && this.consumerOffsetManager.load()) && this.subscriptionGroupManager.load()) && this.consumerFilterManager.load();
        if (z) {
            try {
                this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig);
                this.brokerStats = new BrokerStats(this.messageStore);
                this.messageStore = MessageStoreFactory.build(new MessageStorePluginContext(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig), this.messageStore);
                this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
            } catch (IOException e) {
                z = false;
                log.error("Failed to initialize", e);
            }
        }
        boolean z2 = z && this.messageStore.load();
        if (z2) {
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
            NettyServerConfig nettyServerConfig = (NettyServerConfig) this.nettyServerConfig.clone();
            nettyServerConfig.setListenPort(this.nettyServerConfig.getListenPort() - 2);
            this.fastRemotingServer = new NettyRemotingServer(nettyServerConfig, this.clientHousekeepingService);
            this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getSendMessageThreadPoolNums(), this.brokerConfig.getSendMessageThreadPoolNums(), 60000L, TimeUnit.MILLISECONDS, this.sendThreadPoolQueue, (ThreadFactory) new ThreadFactoryImpl("SendMessageThread_"));
            this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getPullMessageThreadPoolNums(), this.brokerConfig.getPullMessageThreadPoolNums(), 60000L, TimeUnit.MILLISECONDS, this.pullThreadPoolQueue, (ThreadFactory) new ThreadFactoryImpl("PullMessageThread_"));
            this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getQueryMessageThreadPoolNums(), this.brokerConfig.getQueryMessageThreadPoolNums(), 60000L, TimeUnit.MILLISECONDS, this.queryThreadPoolQueue, (ThreadFactory) new ThreadFactoryImpl("QueryMessageThread_"));
            this.adminBrokerExecutor = Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl("AdminBrokerThread_"));
            this.clientManageExecutor = new ThreadPoolExecutor(this.brokerConfig.getClientManageThreadPoolNums(), this.brokerConfig.getClientManageThreadPoolNums(), 60000L, TimeUnit.MILLISECONDS, this.clientManagerThreadPoolQueue, (ThreadFactory) new ThreadFactoryImpl("ClientManageThread_"));
            this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getHeartbeatThreadPoolNums(), this.brokerConfig.getHeartbeatThreadPoolNums(), 60000L, TimeUnit.MILLISECONDS, this.heartbeatThreadPoolQueue, (ThreadFactory) new ThreadFactoryImpl("HeartbeatThread_", true));
            this.consumerManageExecutor = Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl("ConsumerManageThread_"));
            registerProcessor();
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: org.apache.rocketmq.broker.BrokerController.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        BrokerController.this.getBrokerStats().record();
                    } catch (Throwable th) {
                        BrokerController.log.error("schedule record error.", th);
                    }
                }
            }, UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis(), 86400000L, TimeUnit.MILLISECONDS);
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: org.apache.rocketmq.broker.BrokerController.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        BrokerController.this.consumerOffsetManager.persist();
                    } catch (Throwable th) {
                        BrokerController.log.error("schedule persist consumerOffset error.", th);
                    }
                }
            }, 10000L, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: org.apache.rocketmq.broker.BrokerController.3
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        BrokerController.this.consumerFilterManager.persist();
                    } catch (Throwable th) {
                        BrokerController.log.error("schedule persist consumer filter error.", th);
                    }
                }
            }, 10000L, 10000L, TimeUnit.MILLISECONDS);
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: org.apache.rocketmq.broker.BrokerController.4
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        BrokerController.this.protectBroker();
                    } catch (Throwable th) {
                        BrokerController.log.error("protectBroker error.", th);
                    }
                }
            }, 3L, 3L, TimeUnit.MINUTES);
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: org.apache.rocketmq.broker.BrokerController.5
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        BrokerController.this.printWaterMark();
                    } catch (Throwable th) {
                        BrokerController.log.error("printWaterMark error.", th);
                    }
                }
            }, 10L, 1L, TimeUnit.SECONDS);
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: org.apache.rocketmq.broker.BrokerController.6
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        BrokerController.log.info("dispatch behind commit log {} bytes", Long.valueOf(BrokerController.this.getMessageStore().dispatchBehindBytes()));
                    } catch (Throwable th) {
                        BrokerController.log.error("schedule dispatchBehindBytes error.", th);
                    }
                }
            }, 10000L, 60000L, TimeUnit.MILLISECONDS);
            if (this.brokerConfig.getNamesrvAddr() != null) {
                this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
                log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
            } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: org.apache.rocketmq.broker.BrokerController.7
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                        } catch (Throwable th) {
                            BrokerController.log.error("ScheduledTask fetchNameServerAddr exception", th);
                        }
                    }
                }, 10000L, 120000L, TimeUnit.MILLISECONDS);
            }
            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                if (this.messageStoreConfig.getHaMasterAddress() == null || this.messageStoreConfig.getHaMasterAddress().length() < 6) {
                    this.updateMasterHAServerAddrPeriodically = true;
                } else {
                    this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                    this.updateMasterHAServerAddrPeriodically = false;
                }
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: org.apache.rocketmq.broker.BrokerController.8
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            BrokerController.this.slaveSynchronize.syncAll();
                        } catch (Throwable th) {
                            BrokerController.log.error("ScheduledTask syncAll slave exception", th);
                        }
                    }
                }, 10000L, 60000L, TimeUnit.MILLISECONDS);
            } else {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: org.apache.rocketmq.broker.BrokerController.9
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            BrokerController.this.printMasterAndSlaveDiff();
                        } catch (Throwable th) {
                            BrokerController.log.error("schedule printMasterAndSlaveDiff error.", th);
                        }
                    }
                }, 10000L, 60000L, TimeUnit.MILLISECONDS);
            }
            if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
                try {
                    this.fileWatchService = new FileWatchService(new String[]{TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath}, new FileWatchService.Listener() { // from class: org.apache.rocketmq.broker.BrokerController.10
                        boolean certChanged;
                        boolean keyChanged = false;

                        public void onChanged(String str) {
                            if (str.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                BrokerController.log.info("The trust certificate changed, reload the ssl context");
                                reloadServerSslContext();
                            }
                            if (str.equals(TlsSystemConfig.tlsServerCertPath)) {
                                this.certChanged = true;
                            }
                            if (str.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                this.keyChanged = true;
                            }
                            if (this.certChanged && this.keyChanged) {
                                BrokerController.log.info("The certificate and private key changed, reload the ssl context");
                                this.keyChanged = false;
                                this.certChanged = false;
                                reloadServerSslContext();
                            }
                        }

                        private void reloadServerSslContext() {
                            BrokerController.this.remotingServer.loadSslContext();
                            BrokerController.this.fastRemotingServer.loadSslContext();
                        }
                    });
                } catch (Exception e2) {
                    log.warn("FileWatchService created error, can't load the certificate dynamically");
                }
            }
            initialTransaction();
        }
        return z2;
    }

    private void initialTransaction() {
        this.transactionalMessageService = (TransactionalMessageService) ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
        if (null == this.transactionalMessageService) {
            this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, getMessageStore()));
            log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
        }
        this.transactionalMessageCheckListener = (AbstractTransactionalMessageCheckListener) ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
        if (null == this.transactionalMessageCheckListener) {
            this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
            log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
        }
        this.transactionalMessageCheckListener.setBrokerController(this);
        this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
    }

    public void registerProcessor() {
        SendMessageProcessor sendMessageProcessor = new SendMessageProcessor(this);
        sendMessageProcessor.registerSendMessageHook(this.sendMessageHookList);
        sendMessageProcessor.registerConsumeMessageHook(this.consumeMessageHookList);
        this.remotingServer.registerProcessor(10, sendMessageProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(310, sendMessageProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(320, sendMessageProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(36, sendMessageProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(10, sendMessageProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(310, sendMessageProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(320, sendMessageProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(36, sendMessageProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(11, this.pullMessageProcessor, this.pullMessageExecutor);
        this.pullMessageProcessor.registerConsumeMessageHook(this.consumeMessageHookList);
        QueryMessageProcessor queryMessageProcessor = new QueryMessageProcessor(this);
        this.remotingServer.registerProcessor(12, queryMessageProcessor, this.queryMessageExecutor);
        this.remotingServer.registerProcessor(33, queryMessageProcessor, this.queryMessageExecutor);
        this.fastRemotingServer.registerProcessor(12, queryMessageProcessor, this.queryMessageExecutor);
        this.fastRemotingServer.registerProcessor(33, queryMessageProcessor, this.queryMessageExecutor);
        ClientManageProcessor clientManageProcessor = new ClientManageProcessor(this);
        this.remotingServer.registerProcessor(34, clientManageProcessor, this.heartbeatExecutor);
        this.remotingServer.registerProcessor(35, clientManageProcessor, this.clientManageExecutor);
        this.remotingServer.registerProcessor(46, clientManageProcessor, this.clientManageExecutor);
        this.fastRemotingServer.registerProcessor(34, clientManageProcessor, this.heartbeatExecutor);
        this.fastRemotingServer.registerProcessor(35, clientManageProcessor, this.clientManageExecutor);
        this.fastRemotingServer.registerProcessor(46, clientManageProcessor, this.clientManageExecutor);
        ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
        this.remotingServer.registerProcessor(38, consumerManageProcessor, this.consumerManageExecutor);
        this.remotingServer.registerProcessor(15, consumerManageProcessor, this.consumerManageExecutor);
        this.remotingServer.registerProcessor(14, consumerManageProcessor, this.consumerManageExecutor);
        this.fastRemotingServer.registerProcessor(38, consumerManageProcessor, this.consumerManageExecutor);
        this.fastRemotingServer.registerProcessor(15, consumerManageProcessor, this.consumerManageExecutor);
        this.fastRemotingServer.registerProcessor(14, consumerManageProcessor, this.consumerManageExecutor);
        this.remotingServer.registerProcessor(37, new EndTransactionProcessor(this), this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(37, new EndTransactionProcessor(this), this.sendMessageExecutor);
        AdminBrokerProcessor adminBrokerProcessor = new AdminBrokerProcessor(this);
        this.remotingServer.registerDefaultProcessor(adminBrokerProcessor, this.adminBrokerExecutor);
        this.fastRemotingServer.registerDefaultProcessor(adminBrokerProcessor, this.adminBrokerExecutor);
    }

    public BrokerStats getBrokerStats() {
        return this.brokerStats;
    }

    public void setBrokerStats(BrokerStats brokerStats) {
        this.brokerStats = brokerStats;
    }

    public void protectBroker() {
        if (this.brokerConfig.isDisableConsumeIfConsumerReadSlowly()) {
            for (Map.Entry entry : this.brokerStatsManager.getMomentStatsItemSetFallSize().getStatsItemTable().entrySet()) {
                long j = ((MomentStatsItem) entry.getValue()).getValue().get();
                if (j > this.brokerConfig.getConsumerFallbehindThreshold()) {
                    String str = ((MomentStatsItem) entry.getValue()).getStatsKey().split("@")[2];
                    LOG_PROTECTION.info("[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it", str, Long.valueOf(j));
                    this.subscriptionGroupManager.disableConsume(str);
                }
            }
        }
    }

    public long headSlowTimeMills(BlockingQueue<Runnable> blockingQueue) {
        long j = 0;
        Runnable peek = blockingQueue.peek();
        if (peek != null) {
            RequestTask castRunnable = BrokerFastFailure.castRunnable(peek);
            j = castRunnable == null ? 0L : this.messageStore.now() - castRunnable.getCreateTimestamp();
        }
        if (j < 0) {
            j = 0;
        }
        return j;
    }

    public long headSlowTimeMills4SendThreadPoolQueue() {
        return headSlowTimeMills(this.sendThreadPoolQueue);
    }

    public long headSlowTimeMills4PullThreadPoolQueue() {
        return headSlowTimeMills(this.pullThreadPoolQueue);
    }

    public long headSlowTimeMills4QueryThreadPoolQueue() {
        return headSlowTimeMills(this.queryThreadPoolQueue);
    }

    public void printWaterMark() {
        LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", Integer.valueOf(this.sendThreadPoolQueue.size()), Long.valueOf(headSlowTimeMills4SendThreadPoolQueue()));
        LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", Integer.valueOf(this.pullThreadPoolQueue.size()), Long.valueOf(headSlowTimeMills4PullThreadPoolQueue()));
        LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", Integer.valueOf(this.queryThreadPoolQueue.size()), Long.valueOf(headSlowTimeMills4QueryThreadPoolQueue()));
    }

    public MessageStore getMessageStore() {
        return this.messageStore;
    }

    public void setMessageStore(MessageStore messageStore) {
        this.messageStore = messageStore;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void printMasterAndSlaveDiff() {
        log.info("Slave fall behind master: {} bytes", Long.valueOf(this.messageStore.slaveFallBehindMuch()));
    }

    public Broker2Client getBroker2Client() {
        return this.broker2Client;
    }

    public ConsumerManager getConsumerManager() {
        return this.consumerManager;
    }

    public ConsumerFilterManager getConsumerFilterManager() {
        return this.consumerFilterManager;
    }

    public ConsumerOffsetManager getConsumerOffsetManager() {
        return this.consumerOffsetManager;
    }

    public MessageStoreConfig getMessageStoreConfig() {
        return this.messageStoreConfig;
    }

    public ProducerManager getProducerManager() {
        return this.producerManager;
    }

    public void setFastRemotingServer(RemotingServer remotingServer) {
        this.fastRemotingServer = remotingServer;
    }

    public PullMessageProcessor getPullMessageProcessor() {
        return this.pullMessageProcessor;
    }

    public PullRequestHoldService getPullRequestHoldService() {
        return this.pullRequestHoldService;
    }

    public SubscriptionGroupManager getSubscriptionGroupManager() {
        return this.subscriptionGroupManager;
    }

    public void shutdown() {
        if (this.brokerStatsManager != null) {
            this.brokerStatsManager.shutdown();
        }
        if (this.clientHousekeepingService != null) {
            this.clientHousekeepingService.shutdown();
        }
        if (this.pullRequestHoldService != null) {
            this.pullRequestHoldService.shutdown();
        }
        if (this.remotingServer != null) {
            this.remotingServer.shutdown();
        }
        if (this.fastRemotingServer != null) {
            this.fastRemotingServer.shutdown();
        }
        if (this.fileWatchService != null) {
            this.fileWatchService.shutdown();
        }
        if (this.messageStore != null) {
            this.messageStore.shutdown();
        }
        this.scheduledExecutorService.shutdown();
        try {
            this.scheduledExecutorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        }
        unregisterBrokerAll();
        if (this.sendMessageExecutor != null) {
            this.sendMessageExecutor.shutdown();
        }
        if (this.pullMessageExecutor != null) {
            this.pullMessageExecutor.shutdown();
        }
        if (this.adminBrokerExecutor != null) {
            this.adminBrokerExecutor.shutdown();
        }
        if (this.brokerOuterAPI != null) {
            this.brokerOuterAPI.shutdown();
        }
        this.consumerOffsetManager.persist();
        if (this.filterServerManager != null) {
            this.filterServerManager.shutdown();
        }
        if (this.brokerFastFailure != null) {
            this.brokerFastFailure.shutdown();
        }
        if (this.consumerFilterManager != null) {
            this.consumerFilterManager.persist();
        }
        if (this.clientManageExecutor != null) {
            this.clientManageExecutor.shutdown();
        }
        if (this.queryMessageExecutor != null) {
            this.queryMessageExecutor.shutdown();
        }
        if (this.consumerManageExecutor != null) {
            this.consumerManageExecutor.shutdown();
        }
        if (this.fileWatchService != null) {
            this.fileWatchService.shutdown();
        }
    }

    private void unregisterBrokerAll() {
        this.brokerOuterAPI.unregisterBrokerAll(this.brokerConfig.getBrokerClusterName(), getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId());
    }

    public String getBrokerAddr() {
        return this.brokerConfig.getBrokerIP1() + ":" + this.nettyServerConfig.getListenPort();
    }

    public void start() throws Exception {
        if (this.messageStore != null) {
            this.messageStore.start();
        }
        if (this.remotingServer != null) {
            this.remotingServer.start();
        }
        if (this.fastRemotingServer != null) {
            this.fastRemotingServer.start();
        }
        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
        if (this.brokerOuterAPI != null) {
            this.brokerOuterAPI.start();
        }
        if (this.pullRequestHoldService != null) {
            this.pullRequestHoldService.start();
        }
        if (this.clientHousekeepingService != null) {
            this.clientHousekeepingService.start();
        }
        if (this.filterServerManager != null) {
            this.filterServerManager.start();
        }
        registerBrokerAll(true, false, true);
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: org.apache.rocketmq.broker.BrokerController.11
            @Override // java.lang.Runnable
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, BrokerController.this.brokerConfig.isForceRegister());
                } catch (Throwable th) {
                    BrokerController.log.error("registerBrokerAll Exception", th);
                }
            }
        }, 10000L, Math.max(10000, Math.min(this.brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
        if (this.brokerStatsManager != null) {
            this.brokerStatsManager.start();
        }
        if (this.brokerFastFailure != null) {
            this.brokerFastFailure.start();
        }
        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole() || this.transactionalMessageCheckService == null) {
            return;
        }
        log.info("Start transaction service!");
        this.transactionalMessageCheckService.start();
    }

    public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
        TopicConfig topicConfig2 = topicConfig;
        if (!PermName.isWriteable(getBrokerConfig().getBrokerPermission()) || !PermName.isReadable(getBrokerConfig().getBrokerPermission())) {
            topicConfig2 = new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), this.brokerConfig.getBrokerPermission());
        }
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        concurrentHashMap.put(topicConfig.getTopicName(), topicConfig2);
        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
        topicConfigSerializeWrapper.setDataVersion(dataVersion);
        topicConfigSerializeWrapper.setTopicConfigTable(concurrentHashMap);
        doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
    }

    public synchronized void registerBrokerAll(boolean z, boolean z2, boolean z3) {
        TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper = getTopicConfigManager().buildTopicConfigSerializeWrapper();
        if (!PermName.isWriteable(getBrokerConfig().getBrokerPermission()) || !PermName.isReadable(getBrokerConfig().getBrokerPermission())) {
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            for (TopicConfig topicConfig : buildTopicConfigSerializeWrapper.getTopicConfigTable().values()) {
                concurrentHashMap.put(topicConfig.getTopicName(), new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), this.brokerConfig.getBrokerPermission()));
            }
            buildTopicConfigSerializeWrapper.setTopicConfigTable(concurrentHashMap);
        }
        if (z3 || needRegister(this.brokerConfig.getBrokerClusterName(), getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.brokerConfig.getRegisterBrokerTimeoutMills())) {
            doRegisterBrokerAll(z, z2, buildTopicConfigSerializeWrapper);
        }
    }

    private void doRegisterBrokerAll(boolean z, boolean z2, TopicConfigSerializeWrapper topicConfigSerializeWrapper) {
        RegisterBrokerResult registerBrokerResult;
        List<RegisterBrokerResult> registerBrokerAll = this.brokerOuterAPI.registerBrokerAll(this.brokerConfig.getBrokerClusterName(), getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), getHAServerAddr(), topicConfigSerializeWrapper, this.filterServerManager.buildNewFilterServerList(), z2, this.brokerConfig.getRegisterBrokerTimeoutMills(), this.brokerConfig.isCompressedRegister());
        if (registerBrokerAll.size() <= 0 || (registerBrokerResult = registerBrokerAll.get(0)) == null) {
            return;
        }
        if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
            this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
        }
        this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
        if (z) {
            getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
        }
    }

    private boolean needRegister(String str, String str2, String str3, long j, int i) {
        boolean z = false;
        Iterator<Boolean> it = this.brokerOuterAPI.needRegister(str, str2, str3, j, getTopicConfigManager().buildTopicConfigSerializeWrapper(), i).iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (it.next().booleanValue()) {
                z = true;
                break;
            }
        }
        return z;
    }

    public TopicConfigManager getTopicConfigManager() {
        return this.topicConfigManager;
    }

    public void setTopicConfigManager(TopicConfigManager topicConfigManager) {
        this.topicConfigManager = topicConfigManager;
    }

    public String getHAServerAddr() {
        return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort();
    }

    public RebalanceLockManager getRebalanceLockManager() {
        return this.rebalanceLockManager;
    }

    public SlaveSynchronize getSlaveSynchronize() {
        return this.slaveSynchronize;
    }

    public ExecutorService getPullMessageExecutor() {
        return this.pullMessageExecutor;
    }

    public void setPullMessageExecutor(ExecutorService executorService) {
        this.pullMessageExecutor = executorService;
    }

    public BlockingQueue<Runnable> getSendThreadPoolQueue() {
        return this.sendThreadPoolQueue;
    }

    public FilterServerManager getFilterServerManager() {
        return this.filterServerManager;
    }

    public BrokerStatsManager getBrokerStatsManager() {
        return this.brokerStatsManager;
    }

    public List<SendMessageHook> getSendMessageHookList() {
        return this.sendMessageHookList;
    }

    public void registerSendMessageHook(SendMessageHook sendMessageHook) {
        this.sendMessageHookList.add(sendMessageHook);
        log.info("register SendMessageHook Hook, {}", sendMessageHook.hookName());
    }

    public List<ConsumeMessageHook> getConsumeMessageHookList() {
        return this.consumeMessageHookList;
    }

    public void registerConsumeMessageHook(ConsumeMessageHook consumeMessageHook) {
        this.consumeMessageHookList.add(consumeMessageHook);
        log.info("register ConsumeMessageHook Hook, {}", consumeMessageHook.hookName());
    }

    public void registerServerRPCHook(RPCHook rPCHook) {
        getRemotingServer().registerRPCHook(rPCHook);
    }

    public RemotingServer getRemotingServer() {
        return this.remotingServer;
    }

    public void setRemotingServer(RemotingServer remotingServer) {
        this.remotingServer = remotingServer;
    }

    public void registerClientRPCHook(RPCHook rPCHook) {
        getBrokerOuterAPI().registerRPCHook(rPCHook);
    }

    public BrokerOuterAPI getBrokerOuterAPI() {
        return this.brokerOuterAPI;
    }

    public InetSocketAddress getStoreHost() {
        return this.storeHost;
    }

    public void setStoreHost(InetSocketAddress inetSocketAddress) {
        this.storeHost = inetSocketAddress;
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    public BlockingQueue<Runnable> getHeartbeatThreadPoolQueue() {
        return this.heartbeatThreadPoolQueue;
    }

    public TransactionalMessageCheckService getTransactionalMessageCheckService() {
        return this.transactionalMessageCheckService;
    }

    public void setTransactionalMessageCheckService(TransactionalMessageCheckService transactionalMessageCheckService) {
        this.transactionalMessageCheckService = transactionalMessageCheckService;
    }

    public TransactionalMessageService getTransactionalMessageService() {
        return this.transactionalMessageService;
    }

    public void setTransactionalMessageService(TransactionalMessageService transactionalMessageService) {
        this.transactionalMessageService = transactionalMessageService;
    }

    public AbstractTransactionalMessageCheckListener getTransactionalMessageCheckListener() {
        return this.transactionalMessageCheckListener;
    }

    public void setTransactionalMessageCheckListener(AbstractTransactionalMessageCheckListener abstractTransactionalMessageCheckListener) {
        this.transactionalMessageCheckListener = abstractTransactionalMessageCheckListener;
    }
}
