RocketMQ中的生产者的创建和启动过程

RocketMQ中的生产者是消息的发送方,本文先来了解一下生产者的创建和启动过程,后续也能够更好地理解消息的发送过程。


例子

先来通过一个例子找到主线,下面的例子是官方源码提供的demo。

v4.9.4
java
example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("192.168.1.100:9876");
        producer.start();

        for (int i = 0; i < 128; i++)
            try {
                {
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

                    // 设置为延迟消息,参数是延迟级别
                    msg.setDelayTimeLevel(3);

                    // 发送消息
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }
}

本文先暂不关注消息发送,仅关注生产者客户端的创建和启动。

DefaultMQProducer

创建过程

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
public DefaultMQProducer(final String producerGroup) {
    this(null, producerGroup, null);
}
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
    this.namespace = namespace;
    this.producerGroup = producerGroup;
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}

这里创建了DefaultMQProducerImpl实例,这个类型和DefaultMQProducer有什么区别?前者是RocketMQ中的内部实现,而后者是用来屏蔽实现细节的,可以把后者看成是前者的门面。

启动过程

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@Override
public void start() throws MQClientException {
    // 设置命名空间和生产者组
    this.setProducerGroup(withNamespace(this.producerGroup));
    // 启动生产者实现对象
    this.defaultMQProducerImpl.start();
    if (null != traceDispatcher) { // 默认是null
        try {
            // 启动消息轨迹跟踪服务
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

先是设置了生产者组,再调用内部实现对象的start方法。

这里调用的withNamespace方法在RocketMQ的源码中经常出现,主要作用是对资源名称前面加上命名空间的名称。

DefaultMQProducerImpl

创建过程

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
    this.defaultMQProducer = defaultMQProducer;
    this.rpcHook = rpcHook;

    // 异步发送消息的任务队列
    this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
    // 异步发送消息的线程池
    this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
            // 核心线程数和最大线程数都是当前机器可用的线程数
        Runtime.getRuntime().availableProcessors(),
        Runtime.getRuntime().availableProcessors(),
        1000 * 60,
        TimeUnit.MILLISECONDS,
        this.asyncSenderThreadPoolQueue,
        new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
            }
        });
}

在该构造器中,创建了用于异步发送的任务队列和线程池。

启动过程

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
public void start() throws MQClientException {
    this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {
    // 根据当前服务的状态,执行不同的分支
    switch (this.serviceState) {
        // 服务刚刚创建,那么需要启动服务
        case CREATE_JUST:
            // 设置状态,下面启动成功再修改为RUNNING
            this.serviceState = ServiceState.START_FAILED;

            // 检查生产者的配置信息
            this.checkConfig();

            // 生成instanceName
            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }

            // 获取客户端实例
            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

            // 向MQClientInstance中的producerTable注册producer
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }

            // 添加一个默认主题
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

            if (startFactory) {
                // 启动客户端,调用到MQClientInstance中的start方法
                mQClientFactory.start();
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                this.defaultMQProducer.isSendMessageWithVIPChannel());
            // 修改服务状态为正在运行
            this.serviceState = ServiceState.RUNNING;
            break;

        // 如果服务状态是其他,则抛出异常,说明start方法仅能启动一次。
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }

    // 发送心跳消息给所有broker
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

    // 启动定时任务为:移除超时的请求,并执行异常回调
    RequestFutureHolder.getInstance().startScheduledTask(this);

}

该方法的步骤如下:

  • 检查生产者的配置信息;
  • 生成实例名称;
  • 获取MQClientInstance对象;
  • 注册生产者;
  • 启动MQClientInstance对象,请参考RocketMQ中MQClientInstance的启动过程一文。
  • 发送心跳信息给broker;
  • 启动超时任务:移除超时的请求;

生成实例名称

如果生产者所在组不是“CLIENT_INNER_PRODUCER”,则会修改客户端的实例名称。

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
public void changeInstanceNameToPID() {
    if (this.instanceName.equals("DEFAULT")) {
        // 替换instanceName,pid加上时间
        this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
    }
}

实例名称会参与客户端id的创建。

获取MQClientInstance对象

获取或创建MQClientIntance对象,内部会判断客户端的id,如果id相同,则会返回同一个客户端;如果还没创建过,则新建对象并加入缓存中。

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig) {
    return getOrCreateMQClientInstance(clientConfig, null);
}

public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
    // 生成clientID,组成形式为:IP@instanceName
    String clientId = clientConfig.buildMQClientId();
    // 从客户端的缓存中查找对应id的客户端实例
    MQClientInstance instance = this.factoryTable.get(clientId);
    if (null == instance) {
        // 利用客户端id创建新的客户端对象
        instance =
            new MQClientInstance(clientConfig.cloneClientConfig(),
                this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
        // 加入缓存,这里使用的是ConcurrentHashMap,所以不可能有两个id的客户端对象被添加进入缓存中
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
        if (prev != null) {
            // 返回之前的客户端对象
            instance = prev;
            log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
        } else {
            log.info("Created new MQClientInstance for clientId:[{}]", clientId);
        }
    }

    // 返回客户端对象
    return instance;
}

注册生产者

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
public synchronized boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
    if (null == group || null == producer) {
        return false;
    }

    // 如果生产者组还不存在,则添加到map中,并返回null
    MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
    // 返回值不为null,说明已存在对应的生产者组
    if (prev != null) {
        log.warn("the producer group[{}] exist already.", group);
        return false;
    }

    return true;
}

将生产者组和生产者对象映射关系添加到producerTable这个map中。

启动扫描超时请求任务

v4.9.4
startScheduledTask
scanExpiredRequest
<
>
java
client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java
public synchronized void startScheduledTask(DefaultMQProducerImpl producer) {
    this.producerSet.add(producer);
    if (null == scheduledExecutorService) {
        // 创建线程池
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RequestHouseKeepingService"));

        // 周期性调度任务
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    // 扫描超时的请求
                    RequestFutureHolder.getInstance().scanExpiredRequest();
                } catch (Throwable e) {
                    log.error("scan RequestFutureTable exception", e);
                }
            }
        }, 1000 * 3, 1000, TimeUnit.MILLISECONDS);

    }
}
java
client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java
private void scanExpiredRequest() {
    final List<RequestResponseFuture> rfList = new LinkedList<RequestResponseFuture>();
    // 迭代请求表
    Iterator<Map.Entry<String, RequestResponseFuture>> it = requestFutureTable.entrySet().iterator();
    while (it.hasNext()) {
        Map.Entry<String, RequestResponseFuture> next = it.next();
        RequestResponseFuture rep = next.getValue();

        // 判断是否超时了
        if (rep.isTimeout()) {
            it.remove();
            rfList.add(rep);
            log.warn("remove timeout request, CorrelationId={}" + rep.getCorrelationId());
        }
    }

    // 处理已经超时的请求
    for (RequestResponseFuture rf : rfList) {
        try {
            // 创建超时异常
            Throwable cause = new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, "request timeout, no reply message.");
            // 设置异常原因
            rf.setCause(cause);
            // 执行请求回调
            rf.executeRequestCallback();
        } catch (Throwable e) {
            log.warn("scanResponseTable, operationComplete Exception", e);
        }
    }
}

这里会专门创建一个线程数量固定为1的线程池来周期性扫描超时的请求,并为其设置超时异常和调用请求回调。