RocketMQ中的生产者的创建和启动过程
RocketMQ中的生产者是消息的发送方,本文先来了解一下生产者的创建和启动过程,后续也能够更好地理解消息的发送过程。
例子
先来通过一个例子找到主线,下面的例子是官方源码提供的demo。
java
example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("192.168.1.100:9876");
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置为延迟消息,参数是延迟级别
msg.setDelayTimeLevel(3);
// 发送消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
本文先暂不关注消息发送,仅关注生产者客户端的创建和启动。
DefaultMQProducer
创建过程
java
client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
public DefaultMQProducer(final String producerGroup) {
this(null, producerGroup, null);
}
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
这里创建了DefaultMQProducerImpl实例,这个类型和DefaultMQProducer有什么区别?前者是RocketMQ中的内部实现,而后者是用来屏蔽实现细节的,可以把后者看成是前者的门面。
启动过程
java
client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@Override
public void start() throws MQClientException {
// 设置命名空间和生产者组
this.setProducerGroup(withNamespace(this.producerGroup));
// 启动生产者实现对象
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) { // 默认是null
try {
// 启动消息轨迹跟踪服务
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
先是设置了生产者组,再调用内部实现对象的start方法。
这里调用的withNamespace方法在RocketMQ的源码中经常出现,主要作用是对资源名称前面加上命名空间的名称。
DefaultMQProducerImpl
创建过程
java
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
// 异步发送消息的任务队列
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
// 异步发送消息的线程池
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
// 核心线程数和最大线程数都是当前机器可用的线程数
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
}
在该构造器中,创建了用于异步发送的任务队列和线程池。
启动过程
java
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
public void start() throws MQClientException {
this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {
// 根据当前服务的状态,执行不同的分支
switch (this.serviceState) {
// 服务刚刚创建,那么需要启动服务
case CREATE_JUST:
// 设置状态,下面启动成功再修改为RUNNING
this.serviceState = ServiceState.START_FAILED;
// 检查生产者的配置信息
this.checkConfig();
// 生成instanceName
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 获取客户端实例
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 向MQClientInstance中的producerTable注册producer
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 添加一个默认主题
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
// 启动客户端,调用到MQClientInstance中的start方法
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
// 修改服务状态为正在运行
this.serviceState = ServiceState.RUNNING;
break;
// 如果服务状态是其他,则抛出异常,说明start方法仅能启动一次。
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// 发送心跳消息给所有broker
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 启动定时任务为:移除超时的请求,并执行异常回调
RequestFutureHolder.getInstance().startScheduledTask(this);
}
该方法的步骤如下:
- 检查生产者的配置信息;
- 生成实例名称;
- 获取MQClientInstance对象;
- 注册生产者;
- 启动MQClientInstance对象,请参考RocketMQ中MQClientInstance的启动过程一文。
- 发送心跳信息给broker;
- 启动超时任务:移除超时的请求;
生成实例名称
如果生产者所在组不是“CLIENT_INNER_PRODUCER”,则会修改客户端的实例名称。
java
client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
// 替换instanceName,pid加上时间
this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
}
}
实例名称会参与客户端id的创建。
获取MQClientInstance对象
获取或创建MQClientIntance对象,内部会判断客户端的id,如果id相同,则会返回同一个客户端;如果还没创建过,则新建对象并加入缓存中。
java
client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig) {
return getOrCreateMQClientInstance(clientConfig, null);
}
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
// 生成clientID,组成形式为:IP@instanceName
String clientId = clientConfig.buildMQClientId();
// 从客户端的缓存中查找对应id的客户端实例
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
// 利用客户端id创建新的客户端对象
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
// 加入缓存,这里使用的是ConcurrentHashMap,所以不可能有两个id的客户端对象被添加进入缓存中
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
// 返回之前的客户端对象
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
// 返回客户端对象
return instance;
}
注册生产者
java
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
public synchronized boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
if (null == group || null == producer) {
return false;
}
// 如果生产者组还不存在,则添加到map中,并返回null
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
// 返回值不为null,说明已存在对应的生产者组
if (prev != null) {
log.warn("the producer group[{}] exist already.", group);
return false;
}
return true;
}
将生产者组和生产者对象映射关系添加到producerTable这个map中。
启动扫描超时请求任务
startScheduledTask
scanExpiredRequest
java
client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java
public synchronized void startScheduledTask(DefaultMQProducerImpl producer) {
this.producerSet.add(producer);
if (null == scheduledExecutorService) {
// 创建线程池
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RequestHouseKeepingService"));
// 周期性调度任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 扫描超时的请求
RequestFutureHolder.getInstance().scanExpiredRequest();
} catch (Throwable e) {
log.error("scan RequestFutureTable exception", e);
}
}
}, 1000 * 3, 1000, TimeUnit.MILLISECONDS);
}
}
java
client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java
private void scanExpiredRequest() {
final List<RequestResponseFuture> rfList = new LinkedList<RequestResponseFuture>();
// 迭代请求表
Iterator<Map.Entry<String, RequestResponseFuture>> it = requestFutureTable.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, RequestResponseFuture> next = it.next();
RequestResponseFuture rep = next.getValue();
// 判断是否超时了
if (rep.isTimeout()) {
it.remove();
rfList.add(rep);
log.warn("remove timeout request, CorrelationId={}" + rep.getCorrelationId());
}
}
// 处理已经超时的请求
for (RequestResponseFuture rf : rfList) {
try {
// 创建超时异常
Throwable cause = new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, "request timeout, no reply message.");
// 设置异常原因
rf.setCause(cause);
// 执行请求回调
rf.executeRequestCallback();
} catch (Throwable e) {
log.warn("scanResponseTable, operationComplete Exception", e);
}
}
}
这里会专门创建一个线程数量固定为1的线程池来周期性扫描超时的请求,并为其设置超时异常和调用请求回调。