RocketMQ中消费者的创建和启动过程

在RocketMQ中,支持两种消息消费模式,即PULL(拉模式)和PUSH(推模式),PUSH模式的效果类似broker将消息推送到消费者客户端,不过这是一种伪推送,是基于对PULL模式的封装来实现的。本文先来梳理一下消费者的创建和启动过程,从而能够更好理解消息的拉取过程。


RocketMQ中有两种消费模式,自然也有两种消费者客户端。

在RocketMQ4.6版本,引入了DefaultLitePullConsumer,而将原有的DefaultMQPullConsumer类标记为废弃。

PUSH模式消费者

创建过程

DefaultMQPushConsumer作为面向用户的门面类,提供了非常多的构造方法,下面仅给出主要的构造方法的源码。

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
    AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
    this.consumerGroup = consumerGroup;
    this.namespace = namespace;
    this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
    // 创建消费者impl对象
    defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
    if (enableMsgTrace) {
        try {
            // 创建异步跟踪分发器
            AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook);
            dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
            traceDispatcher = dispatcher;
            this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
                new ConsumeMessageTraceHookImpl(traceDispatcher));
        } catch (Throwable e) {
            log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
        }
    }
}

在该构造器中,主要是创建了DefaultMQPushConsumerImpl对象。和生产者一样,xxx是面向用户的类,而xxxImpl是RocketMQ内部的实现,前者算是后者的门面。

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
    this.defaultMQPushConsumer = defaultMQPushConsumer;
    this.rpcHook = rpcHook;
    // 消费者状态错误时采用定时任务执行拉取请求的时间间隔
    this.pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException();
}

启动过程

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@Override
public void start() throws MQClientException {
    // 设置消费组
    setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
    // 启动impl对象
    this.defaultMQPushConsumerImpl.start();
    if (null != traceDispatcher) {
        try {
            // 启动跟踪分发器
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

进一步调用DefaultMQPushConsumerImplstart方法。

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
public synchronized void start() throws MQClientException {
    // 根据当前的服务状态执行不同的分支
    switch (this.serviceState) {
        // 服务刚刚创建,那么启动服务
        case CREATE_JUST:
            log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
            // 首先修改服务状态为启动失败,下面启动成功后会修改状态值
            this.serviceState = ServiceState.START_FAILED;

            // 检查配置
            this.checkConfig();

            // 拷贝主题订阅关系
            this.copySubscription();

            // 如果是集群消费模式
            if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultMQPushConsumer.changeInstanceNameToPID();
            }

            // 获取或创建客户端对象
            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

            // 设置rebalanceImpl的一些属性,RebalanceImpl主要是负责再均衡
            this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); // 设置消费组
            this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); // 设置消息模型
            this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

            // 创建消息拉取核心对象
            this.pullAPIWrapper = new PullAPIWrapper(
                mQClientFactory,
                this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
            // 设置过滤消息的钩子函数
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

            // 选择使用哪种offsetStore,用于实现消费者的消费位移量offset的管理
            if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
            } else {
                // 根据消息模型来选择
                switch (this.defaultMQPushConsumer.getMessageModel()) {
                    case BROADCASTING: // 广播模式
                        /*
                         LocalFileOffsetStore:顾名思义,就是将消费进度存储在 Consumer 本地,Consumer 会在磁盘上生成文件以保存进度。
                         */
                        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    case CLUSTERING: // 集群模式(默认),
                        /*
                         RemoteBrokerOffsetStore:将消费进度保存在远端的 Broker。
                         */
                        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    default:
                        break;
                }
                // 设置offsetStore
                this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
            }
            // 加载消费位移
            this.offsetStore.load();

            // 创建不同类型的消费服务,根据messageListener的类型来判断
            // 顺序消费
            if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                this.consumeOrderly = true;
                this.consumeMessageService =
                    new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
            }
            // 并发消费
            else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                this.consumeOrderly = false;
                this.consumeMessageService =
                    new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
            }

            // 启动消费服务
            this.consumeMessageService.start();

            // 注册消费组到客户端的消费者表
            boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }

            // 启动客户端
            mQClientFactory.start();
            log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
            // 设置状态为运行中
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }

    // 更新主题订阅信息,下面再均衡时会用到
    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    /*
     * 随机选择一个broker,发送检查客户端tag配置的请求
     * 主要是检测broker是否支持SQL92类型的tag过滤以及tag的语法是否正确
     */
    this.mQClientFactory.checkClientInBroker();
    /*
     * 立即给所有broker发送心跳信息
     * 尽管客户端内部会有定时任务也会向所有的broker发送心跳,但是存在延迟,所以这里立即发送一次
     */
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    // 立即执行一次再均衡操作,不然当前消费者不会收到任何的消息
    this.mQClientFactory.rebalanceImmediately();
}

该方法的核心步骤如下:

  • 获取或创建MQClientInstance对象;
  • 设置再均衡属性;
  • 创建消息拉取核心对象;
  • 根据消费模型(广播还是集群),创建不同的位移管理器并加载位移;
  • 根据消息监听器的类型,创建不同的消息消费服务并启动;
  • 注册消费组到客户端中;
  • 启动客户端:请参考RocketMQ中的MQClientInstance的启动过程一文;
  • 更新主题订阅消息;
  • 给broker发送心跳信息;
  • 立即执行一次再均衡;

注册消费组

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
public synchronized boolean registerConsumer(final String group, final MQConsumerInner consumer) {
    if (null == group || null == consumer) {
        return false;
    }

    // 将消费组和消费者对象添加到缓存中
    MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
    if (prev != null) {
        log.warn("the consumer group[" + group + "] exist already.");
        return false;
    }

    return true;
}

在该方法中,会添加消费组和消费者的映射关系到consumerTable这个map中。

PULL模式消费者

创建过程

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
    this.namespace = namespace;
    this.consumerGroup = consumerGroup;
    this.enableStreamRequestType = true;
    defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
}

同样的思路,创建门面类封装的具体实现类DefaultLitePullConsumerImpl的对象。

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
    this.defaultLitePullConsumer = defaultLitePullConsumer;
    this.rpcHook = rpcHook;
    // 创建调度任务线程池
    this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
        this.defaultLitePullConsumer.getPullThreadNums(),
        new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup())
    );
    this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "MonitorMessageQueueChangeThread");
        }
    });
    this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
}

启动过程

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@Override
public void start() throws MQClientException {
    setTraceDispatcher();
    setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
    this.defaultLitePullConsumerImpl.start();
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

调用DefaultLitePullConsumerImpl类中的start方法。

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
public synchronized void start() throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;

            this.checkConfig();

            if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultLitePullConsumer.changeInstanceNameToPID();
            }

            // 创建客户端对象
            initMQClientFactory();

            // 初始化再均衡服务
            initRebalanceImpl();

            // 创建消息拉取服务
            initPullAPIWrapper();

            // 初始化消费位移管理器
            initOffsetStore();

            // 启动客户端对象
            mQClientFactory.start();

            // 启动调度任务
            startScheduleTask();

            this.serviceState = ServiceState.RUNNING;

            log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup());

            // 启动后操作
            operateAfterRunning();

            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }
}

该方法的步骤如下,其实大体上和push模式的消费者都是差不多的。

  • 创建和启动客户端对象,不再赘述;
  • 初始化再均衡服务;
  • 创建消息拉取API对象;
  • 初始化消费位移管理器;
  • 启动调度任务;
  • 执行启动后的操作;

创建和启动客户端对象

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
private void initMQClientFactory() throws MQClientException {
    // 创建客户端对象
    this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);
    // 注册消费者
    boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this);
    if (!registerOK) {
        this.serviceState = ServiceState.CREATE_JUST;

        throw new MQClientException("The consumer group[" + this.defaultLitePullConsumer.getConsumerGroup()
            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
            null);
    }
}

通过客户端管理器来创建MQClientInstance对象,然后将当前消费者注册到客户端对象中。

初始化再均衡服务

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);
private void initRebalanceImpl() {
    this.rebalanceImpl.setConsumerGroup(this.defaultLitePullConsumer.getConsumerGroup());
    this.rebalanceImpl.setMessageModel(this.defaultLitePullConsumer.getMessageModel());
    this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultLitePullConsumer.getAllocateMessageQueueStrategy());
    this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
}

创建消息拉取API对象

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
private void initPullAPIWrapper() {
    this.pullAPIWrapper = new PullAPIWrapper(
        mQClientFactory,
        this.defaultLitePullConsumer.getConsumerGroup(), isUnitMode());
    this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
}

在该方法中创建了消息拉取API对象PullAPIWrapper,并注册过滤消息的钩子函数列表。

创建位移管理器

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
private void initOffsetStore() throws MQClientException {
    if (this.defaultLitePullConsumer.getOffsetStore() != null) {
        this.offsetStore = this.defaultLitePullConsumer.getOffsetStore();
    } else {
        // 根据不同的消费模型,创建不同的位移管理器
        switch (this.defaultLitePullConsumer.getMessageModel()) {
            case BROADCASTING:
                this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup());
                break;
            case CLUSTERING:
                this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup());
                break;
            default:
                break;
        }
        this.defaultLitePullConsumer.setOffsetStore(this.offsetStore);
    }
    this.offsetStore.load();
}

和上面介绍的push消费模型类似,这里根据是广播还是集群模型,创建不同的消费位移管理器。

启动定时任务

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
private void startScheduleTask() {
    scheduledExecutorService.scheduleAtFixedRate(
        new Runnable() {
            @Override
            public void run() {
                try {
                    fetchTopicMessageQueuesAndCompare();
                } catch (Exception e) {
                    log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e);
                }
            }
        }, 1000 * 10, this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(), TimeUnit.MILLISECONDS);
}

该任务会获取主题的队列并和本地的缓存数据进行比较。