ReactorNetty中HttpServer的请求处理流程

在启动过程中分析到,当客户端的请求到来时,会为其设置一个特殊的ReactiveBridge处理器来处理请求,从名称就可以看出来,该处理器是Netty和反应式的桥接器。本文就来分析一下,当新请求到来时,该处理器是怎么处理的。


强烈建议在阅读本文之前先熟悉下面两篇文章中的内容,否则容易不知道讲的是什么。

前情回顾

当Acceptor收到请求后,会往请求的通道对象中添加一个通道初始化器TransportChannelInitializer,然后并注册到worker事件循环组中。注册后会调用这个通道初始化器的initChannel方法。

v1.1.17
java
reactor-netty-core/src/main/java/reactor/netty/transport/TransportConfig.java
@Override
protected void initChannel(Channel channel) {
    ChannelPipeline pipeline = channel.pipeline();


    /*
     * 设置reactiveBridge通道处理器
     * 第二个参数是连接操作提供者,会用来创建连接操作对象。
     * 对于HttpServerConfig而言,没有实现TransportConfig中的方法,而后者的默认实现是返回null。
     */
    ChannelOperations.addReactiveBridge(channel, config.channelOperationsProvider(), connectionObserver);

    // 追加通道初始化操作
    config.defaultOnChannelInit()
            // 添加配置中的初始化器
          .then(config.doOnChannelInit)
            // 执行通道初始化
          .onChannelInit(connectionObserver, channel, remoteAddress);

    // 从pipeline中移除该handler,因为初始化器执行一次就够了。
    pipeline.remove(this);

    if (log.isDebugEnabled()) {
        log.debug(format(channel, "Initialized pipeline {}"), pipeline.toString());
    }
}

在该方法中,会往请求的通道对象中注册一个通道处理器。在调用addReactiveBridge方法时,除了通道对象外,第二个参数是通道操作提供器,第三个参数是连接观察者(会处理通道事件)。

v1.1.17
java
reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java
public static void addReactiveBridge(Channel ch, OnSetup opsFactory, ConnectionObserver listener) {
    requireNonNull(ch, "channel");
    requireNonNull(opsFactory, "opsFactory");
    requireNonNull(listener, "listener");
    // 注册通道处理器
    ch.pipeline()
      .addLast(NettyPipeline.ReactiveBridge, new ChannelOperationsHandler(opsFactory, listener));
}

添加的通道处理器是ChannelOperationsHandler类型的。

添加了ReactiveBridge处理器后,会执行通道初始化操作。

通道初始化

初始化操作调用的是HttpServerConfig中的内部类HttpServerChannelInitializer中的onChannelInit方法。

v1.1.17
java
reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java
configureHttp11Pipeline(
        channel.pipeline(),
        accessLogEnabled,
        accessLog,
        compressPredicate(compressPredicate, minCompressionSize),
        cookieDecoder,
        cookieEncoder,
        false,
        decoder,
        formDecoderProvider,
        forwardedHeaderHandler,
        httpMessageLogFactory,
        idleTimeout,
        observer,
        mapHandle,
        maxKeepAliveRequests,
        methodTagValue,
        metricsRecorder,
        minCompressionSize,
        readTimeout,
        requestTimeout,
        uriTagValue);

该方法比较复杂,忽略的SSL和其他HTTP版本的实现,来看看configureHttp11Pipeline方法的执行过程。

v1.1.17
java
reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java
@SuppressWarnings("deprecation")
static void configureHttp11Pipeline(ChannelPipeline p,
        boolean accessLogEnabled,
        @Nullable Function<AccessLogArgProvider, AccessLog> accessLog,
        @Nullable BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate,
        ServerCookieDecoder cookieDecoder,
        ServerCookieEncoder cookieEncoder,
        boolean channelOpened,
        HttpRequestDecoderSpec decoder,
        HttpServerFormDecoderProvider formDecoderProvider,
        @Nullable BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler,
        HttpMessageLogFactory httpMessageLogFactory,
        @Nullable Duration idleTimeout,
        ConnectionObserver listener,
        @Nullable BiFunction<? super Mono<Void>, ? super Connection, ? extends Mono<Void>> mapHandle,
        int maxKeepAliveRequests,
        @Nullable Function<String, String> methodTagValue,
        @Nullable ChannelMetricsRecorder metricsRecorder,
        int minCompressionSize,
        @Nullable Duration readTimeout,
        @Nullable Duration requestTimeout,
        @Nullable Function<String, String> uriTagValue) {
    HttpDecoderConfig decoderConfig = new HttpDecoderConfig();
    decoderConfig.setMaxInitialLineLength(decoder.maxInitialLineLength())
                 .setMaxHeaderSize(decoder.maxHeaderSize())
                 .setMaxChunkSize(decoder.maxChunkSize())
                 .setValidateHeaders(decoder.validateHeaders())
                 .setInitialBufferSize(decoder.initialBufferSize())
                 .setAllowDuplicateContentLengths(decoder.allowDuplicateContentLengths());
    p.addBefore(NettyPipeline.ReactiveBridge,
                NettyPipeline.HttpCodec,
                new HttpServerCodec(decoderConfig))
     .addBefore(NettyPipeline.ReactiveBridge, // 添加在ReactiveBridge之前
                NettyPipeline.HttpTrafficHandler,
                // 创建通道处理器
                new HttpTrafficHandler(compressPredicate, cookieDecoder, cookieEncoder, formDecoderProvider,
                        forwardedHeaderHandler, httpMessageLogFactory, idleTimeout, listener, mapHandle, maxKeepAliveRequests,
                        readTimeout, requestTimeout));

}

在该方法中添加了一些通道处理器,重点关注一下HttpTrafficHandler

HttpTrafficHandler

这里不是要介绍是怎么进行HTTP流量的处理的,而是重点关注HttpServerOperations对象的创建和绑定。

v1.1.17
java
reactor-netty-http/src/main/java/reactor/netty/http/server/HttpTrafficHandler.java
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
            HttpServerOperations ops;
                // 创建HTTP服务的操作对象
                ops = new HttpServerOperations(Connection.from(ctx.channel()),
                        listener,
                        request,
                        compress,
                        connectionInfo,
                        cookieDecoder,
                        cookieEncoder,
                        formDecoderProvider,
                        httpMessageLogFactory,
                        false,
                        mapHandle,
                        readTimeout,
                        requestTimeout,
                        secure,
                        timestamp);
            // 绑定到通道对象中
            ops.bind();
}

该方法非常长,只需要关注与HttpServerOperations相关的部分即可。创建好ops对象后,会将其绑定到通道的属性中,下面会获取。

v1.1.17
java
reactor-netty-core/src/main/java/reactor/netty/Connection.java
default Connection bind() {
    channel().attr(ReactorNetty.CONNECTION)
             .set(this);
    return this;
}

因为HttpTrafficHandler被添加在ChannelOperationHandler的前面,所以下面会获取到绑定的ops对象。

ChannelOperationHandler

本文关注的是请求入站处理,所以直接看channelRead方法。

v1.1.17
java
reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperationsHandler.java
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public final void channelRead(ChannelHandlerContext ctx, Object msg) {
    if (msg == null || msg == Unpooled.EMPTY_BUFFER || msg instanceof EmptyByteBuf) {
        return;
    }
    try {
        // 这里主要考虑返回值类型是HttpServerOperations
        ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
        if (ops != null) {
            ops.onInboundNext(ctx, msg);
        }
        else {
            ReferenceCountUtil.release(msg);
        }
    }
    catch (Throwable err) {
    }
}

该方法中就两个操作:

  • 获取通道操作对象;
  • 调用操作对象的onInboundNext方法。

获取通道操作对象

v1.1.17
get
from
<
>
java
reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java
@Nullable
public static ChannelOperations<?, ?> get(Channel ch) {
    return Connection.from(ch)
                     .as(ChannelOperations.class);
}
java
reactor-netty-core/src/main/java/reactor/netty/Connection.java
static Connection from(Channel channel) {
    requireNonNull(channel, "channel");
    // 如果通道属性中存在
    if (channel.hasAttr(ReactorNetty.CONNECTION)) {
        // 则从属性中获取
        return channel.attr(ReactorNetty.CONNECTION)
                      .get();
    }
    // 新建对象
    return new ReactorNetty.SimpleConnection(channel).bind();
}

在上面的from方法中,会进入if语句块,获取到上面绑定的ops对象。

调用onInboundNext方法

v1.1.17
java
reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java
@Override
protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
    if (msg instanceof HttpRequest) {
        try {
            // 接收请求
            listener().onStateChange(this, HttpServerState.REQUEST_RECEIVED);
        }
        return;
    }
}

这里调用到的listener是从TransportChannelInitializerinitChannel方法中一路传进去的,就是ConnectionObserver类型的属性connectionObserver,这个属性又是ServerTransportbind方法传入的,就是同方法中定义的变量childObsChildObserver类型的变量,是ServerTransport中的内部类)。 总之,这里会传播REQUEST_RECEIVED请求状态,连接观察者如果对该状态感兴趣则会处理。

v1.1.17
onStateChange
bind
<
>
java
reactor-netty-core/src/main/java/reactor/netty/transport/ServerTransport.java
@Override
public void onStateChange(Connection connection, State newState) {
    if (newState == State.DISCONNECTING) {
        if (connection.channel().isActive() && !connection.isPersistent()) {
            connection.dispose();
        }
    }

    // 处理连接状态
    childObs.onStateChange(connection, newState);
}
java
reactor-netty-core/src/main/java/reactor/netty/transport/ServerTransport.java
public Mono<? extends DisposableServer> bind() {
    // 这里的config参考HttpServerBind的无参构造器
    CONF config = configuration();
    Objects.requireNonNull(config.bindAddress(), "bindAddress");

    Mono<? extends DisposableServer> mono =  Mono.create(sink -> {
        // 在HttpServerBind中设置的以及这里获取到的都是InetSocketAddress
        SocketAddress local = Objects.requireNonNull(config.bindAddress().get(), "Bind Address supplier returned null");
        // HttpServerBind中的设置的是InetSocketAddress类型的
        if (local instanceof InetSocketAddress) {
            InetSocketAddress localInet = (InetSocketAddress) local;

            // 如果地址还没解析过则解析
            if (localInet.isUnresolved()) {
                local = AddressUtils.createResolved(localInet.getHostName(), localInet.getPort());
            }
        }

        boolean isDomainSocket = false;
        // 创建disposableServer
        DisposableBind disposableServer;
        // 对于HttpServerBind来说,这里的local是InetSocketAddress类型的,不是DomainSocketAddress
        if (local instanceof DomainSocketAddress) {
            isDomainSocket = true;
            disposableServer = new UdsDisposableBind(sink, config, local);
        }
        else {
            // 创建server
            disposableServer = new InetDisposableBind(sink, config, local);
        }

        /*
         * 创建连接观察者,config中的childObserver默认是空操作。
         * 所以在Spring Boot中会往HttpServer中添加一个handler,然后会被封装为ConnectionObserver。
         */
        ConnectionObserver childObs =
                new ChildObserver(config.defaultChildObserver().then(config.childObserver()));
        /*
         * 从config中获取eventLoop和channelInitializer来创建acceptor
         * 该acceptor就是reactor模型中的重要角色,
         * 这里通过config来获取EventLoopGroup、ChannelHandler、childOptions、childAttrs,
         * 这些都是Netty中的概念或组件。
         * 对于EventLoopGroup的获取,内部会通过LoopResources来实现。
         *
         * 这里的Acceptor继承了ChannelInboundHandlerAdapter,该类是Netty中的概念。
         *
         * 客户端连接到服务端后,Acceptor的channelRead方法会把通道注册到child事件循环中。
         */
        Acceptor acceptor = new Acceptor(config.childEventLoopGroup(), config.channelInitializer(childObs, null, true),
                config.childOptions, config.childAttrs, isDomainSocket);
        /*
         * 执行底层的bind操作
         * 这里的AcceptorInitializer是一个ChannelInitializer,用于初始化channel
         */
        TransportConnector.bind(config, new AcceptorInitializer(acceptor), local, isDomainSocket)
                          .subscribe(disposableServer);
    });

    // 这里默认是null,所以可以忽略
    if (config.doOnBind() != null) {
        // 执行bind回调
        mono = mono.doOnSubscribe(s -> config.doOnBind().accept(config));
    }
    return mono;
}

ChildObserverchildObs是config对象中配置的连接观察者对象,那是哪里往config中配置了连接观察者对象呢? 实际上在Spring Boot中为HttpServer对象设置处理器的时候,就会把处理器封装为一个连接观察者对象,然后添加到config对象中。

v2.7.x
java
spring-boot-project/spring-boot/src/main/java/org/springframework/boot/web/embedded/netty/NettyWebServer.java
DisposableServer startHttpServer() {
    HttpServer server = this.httpServer;
    // 下面这个条件默认是true
    if (this.routeProviders.isEmpty()) {
        // 设置请求处理器
        server = server.handle(this.handler);
    }
    else {
        server = server.route(this::applyRouteProviders);
    }
    if (this.lifecycleTimeout != null) {
        return server.bindNow(this.lifecycleTimeout);
    }
    /*
     * 执行bind操作,HttpServer是reactor-netty提供的,所以离开了spring-boot范畴,
     * 最后会一直执行到netty层面的ServerBootstrap#bind()
     */
    return server.bindNow();
}

Spring Boot会调用HttpServerhandle方法。

v1.1.17
java
reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServer.java
public final HttpServer handle(
        BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
    Objects.requireNonNull(handler, "handler");
    return childObserve(new HttpServerHandle(handler));
}

可以看到,这里把我们传入的处理器封装为了HttpServerHandle对象。而该对象刚好对请求的REQUEST_RECEIVED状态感兴趣。

HttpServerHandle

v1.1.17
java
reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServer.java
static final class HttpServerHandle implements ConnectionObserver {

    final BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler;

    HttpServerHandle(BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
        this.handler = handler;
    }

    @Override
    @SuppressWarnings("FutureReturnValueIgnored")
    public void onStateChange(Connection connection, State newState) {
        if (newState == HttpServerState.REQUEST_RECEIVED) {
            try {
                if (log.isDebugEnabled()) {
                    log.debug(format(connection.channel(), "Handler is being applied: {}"), handler);
                }
                // 封装连接
                HttpServerOperations ops = (HttpServerOperations) connection;
                // 调用apply方法,处理连接
                Publisher<Void> publisher = handler.apply(ops, ops);
                Mono<Void> mono = Mono.deferContextual(ctx -> {
                    ops.currentContext = Context.of(ctx);
                    return Mono.fromDirect(publisher);
                });
                if (ops.mapHandle != null) {
                    mono = ops.mapHandle.apply(mono, connection);
                }
                mono.subscribe(ops.disposeSubscriber());
            }
            catch (Throwable t) {
                log.error(format(connection.channel(), ""), t);
                //"FutureReturnValueIgnored" this is deliberate
                connection.channel()
                          .close();
            }
        }
    }
}

在上面的onStateChange方法中,会调用handler的apply方法,在上面Spring Boot中设置的是Spring Framework提供的ReactorHttpHandlerAdapter。本文仅关注ReactorNetty框架中的实现,不深入Spring Framework中分析了。

总结

通过本文,我们了解了在ReactorNetty框架中请求是怎么被处理的,以及怎么进入我们设置的handler的。从而达到了知其然,更知其所以然的目的。