RocketMQ中Broker的创建和启动过程

在RocketMQ中,broker提供消息存储服务。生产者生成消息和消费者消费消息都要与broker进行交互。本文先来分析一下broker的创建和启动过程。


启动器

v4.9.4
java
broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
public static void main(String[] args) {
    System.setProperty(MixAll.ROCKETMQ_HOME_PROPERTY, "/root/projects/rocketmq/rocketmq-4.9.4/rocketmqnamesrv");
    // 创建控制器并启动
    start(createBrokerController(args));
}

主方法中有两个步骤:

  • 创建控制器;
  • 启动控制器;

创建控制器

v4.9.4
java
broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
public static BrokerController createBrokerController(String[] args) {
    // 设置RocketMQ的版本信息到系统属性中
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

    try {
        //PackageConflictDetect.detectFastjson();
        // 创建命令行选项对象
        Options options = ServerUtil.buildCommandlineOptions(new Options());
        // 解析命令行参数
        commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
            new PosixParser());
        // 解析失败则退出
        if (null == commandLine) {
            System.exit(-1);
        }

        /*
         * 创建几个配置对象
         */

        // 创建broker的主配置对象
        final BrokerConfig brokerConfig = new BrokerConfig();
        // Netty服务端的配置
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        // Netty客户端的配置
        final NettyClientConfig nettyClientConfig = new NettyClientConfig();

        // TLS相关配置
        nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
            String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
        // 设置broker服务端的默认监听端口
        nettyServerConfig.setListenPort(10911);
        // 创建消息存储配置
        final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();

        // 如果broker的角色是slave
        if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
            int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
            // 设置命中消息在内存的最大比例
            messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
        }

        // 解析‘-c’配置项
        if (commandLine.hasOption('c')) {
            // 获取指定的外部配置文件路径
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                configFile = file;
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                // 读取文件内容
                properties.load(in);

                properties2SystemEnv(properties);
                // 设置broker主配置
                MixAll.properties2Object(properties, brokerConfig);
                // 设置Netty服务端相关配置
                MixAll.properties2Object(properties, nettyServerConfig);
                // 设置Netty客户端的配置
                MixAll.properties2Object(properties, nettyClientConfig);
                // 消息存储配置
                MixAll.properties2Object(properties, messageStoreConfig);

                // 设置配置文件路径
                BrokerPathConfigHelper.setBrokerConfigPath(file);
                in.close();
            }
        }

        // 设置命令行的配置到主配置对象中
        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);

        // 检查是否设置ROCKETMQ_HOME
        if (null == brokerConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
            // 如果没有设置ROCKETMQ_HOME,程序则会异常退出
            System.exit(-2);
        }

        // 对nameserver的地址进行校验
        String namesrvAddr = brokerConfig.getNamesrvAddr();
        if (null != namesrvAddr) {
            try {
                // 多个地址之间用分号隔开
                String[] addrArray = namesrvAddr.split(";");
                for (String addr : addrArray) {
                    // 对字符串进行转换,转换为网络连接的SocketAddress
                    RemotingUtil.string2SocketAddress(addr);
                }
            } catch (Exception e) {
                System.out.printf(
                    "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
                    namesrvAddr);
                System.exit(-3);
            }
        }

        switch (messageStoreConfig.getBrokerRole()) {
            case ASYNC_MASTER:
            case SYNC_MASTER:
                // 如果是master角色,设置brokerID为0
                brokerConfig.setBrokerId(MixAll.MASTER_ID);
                break;
            case SLAVE:
                // 如果是slave,则brokerID需要大于0
                if (brokerConfig.getBrokerId() <= 0) {
                    System.out.printf("Slave's brokerId must be > 0");
                    System.exit(-3);
                }

                break;
            default:
                break;
        }

        // 开启DLeger操作
        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            brokerConfig.setBrokerId(-1);
        }

        // 设置高可用通信监听端口,为主监听端口+1,即默认为为10912
        messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
        // 日志相关配置
        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        // 设置broker日志文件路径
        System.setProperty("brokerLogDir", "");
        // 表示在同一机器上部署多个broker时是否区分日志路径,默认为false
        if (brokerConfig.isIsolateLogEnable()) {
            System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId());
        }
        if (brokerConfig.isIsolateLogEnable() && messageStoreConfig.isEnableDLegerCommitLog()) {
            System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + messageStoreConfig.getdLegerSelfId());
        }
        configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");

        // 判断命令行参数中是否包含”-p“选项
        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
            // 打印配置信息
            MixAll.printObjectProperties(console, brokerConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            MixAll.printObjectProperties(console, nettyClientConfig);
            MixAll.printObjectProperties(console, messageStoreConfig);
            System.exit(0);
        } else if (commandLine.hasOption('m')) { // 是否包含"-m"选项
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
            // 打印配置信息
            MixAll.printObjectProperties(console, brokerConfig, true);
            MixAll.printObjectProperties(console, nettyServerConfig, true);
            MixAll.printObjectProperties(console, nettyClientConfig, true);
            MixAll.printObjectProperties(console, messageStoreConfig, true);
            System.exit(0);
        }

        log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
        // 打印当前broker的配置信息
        MixAll.printObjectProperties(log, brokerConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);
        MixAll.printObjectProperties(log, nettyClientConfig);
        MixAll.printObjectProperties(log, messageStoreConfig);

        // 创建控制器对象,传入解析处理啊的配置
        final BrokerController controller = new BrokerController(
            brokerConfig,
            nettyServerConfig,
            nettyClientConfig,
            messageStoreConfig);
        // remember all configs to prevent discard
        // 将外部配置文件中的配置信息保存到Configuration对象的allConfigs属性中
        controller.getConfiguration().registerConfig(properties);

        // 初始化BrokerController
        // 初始化控制器
        boolean initResult = controller.initialize();
        // 如果初始化失败,则退出
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }

        // 注册关闭钩子方法
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            private volatile boolean hasShutdown = false;
            private AtomicInteger shutdownTimes = new AtomicInteger(0);

            @Override
            public void run() {
                synchronized (this) {
                    log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
                    if (!this.hasShutdown) {
                        this.hasShutdown = true;
                        long beginTime = System.currentTimeMillis();
                        // 关闭控制器
                        controller.shutdown();
                        long consumingTimeTotal = System.currentTimeMillis() - beginTime;
                        log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
                    }
                }
            }
        }, "ShutdownHook"));

        // 返回控制器对象
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

该方法的核心步骤是创建控制器BrokerController对象并调用控制器的initialize方法。

启动控制器

v4.9.4
java
broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
public static BrokerController start(BrokerController controller) {
    try {

        // 启动控制器
        controller.start();

        String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
            + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();

        if (null != controller.getBrokerConfig().getNamesrvAddr()) {
            tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
        }

        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

该方法主要就是调用控制器的start方法。

BrokerController

创建过程

v4.9.4
java
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
public BrokerController(
    final BrokerConfig brokerConfig,
    final NettyServerConfig nettyServerConfig,
    final NettyClientConfig nettyClientConfig,
    final MessageStoreConfig messageStoreConfig
) {
    // broker配置
    this.brokerConfig = brokerConfig;
    // Netty服务端配置
    this.nettyServerConfig = nettyServerConfig;
    // Netty客户端配置
    this.nettyClientConfig = nettyClientConfig;
    // 消息存储配置
    this.messageStoreConfig = messageStoreConfig;
    // 消费者偏移量管理器,维护offset进度信息
    this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this);
    // topic配置管理器
    this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this);
    // 消息拉取处理器
    this.pullMessageProcessor = new PullMessageProcessor(this);
    // 拉取请求挂起服务
    this.pullRequestHoldService = messageStoreConfig.isEnableLmq() ? new LmqPullRequestHoldService(this) : new PullRequestHoldService(this);
    // 消息送达的监听器
    this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
    // 消费者ID变化监听器
    this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
    // 消费者管理类
    this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
    // 消费者过滤管理器
    this.consumerFilterManager = new ConsumerFilterManager(this);
    // 生产者管理器
    this.producerManager = new ProducerManager();
    // 客户端连接心跳服务
    this.clientHousekeepingService = new ClientHousekeepingService(this);
    // 处理某些broker到客户端的请求
    this.broker2Client = new Broker2Client(this);
    // 订阅分组管理器
    this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
    // broker对外访问的API
    this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
    // 消息过滤服务管理器
    this.filterServerManager = new FilterServerManager(this);

    // 用于从节点同步
    this.slaveSynchronize = new SlaveSynchronize(this);

    /*
     * 初始化各种阻塞队列,会被设置到不同的线程池中
     */

    // 处理来自生产者的发送消息的请求的队列
    this.sendThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getSendThreadPoolQueueCapacity());
    this.putThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getPutThreadPoolQueueCapacity());
    // 处理来自消费者的拉取消息的请求的队列
    this.pullThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getPullThreadPoolQueueCapacity());
    // 处理reply消息的请求的队列,ROCKETMQ-4.7.0中引入的新功能request-reply会用到该队列
    this.replyThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
    // 处理查询请求的队列
    this.queryThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
    // 客户端管理器的队列
    this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
    // 消费者管理器的队列
    this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
    // 心跳处理的队列
    this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
    // 事务消息相关处理的队列
    this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getEndTransactionPoolQueueCapacity());

    // broker状态管理器
    this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat());

    this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));

    // 快速失败服务
    this.brokerFastFailure = new BrokerFastFailure(this);
    // 创建配置类对象
    this.configuration = new Configuration(
        log,
        BrokerPathConfigHelper.getBrokerConfigPath(),
        this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
    );
}

该构造方法创建并设置了很多组件,总体上可以分为下面几种:

  • xxx管理器;
  • xxx服务;
  • xxx监听器;
  • xxx阻塞队列;

没有必要一次性分析这么多组件,反正也记不住;而应该在遇到这些组件的地方再专门进行分析。

初始化过程

v4.9.4
java
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
public boolean initialize() throws CloneNotSupportedException {
    // 从磁盘加载一些json文件,文件位于storePathRootDir指定的目录下的config子目录中

    // topic配置文件加载,路径为${ROCKET_HOME}/store/config/topics.json
    boolean result = this.topicConfigManager.load();

    // 消费者消费偏移量配置文件加载,路径为${ROCKET_HOME}/store/config/consumerOffset.json
    result = result && this.consumerOffsetManager.load();
    // 订阅分组配置文件加载,路径为${ROCKET_HOME}/store/config/subscriptionGroup.json
    result = result && this.subscriptionGroupManager.load();
    // 消费者过滤配置文件加载,路径为${ROCKET_HOME}/store/config/consumerFilter.json
    result = result && this.consumerFilterManager.load();

    if (result) {
        try {
            // 创建消息存储对象
            this.messageStore =
                new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                    this.brokerConfig);
            // 判断是否开启了DLeger
            if (messageStoreConfig.isEnableDLegerCommitLog()) {
                // 创建dleger角色改变处理器
                DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
                ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
            }
            // 创建统计服务对象
            this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
            //load plugin
            // 创建消息存储插件上下文
            MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
            this.messageStore = MessageStoreFactory.build(context, this.messageStore);
            // 添加一个针对布隆过滤器的消费过滤对象
            this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
        } catch (IOException e) {
            result = false;
            log.error("Failed to initialize", e);
        }
    }

    // 通过消息存储服务加载消息存储的相关文件
    result = result && this.messageStore.load();

    if (result) {
        // 创建Netty服务器
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
        // 拷贝一分Netty服务端配置
        NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
        // 在remotingServer监听的基本端口上减去2,即10911 - 1 = 10909
        fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
        // 创建快速服务器,所谓的快速通道,是指可以处理来自客户端拉取消息之外的所有请求
        this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
        /*
         * 创建各种线程池
         */

        // 处理发送消息的请求的线程池
        this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getSendMessageThreadPoolNums(),
            this.brokerConfig.getSendMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.sendThreadPoolQueue,
            new ThreadFactoryImpl("SendMessageThread_"));

        //
        this.putMessageFutureExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getPutMessageFutureThreadPoolNums(),
            this.brokerConfig.getPutMessageFutureThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.putThreadPoolQueue,
            new ThreadFactoryImpl("PutMessageThread_"));

        // 处理拉取消息的请求的线程池
        this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getPullMessageThreadPoolNums(),
            this.brokerConfig.getPullMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.pullThreadPoolQueue,
            new ThreadFactoryImpl("PullMessageThread_"));

        // 处理reply消息的请求的线程池
        this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
            this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.replyThreadPoolQueue,
            new ThreadFactoryImpl("ProcessReplyMessageThread_"));

        // 处理查询请求的线程池
        this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getQueryMessageThreadPoolNums(),
            this.brokerConfig.getQueryMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.queryThreadPoolQueue,
            new ThreadFactoryImpl("QueryMessageThread_"));

        // 用于实现管理功能的线程池,作为默认处理器的线程池
        this.adminBrokerExecutor =
            Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
                "AdminBrokerThread_"));

        // 客户端管理器的线程池
        this.clientManageExecutor = new ThreadPoolExecutor(
            this.brokerConfig.getClientManageThreadPoolNums(),
            this.brokerConfig.getClientManageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.clientManagerThreadPoolQueue,
            new ThreadFactoryImpl("ClientManageThread_"));

        // 心跳处理的线程池
        this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getHeartbeatThreadPoolNums(),
            this.brokerConfig.getHeartbeatThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.heartbeatThreadPoolQueue,
            new ThreadFactoryImpl("HeartbeatThread_", true));

        // 事务消息相关处理的线程池
        this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getEndTransactionThreadPoolNums(),
            this.brokerConfig.getEndTransactionThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.endTransactionThreadPoolQueue,
            new ThreadFactoryImpl("EndTransactionThread_"));

        // 消费者管理的线程池
        this.consumerManageExecutor =
            Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
                "ConsumerManageThread_"));

        // 注册处理器和线程池
        this.registerProcessor();

        /*
         * 启动一系列的定时周期任务
         */

        final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
        final long period = 1000 * 60 * 60 * 24;
        // 每隔24小时打印昨天生产和消费的消息数量
        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                BrokerController.this.getBrokerStats().record();
            } catch (Throwable e) {
                log.error("schedule record error.", e);
            }
        }, initialDelay, period, TimeUnit.MILLISECONDS);

        // 将消费者offset进行持久化,存入consumerOffset.json文件中
        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                BrokerController.this.consumerOffsetManager.persist();
            } catch (Throwable e) {
                log.error("schedule persist consumerOffset error.", e);
            }
        }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

        // 将消费过滤信息进行持久化,存入consumerFilter.json文件中
        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                BrokerController.this.consumerFilterManager.persist();
            } catch (Throwable e) {
                log.error("schedule persist consumer filter error.", e);
            }
        }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);

        // 检查消费者的消费进度
        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                BrokerController.this.protectBroker();
            } catch (Throwable e) {
                log.error("protectBroker error.", e);
            }
        }, 3, 3, TimeUnit.MINUTES);

        // 打印发送消息线程池队列、拉取消息线程池队列、查询消息线程池队列、结束事务线程池队列的大小及队列头部元素存在的时间
        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                BrokerController.this.printWaterMark();
            } catch (Throwable e) {
                log.error("printWaterMark error.", e);
            }
        }, 10, 1, TimeUnit.SECONDS);

        // 打印已存储在commitlog提交日志中,但尚未分派到消费队列的字节数
        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
            } catch (Throwable e) {
                log.error("schedule dispatchBehindBytes error.", e);
            }
        }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

        if (this.brokerConfig.getNamesrvAddr() != null) {
            // 更新nameserver的地址
            this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
            log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
            this.scheduledExecutorService.scheduleAtFixedRate(() -> {
                try {
                    BrokerController.this.brokerOuterAPI.updateNameServerAddressList(BrokerController.this.brokerConfig.getNamesrvAddr());
                } catch (Throwable e) {
                    log.error("ScheduledTask updateNameServerAddr exception", e);
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
            // 从地址服务器拉取最新的nameserver的地址并更新
            this.scheduledExecutorService.scheduleAtFixedRate(() -> {
                try {
                    BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                } catch (Throwable e) {
                    log.error("ScheduledTask fetchNameServerAddr exception", e);
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }

        // 如果没有开启Dleger
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            // 如果当前节点是从节点
            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                    // 设置主节点地址
                    this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                    this.updateMasterHAServerAddrPeriodically = false;
                } else {
                    this.updateMasterHAServerAddrPeriodically = true;
                }
            } else { // 如果当前节点是主节点
                this.scheduledExecutorService.scheduleAtFixedRate(() -> {
                    try {
                        // 打印主从节点的差异
                        BrokerController.this.printMasterAndSlaveDiff();
                    } catch (Throwable e) {
                        log.error("schedule printMasterAndSlaveDiff error.", e);
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
            }
        }

        // 初始化事务消息相关服务
        initialTransaction();
        // 初始化权限相关服务
        initialAcl();
        // 初始化RPC调用的钩子函数
        initialRpcHooks();
    }
    return result;
}

该方法非常长,主要可以分为下面几部分:

  • 加载一些消息相关的数据文件;
  • 创建消息存储相关组件;
  • 创建Netty服务器,包括普通通道和快速通道两种服务器;
  • 创建各种线程池;
  • 注册处理器和线程池;
  • 启动一系列的定时任务;
  • 初始化事务消息相关服务;
  • 初始化权限相关服务;
  • 注册RPC钩子函数;

本文只会挑选一些核心的步骤来介绍,其他部分应该按照专题来分析。

在RocketMQ中,nameserver和broker都会依赖底层的Netty服务器实现,这部分内容被抽离了出来单独成文,所以本文不会重复介绍,请参考RocketMQ中的NettyRemotingServer一文。

注册处理器和线程池

v4.9.4
java
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
public void registerProcessor() {

    // 注册了一系列的处理器和线程池,用于处理不同类型的任务

    /**
     * SendMessageProcessor
     */
    // 发送消息处理器
    SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
    // 设置钩子函数列表
    sendProcessor.registerSendMessageHook(sendMessageHookList);
    sendProcessor.registerConsumeMessageHook(consumeMessageHookList);

    // 注册发送消息处理器和执行器可以处理的请求码
    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
    /**
     * PullMessageProcessor
     */
    // 拉取消息,不会注册到fastRemotingServer中,本来fastRemotingServer就是设计用来处理非消息拉取请求的
    this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
    this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);

    /**
     * ReplyMessageProcessor
     */
    ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
    replyMessageProcessor.registerSendMessageHook(sendMessageHookList);

    this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);

    /**
     * QueryMessageProcessor
     */
    NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
    this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);

    this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);

    /**
     * ClientManageProcessor
     */
    ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
    this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
    this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
    this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);

    this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);

    /**
     * ConsumerManageProcessor
     */
    ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
    this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
    this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
    this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);

    this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);

    /**
     * EndTransactionProcessor
     */
    this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);

    /**
     * Default
     */
    AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
    this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
    this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
}

该方法创建了多个处理器,并将其和相应的线程池分别注册到普通服务器和快速服务器中,当Netty的服务器收到请求后,会根据请求码找到对应的处理器和线程池来处理。

注册RPC钩子函数

v4.9.4
java
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
private void initialRpcHooks() {

    // 通过RPC机制获取RPCHook的实现
    List<RPCHook> rpcHooks = ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class);
    // 没有配置则直接返回
    if (rpcHooks == null || rpcHooks.isEmpty()) {
        return;
    }
    // 遍历并且注册所有的RpcHook
    for (RPCHook rpcHook: rpcHooks) {
        // 执行注册
        this.registerServerRPCHook(rpcHook);
    }
}
public void registerServerRPCHook(RPCHook rpcHook) {
    getRemotingServer().registerRPCHook(rpcHook);
    this.fastRemotingServer.registerRPCHook(rpcHook);
}

分别为普通服务器和快速服务器注册RPC钩子函数。

启动过程

v4.9.4
java
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
public void start() throws Exception {

    // 调用各个组件的start方法

    // 启动消息存储服务
    if (this.messageStore != null) {
        this.messageStore.start();
    }

    // 启动Netty服务器
    if (this.remotingServer != null) {
        // 启动基于Netty实现的server
        this.remotingServer.start();
    }

    // 启动快速Netty服务器
    if (this.fastRemotingServer != null) {
        this.fastRemotingServer.start();
    }

    // 启动文件监视服务
    if (this.fileWatchService != null) {
        this.fileWatchService.start();
    }

    // 启动broker对外API服务
    if (this.brokerOuterAPI != null) {
        this.brokerOuterAPI.start();
    }

    // 启动长轮询拉取消息挂起服务
    if (this.pullRequestHoldService != null) {
        this.pullRequestHoldService.start();
    }

    // 启动客户端连接心跳服务
    if (this.clientHousekeepingService != null) {
        this.clientHousekeepingService.start();
    }

    // 启动过滤服务管理器
    if (this.filterServerManager != null) {
        this.filterServerManager.start();
    }

    // 如果没有开启DLeger(默认没有启动)
    if (!messageStoreConfig.isEnableDLegerCommitLog()) {
        // 如果不是从节点,则开启事务消息检查服务
        startProcessorByHa(messageStoreConfig.getBrokerRole());
        //
        handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
        // 强制注册当前的broker到所有的nameserver
        this.registerBrokerAll(true, false, true);
    }

    // 心跳相关的定时任务
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                // 向nameserver进行注册
                BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
            } catch (Throwable e) {
                log.error("registerBrokerAll Exception", e);
            }
        }
        // 这里的时间在10000到60000毫秒之间
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

    // 启动broker状态统计服务
    if (this.brokerStatsManager != null) {
        this.brokerStatsManager.start();
    }

    // 启动broker快速失败服务
    if (this.brokerFastFailure != null) {
        this.brokerFastFailure.start();
    }


}

该方法的操作可以总结如下:

  • 启动底层的Netty服务器;
  • 启动各种服务;
  • 启动broker心跳相关的服务,即定时向nameserver执行注册;

总结

RocketMQ中broker的创建和启动是通过broker控制器来实现的,封装了各种相关服务、处理器和线程池。当然本文只是很浅显地分析了创建、初始化和启动过程,没有涉及各种服务内部的逻辑,因为这些内容适合单独成文,而不应该和主线冗杂在一起。