RocketMQ broker启动流程是什么

其他教程   发布日期:2023年10月17日   浏览次数:239

这篇文章主要介绍“RocketMQ broker启动流程是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“RocketMQ broker启动流程是什么”文章能帮助大家解决问题。

    1. 启动入口

    本系列RocketMQ4.8注释github地址,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈

    前面我们已经分析完了

    NameServer
    producer
    ,从本文开始,我们将分析
    Broker

    broker
    的启动类为
    org.apache.rocketmq.broker.BrokerStartup
    ,代码如下:
    public class BrokerStartup {
        ...
        public static void main(String[] args) {
            start(createBrokerController(args));
        }
        ...
    }

    main()
    方法中,仅有一行代码,这行代码包含了两个操作:
    • createBrokerController(...)
      :创建
      BrokerController
    • start(...)
      :启动
      Broker

    接下来我们就来分析这两个操作。

    2. 创建BrokerController

    创建

    BrokerController
    的方法为
    BrokerStartup#createBrokerController
    ,代码如下:
    /**
     * 创建 broker 的配置参数
     */
    public static BrokerController createBrokerController(String[] args) {
        ...
        try {
            //解析命令行参数
            Options options = ServerUtil.buildCommandlineOptions(new Options());
            commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
                new PosixParser());
            if (null == commandLine) {
                System.exit(-1);
            }
            // 处理配置
            final BrokerConfig brokerConfig = new BrokerConfig();
            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
            final NettyClientConfig nettyClientConfig = new NettyClientConfig();
            // tls安全相关
            nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
                String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
            // 配置端口
            nettyServerConfig.setListenPort(10911);
            // 消息存储的配置
            final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
            ...
            // 将命令行中的配置设置到brokerConfig对象中
            MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
            // 检查环境变量:ROCKETMQ_HOME
            if (null == brokerConfig.getRocketmqHome()) {
                System.out.printf("Please set the %s variable in your environment to match 
                    the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
                System.exit(-2);
            }
            //省略一些配置
            ...
            // 创建 brokerController
            final BrokerController controller = new BrokerController(
                brokerConfig,
                nettyServerConfig,
                nettyClientConfig,
                messageStoreConfig);
            controller.getConfiguration().registerConfig(properties);
            // 初始化
            boolean initResult = controller.initialize();
            if (!initResult) {
                controller.shutdown();
                System.exit(-3);
            }
            // 关闭钩子,在关闭前处理一些操作
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                private volatile boolean hasShutdown = false;
                private AtomicInteger shutdownTimes = new AtomicInteger(0);
                @Override
                public void run() {
                    synchronized (this) {
                        if (!this.hasShutdown) {
                            ...
                            // 这里会发一条注销消息给nameServer
                            controller.shutdown();
                            ...
                        }
                    }
                }
            }, "ShutdownHook"));
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }
        return null;
    }

    这个方法的代码有点长,但功能并不多,总的来说就三个功能:

    • 处理配置:主要是处理

      nettyServerConfig
      nettyClientConfig
      的配置,这块就是一些配置解析的操作,处理方式与
      NameServer
      很类似,这里就不多说了。
    • 创建及初始化

      controller
      :调用方法
      controller.initialize()
      ,这块内容我们后面分析。
    • 注册关闭钩子:调用

      Runtime.getRuntime().addShutdownHook(...)
      ,可以在jvm进程关闭前进行一些操作。

    2.1 controller实例化

    BrokerController
    的创建及初始化是在
    BrokerStartup#createBrokerController
    方法中进行,我们先来看看它的构造方法:
    public BrokerController(
        final BrokerConfig brokerConfig,
        final NettyServerConfig nettyServerConfig,
        final NettyClientConfig nettyClientConfig,
        final MessageStoreConfig messageStoreConfig
    ) {
        // 4个核心配置信息
        this.brokerConfig = brokerConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.nettyClientConfig = nettyClientConfig;
        this.messageStoreConfig = messageStoreConfig;
        // 管理consumer消费消息的offset
        this.consumerOffsetManager = new ConsumerOffsetManager(this);
        // 管理topic配置
        this.topicConfigManager = new TopicConfigManager(this);
        // 处理 consumer 拉消息请求的
        this.pullMessageProcessor = new PullMessageProcessor(this);
        this.pullRequestHoldService = new PullRequestHoldService(this);
        // 消息送达的监听器
        this.messageArrivingListener 
            = new NotifyMessageArrivingListener(this.pullRequestHoldService);
        ...
        // 往外发消息的组件
        this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
        ...
    }

    BrokerController
    的构造方法很长,基本都是一些赋值操作,代码中已列出关键项,这些包括:
    • 核心配置赋值:主要是

      brokerConfig
      /
      nettyServerConfig
      /
      nettyClientConfig
      /
      messageStoreConfig
      四个配置
    • ConsumerOffsetManager
      :管理
      consumer
      消费消息位置的偏移量,偏移量表示消费者组消费该
      topic
      消息的位置,后面再消费时,就从该位置后消费,避免重复消费消息,也避免了漏消费消息。
    • topicConfigManager
      topic
      配置管理器,就是用来管理
      topic
      配置的,如
      topic
      名称,
      topic
      队列数量
    • pullMessageProcessor
      :消息处理器,用来处理消费者拉消息
    • messageArrivingListener
      :消息送达的监听器,当生产者的消息送达时,由该监听器监听
    • brokerOuterAPI
      :往外发消息的组件,如向
      NameServer
      发送注册/注销消息

    以上这些组件的用处,这里先混个脸熟,我们后面再分析。

    2.2 初始化controller

    我们再来看看初始化操作,方法为

    BrokerController#initialize
    public boolean initialize() throws CloneNotSupportedException {
        // 加载配置文件中的配置
        boolean result = this.topicConfigManager.load();
        result = result && this.consumerOffsetManager.load();
        result = result && this.subscriptionGroupManager.load();
        result = result && this.consumerFilterManager.load();
        if (result) {
            try {
                // 消息存储管理组件,管理磁盘上的消息
                this.messageStore =
                    new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, 
                        this.messageArrivingListener, this.brokerConfig);
                // 启用了DLeger,就创建DLeger相关组件
                if (messageStoreConfig.isEnableDLegerCommitLog()) {
                    ...
                }
                // broker统计组件
                this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
                //load plugin
                MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, 
                    brokerStatsManager, messageArrivingListener, brokerConfig);
                this.messageStore = MessageStoreFactory.build(context, this.messageStore);
                this.messageStore.getDispatcherList().addFirst(
                    new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
            } catch (IOException e) {
                result = false;
                log.error("Failed to initialize", e);
            }
        }
        // 加载磁盘上的记录,如commitLog写入的位置、消费者主题/队列的信息
        result = result && this.messageStore.load();
        if (result) {
            // 处理 nettyServer
            this.remotingServer = new NettyRemotingServer(
                this.nettyServerConfig, this.clientHousekeepingService);
            NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
            fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
            this.fastRemotingServer = new NettyRemotingServer(
                fastConfig, this.clientHousekeepingService);
            // 创建线程池start... 这里会创建多种类型的线程池
            ...
            // 处理consumer pull操作的线程池
            this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getPullMessageThreadPoolNums(),
                this.brokerConfig.getPullMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.pullThreadPoolQueue,
                new ThreadFactoryImpl("PullMessageThread_"));
            ...
            // 创建线程池end...
            // 注册处理器
            this.registerProcessor();
            // 启动定时任务start... 这里会启动好多的定时任务
            ...
            // 定时将consumer消费到的offset进行持久化操作,即将数据保存到磁盘上
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerOffsetManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumerOffset error.", e);
                    }
                }
            }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
            ...
            // 启动定时任务end...
            ...
            // 开启 DLeger 的一些操作
            if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                ...
            }
            // 处理tls配置
            if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
                ...
            }
            // 初始化一些操作
            initialTransaction();
            initialAcl();
            initialRpcHooks();
        }
        return result;
    }

    这个还是很长,关键部分都做了注释,该方法所做的工作如下:

    • 加载配置文件中的配置

    • 赋值与初始化操作

    • 创建线程池

    • 注册处理器

    • 启动定时任务

    这里我们来看下注册处理器的操作

    this.registerProcessor()
    :

    2.2.1 注册处理器:BrokerController#registerProcessor

    this.registerProcessor()
    实际调用的方法是
    BrokerController#registerProcessor
    ,代码如下:
    public void registerProcessor() {
        /**
         * SendMessageProcessor
         */
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        sendProcessor.registerSendMessageHook(sendMessageHookList);
        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, 
            this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor,  
            this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, 
            this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, 
            this.sendMessageExecutor);
        ...
        /**
         * PullMessageProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, 
            this.pullMessageExecutor);
        this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
        /**
            * ReplyMessageProcessor
            */
        ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
        replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
        ...
    }

    这个方法里注册了许许多多的处理器,这里仅列出了与消息相关的内容,如发送消息、回复消息、拉取消息等,后面在处理

    producer
    /
    consumer
    的消息时,就会用到这些处理器,这里先不展开分析。

    2.2.2 remotingServer注册处理器:NettyRemotingServer#registerProcessor

    我们来看下

    remotingServer
    注册处理器的操作,方法为
    NettyRemotingServer#registerProcessor
    public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
        ...
        @Override
        public void registerProcessor(int requestCode, NettyRequestProcessor processor, 
                ExecutorService executor) {
            ExecutorService executorThis = executor;
            if (null == executor) {
                executorThis = this.publicExecutor;
            }
            Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, 
                    ExecutorService>(processor, executorThis);
            // 注册到processorTable 中
            this.processorTable.put(requestCode, pair);
        }
        ...
    }

    最终,这些处理器注册到了

    processorTable
    中,它是
    NettyRemotingAbstract
    的成员变量,定义如下:
    HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>>

    这是一个

    hashMap
    的结构,
    key
    code
    value
    Pair
    ,该类中有两个成员变量:
    NettyRequestProcessor
    ExecutorService
    code
    NettyRequestProcessor
    的映射关系就是在
    hashMap
    里存储的。

    2.3 注册关闭钩子:

    Runtime.getRuntime().addShutdownHook(...)

    接着我们来看看注册关闭钩子的操作:

    // 关闭钩子,在关闭前处理一些操作
    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
        private volatile boolean hasShutdown = false;
        private AtomicInteger shutdownTimes = new AtomicInteger(0);
        @Override
        public void run() {
            synchronized (this) {
                if (!this.hasShutdown) {
                    ...
                    // 这里会发一条注销消息给nameServer
                    controller.shutdown();
                    ...
                }
            }
        }
    }, "ShutdownHook"));

    跟进

    BrokerController#shutdown
    方法:
    public void shutdown() {
        // 调用各组件的shutdown方法
        ...
        // 发送注销消息到NameServer
        this.unregisterBrokerAll();
        ...
        // 持久化consumer的消费偏移量
        this.consumerOffsetManager.persist();
        // 又是调用各组件的shutdown方法
        ...

    这个方法里会调用各组件的

    shutdown()
    方法、发送注销消息给
    NameServer
    、持久化consumer的消费偏移量,这里我们主要看发送注销消息的方法
    BrokerController#unregisterBrokerAll
    :
    private void unregisterBrokerAll() {
        // 发送一条注销消息给nameServer
        this.brokerOuterAPI.unregisterBrokerAll(
            this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId());
    }

    继续进入

    BrokerOuterAPI#unregisterBrokerAll
    public void unregisterBrokerAll(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId
    ) {
        // 获取所有的 nameServer,遍历发送注销消息
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null) {
            for (String namesrvAddr : nameServerAddressList) {
                try {
                    this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);
                    log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);
                } catch (Exception e) {
                    log.warn("unregisterBroker Exception, {}", namesrvAddr, e);
                }
            }
        }
    }

    这个方法里,会获取到所有的

    nameServer
    ,然后逐个发送注销消息,继续进入
    BrokerOuterAPI#unregisterBroker
    方法:
    public void unregisterBroker(
        final String namesrvAddr,
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId
    ) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, 
            InterruptedException, MQBrokerException {
        UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
        requestHeader.setBrokerAddr(brokerAddr);
        requestHeader.setBrokerId(brokerId);
        requestHeader.setBrokerName(brokerName);
        requestHeader.setClusterName(clusterName);
        // 发送的注销消息:RequestCode.UNREGISTER_BROKER
        RemotingCommand request = RemotingCommand.createRequestCommand(
                c, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                return;
            }
            default:
                break;
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
    }

    最终调用的是

    RemotingClient#invokeSync
    进行消息发送,请求
    code
    RequestCode.UNREGISTER_BROKER
    ,这就与
    NameServer
    接收
    broker
    的注销消息对应上了。

    3. 启动Broker:start(...)

    我们再来看看

    Broker
    的启动流程,处理方法为
    BrokerController#start
    public void start() throws Exception {
        // 启动各组件
        // 启动消息存储相关组件
        if (this.messageStore != null) {
            this.messageStore.start();
        }
        // 启动 remotingServer,其实就是启动一个netty服务,用来接收producer传来的消息
        if (this.remotingServer != null) {
            this.remotingServer.start();
        }
        ...
        // broker对外发放消息的组件,向nameServer上报存活消息时使用了它,也是一个netty服务
        if (this.brokerOuterAPI != null) {
            this.brokerOuterAPI.start();
        }
        ...
        // broker 核心的心跳注册任务
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, 
                        brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
            // brokerConfig.getRegisterNameServerPeriod() 值为 1000 * 30,最终计算得到默认30秒执行一次
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), 
                TimeUnit.MILLISECONDS);
        ...
    }

    这个方法主要就是启动各组件了,这里列出了几大重要组件的启动:

    • messageStore
      :消息存储组件,在这个组件里,会启动消息存储相关的线程,如消息的投递操作、
      commitLog
      文件的
      flush
      操作、
      comsumeQueue
      文件的
      flush
      操作等
    • remotingServer
      netty
      服务,用来接收请求消息,如
      producer
      发送过来的消息
    • brokerOuterAPI
      :也是一个
      netty
      服务,用来对外发送消息,如向
      nameServer
      上报心跳消息
    • 启动定时任务:

      broker
      nameServer
      发送注册消息

    这里我们重点来看定时任务是如何发送心跳发送的。

    处理注册消息发送的时间间隔如下:

    Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)

    这行代码看着长,但意思就一句话:时间间隔可以自行配置,但不能小于10s,不能大于60s,默认是30s.

    处理消息注册的方法为

    BrokerController#registerBrokerAll(...)
    ,代码如下:
    public synchronized void registerBrokerAll(final boolean checkOrderConfig, 
            boolean oneway, boolean forceRegister) {
        TopicConfigSerializeWrapper topicConfigWrapper 
                = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
        // 处理topic相关配置
        if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
            || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
            ...
        }
        // 这里会判断是否需要进行注册
        if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.brokerConfig.getRegisterBrokerTimeoutMills())) {
            // 进行注册操作    
            doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
        }
    }

    这个方法就是用来处理注册操作的,不过注册前会先验证下是否需要注册,验证是否需要注册的方法为

    BrokerController#needRegister
    , 代码如下:
    private boolean needRegister(final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final int timeoutMills) {
        TopicConfigSerializeWrapper topicConfigWrapper 
            = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
        // 判断是否需要进行注册
        List&lt;Boolean&gt; changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, 
            brokerId, topicConfigWrapper, timeoutMills);
        // 有一个发生了变化,就表示需要注册了    
        boolean needRegister = false;
        for (Boolean changed : changeList) {
         &						
    						
    						
    						
    						
    						
    						
    					

    以上就是RocketMQ broker启动流程是什么的详细内容,更多关于RocketMQ broker启动流程是什么的资料请关注九品源码其它相关文章!