RocketMQ中的NettyRemotingServer
NettyRemotingServer是RocketMQ中Netty服务器的实现,封装了与Netty相关的组件。如服务端启动器、事件循环组、通道处理器及其执行器组。Nameserver和broker都会依赖该组件来实现网络通信和事件处理。本文就来分析一下该组件的实现原理。
本文会以创建过程和启动过程两条主线来分析。
创建过程
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
this(nettyServerConfig, null);
}
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
// 设置服务器单向和异步发送的信号量
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
// 创建Netty的启动引导类
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
// 获取服务器回调执行线程数量
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
// 默认是4
publicThreadNums = 4;
}
// 创建线程池,用于处理请求
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
// 创建线程并设置指定名称
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
// 判断是否采用netty自己实现的epoll封装
if (useEpoll()) {
// 创建boss组,线程数为1
this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
}
});
// 创建worker组,线程数为3
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
} else {
// boss组
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
}
});
// worker组
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
}
// 加载SSL相关信息
loadSslContext();
}
主要有两个操作:
- 创建公共线程池:如果注册处理器时,传入的线程池为null,则会使用该公共线程池来执行处理器;
- 创建Netty中的事件循环组;
启动过程
start
prepareSharableHandlers
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@Override
public void start() {
/*
* 创建执行器组,让Netty的通道处理器执行;
* 注意和构造方法中创建的publicExecutor进行区分,这个是处理业务的。
*/
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
// 这里默认返回 8
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
// 准备共享处理器
prepareSharableHandlers();
// 创建服务端启动器
ServerBootstrap childHandler =
// 设置boss组和worker组,前者负责连接建立,后者负责IO事件的处理
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) //分别传入boss和worker组
// 设置通道类型
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
// backlog参数
.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
// 地址重用
.option(ChannelOption.SO_REUSEADDR, true)
// 保活机制
.option(ChannelOption.SO_KEEPALIVE, false)
// 禁用nagle算法
.childOption(ChannelOption.TCP_NODELAY, true)
// 配置所要监听的地址和端口
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
// 设置用于请求通道提供服务的处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 往通道的管道中添加通道处理器
ch.pipeline()
// 负责TLS协议握手的handler
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
// 设置编码器
encoder,
// 设置解码器
new NettyDecoder(),
// Netty自带的心跳管理器,主要用来检测远端是否存活
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
// 连接管理器,负责连接的激活、断开、超时、异常等事件
connectionManageHandler,
// 服务请求处理器,负责业务处理
serverHandler
);
}
});
// 设置一些通道选项
if (nettyServerConfig.getServerSocketSndBufSize() > 0) {
log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());
// 设置发送缓冲区大小
childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());
}
if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {
log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());
// 设置接收缓冲区大小
childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());
}
if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {
log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",
nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());
// 设置写缓冲区的低水位线和高水位线
childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));
}
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
// 分配缓冲区
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
// 启动Netty服务器
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
// 启动Netty的事件执行器
this.nettyEventExecutor.start();
}
// 启动定时任务,扫描响应表,初始启动3秒后执行,此后每隔1秒执行1次;
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
// 扫描响应表,将超时的ResponseFuture移除并执行其回调
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
private void prepareSharableHandlers() {
handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
encoder = new NettyEncoder();
connectionManageHandler = new NettyConnectManageHandler();
serverHandler = new NettyServerHandler();
}
这个方法中的步骤如下:
- 创建事件执行器组;
- 调用prepareSharableHandlers来创建处理器;
- 设置并启动Netty服务器;
注册处理器
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor;
// 如果线程池为null
if (null == executor) {
// 则使用默认的公共线程池
executorThis = this.publicExecutor;
}
// 将处理器和线程池封装为pair对象
Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
// 添加到处理器表中,键是请求码。后续请求到来时,可以根据请求码找到处理器和线程池。
this.processorTable.put(requestCode, pair);
}
如果传入的线程池是null,会使用默认的公共线程池。然后将执行器和线程池封装为Pair对象,最后将其添加到processTable处理器表中。
注册RPC钩子函数
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@Override
public void registerRPCHook(RPCHook rpcHook) {
// 如果还没有添加过,
if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
// 则添加到列表中
rpcHooks.add(rpcHook);
}
}
如果对应的钩子函数还没有被添加,则添加到列表中。
NettyServerHandler
这是一个通道处理器,会被设置到通道流水线中的最后一个位置上。
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@ChannelHandler.Sharable
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
// 服务端业务请求处理器的入口
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
// 处理请求
processMessageReceived(ctx, msg);
}
}
调用父类NettyRemotingAbstract中的方法来处理接收到的请求。
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
// 根据命令类型执行不同操作
switch (cmd.getType()) {
case REQUEST_COMMAND: // 是其他方发起的请求
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND: // 来自其他方的针对本机发起请求的响应
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
处理请求
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
// 根据请求码找到合适的处理器
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
// 如果没有为请求码注册处理器,则使用默认的处理器
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
// 获取请求ID
final int opaque = cmd.getOpaque();
// 如果处理器信息不为null,也就是设置了默认的处理器
if (pair != null) {
// 创建一个用于执行请求的任务
Runnable run = new Runnable() {
@Override
public void run() {
try {
// 获取远程地址
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
// 执行请求前置钩子
doBeforeRpcHooks(remoteAddr, cmd);
// 创建回调函数,用来实现对其他方的请求做出响应
final RemotingResponseCallback callback = new RemotingResponseCallback() {
@Override
public void callback(RemotingCommand response) {
// 执行请求后置钩子
doAfterRpcHooks(remoteAddr, cmd, response);
// 如果不是单向的请求
if (!cmd.isOnewayRPC()) {
// 如果响应对象
if (response != null) {
// 设置响应id为请求id
response.setOpaque(opaque);
// 标记响应状态
response.markResponseType();
// 设置序列化类型
response.setSerializeTypeCurrentRPC(cmd.getSerializeTypeCurrentRPC());
try {
// 响应其他方的请求
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
}
};
// 判断处理器类型是不是异步的
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
// 获取处理器
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
// 异步地处理请求,传入实现响应的对象
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
// 处理处理器
NettyRequestProcessor processor = pair.getObject1();
// 处理请求
RemotingCommand response = processor.processRequest(ctx, cmd);
// 直接调用响应对象的负责响应的函数
callback.callback(response);
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
if (!cmd.isOnewayRPC()) { // 不是单向请求
// 构建系统错误的响应对象
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
// 响应错误
ctx.writeAndFlush(response);
}
}
}
};
// 如果请求处理器拒绝请求
if (pair.getObject1().rejectRequest()) {
// 创建系统忙错误响应对象
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
// 响应系统忙
ctx.writeAndFlush(response);
return;
}
try {
// 创建任务对象,这里起一个封装效果
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
// 提交到处理器对应的线程池中
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) { // 发生了拒绝异常
if ((System.currentTimeMillis() % 10000) == 0) { // 如果在特定时刻,
// 则打印任务拒绝日志
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}
if (!cmd.isOnewayRPC()) { // 如果不是单向请求
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
// 响应系统忙
ctx.writeAndFlush(response);
}
}
} else { // 如果没有找到合适的处理器,或者也没有设置默认的处理器
String error = " request type " + cmd.getCode() + " not supported";
// 创建请求码不支持的的响应对象
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
// 响应请求码不支持
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
在该方法中,会根据请求码来找到合适的处理器,将请求封装为RequestTask并提交到线程池中。在创建的Runnable中,在处理请求前后会分别执行RPC钩子函数。
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
protected void doBeforeRpcHooks(String addr, RemotingCommand request) {
if (rpcHooks.size() > 0) {
// 遍历钩子对象
for (RPCHook rpcHook: rpcHooks) {
// 执行前置逻辑
rpcHook.doBeforeRequest(addr, request);
}
}
}
protected void doAfterRpcHooks(String addr, RemotingCommand request, RemotingCommand response) {
if (rpcHooks.size() > 0) {
// 遍历钩子对象
for (RPCHook rpcHook: rpcHooks) {
// 执行后置逻辑
rpcHook.doAfterResponse(addr, request, response);
}
}
}
具体有哪些处理器,则需要根据NettyRemotingServer的使用方来决定。如果没有根据请求码来设置处理器,则会使用默认的处理器。比如nameserver就只设置了一个默认的处理器。
处理响应
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
// 获取请求id
final int opaque = cmd.getOpaque();
// 从响应表中得到future对象
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
// 设置响应命令
responseFuture.setResponseCommand(cmd);
// 从响应表中移除
responseTable.remove(opaque);
// 如果存在回调
if (responseFuture.getInvokeCallback() != null) { // 异步发送情况
// 执行回调
executeInvokeCallback(responseFuture);
} else { // 同步发送情况
// 设置响应,内部会对门闩减1,从而唤醒被阻塞的业务线程
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}
先根据请求从响应表中获取到future对象,并从中移除掉,最后执行设置的响应回调函数。
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;
// 获取回调执行线程池,默认没有设置
ExecutorService executor = this.getCallbackExecutor();
if (executor != null) { // 如果设置回调执行线程池
try {
// 提交任务
executor.submit(new Runnable() {
@Override
public void run() {
try {
// 执行回调
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("execute callback in executor exception, and callback throw", e);
} finally {
responseFuture.release();
}
}
});
} catch (Exception e) {
runInThisThread = true;
log.warn("execute callback in executor exception, maybe executor busy", e);
}
} else {
// 在当前的线程中执行回调
runInThisThread = true;
}
if (runInThisThread) {
try {
// 执行回调
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("executeInvokeCallback Exception", e);
} finally {
responseFuture.release();
}
}
}
如果设置了回调线程池,则向其提交一个任务;否则就在本线程内执行回调处理。
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
public void executeInvokeCallback() {
if (invokeCallback != null) {
// 只允许执行一次
if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) {
// 执行操作完成方法
invokeCallback.operationComplete(this);
}
}
}
会调用到业务方定义的operationComplete方法。