RocketMQ中的MQClientInstance的创建过程

MQClientInstance是RocketMQ中客户端实现,生产者和消费者都要依靠它。它内部封装了Netty客户端,实现了nameserver和broker交互的底层细节,为上层应用提供了统一的接口。本文就来分析一下该客户端的创建过程。


MQClientInstance

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId) {
    this(clientConfig, instanceIndex, clientId, null);
}
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
    this.clientConfig = clientConfig;
    this.instanceIndex = instanceIndex;
    // Netty客户端的配置对象
    this.nettyClientConfig = new NettyClientConfig();
    this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
    this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
    // 客户端请求处理器
    this.clientRemotingProcessor = new ClientRemotingProcessor(this);
    // 创建客户端远程通信API实现类的实例
    this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);

    if (this.clientConfig.getNamesrvAddr() != null) {
        // 更新nameserver的地址
        this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
        log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
    }

    // 客户端的id
    this.clientId = clientId;

    // admin控制台的实现
    this.mQAdminImpl = new MQAdminImpl(this);

    // 拉取消息的服务
    this.pullMessageService = new PullMessageService(this);

    // 负载均衡服务
    this.rebalanceService = new RebalanceService(this);

    // 客户端内部的生产者,为什么这里要创建生产者对象?
    this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
    // 设置客户端的配置
    this.defaultMQProducer.resetClientConfig(clientConfig);

    // 消息状态管理器
    this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);

    log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
        this.instanceIndex,
        this.clientId,
        this.clientConfig,
        MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}

重点关注创建MQClientApiImpl,内部封装了Netty的客户端。

MQClientAPIImpl

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
    final ClientRemotingProcessor clientRemotingProcessor,
    RPCHook rpcHook, final ClientConfig clientConfig) {
    this.clientConfig = clientConfig;
    topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
    // 创建Netty客户端
    this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
    this.clientRemotingProcessor = clientRemotingProcessor;

    // 注册RPC回调
    // Inject stream rpc hook first to make reserve field signature
    if (clientConfig.isEnableStreamRequestType()) {
        this.remotingClient.registerRPCHook(new StreamTypeRPCHook());
    }
    this.remotingClient.registerRPCHook(rpcHook);

    // 注册一些处理器
    this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);

    this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);

    this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);

    this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);

    this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);

    this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);

    this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
}

该构造方法中主要就做了三件事情:

  • 创建NettyRemotingClient对象;
  • 注册RPC钩子;
  • 注册一些处理器,处理不同的请求码;这里注册的多种请求码,都会有同一个处理器来处理,即ClientRemotingProcessor类型的对象,其processRequest方法中就刚好对应了这几种请求码。

创建NettyRemotingClient

v4.9.4
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
    this(nettyClientConfig, null);
}
public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
    final ChannelEventListener channelEventListener) {
    super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
    this.nettyClientConfig = nettyClientConfig;
    this.channelEventListener = channelEventListener;

    int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
    if (publicThreadNums <= 0) {
        publicThreadNums = 4;
    }

    // 创建线程池,用来处理业务请求
    this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
        }
    });

    // 创建事件循环组,组内只有1个事件循环
    this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
        }
    });

}

在该类的构造方法中,创建了线程池和Netty中的事件循环组。

注册RPC钩子

v4.9.4
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@Override
public void registerRPCHook(RPCHook rpcHook) {
    // 如果钩子不是null且列表中不存在,才进行添加
    if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
        // 将钩子对象添加到列表中
        rpcHooks.add(rpcHook);
    }
}

这一步没什么好说的,就是将RPC钩子对象添加到列表中。

注册处理器

v4.9.4
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
    ExecutorService executorThis = executor;
    if (null == executor) {
        executorThis = this.publicExecutor;
    }

    // 将处理器和线程池绑定在一起
    Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
    // 将处理器添加到map中
    this.processorTable.put(requestCode, pair);
}

这里将处理器和构造方法中创建的线程池绑定成pair,然后添加到map中。

客户端的id

MQClientInstance的构造方法是要求传入id的,那这个id是怎么生成的呢?那就要找是哪里创建的MQClientInstance,不管是生产者还是消费者,都是通过MQClientManager来管理器客户端实例的。

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
private static MQClientManager instance = new MQClientManager();
private AtomicInteger factoryIndexGenerator = new AtomicInteger();
// 客户端的映射表,键是clientId,值是客户端对象
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
    new ConcurrentHashMap<String, MQClientInstance>();
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig) {
    return getOrCreateMQClientInstance(clientConfig, null);
}

public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
    // 生成clientID,组成形式为:IP@instanceName
    String clientId = clientConfig.buildMQClientId();
    // 从客户端的缓存中查找对应id的客户端实例
    MQClientInstance instance = this.factoryTable.get(clientId);
    if (null == instance) {
        // 利用客户端id创建新的客户端对象
        instance =
            new MQClientInstance(clientConfig.cloneClientConfig(),
                this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
        // 加入缓存,这里使用的是ConcurrentHashMap,所以不可能有两个id的客户端对象被添加进入缓存中
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
        if (prev != null) {
            // 返回之前的客户端对象
            instance = prev;
            log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
        } else {
            log.info("Created new MQClientInstance for clientId:[{}]", clientId);
        }
    }

    // 返回客户端对象
    return instance;
}

可以看到,它是调用的ClientConfig类中的buildMQClientId方法来生成客户端的id,而且对于同一客户端id,这里只会创建一个客户端实例。先来看看id的生成。

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
// 实例名称
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
public String buildMQClientId() {
    StringBuilder sb = new StringBuilder();
    // 拼接IP
    sb.append(this.getClientIP());

    sb.append("@");
    // 拼接实例名称
    sb.append(this.getInstanceName());

    // 默认是null,不满足条件
    if (!UtilAll.isBlank(this.unitName)) {
        sb.append("@");
        // 拼接单元名称
        sb.append(this.unitName);
    }

    // 默认是false
    if (enableStreamRequestType) {
        sb.append("@");
        // 拼接0
        sb.append(RequestType.STREAM);
    }

    return sb.toString();
}

这里默认情况下只拼接了客户端所在机器的IP和实例名称,实例名称会获取配置的系统属性,默认为“DEFAULT”。但是在生产者或消费者启动时,会修改该属性。

v4.9.4
java
client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
public void changeInstanceNameToPID() {
    if (this.instanceName.equals("DEFAULT")) {
        // 替换instanceName,pid加上时间
        this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
    }
}

如果没有设置客户端实例的名称,那么会设置为进程的pid加上当前的纳秒时间。虽然是纳秒精度,但是如果在同一个JVM中同时创建两个客户端,还是有可能导致id相同的。如果id相同,那么只会存在一个MQClientInstance实例。那么如果两个客户端需要不同的配置,则只会有一个生效,所以针对这种情况,最好还是让其具有不同的客户端id,除了修改实例名称外,还可以设置不同的单元名称unitName