在RocketMQ中,支持两种消息消费模式,即PULL(拉模式)和PUSH(推模式),PUSH模式的效果类似broker将消息推送到消费者客户端,不过这是一种伪推送,是基于对PULL模式的封装来实现的。本文先来梳理一下消费者的创建和启动过程,从而能够更好理解消息的拉取过程。
RocketMQ中有两种消费模式,自然也有两种消费者客户端。
在RocketMQ4.6版本,引入了DefaultLitePullConsumer,而将原有的DefaultMQPullConsumer类标记为废弃。
PUSH模式消费者
创建过程
DefaultMQPushConsumer作为面向用户的门面类,提供了非常多的构造方法,下面仅给出主要的构造方法的源码。
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
// 创建消费者impl对象
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
// 创建异步跟踪分发器
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
在该构造器中,主要是创建了DefaultMQPushConsumerImpl对象。和生产者一样,xxx是面向用户的类,而xxxImpl是RocketMQ内部的实现,前者算是后者的门面。
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this.defaultMQPushConsumer = defaultMQPushConsumer;
this.rpcHook = rpcHook;
// 消费者状态错误时采用定时任务执行拉取请求的时间间隔
this.pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException();
}
启动过程
@Override
public void start() throws MQClientException {
// 设置消费组
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
// 启动impl对象
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
// 启动跟踪分发器
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
进一步调用DefaultMQPushConsumerImpl的start方法。
public synchronized void start() throws MQClientException {
// 根据当前的服务状态执行不同的分支
switch (this.serviceState) {
// 服务刚刚创建,那么启动服务
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
// 首先修改服务状态为启动失败,下面启动成功后会修改状态值
this.serviceState = ServiceState.START_FAILED;
// 检查配置
this.checkConfig();
// 拷贝主题订阅关系
this.copySubscription();
// 如果是集群消费模式
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
// 获取或创建客户端对象
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
// 设置rebalanceImpl的一些属性,RebalanceImpl主要是负责再均衡
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); // 设置消费组
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); // 设置消息模型
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
// 创建消息拉取核心对象
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
// 设置过滤消息的钩子函数
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
// 选择使用哪种offsetStore,用于实现消费者的消费位移量offset的管理
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
// 根据消息模型来选择
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING: // 广播模式
/*
LocalFileOffsetStore:顾名思义,就是将消费进度存储在 Consumer 本地,Consumer 会在磁盘上生成文件以保存进度。
*/
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING: // 集群模式(默认),
/*
RemoteBrokerOffsetStore:将消费进度保存在远端的 Broker。
*/
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
// 设置offsetStore
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// 加载消费位移
this.offsetStore.load();
// 创建不同类型的消费服务,根据messageListener的类型来判断
// 顺序消费
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
}
// 并发消费
else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
// 启动消费服务
this.consumeMessageService.start();
// 注册消费组到客户端的消费者表
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 启动客户端
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
// 设置状态为运行中
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// 更新主题订阅信息,下面再均衡时会用到
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
/*
* 随机选择一个broker,发送检查客户端tag配置的请求
* 主要是检测broker是否支持SQL92类型的tag过滤以及tag的语法是否正确
*/
this.mQClientFactory.checkClientInBroker();
/*
* 立即给所有broker发送心跳信息
* 尽管客户端内部会有定时任务也会向所有的broker发送心跳,但是存在延迟,所以这里立即发送一次
*/
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 立即执行一次再均衡操作,不然当前消费者不会收到任何的消息
this.mQClientFactory.rebalanceImmediately();
}
该方法的核心步骤如下:
- 获取或创建MQClientInstance对象;
- 设置再均衡属性;
- 创建消息拉取核心对象;
- 根据消费模型(广播还是集群),创建不同的位移管理器并加载位移;
- 根据消息监听器的类型,创建不同的消息消费服务并启动;
- 注册消费组到客户端中;
- 启动客户端:请参考RocketMQ中的MQClientInstance的启动过程一文;
- 更新主题订阅消息;
- 给broker发送心跳信息;
- 立即执行一次再均衡;
注册消费组
public synchronized boolean registerConsumer(final String group, final MQConsumerInner consumer) {
if (null == group || null == consumer) {
return false;
}
// 将消费组和消费者对象添加到缓存中
MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
if (prev != null) {
log.warn("the consumer group[" + group + "] exist already.");
return false;
}
return true;
}
在该方法中,会添加消费组和消费者的映射关系到consumerTable这个map中。
PULL模式消费者
创建过程
public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.consumerGroup = consumerGroup;
this.enableStreamRequestType = true;
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
}
同样的思路,创建门面类封装的具体实现类DefaultLitePullConsumerImpl的对象。
public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
this.defaultLitePullConsumer = defaultLitePullConsumer;
this.rpcHook = rpcHook;
// 创建调度任务线程池
this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
this.defaultLitePullConsumer.getPullThreadNums(),
new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup())
);
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "MonitorMessageQueueChangeThread");
}
});
this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
}
启动过程
@Override
public void start() throws MQClientException {
setTraceDispatcher();
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultLitePullConsumerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
调用DefaultLitePullConsumerImpl类中的start方法。
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultLitePullConsumer.changeInstanceNameToPID();
}
// 创建客户端对象
initMQClientFactory();
// 初始化再均衡服务
initRebalanceImpl();
// 创建消息拉取服务
initPullAPIWrapper();
// 初始化消费位移管理器
initOffsetStore();
// 启动客户端对象
mQClientFactory.start();
// 启动调度任务
startScheduleTask();
this.serviceState = ServiceState.RUNNING;
log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup());
// 启动后操作
operateAfterRunning();
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
}
该方法的步骤如下,其实大体上和push模式的消费者都是差不多的。
- 创建和启动客户端对象,不再赘述;
- 初始化再均衡服务;
- 创建消息拉取API对象;
- 初始化消费位移管理器;
- 启动调度任务;
- 执行启动后的操作;
创建和启动客户端对象
private void initMQClientFactory() throws MQClientException {
// 创建客户端对象
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);
// 注册消费者
boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The consumer group[" + this.defaultLitePullConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
}
通过客户端管理器来创建MQClientInstance对象,然后将当前消费者注册到客户端对象中。
初始化再均衡服务
private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);
private void initRebalanceImpl() {
this.rebalanceImpl.setConsumerGroup(this.defaultLitePullConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultLitePullConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultLitePullConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
}
创建消息拉取API对象
private void initPullAPIWrapper() {
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultLitePullConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
}
在该方法中创建了消息拉取API对象PullAPIWrapper,并注册过滤消息的钩子函数列表。
创建位移管理器
private void initOffsetStore() throws MQClientException {
if (this.defaultLitePullConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultLitePullConsumer.getOffsetStore();
} else {
// 根据不同的消费模型,创建不同的位移管理器
switch (this.defaultLitePullConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultLitePullConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
}
和上面介绍的push消费模型类似,这里根据是广播还是集群模型,创建不同的消费位移管理器。
启动定时任务
private void startScheduleTask() {
scheduledExecutorService.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
try {
fetchTopicMessageQueuesAndCompare();
} catch (Exception e) {
log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e);
}
}
}, 1000 * 10, this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(), TimeUnit.MILLISECONDS);
}
该任务会获取主题的队列并和本地的缓存数据进行比较。