这篇文章主要介绍“RocketMQ broker启动流程是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“RocketMQ broker启动流程是什么”文章能帮助大家解决问题。
1. 启动入口
本系列RocketMQ4.8注释github地址,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
前面我们已经分析完了
NameServer
和producer
,从本文开始,我们将分析Broker
。
broker
的启动类为org.apache.rocketmq.broker.BrokerStartup
,代码如下:public class BrokerStartup {
...
public static void main(String[] args) {
start(createBrokerController(args));
}
...
}
在
main()
方法中,仅有一行代码,这行代码包含了两个操作:createBrokerController(...)
:创建BrokerController
start(...)
:启动Broker
接下来我们就来分析这两个操作。
2. 创建BrokerController
创建
BrokerController
的方法为BrokerStartup#createBrokerController
,代码如下:/**
* 创建 broker 的配置参数
*/
public static BrokerController createBrokerController(String[] args) {
...
try {
//解析命令行参数
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
System.exit(-1);
}
// 处理配置
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
// tls安全相关
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
// 配置端口
nettyServerConfig.setListenPort(10911);
// 消息存储的配置
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
...
// 将命令行中的配置设置到brokerConfig对象中
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
// 检查环境变量:ROCKETMQ_HOME
if (null == brokerConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match
the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
//省略一些配置
...
// 创建 brokerController
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
controller.getConfiguration().registerConfig(properties);
// 初始化
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 关闭钩子,在关闭前处理一些操作
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run() {
synchronized (this) {
if (!this.hasShutdown) {
...
// 这里会发一条注销消息给nameServer
controller.shutdown();
...
}
}
}
}, "ShutdownHook"));
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
这个方法的代码有点长,但功能并不多,总的来说就三个功能:
处理配置:主要是处理
nettyServerConfig
与nettyClientConfig
的配置,这块就是一些配置解析的操作,处理方式与NameServer
很类似,这里就不多说了。创建及初始化
controller
:调用方法controller.initialize()
,这块内容我们后面分析。注册关闭钩子:调用
Runtime.getRuntime().addShutdownHook(...)
,可以在jvm进程关闭前进行一些操作。2.1 controller实例化
BrokerController
的创建及初始化是在BrokerStartup#createBrokerController
方法中进行,我们先来看看它的构造方法:public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
final NettyClientConfig nettyClientConfig,
final MessageStoreConfig messageStoreConfig
) {
// 4个核心配置信息
this.brokerConfig = brokerConfig;
this.nettyServerConfig = nettyServerConfig;
this.nettyClientConfig = nettyClientConfig;
this.messageStoreConfig = messageStoreConfig;
// 管理consumer消费消息的offset
this.consumerOffsetManager = new ConsumerOffsetManager(this);
// 管理topic配置
this.topicConfigManager = new TopicConfigManager(this);
// 处理 consumer 拉消息请求的
this.pullMessageProcessor = new PullMessageProcessor(this);
this.pullRequestHoldService = new PullRequestHoldService(this);
// 消息送达的监听器
this.messageArrivingListener
= new NotifyMessageArrivingListener(this.pullRequestHoldService);
...
// 往外发消息的组件
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
...
}
BrokerController
的构造方法很长,基本都是一些赋值操作,代码中已列出关键项,这些包括:核心配置赋值:主要是
brokerConfig
/nettyServerConfig
/nettyClientConfig
/messageStoreConfig
四个配置ConsumerOffsetManager
:管理consumer
消费消息位置的偏移量,偏移量表示消费者组消费该topic
消息的位置,后面再消费时,就从该位置后消费,避免重复消费消息,也避免了漏消费消息。topicConfigManager
:topic
配置管理器,就是用来管理topic
配置的,如topic
名称,topic
队列数量pullMessageProcessor
:消息处理器,用来处理消费者拉消息messageArrivingListener
:消息送达的监听器,当生产者的消息送达时,由该监听器监听brokerOuterAPI
:往外发消息的组件,如向NameServer
发送注册/注销消息以上这些组件的用处,这里先混个脸熟,我们后面再分析。
2.2 初始化controller
我们再来看看初始化操作,方法为
BrokerController#initialize
:public boolean initialize() throws CloneNotSupportedException {
// 加载配置文件中的配置
boolean result = this.topicConfigManager.load();
result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
result = result && this.consumerFilterManager.load();
if (result) {
try {
// 消息存储管理组件,管理磁盘上的消息
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager,
this.messageArrivingListener, this.brokerConfig);
// 启用了DLeger,就创建DLeger相关组件
if (messageStoreConfig.isEnableDLegerCommitLog()) {
...
}
// broker统计组件
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig,
brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(
new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}
// 加载磁盘上的记录,如commitLog写入的位置、消费者主题/队列的信息
result = result && this.messageStore.load();
if (result) {
// 处理 nettyServer
this.remotingServer = new NettyRemotingServer(
this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(
fastConfig, this.clientHousekeepingService);
// 创建线程池start... 这里会创建多种类型的线程池
...
// 处理consumer pull操作的线程池
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
...
// 创建线程池end...
// 注册处理器
this.registerProcessor();
// 启动定时任务start... 这里会启动好多的定时任务
...
// 定时将consumer消费到的offset进行持久化操作,即将数据保存到磁盘上
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
...
// 启动定时任务end...
...
// 开启 DLeger 的一些操作
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
...
}
// 处理tls配置
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
...
}
// 初始化一些操作
initialTransaction();
initialAcl();
initialRpcHooks();
}
return result;
}
这个还是很长,关键部分都做了注释,该方法所做的工作如下:
加载配置文件中的配置
赋值与初始化操作
创建线程池
注册处理器
启动定时任务
这里我们来看下注册处理器的操作
this.registerProcessor()
:2.2.1 注册处理器:BrokerController#registerProcessor
this.registerProcessor()
实际调用的方法是BrokerController#registerProcessor
,代码如下:public void registerProcessor() {
/**
* SendMessageProcessor
*/
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor,
this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor,
this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor,
this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor,
this.sendMessageExecutor);
...
/**
* PullMessageProcessor
*/
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor,
this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
/**
* ReplyMessageProcessor
*/
ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
...
}
这个方法里注册了许许多多的处理器,这里仅列出了与消息相关的内容,如发送消息、回复消息、拉取消息等,后面在处理
producer
/consumer
的消息时,就会用到这些处理器,这里先不展开分析。2.2.2 remotingServer注册处理器:NettyRemotingServer#registerProcessor
我们来看下
remotingServer
注册处理器的操作,方法为NettyRemotingServer#registerProcessor
:public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
...
@Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor,
ExecutorService executor) {
ExecutorService executorThis = executor;
if (null == executor) {
executorThis = this.publicExecutor;
}
Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor,
ExecutorService>(processor, executorThis);
// 注册到processorTable 中
this.processorTable.put(requestCode, pair);
}
...
}
最终,这些处理器注册到了
processorTable
中,它是NettyRemotingAbstract
的成员变量,定义如下:HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>>
这是一个
hashMap
的结构,key
为code
,value
为Pair
,该类中有两个成员变量:NettyRequestProcessor
、ExecutorService
,code
与NettyRequestProcessor
的映射关系就是在hashMap
里存储的。2.3 注册关闭钩子:
Runtime.getRuntime().addShutdownHook(...)
接着我们来看看注册关闭钩子的操作:
// 关闭钩子,在关闭前处理一些操作
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run() {
synchronized (this) {
if (!this.hasShutdown) {
...
// 这里会发一条注销消息给nameServer
controller.shutdown();
...
}
}
}
}, "ShutdownHook"));
跟进
BrokerController#shutdown
方法:public void shutdown() {
// 调用各组件的shutdown方法
...
// 发送注销消息到NameServer
this.unregisterBrokerAll();
...
// 持久化consumer的消费偏移量
this.consumerOffsetManager.persist();
// 又是调用各组件的shutdown方法
...
这个方法里会调用各组件的
shutdown()
方法、发送注销消息给NameServer
、持久化consumer的消费偏移量,这里我们主要看发送注销消息的方法BrokerController#unregisterBrokerAll
:private void unregisterBrokerAll() {
// 发送一条注销消息给nameServer
this.brokerOuterAPI.unregisterBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId());
}
继续进入
BrokerOuterAPI#unregisterBrokerAll
:public void unregisterBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId
) {
// 获取所有的 nameServer,遍历发送注销消息
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null) {
for (String namesrvAddr : nameServerAddressList) {
try {
this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);
log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);
} catch (Exception e) {
log.warn("unregisterBroker Exception, {}", namesrvAddr, e);
}
}
}
}
这个方法里,会获取到所有的
nameServer
,然后逐个发送注销消息,继续进入BrokerOuterAPI#unregisterBroker
方法:public void unregisterBroker(
final String namesrvAddr,
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId
) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException, MQBrokerException {
UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
// 发送的注销消息:RequestCode.UNREGISTER_BROKER
RemotingCommand request = RemotingCommand.createRequestCommand(
c, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
}
最终调用的是
RemotingClient#invokeSync
进行消息发送,请求code
是RequestCode.UNREGISTER_BROKER
,这就与NameServer
接收broker
的注销消息对应上了。3. 启动Broker:start(...)
我们再来看看
Broker
的启动流程,处理方法为BrokerController#start
:public void start() throws Exception {
// 启动各组件
// 启动消息存储相关组件
if (this.messageStore != null) {
this.messageStore.start();
}
// 启动 remotingServer,其实就是启动一个netty服务,用来接收producer传来的消息
if (this.remotingServer != null) {
this.remotingServer.start();
}
...
// broker对外发放消息的组件,向nameServer上报存活消息时使用了它,也是一个netty服务
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
...
// broker 核心的心跳注册任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false,
brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
// brokerConfig.getRegisterNameServerPeriod() 值为 1000 * 30,最终计算得到默认30秒执行一次
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)),
TimeUnit.MILLISECONDS);
...
}
这个方法主要就是启动各组件了,这里列出了几大重要组件的启动:
messageStore
:消息存储组件,在这个组件里,会启动消息存储相关的线程,如消息的投递操作、commitLog
文件的flush
操作、comsumeQueue
文件的flush
操作等remotingServer
:netty
服务,用来接收请求消息,如producer
发送过来的消息brokerOuterAPI
:也是一个netty
服务,用来对外发送消息,如向nameServer
上报心跳消息启动定时任务:
broker
向nameServer
发送注册消息这里我们重点来看定时任务是如何发送心跳发送的。
处理注册消息发送的时间间隔如下:
Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)
这行代码看着长,但意思就一句话:时间间隔可以自行配置,但不能小于10s,不能大于60s,默认是30s.
处理消息注册的方法为
BrokerController#registerBrokerAll(...)
,代码如下:public synchronized void registerBrokerAll(final boolean checkOrderConfig,
boolean oneway, boolean forceRegister) {
TopicConfigSerializeWrapper topicConfigWrapper
= this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
// 处理topic相关配置
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
...
}
// 这里会判断是否需要进行注册
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
// 进行注册操作
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
这个方法就是用来处理注册操作的,不过注册前会先验证下是否需要注册,验证是否需要注册的方法为
BrokerController#needRegister
, 代码如下:private boolean needRegister(final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final int timeoutMills) {
TopicConfigSerializeWrapper topicConfigWrapper
= this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
// 判断是否需要进行注册
List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName,
brokerId, topicConfigWrapper, timeoutMills);
// 有一个发生了变化,就表示需要注册了
boolean needRegister = false;
for (Boolean changed : changeList) {
&
以上就是RocketMQ broker启动流程是什么的详细内容,更多关于RocketMQ broker启动流程是什么的资料请关注九品源码其它相关文章!