RocketMQ中NameServer的启动过程

在RocketMQ中,nameserver会负责管理broker和消息主题。本文会分析RocketMQ中nameserver的创建和启动过程。


在RocketMQ中,nameserver和broker都会依赖底层的Netty服务器实现,这部分内容被抽离了出来单独成文,所以本文不会重复介绍,请参考RocketMQ中的NettyRemotingServer一文。

启动器

nameserver是通过启动器来启动的。

v4.9.4
java
namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
public static void main(String[] args) {
    System.setProperty(MixAll.ROCKETMQ_HOME_PROPERTY, "/root/projects/rocketmq/rocketmq-4.9.4/rocketmqnamesrv");
    main0(args);
}
public static NamesrvController main0(String[] args) {

    try {
        // 创建控制器对象
        NamesrvController controller = createNamesrvController(args);
        // 启动控制器
        start(controller);
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

main方法中,设置了系统属性,指定了RocketMQ的home属性。然后在main0方法中,创建并启动了控制器对象,真正的启动操作被封装到启动器中。

创建启动器

v4.9.4
java
namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    //PackageConflictDetect.detectFastjson();


    // 构建命令行选项对象
    Options options = ServerUtil.buildCommandlineOptions(new Options());
    // 解析命令行参数
    commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
    if (null == commandLine) {
        System.exit(-1);
        return null;
    }

    // 创建两个配置对象
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    // Netty相关的配置
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    // 设置默认端口
    nettyServerConfig.setListenPort(9876);
    if (commandLine.hasOption('c')) { // 命令行中是否有-c选项
        // 获取指定的外部配置文件位置
        String file = commandLine.getOptionValue('c');
        if (file != null) {
            InputStream in = new BufferedInputStream(new FileInputStream(file));
            properties = new Properties();
            // 读取配置文件
            properties.load(in);
            // 将配置文件中的内容合并到两个配置对象中
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);

            // 设置配置文件位置
            namesrvConfig.setConfigStorePath(file);

            System.out.printf("load config properties file OK, %s%n", file);
            in.close();
        }
    }

    if (commandLine.hasOption('p')) { // 命令行中是否有-p选项,只打印配置信息,不真正启动程序
        InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
        // 打印配置信息
        MixAll.printObjectProperties(console, namesrvConfig);
        MixAll.printObjectProperties(console, nettyServerConfig);
        // 执行结束,退出程序
        System.exit(0);
    }

    // 把命令行中的选项添加到配置对象中
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

    // 校验ROCKETMQ_HOME环境变量
    if (null == namesrvConfig.getRocketmqHome()) {
        System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
        System.exit(-2);
    }

    // 日志相关配置
    LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
    JoranConfigurator configurator = new JoranConfigurator();
    configurator.setContext(lc);
    lc.reset();
    configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

    log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    // 打印配置信息
    MixAll.printObjectProperties(log, namesrvConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);

    // 创建NamesrvController对象
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

    // remember all configs to prevent discard
    // 将外部配置文件中的配置保存到控制器中的配置对象的allConfigs集合中
    controller.getConfiguration().registerConfig(properties);

    return controller;
}

这里首先是解析了命令行参数和选项,以及解析指定的外部配置文件,然后将解析到的配置保存在配置对象中,最后创建NamesrvController控制器对象,并将配置对象设置到控制器对象中。

启动控制器

v4.9.4
java
namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
public static NamesrvController start(final NamesrvController controller) throws Exception {

    if (null == controller) {
        throw new IllegalArgumentException("NamesrvController is null");
    }

    // 初始化控制器
    boolean initResult = controller.initialize();
    // 初始化失败则退出程序
    if (!initResult) {
        controller.shutdown();
        System.exit(-3);
    }

    // 注册shutdown的钩子函数
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
        // 在JVM关闭之前,先关闭控制器
        controller.shutdown();
        return null;
    }));

    // 启动控制器
    controller.start();

    return controller;
}

先是调用了控制器的initialize方法来进行初始化,然后调用start方法来启动控制器。

控制器

初始化

v4.9.4
java
namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
public boolean initialize() {

    // 加载KV配置并存储到管理器内部的configTable属性中
    this.kvConfigManager.load();

    // 创建基于Netty实现的server
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

    // 创建Netty远程通信执行器线程池,线程数量固定,默认为8
    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

    // 注册默认请求处理器
    this.registerProcessor();

    // 启动定时任务:定时对broker做心跳检查,稠次延迟5秒钟执行,此后每隔10秒执行一次,会清理掉下线的broker
    this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);

    // 定时定时任务:打印KV配置信息,首次启动延迟1分钟执行,此后每隔10分钟执行一次
    this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES);


    return true;
}

该方法中做了下面几件事情:

  • 加载键值配置;
  • 创建Netty服务器;
  • 创建业务线程池;
  • 注册请求处理器;
  • 定时扫描非正常的broker;
  • 定时打印键值配置信息;

注册请求处理器

v4.9.4
java
namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
private void registerProcessor() {
    // 默认是false
    if (namesrvConfig.isClusterTest()) {

        this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
            this.remotingExecutor);
    } else {

        // 创建并注册默认请求处理器,并设置默认的处理请求的线程池
        this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
    }
}

这里创建了默认的请求处理器DefaultRequestProcessor并设置到server中,Nameserver并没有为不同的请求码设置不同的处理器,而是使用同一个处理器。注册处理器时绑定的线程池就是在构造方法中创建的remotingExecutor属性,即Nameserver的业务线程池。

启动

v4.9.4
java
namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
public void start() throws Exception {
    // 启动Netty服务器
    this.remotingServer.start();

    if (this.fileWatchService != null) {
        this.fileWatchService.start();
    }
}

这里主要就是在启动Netty服务器。