RocketMQ中的MQClientInstance的启动过程

在前面介绍创建过程时,讲到会创建Netty客户端、以及一些其他服务。那么在启动过程中,就要启动该客户端和这些服务,包括消息拉取、再均衡、消息推送服务;除此以外,还要启动一些定时任务,比如发送心跳给broker。本文就来分析一下具体的实现过程。


MQClientInstance

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
public void start() throws MQClientException {

    synchronized (this) {
        // 根据状态执行不同的操作
        switch (this.serviceState) {
            // 刚创建就是这种状态
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                // 如果没有指定nameserver的地址,则获取
                if (null == this.clientConfig.getNamesrvAddr()) {
                    // 获取NameServer的地址
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                // 设置一些定时任务,比如发送心跳
                this.startScheduledTask();
                // Start pull service
                // 对于消费者,启动pullMessageService
                this.pullMessageService.start();
                // Start rebalance service
                // 对于消费者,启动rebalanceService
                this.rebalanceService.start();
                // Start push service
                // 启动消息推送服务
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

启动Netty客户端

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
public void start() {
    // 启动Netty客户端
    this.remotingClient.start();
}

这里就一行代码,就是在启动Netty客户端。

v4.9.4
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@Override
public void start() {
    /*
     * 创建执行器组,主要用于执行下面设置的通道处理器;
     * 避免使用事件循环中的执行器从而影响网络IO的处理,因为它只有1个线程(参考上面的构造方法)
     */
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            // 默认4个线程
        nettyClientConfig.getClientWorkerThreads(),
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
            }
        });

    // 初始化Netty客户端
    Bootstrap handler = this.bootstrap
            // 设置构造方法中创建的事件循环组,
        .group(this.eventLoopGroupWorker)
            // 设置通道类型为客户端通道
        .channel(NioSocketChannel.class)
            // 禁用nagle算法
        .option(ChannelOption.TCP_NODELAY, true)
            // 关闭内核的TCP保活机制
        .option(ChannelOption.SO_KEEPALIVE, false)
            // 设置连接超时时间
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            // 设置通道初始化器
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(
                        // 传入执行器组,对于每个通道处理器,会绑定一个执行器
                    defaultEventExecutorGroup,
                    new NettyEncoder(), // 编码器
                    new NettyDecoder(), // 解码器
                        // Netty自带的心跳管理器,用来检查远端是否存活
                    new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                    // 连接管理器,负责连接的激活、断开、超时和异常等事件
                    new NettyConnectManageHandler(),
                    // 服务请求处理器
                    new NettyClientHandler());
            }
        });
    // 设置一些选项
    if (nettyClientConfig.getClientSocketSndBufSize() > 0) {
        log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());
        // 发送缓冲区大小
        handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());
    }
    if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {
        log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());
        // 接收缓冲区大小
        handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());
    }
    if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {
        log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",
                nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());
        handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
                nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));
    }

    // 启动定时任务
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                // 扫描响应表,将超时的响应直接移除,并执行其回调
                NettyRemotingClient.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);

    if (this.channelEventListener != null) {
        // 启动Netty的事件执行器
        this.nettyEventExecutor.start();
    }
}

主要是两个操作:

  • 创建Netty的事件执行器组,注意不要和构造方法中创建的公共线程池搞混了,前者是用在Netty中的,后者是用来处理RocketMQ中的业务;
  • 为Netty的客户端启动器设置属性;

注意,这里并没有通过启动器执行连接操作,而是要在createChannel方法中才会执行连接(后续会介绍)。

启动定时任务

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
private void startScheduledTask() {
    // 如果没有手动指定nameserver的地址
    if (null == this.clientConfig.getNamesrvAddr()) {
        // 定时任务:nameserver地址拉取任务
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    // 从nameserver地址服务器拉取最新的地址信息
                    MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                } catch (Exception e) {
                    log.error("ScheduledTask fetchNameServerAddr exception", e);
                }
            }
        }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
    }

    // 更新主题路由
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

    // 定时任务给所有broker发送心跳
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                // 清除下线了的broker
                MQClientInstance.this.cleanOfflineBroker();
                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
            } catch (Exception e) {
                log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
            }
        }
    }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

    // 持久化消费位移
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

    // 调整push模式的消费线程池的线程数量
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.adjustThreadPool();
            } catch (Exception e) {
                log.error("ScheduledTask adjustThreadPool exception", e);
            }
        }
    }, 1, 1, TimeUnit.MINUTES);
}

对于具体的每种任务的逻辑,后面单独写文章来分析。

ServiceThread

消息拉取服务和再均衡服务类都是ServiceThread的子类,其start方法会创建启动一个线程,具体的逻辑在子类实现的run方法中。

v4.9.4
java
common/src/main/java/org/apache/rocketmq/common/ServiceThread.java
public void start() {
    log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
    if (!started.compareAndSet(false, true)) {
        return;
    }
    stopped = false;
    // 创建并启动线程
    this.thread = new Thread(this, getServiceName());
    this.thread.setDaemon(isDaemon);
    // 启动线程后,会执行子类实现的run方法
    this.thread.start();
}

本文的主线是客户端实例的启动流程,所以暂时知道启动了这些服务即可。