RocketMQ中的MQClientInstance的启动过程
在前面介绍创建过程时,讲到会创建Netty客户端、以及一些其他服务。那么在启动过程中,就要启动该客户端和这些服务,包括消息拉取、再均衡、消息推送服务;除此以外,还要启动一些定时任务,比如发送心跳给broker。本文就来分析一下具体的实现过程。
MQClientInstance
java
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
public void start() throws MQClientException {
synchronized (this) {
// 根据状态执行不同的操作
switch (this.serviceState) {
// 刚创建就是这种状态
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
// 如果没有指定nameserver的地址,则获取
if (null == this.clientConfig.getNamesrvAddr()) {
// 获取NameServer的地址
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
// 设置一些定时任务,比如发送心跳
this.startScheduledTask();
// Start pull service
// 对于消费者,启动pullMessageService
this.pullMessageService.start();
// Start rebalance service
// 对于消费者,启动rebalanceService
this.rebalanceService.start();
// Start push service
// 启动消息推送服务
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
启动Netty客户端
java
client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
public void start() {
// 启动Netty客户端
this.remotingClient.start();
}
这里就一行代码,就是在启动Netty客户端。
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@Override
public void start() {
/*
* 创建执行器组,主要用于执行下面设置的通道处理器;
* 避免使用事件循环中的执行器从而影响网络IO的处理,因为它只有1个线程(参考上面的构造方法)
*/
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
// 默认4个线程
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
// 初始化Netty客户端
Bootstrap handler = this.bootstrap
// 设置构造方法中创建的事件循环组,
.group(this.eventLoopGroupWorker)
// 设置通道类型为客户端通道
.channel(NioSocketChannel.class)
// 禁用nagle算法
.option(ChannelOption.TCP_NODELAY, true)
// 关闭内核的TCP保活机制
.option(ChannelOption.SO_KEEPALIVE, false)
// 设置连接超时时间
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
// 设置通道初始化器
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(
// 传入执行器组,对于每个通道处理器,会绑定一个执行器
defaultEventExecutorGroup,
new NettyEncoder(), // 编码器
new NettyDecoder(), // 解码器
// Netty自带的心跳管理器,用来检查远端是否存活
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
// 连接管理器,负责连接的激活、断开、超时和异常等事件
new NettyConnectManageHandler(),
// 服务请求处理器
new NettyClientHandler());
}
});
// 设置一些选项
if (nettyClientConfig.getClientSocketSndBufSize() > 0) {
log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());
// 发送缓冲区大小
handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());
}
if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {
log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());
// 接收缓冲区大小
handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());
}
if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {
log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",
nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());
handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));
}
// 启动定时任务
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
// 扫描响应表,将超时的响应直接移除,并执行其回调
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
if (this.channelEventListener != null) {
// 启动Netty的事件执行器
this.nettyEventExecutor.start();
}
}
主要是两个操作:
- 创建Netty的事件执行器组,注意不要和构造方法中创建的公共线程池搞混了,前者是用在Netty中的,后者是用来处理RocketMQ中的业务;
- 为Netty的客户端启动器设置属性;
注意,这里并没有通过启动器执行连接操作,而是要在createChannel方法中才会执行连接(后续会介绍)。
启动定时任务
java
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
private void startScheduledTask() {
// 如果没有手动指定nameserver的地址
if (null == this.clientConfig.getNamesrvAddr()) {
// 定时任务:nameserver地址拉取任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 从nameserver地址服务器拉取最新的地址信息
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
// 更新主题路由
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
// 定时任务给所有broker发送心跳
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 清除下线了的broker
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
// 持久化消费位移
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// 调整push模式的消费线程池的线程数量
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
对于具体的每种任务的逻辑,后面单独写文章来分析。
ServiceThread
消息拉取服务和再均衡服务类都是ServiceThread的子类,其start方法会创建启动一个线程,具体的逻辑在子类实现的run方法中。
java
common/src/main/java/org/apache/rocketmq/common/ServiceThread.java
public void start() {
log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
// 创建并启动线程
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
// 启动线程后,会执行子类实现的run方法
this.thread.start();
}
本文的主线是客户端实例的启动流程,所以暂时知道启动了这些服务即可。