RocketMQ中Broker的创建和启动过程
在RocketMQ中,broker提供消息存储服务。生产者生成消息和消费者消费消息都要与broker进行交互。本文先来分析一下broker的创建和启动过程。
启动器
java
broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
public static void main(String[] args) {
System.setProperty(MixAll.ROCKETMQ_HOME_PROPERTY, "/root/projects/rocketmq/rocketmq-4.9.4/rocketmqnamesrv");
// 创建控制器并启动
start(createBrokerController(args));
}
主方法中有两个步骤:
- 创建控制器;
- 启动控制器;
创建控制器
java
broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
public static BrokerController createBrokerController(String[] args) {
// 设置RocketMQ的版本信息到系统属性中
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
try {
//PackageConflictDetect.detectFastjson();
// 创建命令行选项对象
Options options = ServerUtil.buildCommandlineOptions(new Options());
// 解析命令行参数
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
new PosixParser());
// 解析失败则退出
if (null == commandLine) {
System.exit(-1);
}
/*
* 创建几个配置对象
*/
// 创建broker的主配置对象
final BrokerConfig brokerConfig = new BrokerConfig();
// Netty服务端的配置
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// Netty客户端的配置
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
// TLS相关配置
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
// 设置broker服务端的默认监听端口
nettyServerConfig.setListenPort(10911);
// 创建消息存储配置
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
// 如果broker的角色是slave
if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
// 设置命中消息在内存的最大比例
messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
}
// 解析‘-c’配置项
if (commandLine.hasOption('c')) {
// 获取指定的外部配置文件路径
String file = commandLine.getOptionValue('c');
if (file != null) {
configFile = file;
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
// 读取文件内容
properties.load(in);
properties2SystemEnv(properties);
// 设置broker主配置
MixAll.properties2Object(properties, brokerConfig);
// 设置Netty服务端相关配置
MixAll.properties2Object(properties, nettyServerConfig);
// 设置Netty客户端的配置
MixAll.properties2Object(properties, nettyClientConfig);
// 消息存储配置
MixAll.properties2Object(properties, messageStoreConfig);
// 设置配置文件路径
BrokerPathConfigHelper.setBrokerConfigPath(file);
in.close();
}
}
// 设置命令行的配置到主配置对象中
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
// 检查是否设置ROCKETMQ_HOME
if (null == brokerConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
// 如果没有设置ROCKETMQ_HOME,程序则会异常退出
System.exit(-2);
}
// 对nameserver的地址进行校验
String namesrvAddr = brokerConfig.getNamesrvAddr();
if (null != namesrvAddr) {
try {
// 多个地址之间用分号隔开
String[] addrArray = namesrvAddr.split(";");
for (String addr : addrArray) {
// 对字符串进行转换,转换为网络连接的SocketAddress
RemotingUtil.string2SocketAddress(addr);
}
} catch (Exception e) {
System.out.printf(
"The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
namesrvAddr);
System.exit(-3);
}
}
switch (messageStoreConfig.getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
// 如果是master角色,设置brokerID为0
brokerConfig.setBrokerId(MixAll.MASTER_ID);
break;
case SLAVE:
// 如果是slave,则brokerID需要大于0
if (brokerConfig.getBrokerId() <= 0) {
System.out.printf("Slave's brokerId must be > 0");
System.exit(-3);
}
break;
default:
break;
}
// 开启DLeger操作
if (messageStoreConfig.isEnableDLegerCommitLog()) {
brokerConfig.setBrokerId(-1);
}
// 设置高可用通信监听端口,为主监听端口+1,即默认为为10912
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
// 日志相关配置
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
// 设置broker日志文件路径
System.setProperty("brokerLogDir", "");
// 表示在同一机器上部署多个broker时是否区分日志路径,默认为false
if (brokerConfig.isIsolateLogEnable()) {
System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId());
}
if (brokerConfig.isIsolateLogEnable() && messageStoreConfig.isEnableDLegerCommitLog()) {
System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + messageStoreConfig.getdLegerSelfId());
}
configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
// 判断命令行参数中是否包含”-p“选项
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
// 打印配置信息
MixAll.printObjectProperties(console, brokerConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
MixAll.printObjectProperties(console, nettyClientConfig);
MixAll.printObjectProperties(console, messageStoreConfig);
System.exit(0);
} else if (commandLine.hasOption('m')) { // 是否包含"-m"选项
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
// 打印配置信息
MixAll.printObjectProperties(console, brokerConfig, true);
MixAll.printObjectProperties(console, nettyServerConfig, true);
MixAll.printObjectProperties(console, nettyClientConfig, true);
MixAll.printObjectProperties(console, messageStoreConfig, true);
System.exit(0);
}
log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
// 打印当前broker的配置信息
MixAll.printObjectProperties(log, brokerConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
MixAll.printObjectProperties(log, nettyClientConfig);
MixAll.printObjectProperties(log, messageStoreConfig);
// 创建控制器对象,传入解析处理啊的配置
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// remember all configs to prevent discard
// 将外部配置文件中的配置信息保存到Configuration对象的allConfigs属性中
controller.getConfiguration().registerConfig(properties);
// 初始化BrokerController
// 初始化控制器
boolean initResult = controller.initialize();
// 如果初始化失败,则退出
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 注册关闭钩子方法
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run() {
synchronized (this) {
log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) {
this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
// 关闭控制器
controller.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
}
}
}
}, "ShutdownHook"));
// 返回控制器对象
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
该方法的核心步骤是创建控制器BrokerController对象并调用控制器的initialize方法。
启动控制器
java
broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
public static BrokerController start(BrokerController controller) {
try {
// 启动控制器
controller.start();
String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
if (null != controller.getBrokerConfig().getNamesrvAddr()) {
tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
}
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
该方法主要就是调用控制器的start方法。
BrokerController
创建过程
java
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
final NettyClientConfig nettyClientConfig,
final MessageStoreConfig messageStoreConfig
) {
// broker配置
this.brokerConfig = brokerConfig;
// Netty服务端配置
this.nettyServerConfig = nettyServerConfig;
// Netty客户端配置
this.nettyClientConfig = nettyClientConfig;
// 消息存储配置
this.messageStoreConfig = messageStoreConfig;
// 消费者偏移量管理器,维护offset进度信息
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this);
// topic配置管理器
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this);
// 消息拉取处理器
this.pullMessageProcessor = new PullMessageProcessor(this);
// 拉取请求挂起服务
this.pullRequestHoldService = messageStoreConfig.isEnableLmq() ? new LmqPullRequestHoldService(this) : new PullRequestHoldService(this);
// 消息送达的监听器
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
// 消费者ID变化监听器
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
// 消费者管理类
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
// 消费者过滤管理器
this.consumerFilterManager = new ConsumerFilterManager(this);
// 生产者管理器
this.producerManager = new ProducerManager();
// 客户端连接心跳服务
this.clientHousekeepingService = new ClientHousekeepingService(this);
// 处理某些broker到客户端的请求
this.broker2Client = new Broker2Client(this);
// 订阅分组管理器
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
// broker对外访问的API
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
// 消息过滤服务管理器
this.filterServerManager = new FilterServerManager(this);
// 用于从节点同步
this.slaveSynchronize = new SlaveSynchronize(this);
/*
* 初始化各种阻塞队列,会被设置到不同的线程池中
*/
// 处理来自生产者的发送消息的请求的队列
this.sendThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getSendThreadPoolQueueCapacity());
this.putThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getPutThreadPoolQueueCapacity());
// 处理来自消费者的拉取消息的请求的队列
this.pullThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getPullThreadPoolQueueCapacity());
// 处理reply消息的请求的队列,ROCKETMQ-4.7.0中引入的新功能request-reply会用到该队列
this.replyThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
// 处理查询请求的队列
this.queryThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
// 客户端管理器的队列
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
// 消费者管理器的队列
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
// 心跳处理的队列
this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
// 事务消息相关处理的队列
this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
// broker状态管理器
this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat());
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
// 快速失败服务
this.brokerFastFailure = new BrokerFastFailure(this);
// 创建配置类对象
this.configuration = new Configuration(
log,
BrokerPathConfigHelper.getBrokerConfigPath(),
this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
);
}
该构造方法创建并设置了很多组件,总体上可以分为下面几种:
- xxx管理器;
- xxx服务;
- xxx监听器;
- xxx阻塞队列;
没有必要一次性分析这么多组件,反正也记不住;而应该在遇到这些组件的地方再专门进行分析。
初始化过程
java
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
public boolean initialize() throws CloneNotSupportedException {
// 从磁盘加载一些json文件,文件位于storePathRootDir指定的目录下的config子目录中
// topic配置文件加载,路径为${ROCKET_HOME}/store/config/topics.json
boolean result = this.topicConfigManager.load();
// 消费者消费偏移量配置文件加载,路径为${ROCKET_HOME}/store/config/consumerOffset.json
result = result && this.consumerOffsetManager.load();
// 订阅分组配置文件加载,路径为${ROCKET_HOME}/store/config/subscriptionGroup.json
result = result && this.subscriptionGroupManager.load();
// 消费者过滤配置文件加载,路径为${ROCKET_HOME}/store/config/consumerFilter.json
result = result && this.consumerFilterManager.load();
if (result) {
try {
// 创建消息存储对象
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
// 判断是否开启了DLeger
if (messageStoreConfig.isEnableDLegerCommitLog()) {
// 创建dleger角色改变处理器
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
// 创建统计服务对象
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
// 创建消息存储插件上下文
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
// 添加一个针对布隆过滤器的消费过滤对象
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}
// 通过消息存储服务加载消息存储的相关文件
result = result && this.messageStore.load();
if (result) {
// 创建Netty服务器
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
// 拷贝一分Netty服务端配置
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
// 在remotingServer监听的基本端口上减去2,即10911 - 1 = 10909
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
// 创建快速服务器,所谓的快速通道,是指可以处理来自客户端拉取消息之外的所有请求
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
/*
* 创建各种线程池
*/
// 处理发送消息的请求的线程池
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));
//
this.putMessageFutureExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPutMessageFutureThreadPoolNums(),
this.brokerConfig.getPutMessageFutureThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.putThreadPoolQueue,
new ThreadFactoryImpl("PutMessageThread_"));
// 处理拉取消息的请求的线程池
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
// 处理reply消息的请求的线程池
this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.replyThreadPoolQueue,
new ThreadFactoryImpl("ProcessReplyMessageThread_"));
// 处理查询请求的线程池
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.queryThreadPoolQueue,
new ThreadFactoryImpl("QueryMessageThread_"));
// 用于实现管理功能的线程池,作为默认处理器的线程池
this.adminBrokerExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
"AdminBrokerThread_"));
// 客户端管理器的线程池
this.clientManageExecutor = new ThreadPoolExecutor(
this.brokerConfig.getClientManageThreadPoolNums(),
this.brokerConfig.getClientManageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.clientManagerThreadPoolQueue,
new ThreadFactoryImpl("ClientManageThread_"));
// 心跳处理的线程池
this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getHeartbeatThreadPoolNums(),
this.brokerConfig.getHeartbeatThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_", true));
// 事务消息相关处理的线程池
this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getEndTransactionThreadPoolNums(),
this.brokerConfig.getEndTransactionThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.endTransactionThreadPoolQueue,
new ThreadFactoryImpl("EndTransactionThread_"));
// 消费者管理的线程池
this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
"ConsumerManageThread_"));
// 注册处理器和线程池
this.registerProcessor();
/*
* 启动一系列的定时周期任务
*/
final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
// 每隔24小时打印昨天生产和消费的消息数量
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
log.error("schedule record error.", e);
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
// 将消费者offset进行持久化,存入consumerOffset.json文件中
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// 将消费过滤信息进行持久化,存入consumerFilter.json文件中
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
BrokerController.this.consumerFilterManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumer filter error.", e);
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
// 检查消费者的消费进度
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
BrokerController.this.protectBroker();
} catch (Throwable e) {
log.error("protectBroker error.", e);
}
}, 3, 3, TimeUnit.MINUTES);
// 打印发送消息线程池队列、拉取消息线程池队列、查询消息线程池队列、结束事务线程池队列的大小及队列头部元素存在的时间
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
BrokerController.this.printWaterMark();
} catch (Throwable e) {
log.error("printWaterMark error.", e);
}
}, 10, 1, TimeUnit.SECONDS);
// 打印已存储在commitlog提交日志中,但尚未分派到消费队列的字节数
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
} catch (Throwable e) {
log.error("schedule dispatchBehindBytes error.", e);
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
if (this.brokerConfig.getNamesrvAddr() != null) {
// 更新nameserver的地址
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
BrokerController.this.brokerOuterAPI.updateNameServerAddressList(BrokerController.this.brokerConfig.getNamesrvAddr());
} catch (Throwable e) {
log.error("ScheduledTask updateNameServerAddr exception", e);
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
// 从地址服务器拉取最新的nameserver的地址并更新
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
} catch (Throwable e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
// 如果没有开启Dleger
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
// 如果当前节点是从节点
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
// 设置主节点地址
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
} else { // 如果当前节点是主节点
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
// 打印主从节点的差异
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
}
// 初始化事务消息相关服务
initialTransaction();
// 初始化权限相关服务
initialAcl();
// 初始化RPC调用的钩子函数
initialRpcHooks();
}
return result;
}
该方法非常长,主要可以分为下面几部分:
- 加载一些消息相关的数据文件;
- 创建消息存储相关组件;
- 创建Netty服务器,包括普通通道和快速通道两种服务器;
- 创建各种线程池;
- 注册处理器和线程池;
- 启动一系列的定时任务;
- 初始化事务消息相关服务;
- 初始化权限相关服务;
- 注册RPC钩子函数;
本文只会挑选一些核心的步骤来介绍,其他部分应该按照专题来分析。
在RocketMQ中,nameserver和broker都会依赖底层的Netty服务器实现,这部分内容被抽离了出来单独成文,所以本文不会重复介绍,请参考RocketMQ中的NettyRemotingServer一文。
注册处理器和线程池
java
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
public void registerProcessor() {
// 注册了一系列的处理器和线程池,用于处理不同类型的任务
/**
* SendMessageProcessor
*/
// 发送消息处理器
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
// 设置钩子函数列表
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
// 注册发送消息处理器和执行器可以处理的请求码
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
/**
* PullMessageProcessor
*/
// 拉取消息,不会注册到fastRemotingServer中,本来fastRemotingServer就是设计用来处理非消息拉取请求的
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
/**
* ReplyMessageProcessor
*/
ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
/**
* QueryMessageProcessor
*/
NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
/**
* ClientManageProcessor
*/
ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
/**
* ConsumerManageProcessor
*/
ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
/**
* EndTransactionProcessor
*/
this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
/**
* Default
*/
AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
}
该方法创建了多个处理器,并将其和相应的线程池分别注册到普通服务器和快速服务器中,当Netty的服务器收到请求后,会根据请求码找到对应的处理器和线程池来处理。
注册RPC钩子函数
java
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
private void initialRpcHooks() {
// 通过RPC机制获取RPCHook的实现
List<RPCHook> rpcHooks = ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class);
// 没有配置则直接返回
if (rpcHooks == null || rpcHooks.isEmpty()) {
return;
}
// 遍历并且注册所有的RpcHook
for (RPCHook rpcHook: rpcHooks) {
// 执行注册
this.registerServerRPCHook(rpcHook);
}
}
public void registerServerRPCHook(RPCHook rpcHook) {
getRemotingServer().registerRPCHook(rpcHook);
this.fastRemotingServer.registerRPCHook(rpcHook);
}
分别为普通服务器和快速服务器注册RPC钩子函数。
启动过程
java
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
public void start() throws Exception {
// 调用各个组件的start方法
// 启动消息存储服务
if (this.messageStore != null) {
this.messageStore.start();
}
// 启动Netty服务器
if (this.remotingServer != null) {
// 启动基于Netty实现的server
this.remotingServer.start();
}
// 启动快速Netty服务器
if (this.fastRemotingServer != null) {
this.fastRemotingServer.start();
}
// 启动文件监视服务
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
// 启动broker对外API服务
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
// 启动长轮询拉取消息挂起服务
if (this.pullRequestHoldService != null) {
this.pullRequestHoldService.start();
}
// 启动客户端连接心跳服务
if (this.clientHousekeepingService != null) {
this.clientHousekeepingService.start();
}
// 启动过滤服务管理器
if (this.filterServerManager != null) {
this.filterServerManager.start();
}
// 如果没有开启DLeger(默认没有启动)
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
// 如果不是从节点,则开启事务消息检查服务
startProcessorByHa(messageStoreConfig.getBrokerRole());
//
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
// 强制注册当前的broker到所有的nameserver
this.registerBrokerAll(true, false, true);
}
// 心跳相关的定时任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 向nameserver进行注册
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
// 这里的时间在10000到60000毫秒之间
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
// 启动broker状态统计服务
if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}
// 启动broker快速失败服务
if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}
}
该方法的操作可以总结如下:
- 启动底层的Netty服务器;
- 启动各种服务;
- 启动broker心跳相关的服务,即定时向nameserver执行注册;
总结
RocketMQ中broker的创建和启动是通过broker控制器来实现的,封装了各种相关服务、处理器和线程池。当然本文只是很浅显地分析了创建、初始化和启动过程,没有涉及各种服务内部的逻辑,因为这些内容适合单独成文,而不应该和主线冗杂在一起。