RocketMQ中的MQClientInstance的创建过程
MQClientInstance是RocketMQ中客户端实现,生产者和消费者都要依靠它。它内部封装了Netty客户端,实现了nameserver和broker交互的底层细节,为上层应用提供了统一的接口。本文就来分析一下该客户端的创建过程。
MQClientInstance
java
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId) {
this(clientConfig, instanceIndex, clientId, null);
}
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
// Netty客户端的配置对象
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
// 客户端请求处理器
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
// 创建客户端远程通信API实现类的实例
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
if (this.clientConfig.getNamesrvAddr() != null) {
// 更新nameserver的地址
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}
// 客户端的id
this.clientId = clientId;
// admin控制台的实现
this.mQAdminImpl = new MQAdminImpl(this);
// 拉取消息的服务
this.pullMessageService = new PullMessageService(this);
// 负载均衡服务
this.rebalanceService = new RebalanceService(this);
// 客户端内部的生产者,为什么这里要创建生产者对象?
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
// 设置客户端的配置
this.defaultMQProducer.resetClientConfig(clientConfig);
// 消息状态管理器
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
this.instanceIndex,
this.clientId,
this.clientConfig,
MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}
重点关注创建MQClientApiImpl,内部封装了Netty的客户端。
MQClientAPIImpl
java
client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
final ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook, final ClientConfig clientConfig) {
this.clientConfig = clientConfig;
topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
// 创建Netty客户端
this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
this.clientRemotingProcessor = clientRemotingProcessor;
// 注册RPC回调
// Inject stream rpc hook first to make reserve field signature
if (clientConfig.isEnableStreamRequestType()) {
this.remotingClient.registerRPCHook(new StreamTypeRPCHook());
}
this.remotingClient.registerRPCHook(rpcHook);
// 注册一些处理器
this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
}
该构造方法中主要就做了三件事情:
- 创建NettyRemotingClient对象;
- 注册RPC钩子;
- 注册一些处理器,处理不同的请求码;这里注册的多种请求码,都会有同一个处理器来处理,即ClientRemotingProcessor类型的对象,其processRequest方法中就刚好对应了这几种请求码。
创建NettyRemotingClient
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
this(nettyClientConfig, null);
}
public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
final ChannelEventListener channelEventListener) {
super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
this.nettyClientConfig = nettyClientConfig;
this.channelEventListener = channelEventListener;
int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
// 创建线程池,用来处理业务请求
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
// 创建事件循环组,组内只有1个事件循环
this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
}
});
}
在该类的构造方法中,创建了线程池和Netty中的事件循环组。
注册RPC钩子
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@Override
public void registerRPCHook(RPCHook rpcHook) {
// 如果钩子不是null且列表中不存在,才进行添加
if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
// 将钩子对象添加到列表中
rpcHooks.add(rpcHook);
}
}
这一步没什么好说的,就是将RPC钩子对象添加到列表中。
注册处理器
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor;
if (null == executor) {
executorThis = this.publicExecutor;
}
// 将处理器和线程池绑定在一起
Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
// 将处理器添加到map中
this.processorTable.put(requestCode, pair);
}
这里将处理器和构造方法中创建的线程池绑定成pair,然后添加到map中。
客户端的id
MQClientInstance的构造方法是要求传入id的,那这个id是怎么生成的呢?那就要找是哪里创建的MQClientInstance,不管是生产者还是消费者,都是通过MQClientManager来管理器客户端实例的。
java
client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
private static MQClientManager instance = new MQClientManager();
private AtomicInteger factoryIndexGenerator = new AtomicInteger();
// 客户端的映射表,键是clientId,值是客户端对象
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
new ConcurrentHashMap<String, MQClientInstance>();
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig) {
return getOrCreateMQClientInstance(clientConfig, null);
}
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
// 生成clientID,组成形式为:IP@instanceName
String clientId = clientConfig.buildMQClientId();
// 从客户端的缓存中查找对应id的客户端实例
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
// 利用客户端id创建新的客户端对象
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
// 加入缓存,这里使用的是ConcurrentHashMap,所以不可能有两个id的客户端对象被添加进入缓存中
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
// 返回之前的客户端对象
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
// 返回客户端对象
return instance;
}
可以看到,它是调用的ClientConfig类中的buildMQClientId方法来生成客户端的id,而且对于同一客户端id,这里只会创建一个客户端实例。先来看看id的生成。
java
client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
// 实例名称
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
// 拼接IP
sb.append(this.getClientIP());
sb.append("@");
// 拼接实例名称
sb.append(this.getInstanceName());
// 默认是null,不满足条件
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
// 拼接单元名称
sb.append(this.unitName);
}
// 默认是false
if (enableStreamRequestType) {
sb.append("@");
// 拼接0
sb.append(RequestType.STREAM);
}
return sb.toString();
}
这里默认情况下只拼接了客户端所在机器的IP和实例名称,实例名称会获取配置的系统属性,默认为“DEFAULT”。但是在生产者或消费者启动时,会修改该属性。
java
client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
// 替换instanceName,pid加上时间
this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
}
}
如果没有设置客户端实例的名称,那么会设置为进程的pid加上当前的纳秒时间。虽然是纳秒精度,但是如果在同一个JVM中同时创建两个客户端,还是有可能导致id相同的。如果id相同,那么只会存在一个MQClientInstance实例。那么如果两个客户端需要不同的配置,则只会有一个生效,所以针对这种情况,最好还是让其具有不同的客户端id,除了修改实例名称外,还可以设置不同的单元名称unitName。