ReactorNetty中HttpServer的创建过程

ReactorNetty是Spring官方实现的一款框架,被广泛作为Spring中WebFlux的服务器实现,比如Spring Cloud Gateway默认就使用该框架。在SpringBoot中的Web容器一文中介绍过Reactive容器大概的启动过程,但是并没有深入分析,本文就来深入ReactorNetty看一下是怎么实现的。


回顾Spring Boot中的实现

v2.7.x
NettyReactiveWebServerFactory
NettyWebServer
<
>
java
spring-boot-project/spring-boot/src/main/java/org/springframework/boot/web/embedded/netty/NettyReactiveWebServerFactory.java
	@Override
	public WebServer getWebServer(HttpHandler httpHandler) {
		// 创建server,类型是HttpServerBind
		HttpServer httpServer = createHttpServer();
		/*
		 * 创建适配器,该适配器是Reactor-Netty和Spring的适配器,
		 * 在该适配器的apply方法中会调用httpHandler来处理请求,即请求进入了Spring的范畴。
		 * 至于这里的httpHandler是什么,参考HttpHandlerAutoConfiguration,该类中会注入该类型的bean。
		 */
		ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);
		// 创建NettyWebServer实例,对httpServer进行了封装,并传入了请求处理器handlerAdapter
		NettyWebServer webServer = createNettyWebServer(httpServer, handlerAdapter, this.lifecycleTimeout,
				getShutdown());
		webServer.setRouteProviders(this.routeProviders);
		return webServer;
	}
	NettyWebServer createNettyWebServer(HttpServer httpServer, ReactorHttpHandlerAdapter handlerAdapter,
			Duration lifecycleTimeout, Shutdown shutdown) {
		return new NettyWebServer(httpServer, handlerAdapter, lifecycleTimeout, shutdown);
	}
private HttpServer createHttpServer() {
    // 创建的是HttpServerBind类型的实例
    HttpServer server = HttpServer.create();
    if (this.resourceFactory != null) {
        LoopResources resources = this.resourceFactory.getLoopResources();
        Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?");
        // 设置绑定所需的地址信息,this::getListenAddress会返回所要监听的地址和端口信息
        server = server.runOn(resources).bindAddress(this::getListenAddress);
    }
    else {
        server = server.bindAddress(this::getListenAddress);
    }
    // SSL相关
    if (getSsl() != null && getSsl().isEnabled()) {
        server = customizeSslConfiguration(server);
    }
    // 压缩相关
    if (getCompression() != null && getCompression().getEnabled()) {
        CompressionCustomizer compressionCustomizer = new CompressionCustomizer(getCompression());
        server = compressionCustomizer.apply(server);
    }
    // 设置协议
    server = server.protocol(listProtocols()).forwarded(this.useForwardHeaders);
    // 应用自定义设置
    return applyCustomizers(server);
}
java
spring-boot-project/spring-boot/src/main/java/org/springframework/boot/web/embedded/netty/NettyWebServer.java
public NettyWebServer(HttpServer httpServer, ReactorHttpHandlerAdapter handlerAdapter, Duration lifecycleTimeout,
        Shutdown shutdown) {
    Assert.notNull(httpServer, "HttpServer must not be null");
    Assert.notNull(handlerAdapter, "HandlerAdapter must not be null");
    this.lifecycleTimeout = lifecycleTimeout;
    // 负责请求处理
    this.handler = handlerAdapter;
    // 执行底层的创建过程
    this.httpServer = httpServer.channelGroup(new DefaultChannelGroup(new DefaultEventExecutor()));
    this.gracefulShutdown = (shutdown == Shutdown.GRACEFUL) ? new GracefulShutdown(() -> this.disposableServer)
            : null;
}

NettyReactiveWebServerFactorygetWebServer方法中,会调用createHttpServer方法来创建HttpServer对象,然后在NettyWebServer的构造方法中,会设置为其通道组。

创建HttpServer

v1.1.17
java
reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServer.java
public static HttpServer create() {
    return HttpServerBind.INSTANCE;
}

这里返回的是HttpServerBind中的INSTANCE属性,HttpServerBindHttpServer的实现类。

v1.1.17
java
reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerBind.java
static final HttpServerBind INSTANCE = new HttpServerBind();

final HttpServerConfig config;
HttpServerBind() {
    Map<ChannelOption<?>, Boolean> childOptions = new HashMap<>(MapUtils.calculateInitialCapacity(2));
    // 禁用自动读
    childOptions.put(ChannelOption.AUTO_READ, false);
    // 禁用TCP的nagle算法
    childOptions.put(ChannelOption.TCP_NODELAY, true);
    // 创建默认配置
    this.config = new HttpServerConfig(
            // parent channel的选型,parent channel是指负责建立连接的channel
            Collections.singletonMap(ChannelOption.SO_REUSEADDR, true),
            // child channel的选项,child channel是指负责客户端数据读写的channel
            childOptions,
            // localAddress或bindAddress
            () -> new InetSocketAddress(DEFAULT_PORT));
}

HttpServerBind(HttpServerConfig config) {
    this.config = config;
}

在该构造方法中,会设置通道选项和创建HTTP服务的配置。

下图是HttpServer的继承和实现体系。

地址绑定

其实除了地址绑定,还有很多其他属性可以设置,但本文仅以最重要的地址绑定为例。

这里其实没有真正进行地址绑定,而是先设置地址信息。

v1.1.17
HttpServer
Transport
<
>
java
reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServer.java
// 绑定监听的地址端口
@Override
public final HttpServer bindAddress(Supplier<? extends SocketAddress> bindAddressSupplier) {
    return super.bindAddress(bindAddressSupplier);
}
java
reactor-netty-core/src/main/java/reactor/netty/transport/Transport.java
public T bindAddress(Supplier<? extends SocketAddress> bindAddressSupplier) {
    Objects.requireNonNull(bindAddressSupplier, "bindAddressSupplier");
    T dup = duplicate();
    // 设置所要绑定的地址端口信息
    dup.configuration().bindAddress = bindAddressSupplier;
    return dup;
}
protected abstract T duplicate();

这里调用了HttpServer的父类Transport中的方法。调用的duplicate方法在父类中是个模板方法,实现在子类HttpServerBind中。

v1.1.17
java
reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerBind.java
@Override
protected HttpServer duplicate() {
    return new HttpServerBind(new HttpServerConfig(config));
}

这里重新创建了一个HttpServerConfig对象,然后再调用有参构造方法来创建HttpServerBind对象。