RocketMQ中的NettyRemotingServer

NettyRemotingServer是RocketMQ中Netty服务器的实现,封装了与Netty相关的组件。如服务端启动器、事件循环组、通道处理器及其执行器组。Nameserver和broker都会依赖该组件来实现网络通信和事件处理。本文就来分析一下该组件的实现原理。


本文会以创建过程和启动过程两条主线来分析。

创建过程

v4.9.4
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
    this(nettyServerConfig, null);
}
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
    final ChannelEventListener channelEventListener) {
    // 设置服务器单向和异步发送的信号量
    super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
    // 创建Netty的启动引导类
    this.serverBootstrap = new ServerBootstrap();
    this.nettyServerConfig = nettyServerConfig;
    this.channelEventListener = channelEventListener;

    // 获取服务器回调执行线程数量
    int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
    if (publicThreadNums <= 0) {
        // 默认是4
        publicThreadNums = 4;
    }

    // 创建线程池,用于处理请求
    this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            // 创建线程并设置指定名称
            return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
        }
    });

    // 判断是否采用netty自己实现的epoll封装
    if (useEpoll()) {
        // 创建boss组,线程数为1
        this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
            }
        });

        // 创建worker组,线程数为3
        this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);
            private int threadTotal = nettyServerConfig.getServerSelectorThreads();

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
            }
        });
    } else {
        // boss组
        this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
            }
        });

        // worker组
        this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);
            private int threadTotal = nettyServerConfig.getServerSelectorThreads();

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
            }
        });
    }

    // 加载SSL相关信息
    loadSslContext();
}

主要有两个操作:

  • 创建公共线程池:如果注册处理器时,传入的线程池为null,则会使用该公共线程池来执行处理器;
  • 创建Netty中的事件循环组;

启动过程

v4.9.4
start
prepareSharableHandlers
<
>
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@Override
public void start() {
    /*
     * 创建执行器组,让Netty的通道处理器执行;
     * 注意和构造方法中创建的publicExecutor进行区分,这个是处理业务的。
     */
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            // 这里默认返回 8
        nettyServerConfig.getServerWorkerThreads(),
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
            }
        });

    // 准备共享处理器
    prepareSharableHandlers();

    // 创建服务端启动器
    ServerBootstrap childHandler =
            // 设置boss组和worker组,前者负责连接建立,后者负责IO事件的处理
        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) //分别传入boss和worker组
            // 设置通道类型
            .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                // backlog参数
            .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
                // 地址重用
            .option(ChannelOption.SO_REUSEADDR, true)
                // 保活机制
            .option(ChannelOption.SO_KEEPALIVE, false)
                // 禁用nagle算法
            .childOption(ChannelOption.TCP_NODELAY, true)
                // 配置所要监听的地址和端口
            .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                // 设置用于请求通道提供服务的处理器
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    // 往通道的管道中添加通道处理器
                    ch.pipeline()
                            // 负责TLS协议握手的handler
                        .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                        .addLast(defaultEventExecutorGroup,
                            // 设置编码器
                            encoder,
                            // 设置解码器
                            new NettyDecoder(),
                            // Netty自带的心跳管理器,主要用来检测远端是否存活
                            new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                            // 连接管理器,负责连接的激活、断开、超时、异常等事件
                            connectionManageHandler,
                            // 服务请求处理器,负责业务处理
                            serverHandler
                        );
                }
            });
    // 设置一些通道选项
    if (nettyServerConfig.getServerSocketSndBufSize() > 0) {
        log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());
        // 设置发送缓冲区大小
        childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());
    }
    if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {
        log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());
        // 设置接收缓冲区大小
        childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());
    }
    if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {
        log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",
                nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());
        // 设置写缓冲区的低水位线和高水位线
        childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
                nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));
    }

    if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
        // 分配缓冲区
        childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    }

    try {
        // 启动Netty服务器
        ChannelFuture sync = this.serverBootstrap.bind().sync();
        InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
        this.port = addr.getPort();
    } catch (InterruptedException e1) {
        throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
    }

    if (this.channelEventListener != null) {
        // 启动Netty的事件执行器
        this.nettyEventExecutor.start();
    }

    // 启动定时任务,扫描响应表,初始启动3秒后执行,此后每隔1秒执行1次;
    this.timer.scheduleAtFixedRate(new TimerTask() {

        @Override
        public void run() {
            try {
                // 扫描响应表,将超时的ResponseFuture移除并执行其回调
                NettyRemotingServer.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
}
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
private void prepareSharableHandlers() {
    handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
    encoder = new NettyEncoder();
    connectionManageHandler = new NettyConnectManageHandler();
    serverHandler = new NettyServerHandler();
}

这个方法中的步骤如下:

  • 创建事件执行器组;
  • 调用prepareSharableHandlers来创建处理器;
  • 设置并启动Netty服务器;

注册处理器

v4.9.4
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
    ExecutorService executorThis = executor;
    // 如果线程池为null
    if (null == executor) {
        // 则使用默认的公共线程池
        executorThis = this.publicExecutor;
    }

    // 将处理器和线程池封装为pair对象
    Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
    // 添加到处理器表中,键是请求码。后续请求到来时,可以根据请求码找到处理器和线程池。
    this.processorTable.put(requestCode, pair);
}

如果传入的线程池是null,会使用默认的公共线程池。然后将执行器和线程池封装为Pair对象,最后将其添加到processTable处理器表中。

注册RPC钩子函数

v4.9.4
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@Override
public void registerRPCHook(RPCHook rpcHook) {
    // 如果还没有添加过,
    if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
        // 则添加到列表中
        rpcHooks.add(rpcHook);
    }
}

如果对应的钩子函数还没有被添加,则添加到列表中。

NettyServerHandler

这是一个通道处理器,会被设置到通道流水线中的最后一个位置上。

v4.9.4
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@ChannelHandler.Sharable
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

    // 服务端业务请求处理器的入口
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        // 处理请求
        processMessageReceived(ctx, msg);
    }
}

调用父类NettyRemotingAbstract中的方法来处理接收到的请求。

v4.9.4
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
    final RemotingCommand cmd = msg;
    if (cmd != null) {
        // 根据命令类型执行不同操作
        switch (cmd.getType()) {
            case REQUEST_COMMAND: // 是其他方发起的请求
                processRequestCommand(ctx, cmd);
                break;
            case RESPONSE_COMMAND: // 来自其他方的针对本机发起请求的响应
                processResponseCommand(ctx, cmd);
                break;
            default:
                break;
        }
    }
}

处理请求

v4.9.4
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
    // 根据请求码找到合适的处理器
    final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
    // 如果没有为请求码注册处理器,则使用默认的处理器
    final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;

    // 获取请求ID
    final int opaque = cmd.getOpaque();

    // 如果处理器信息不为null,也就是设置了默认的处理器
    if (pair != null) {
        // 创建一个用于执行请求的任务
        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    // 获取远程地址
                    String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                    // 执行请求前置钩子
                    doBeforeRpcHooks(remoteAddr, cmd);
                    // 创建回调函数,用来实现对其他方的请求做出响应
                    final RemotingResponseCallback callback = new RemotingResponseCallback() {
                        @Override
                        public void callback(RemotingCommand response) {
                            // 执行请求后置钩子
                            doAfterRpcHooks(remoteAddr, cmd, response);
                            // 如果不是单向的请求
                            if (!cmd.isOnewayRPC()) {
                                // 如果响应对象
                                if (response != null) {
                                    // 设置响应id为请求id
                                    response.setOpaque(opaque);
                                    // 标记响应状态
                                    response.markResponseType();
                                    // 设置序列化类型
                                    response.setSerializeTypeCurrentRPC(cmd.getSerializeTypeCurrentRPC());
                                    try {
                                        // 响应其他方的请求
                                        ctx.writeAndFlush(response);
                                    } catch (Throwable e) {
                                        log.error("process request over, but response failed", e);
                                        log.error(cmd.toString());
                                        log.error(response.toString());
                                    }
                                } else {
                                }
                            }
                        }
                    };
                    // 判断处理器类型是不是异步的
                    if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
                        // 获取处理器
                        AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
                        // 异步地处理请求,传入实现响应的对象
                        processor.asyncProcessRequest(ctx, cmd, callback);
                    } else {
                        // 处理处理器
                        NettyRequestProcessor processor = pair.getObject1();
                        // 处理请求
                        RemotingCommand response = processor.processRequest(ctx, cmd);
                        // 直接调用响应对象的负责响应的函数
                        callback.callback(response);
                    }
                } catch (Throwable e) {
                    log.error("process request exception", e);
                    log.error(cmd.toString());

                    if (!cmd.isOnewayRPC()) { // 不是单向请求
                        // 构建系统错误的响应对象
                        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                            RemotingHelper.exceptionSimpleDesc(e));
                        response.setOpaque(opaque);
                        // 响应错误
                        ctx.writeAndFlush(response);
                    }
                }
            }
        };

        // 如果请求处理器拒绝请求
        if (pair.getObject1().rejectRequest()) {
            // 创建系统忙错误响应对象
            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                "[REJECTREQUEST]system busy, start flow control for a while");
            response.setOpaque(opaque);
            // 响应系统忙
            ctx.writeAndFlush(response);
            return;
        }

        try {
            // 创建任务对象,这里起一个封装效果
            final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
            // 提交到处理器对应的线程池中
            pair.getObject2().submit(requestTask);
        } catch (RejectedExecutionException e) { // 发生了拒绝异常
            if ((System.currentTimeMillis() % 10000) == 0) { // 如果在特定时刻,
                // 则打印任务拒绝日志
                log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                    + ", too many requests and system thread pool busy, RejectedExecutionException "
                    + pair.getObject2().toString()
                    + " request code: " + cmd.getCode());
            }

            if (!cmd.isOnewayRPC()) { // 如果不是单向请求
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[OVERLOAD]system busy, start flow control for a while");
                response.setOpaque(opaque);
                // 响应系统忙
                ctx.writeAndFlush(response);
            }
        }
    } else { // 如果没有找到合适的处理器,或者也没有设置默认的处理器

        String error = " request type " + cmd.getCode() + " not supported";
        // 创建请求码不支持的的响应对象
        final RemotingCommand response =
            RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
        response.setOpaque(opaque);
        // 响应请求码不支持
        ctx.writeAndFlush(response);
        log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
    }
}

在该方法中,会根据请求码来找到合适的处理器,将请求封装为RequestTask并提交到线程池中。在创建的Runnable中,在处理请求前后会分别执行RPC钩子函数。

v4.9.4
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
protected void doBeforeRpcHooks(String addr, RemotingCommand request) {
    if (rpcHooks.size() > 0) {
        // 遍历钩子对象
        for (RPCHook rpcHook: rpcHooks) {
            // 执行前置逻辑
            rpcHook.doBeforeRequest(addr, request);
        }
    }
}
protected void doAfterRpcHooks(String addr, RemotingCommand request, RemotingCommand response) {
    if (rpcHooks.size() > 0) {
        // 遍历钩子对象
        for (RPCHook rpcHook: rpcHooks) {
            // 执行后置逻辑
            rpcHook.doAfterResponse(addr, request, response);
        }
    }
}

具体有哪些处理器,则需要根据NettyRemotingServer的使用方来决定。如果没有根据请求码来设置处理器,则会使用默认的处理器。比如nameserver就只设置了一个默认的处理器。

处理响应

v4.9.4
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
    // 获取请求id
    final int opaque = cmd.getOpaque();
    // 从响应表中得到future对象
    final ResponseFuture responseFuture = responseTable.get(opaque);
    if (responseFuture != null) {
        // 设置响应命令
        responseFuture.setResponseCommand(cmd);

        // 从响应表中移除
        responseTable.remove(opaque);

        // 如果存在回调
        if (responseFuture.getInvokeCallback() != null) { // 异步发送情况
            // 执行回调
            executeInvokeCallback(responseFuture);
        } else { // 同步发送情况
            // 设置响应,内部会对门闩减1,从而唤醒被阻塞的业务线程
            responseFuture.putResponse(cmd);
            responseFuture.release();
        }
    } else {
        log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
        log.warn(cmd.toString());
    }
}

先根据请求从响应表中获取到future对象,并从中移除掉,最后执行设置的响应回调函数。

v4.9.4
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
private void executeInvokeCallback(final ResponseFuture responseFuture) {
    boolean runInThisThread = false;
    // 获取回调执行线程池,默认没有设置
    ExecutorService executor = this.getCallbackExecutor();
    if (executor != null) { // 如果设置回调执行线程池
        try {
            // 提交任务
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 执行回调
                        responseFuture.executeInvokeCallback();
                    } catch (Throwable e) {
                        log.warn("execute callback in executor exception, and callback throw", e);
                    } finally {
                        responseFuture.release();
                    }
                }
            });
        } catch (Exception e) {
            runInThisThread = true;
            log.warn("execute callback in executor exception, maybe executor busy", e);
        }
    } else {
        // 在当前的线程中执行回调
        runInThisThread = true;
    }

    if (runInThisThread) {
        try {
            // 执行回调
            responseFuture.executeInvokeCallback();
        } catch (Throwable e) {
            log.warn("executeInvokeCallback Exception", e);
        } finally {
            responseFuture.release();
        }
    }
}

如果设置了回调线程池,则向其提交一个任务;否则就在本线程内执行回调处理。

v4.9.4
java
remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
public void executeInvokeCallback() {
    if (invokeCallback != null) {
        // 只允许执行一次
        if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) {
            // 执行操作完成方法
            invokeCallback.operationComplete(this);
        }
    }
}

会调用到业务方定义的operationComplete方法。