RocketMQ中NameServer的启动过程
在RocketMQ中,nameserver会负责管理broker和消息主题。本文会分析RocketMQ中nameserver的创建和启动过程。
在RocketMQ中,nameserver和broker都会依赖底层的Netty服务器实现,这部分内容被抽离了出来单独成文,所以本文不会重复介绍,请参考RocketMQ中的NettyRemotingServer一文。
启动器
nameserver是通过启动器来启动的。
java
namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
public static void main(String[] args) {
System.setProperty(MixAll.ROCKETMQ_HOME_PROPERTY, "/root/projects/rocketmq/rocketmq-4.9.4/rocketmqnamesrv");
main0(args);
}
public static NamesrvController main0(String[] args) {
try {
// 创建控制器对象
NamesrvController controller = createNamesrvController(args);
// 启动控制器
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
在main方法中,设置了系统属性,指定了RocketMQ的home属性。然后在main0方法中,创建并启动了控制器对象,真正的启动操作被封装到启动器中。
创建启动器
java
namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
// 构建命令行选项对象
Options options = ServerUtil.buildCommandlineOptions(new Options());
// 解析命令行参数
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
// 创建两个配置对象
final NamesrvConfig namesrvConfig = new NamesrvConfig();
// Netty相关的配置
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// 设置默认端口
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) { // 命令行中是否有-c选项
// 获取指定的外部配置文件位置
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
// 读取配置文件
properties.load(in);
// 将配置文件中的内容合并到两个配置对象中
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
// 设置配置文件位置
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
if (commandLine.hasOption('p')) { // 命令行中是否有-p选项,只打印配置信息,不真正启动程序
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
// 打印配置信息
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
// 执行结束,退出程序
System.exit(0);
}
// 把命令行中的选项添加到配置对象中
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
// 校验ROCKETMQ_HOME环境变量
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
// 日志相关配置
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
// 打印配置信息
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
// 创建NamesrvController对象
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
// 将外部配置文件中的配置保存到控制器中的配置对象的allConfigs集合中
controller.getConfiguration().registerConfig(properties);
return controller;
}
这里首先是解析了命令行参数和选项,以及解析指定的外部配置文件,然后将解析到的配置保存在配置对象中,最后创建NamesrvController控制器对象,并将配置对象设置到控制器对象中。
启动控制器
java
namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// 初始化控制器
boolean initResult = controller.initialize();
// 初始化失败则退出程序
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 注册shutdown的钩子函数
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
// 在JVM关闭之前,先关闭控制器
controller.shutdown();
return null;
}));
// 启动控制器
controller.start();
return controller;
}
先是调用了控制器的initialize方法来进行初始化,然后调用start方法来启动控制器。
控制器
初始化
java
namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
public boolean initialize() {
// 加载KV配置并存储到管理器内部的configTable属性中
this.kvConfigManager.load();
// 创建基于Netty实现的server
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 创建Netty远程通信执行器线程池,线程数量固定,默认为8
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注册默认请求处理器
this.registerProcessor();
// 启动定时任务:定时对broker做心跳检查,稠次延迟5秒钟执行,此后每隔10秒执行一次,会清理掉下线的broker
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);
// 定时定时任务:打印KV配置信息,首次启动延迟1分钟执行,此后每隔10分钟执行一次
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES);
return true;
}
该方法中做了下面几件事情:
- 加载键值配置;
- 创建Netty服务器;
- 创建业务线程池;
- 注册请求处理器;
- 定时扫描非正常的broker;
- 定时打印键值配置信息;
注册请求处理器
java
namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
private void registerProcessor() {
// 默认是false
if (namesrvConfig.isClusterTest()) {
this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.remotingExecutor);
} else {
// 创建并注册默认请求处理器,并设置默认的处理请求的线程池
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
}
}
这里创建了默认的请求处理器DefaultRequestProcessor并设置到server中,Nameserver并没有为不同的请求码设置不同的处理器,而是使用同一个处理器。注册处理器时绑定的线程池就是在构造方法中创建的remotingExecutor属性,即Nameserver的业务线程池。
启动
java
namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
public void start() throws Exception {
// 启动Netty服务器
this.remotingServer.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
这里主要就是在启动Netty服务器。