RocketMQ broker启动流程是什么

其他教程   发布日期:2023年10月17日   浏览次数:345

这篇文章主要介绍“RocketMQ broker启动流程是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“RocketMQ broker启动流程是什么”文章能帮助大家解决问题。

    1. 启动入口

    本系列RocketMQ4.8注释github地址,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈

    前面我们已经分析完了

    1. NameServer
    1. producer
    ,从本文开始,我们将分析
    1. Broker

    1. broker
    的启动类为
    1. org.apache.rocketmq.broker.BrokerStartup
    ,代码如下:
    1. public class BrokerStartup {
    2. ...
    3. public static void main(String[] args) {
    4. start(createBrokerController(args));
    5. }
    6. ...
    7. }

    1. main()
    方法中,仅有一行代码,这行代码包含了两个操作:
      1. createBrokerController(...)
      :创建
      1. BrokerController
      1. start(...)
      :启动
      1. Broker

    接下来我们就来分析这两个操作。

    2. 创建BrokerController

    创建

    1. BrokerController
    的方法为
    1. BrokerStartup#createBrokerController
    ,代码如下:
    1. /**
    2. * 创建 broker 的配置参数
    3. */
    4. public static BrokerController createBrokerController(String[] args) {
    5. ...
    6. try {
    7. //解析命令行参数
    8. Options options = ServerUtil.buildCommandlineOptions(new Options());
    9. commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
    10. new PosixParser());
    11. if (null == commandLine) {
    12. System.exit(-1);
    13. }
    14. // 处理配置
    15. final BrokerConfig brokerConfig = new BrokerConfig();
    16. final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    17. final NettyClientConfig nettyClientConfig = new NettyClientConfig();
    18. // tls安全相关
    19. nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
    20. String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
    21. // 配置端口
    22. nettyServerConfig.setListenPort(10911);
    23. // 消息存储的配置
    24. final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
    25. ...
    26. // 将命令行中的配置设置到brokerConfig对象中
    27. MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
    28. // 检查环境变量:ROCKETMQ_HOME
    29. if (null == brokerConfig.getRocketmqHome()) {
    30. System.out.printf("Please set the %s variable in your environment to match
    31. the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
    32. System.exit(-2);
    33. }
    34. //省略一些配置
    35. ...
    36. // 创建 brokerController
    37. final BrokerController controller = new BrokerController(
    38. brokerConfig,
    39. nettyServerConfig,
    40. nettyClientConfig,
    41. messageStoreConfig);
    42. controller.getConfiguration().registerConfig(properties);
    43. // 初始化
    44. boolean initResult = controller.initialize();
    45. if (!initResult) {
    46. controller.shutdown();
    47. System.exit(-3);
    48. }
    49. // 关闭钩子,在关闭前处理一些操作
    50. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
    51. private volatile boolean hasShutdown = false;
    52. private AtomicInteger shutdownTimes = new AtomicInteger(0);
    53. @Override
    54. public void run() {
    55. synchronized (this) {
    56. if (!this.hasShutdown) {
    57. ...
    58. // 这里会发一条注销消息给nameServer
    59. controller.shutdown();
    60. ...
    61. }
    62. }
    63. }
    64. }, "ShutdownHook"));
    65. return controller;
    66. } catch (Throwable e) {
    67. e.printStackTrace();
    68. System.exit(-1);
    69. }
    70. return null;
    71. }

    这个方法的代码有点长,但功能并不多,总的来说就三个功能:

    • 处理配置:主要是处理

      1. nettyServerConfig
      1. nettyClientConfig
      的配置,这块就是一些配置解析的操作,处理方式与
      1. NameServer
      很类似,这里就不多说了。
    • 创建及初始化

      1. controller
      :调用方法
      1. controller.initialize()
      ,这块内容我们后面分析。
    • 注册关闭钩子:调用

      1. Runtime.getRuntime().addShutdownHook(...)
      ,可以在jvm进程关闭前进行一些操作。

    2.1 controller实例化

    1. BrokerController
    的创建及初始化是在
    1. BrokerStartup#createBrokerController
    方法中进行,我们先来看看它的构造方法:
    1. public BrokerController(
    2. final BrokerConfig brokerConfig,
    3. final NettyServerConfig nettyServerConfig,
    4. final NettyClientConfig nettyClientConfig,
    5. final MessageStoreConfig messageStoreConfig
    6. ) {
    7. // 4个核心配置信息
    8. this.brokerConfig = brokerConfig;
    9. this.nettyServerConfig = nettyServerConfig;
    10. this.nettyClientConfig = nettyClientConfig;
    11. this.messageStoreConfig = messageStoreConfig;
    12. // 管理consumer消费消息的offset
    13. this.consumerOffsetManager = new ConsumerOffsetManager(this);
    14. // 管理topic配置
    15. this.topicConfigManager = new TopicConfigManager(this);
    16. // 处理 consumer 拉消息请求的
    17. this.pullMessageProcessor = new PullMessageProcessor(this);
    18. this.pullRequestHoldService = new PullRequestHoldService(this);
    19. // 消息送达的监听器
    20. this.messageArrivingListener
    21. = new NotifyMessageArrivingListener(this.pullRequestHoldService);
    22. ...
    23. // 往外发消息的组件
    24. this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
    25. ...
    26. }

    1. BrokerController
    的构造方法很长,基本都是一些赋值操作,代码中已列出关键项,这些包括:
    • 核心配置赋值:主要是

      1. brokerConfig
      /
      1. nettyServerConfig
      /
      1. nettyClientConfig
      /
      1. messageStoreConfig
      四个配置
      1. ConsumerOffsetManager
      :管理
      1. consumer
      消费消息位置的偏移量,偏移量表示消费者组消费该
      1. topic
      消息的位置,后面再消费时,就从该位置后消费,避免重复消费消息,也避免了漏消费消息。
      1. topicConfigManager
      1. topic
      配置管理器,就是用来管理
      1. topic
      配置的,如
      1. topic
      名称,
      1. topic
      队列数量
      1. pullMessageProcessor
      :消息处理器,用来处理消费者拉消息
      1. messageArrivingListener
      :消息送达的监听器,当生产者的消息送达时,由该监听器监听
      1. brokerOuterAPI
      :往外发消息的组件,如向
      1. NameServer
      发送注册/注销消息

    以上这些组件的用处,这里先混个脸熟,我们后面再分析。

    2.2 初始化controller

    我们再来看看初始化操作,方法为

    1. BrokerController#initialize
    1. public boolean initialize() throws CloneNotSupportedException {
    2. // 加载配置文件中的配置
    3. boolean result = this.topicConfigManager.load();
    4. result = result && this.consumerOffsetManager.load();
    5. result = result && this.subscriptionGroupManager.load();
    6. result = result && this.consumerFilterManager.load();
    7. if (result) {
    8. try {
    9. // 消息存储管理组件,管理磁盘上的消息
    10. this.messageStore =
    11. new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager,
    12. this.messageArrivingListener, this.brokerConfig);
    13. // 启用了DLeger,就创建DLeger相关组件
    14. if (messageStoreConfig.isEnableDLegerCommitLog()) {
    15. ...
    16. }
    17. // broker统计组件
    18. this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
    19. //load plugin
    20. MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig,
    21. brokerStatsManager, messageArrivingListener, brokerConfig);
    22. this.messageStore = MessageStoreFactory.build(context, this.messageStore);
    23. this.messageStore.getDispatcherList().addFirst(
    24. new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
    25. } catch (IOException e) {
    26. result = false;
    27. log.error("Failed to initialize", e);
    28. }
    29. }
    30. // 加载磁盘上的记录,如commitLog写入的位置、消费者主题/队列的信息
    31. result = result && this.messageStore.load();
    32. if (result) {
    33. // 处理 nettyServer
    34. this.remotingServer = new NettyRemotingServer(
    35. this.nettyServerConfig, this.clientHousekeepingService);
    36. NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
    37. fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
    38. this.fastRemotingServer = new NettyRemotingServer(
    39. fastConfig, this.clientHousekeepingService);
    40. // 创建线程池start... 这里会创建多种类型的线程池
    41. ...
    42. // 处理consumer pull操作的线程池
    43. this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
    44. this.brokerConfig.getPullMessageThreadPoolNums(),
    45. this.brokerConfig.getPullMessageThreadPoolNums(),
    46. 1000 * 60,
    47. TimeUnit.MILLISECONDS,
    48. this.pullThreadPoolQueue,
    49. new ThreadFactoryImpl("PullMessageThread_"));
    50. ...
    51. // 创建线程池end...
    52. // 注册处理器
    53. this.registerProcessor();
    54. // 启动定时任务start... 这里会启动好多的定时任务
    55. ...
    56. // 定时将consumer消费到的offset进行持久化操作,即将数据保存到磁盘上
    57. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    58. @Override
    59. public void run() {
    60. try {
    61. BrokerController.this.consumerOffsetManager.persist();
    62. } catch (Throwable e) {
    63. log.error("schedule persist consumerOffset error.", e);
    64. }
    65. }
    66. }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    67. ...
    68. // 启动定时任务end...
    69. ...
    70. // 开启 DLeger 的一些操作
    71. if (!messageStoreConfig.isEnableDLegerCommitLog()) {
    72. ...
    73. }
    74. // 处理tls配置
    75. if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
    76. ...
    77. }
    78. // 初始化一些操作
    79. initialTransaction();
    80. initialAcl();
    81. initialRpcHooks();
    82. }
    83. return result;
    84. }

    这个还是很长,关键部分都做了注释,该方法所做的工作如下:

    • 加载配置文件中的配置

    • 赋值与初始化操作

    • 创建线程池

    • 注册处理器

    • 启动定时任务

    这里我们来看下注册处理器的操作

    1. this.registerProcessor()
    :

    2.2.1 注册处理器:BrokerController#registerProcessor

    1. this.registerProcessor()
    实际调用的方法是
    1. BrokerController#registerProcessor
    ,代码如下:
    1. public void registerProcessor() {
    2. /**
    3. * SendMessageProcessor
    4. */
    5. SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
    6. sendProcessor.registerSendMessageHook(sendMessageHookList);
    7. sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
    8. this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor,
    9. this.sendMessageExecutor);
    10. this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor,
    11. this.sendMessageExecutor);
    12. this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor,
    13. this.sendMessageExecutor);
    14. this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor,
    15. this.sendMessageExecutor);
    16. ...
    17. /**
    18. * PullMessageProcessor
    19. */
    20. this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor,
    21. this.pullMessageExecutor);
    22. this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
    23. /**
    24. * ReplyMessageProcessor
    25. */
    26. ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
    27. replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
    28. ...
    29. }

    这个方法里注册了许许多多的处理器,这里仅列出了与消息相关的内容,如发送消息、回复消息、拉取消息等,后面在处理

    1. producer
    /
    1. consumer
    的消息时,就会用到这些处理器,这里先不展开分析。

    2.2.2 remotingServer注册处理器:NettyRemotingServer#registerProcessor

    我们来看下

    1. remotingServer
    注册处理器的操作,方法为
    1. NettyRemotingServer#registerProcessor
    1. public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
    2. ...
    3. @Override
    4. public void registerProcessor(int requestCode, NettyRequestProcessor processor,
    5. ExecutorService executor) {
    6. ExecutorService executorThis = executor;
    7. if (null == executor) {
    8. executorThis = this.publicExecutor;
    9. }
    10. Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor,
    11. ExecutorService>(processor, executorThis);
    12. // 注册到processorTable 中
    13. this.processorTable.put(requestCode, pair);
    14. }
    15. ...
    16. }

    最终,这些处理器注册到了

    1. processorTable
    中,它是
    1. NettyRemotingAbstract
    的成员变量,定义如下:
    1. HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>>

    这是一个

    1. hashMap
    的结构,
    1. key
    1. code
    1. value
    1. Pair
    ,该类中有两个成员变量:
    1. NettyRequestProcessor
    1. ExecutorService
    1. code
    1. NettyRequestProcessor
    的映射关系就是在
    1. hashMap
    里存储的。

    2.3 注册关闭钩子:

    1. Runtime.getRuntime().addShutdownHook(...)

    接着我们来看看注册关闭钩子的操作:

    1. // 关闭钩子,在关闭前处理一些操作
    2. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
    3. private volatile boolean hasShutdown = false;
    4. private AtomicInteger shutdownTimes = new AtomicInteger(0);
    5. @Override
    6. public void run() {
    7. synchronized (this) {
    8. if (!this.hasShutdown) {
    9. ...
    10. // 这里会发一条注销消息给nameServer
    11. controller.shutdown();
    12. ...
    13. }
    14. }
    15. }
    16. }, "ShutdownHook"));

    跟进

    1. BrokerController#shutdown
    方法:
    1. public void shutdown() {
    2. // 调用各组件的shutdown方法
    3. ...
    4. // 发送注销消息到NameServer
    5. this.unregisterBrokerAll();
    6. ...
    7. // 持久化consumer的消费偏移量
    8. this.consumerOffsetManager.persist();
    9. // 又是调用各组件的shutdown方法
    10. ...

    这个方法里会调用各组件的

    1. shutdown()
    方法、发送注销消息给
    1. NameServer
    、持久化consumer的消费偏移量,这里我们主要看发送注销消息的方法
    1. BrokerController#unregisterBrokerAll
    :
    1. private void unregisterBrokerAll() {
    2. // 发送一条注销消息给nameServer
    3. this.brokerOuterAPI.unregisterBrokerAll(
    4. this.brokerConfig.getBrokerClusterName(),
    5. this.getBrokerAddr(),
    6. this.brokerConfig.getBrokerName(),
    7. this.brokerConfig.getBrokerId());
    8. }

    继续进入

    1. BrokerOuterAPI#unregisterBrokerAll
    1. public void unregisterBrokerAll(
    2. final String clusterName,
    3. final String brokerAddr,
    4. final String brokerName,
    5. final long brokerId
    6. ) {
    7. // 获取所有的 nameServer,遍历发送注销消息
    8. List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    9. if (nameServerAddressList != null) {
    10. for (String namesrvAddr : nameServerAddressList) {
    11. try {
    12. this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);
    13. log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);
    14. } catch (Exception e) {
    15. log.warn("unregisterBroker Exception, {}", namesrvAddr, e);
    16. }
    17. }
    18. }
    19. }

    这个方法里,会获取到所有的

    1. nameServer
    ,然后逐个发送注销消息,继续进入
    1. BrokerOuterAPI#unregisterBroker
    方法:
    1. public void unregisterBroker(
    2. final String namesrvAddr,
    3. final String clusterName,
    4. final String brokerAddr,
    5. final String brokerName,
    6. final long brokerId
    7. ) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
    8. InterruptedException, MQBrokerException {
    9. UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
    10. requestHeader.setBrokerAddr(brokerAddr);
    11. requestHeader.setBrokerId(brokerId);
    12. requestHeader.setBrokerName(brokerName);
    13. requestHeader.setClusterName(clusterName);
    14. // 发送的注销消息:RequestCode.UNREGISTER_BROKER
    15. RemotingCommand request = RemotingCommand.createRequestCommand(
    16. c, requestHeader);
    17. RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);
    18. assert response != null;
    19. switch (response.getCode()) {
    20. case ResponseCode.SUCCESS: {
    21. return;
    22. }
    23. default:
    24. break;
    25. }
    26. throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
    27. }

    最终调用的是

    1. RemotingClient#invokeSync
    进行消息发送,请求
    1. code
    1. RequestCode.UNREGISTER_BROKER
    ,这就与
    1. NameServer
    接收
    1. broker
    的注销消息对应上了。

    3. 启动Broker:start(...)

    我们再来看看

    1. Broker
    的启动流程,处理方法为
    1. BrokerController#start
    1. public void start() throws Exception {
    2. // 启动各组件
    3. // 启动消息存储相关组件
    4. if (this.messageStore != null) {
    5. this.messageStore.start();
    6. }
    7. // 启动 remotingServer,其实就是启动一个netty服务,用来接收producer传来的消息
    8. if (this.remotingServer != null) {
    9. this.remotingServer.start();
    10. }
    11. ...
    12. // broker对外发放消息的组件,向nameServer上报存活消息时使用了它,也是一个netty服务
    13. if (this.brokerOuterAPI != null) {
    14. this.brokerOuterAPI.start();
    15. }
    16. ...
    17. // broker 核心的心跳注册任务
    18. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    19. @Override
    20. public void run() {
    21. try {
    22. BrokerController.this.registerBrokerAll(true, false,
    23. brokerConfig.isForceRegister());
    24. } catch (Throwable e) {
    25. log.error("registerBrokerAll Exception", e);
    26. }
    27. }
    28. // brokerConfig.getRegisterNameServerPeriod() 值为 1000 * 30,最终计算得到默认30秒执行一次
    29. }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)),
    30. TimeUnit.MILLISECONDS);
    31. ...
    32. }

    这个方法主要就是启动各组件了,这里列出了几大重要组件的启动:

      1. messageStore
      :消息存储组件,在这个组件里,会启动消息存储相关的线程,如消息的投递操作、
      1. commitLog
      文件的
      1. flush
      操作、
      1. comsumeQueue
      文件的
      1. flush
      操作等
      1. remotingServer
      1. netty
      服务,用来接收请求消息,如
      1. producer
      发送过来的消息
      1. brokerOuterAPI
      :也是一个
      1. netty
      服务,用来对外发送消息,如向
      1. nameServer
      上报心跳消息
    • 启动定时任务:

      1. broker
      1. nameServer
      发送注册消息

    这里我们重点来看定时任务是如何发送心跳发送的。

    处理注册消息发送的时间间隔如下:

    1. Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)

    这行代码看着长,但意思就一句话:时间间隔可以自行配置,但不能小于10s,不能大于60s,默认是30s.

    处理消息注册的方法为

    1. BrokerController#registerBrokerAll(...)
    ,代码如下:
    1. public synchronized void registerBrokerAll(final boolean checkOrderConfig,
    2. boolean oneway, boolean forceRegister) {
    3. TopicConfigSerializeWrapper topicConfigWrapper
    4. = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
    5. // 处理topic相关配置
    6. if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
    7. || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
    8. ...
    9. }
    10. // 这里会判断是否需要进行注册
    11. if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
    12. this.getBrokerAddr(),
    13. this.brokerConfig.getBrokerName(),
    14. this.brokerConfig.getBrokerId(),
    15. this.brokerConfig.getRegisterBrokerTimeoutMills())) {
    16. // 进行注册操作
    17. doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
    18. }
    19. }

    这个方法就是用来处理注册操作的,不过注册前会先验证下是否需要注册,验证是否需要注册的方法为

    1. BrokerController#needRegister
    , 代码如下:
    1. private boolean needRegister(final String clusterName,
    2. final String brokerAddr,
    3. final String brokerName,
    4. final long brokerId,
    5. final int timeoutMills) {
    6. TopicConfigSerializeWrapper topicConfigWrapper
    7. = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
    8. // 判断是否需要进行注册
    9. List&lt;Boolean&gt; changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName,
    10. brokerId, topicConfigWrapper, timeoutMills);
    11. // 有一个发生了变化,就表示需要注册了
    12. boolean needRegister = false;
    13. for (Boolean changed : changeList) {
    14. &

    以上就是RocketMQ broker启动流程是什么的详细内容,更多关于RocketMQ broker启动流程是什么的资料请关注九品源码其它相关文章!