ReactorNetty中HttpServer的启动过程

SpringBoot中的Web容器一文中介绍过Spring Boot在启动过程中会创建Web服务器并启动它。在[ReactorNetty中的HttpServer的创建过程]这篇文章中也介绍了创建的过程,本文就来分析一下其启动过程是怎样的。


重要类的继承和实现

在继续介绍下面内容之前,先熟悉一下HttpServer的继承和实现体系,以及HttpServerConfig的继承体系。

HttpServer

HttpServerConfig

回顾Spring Boot中的实现

v2.7.x
java
spring-boot-project/spring-boot/src/main/java/org/springframework/boot/web/embedded/netty/NettyWebServer.java
@Override
public void start() throws WebServerException {
    if (this.disposableServer == null) {
        try {
            // 启动httpServer
            this.disposableServer = startHttpServer();
        }
        catch (Exception ex) {
            PortInUseException.ifCausedBy(ex, ChannelBindException.class, (bindException) -> {
                if (bindException.localPort() > 0 && !isPermissionDenied(bindException.getCause())) {
                    throw new PortInUseException(bindException.localPort(), ex);
                }
            });
            throw new WebServerException("Unable to start Netty", ex);
        }
        if (this.disposableServer != null) { // 启动成功,打印日志
            logger.info("Netty started" + getStartedOnMessage(this.disposableServer));
        }

        // 创建线程
        startDaemonAwaitThread(this.disposableServer);
    }
}
DisposableServer startHttpServer() {
    HttpServer server = this.httpServer;
    // 下面这个条件默认是true
    if (this.routeProviders.isEmpty()) {
        // 设置请求处理器
        server = server.handle(this.handler);
    }
    else {
        server = server.route(this::applyRouteProviders);
    }
    if (this.lifecycleTimeout != null) {
        return server.bindNow(this.lifecycleTimeout);
    }
    /*
     * 执行bind操作,HttpServer是reactor-netty提供的,所以离开了spring-boot范畴,
     * 最后会一直执行到netty层面的ServerBootstrap#bind()
     */
    return server.bindNow();
}

在该方法中,主要是调用了HttpServerhandlebindNow这两个方法。在调用handle方法时传入的handler是在NettyReactiveWebServerFactorygetWebServer方法中创建的,类型为ReactorHttpHandlerAdapter,这是一个Spring Framework提供的类。

handle方法

v1.1.17
handle
childObserve
<
>
java
reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServer.java
public final HttpServer handle(
        BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
    Objects.requireNonNull(handler, "handler");
    return childObserve(new HttpServerHandle(handler));
}
java
reactor-netty-core/src/main/java/reactor/netty/transport/ServerTransport.java
public T childObserve(ConnectionObserver observer) {
    Objects.requireNonNull(observer, "observer");
    T dup = duplicate();
    // 获取当前已经配置的observer
    ConnectionObserver current = configuration().childObserver;
    // 和已有的observer合并在一起
    dup.configuration().childObserver = current == null ? observer : current.then(observer);
    return dup;
}

先是将handler封装为observer对象,然后再和配置中已有的拼接在一起。这里暂时不深入分析太多,本文主线是分析启动过程,在下面会遇到这个observer的。

bindNow方法

v1.1.17
java
reactor-netty-core/src/main/java/reactor/netty/transport/ServerTransport.java
/*
 * 在SpringBoot中的NettyWebServer中的startHttpServer方法中调用了该方法
 */
public final DisposableServer bindNow() {
    // 设置绑定操作的超时时间为45秒
    return bindNow(Duration.ofSeconds(45));
}
public final DisposableServer bindNow(Duration timeout) {
    Objects.requireNonNull(timeout, "timeout");
    try {
        // 调用bind方法,必须在timeout之内绑定完成
        return Objects.requireNonNull(bind().block(timeout), "aborted");
    }
    catch (IllegalStateException e) {
        if (e.getMessage()
             .contains("blocking read")) {
            throw new IllegalStateException(getClass().getSimpleName() + " couldn't be started within " + timeout.toMillis() + "ms");
        }
        throw e;
    }
}

这里会阻塞执行绑定操作,默认最多阻塞等待45秒。

v1.1.17
java
reactor-netty-core/src/main/java/reactor/netty/transport/ServerTransport.java
public Mono<? extends DisposableServer> bind() {
    // 这里的config参考HttpServerBind的无参构造器
    CONF config = configuration();
    Objects.requireNonNull(config.bindAddress(), "bindAddress");

    Mono<? extends DisposableServer> mono =  Mono.create(sink -> {
        // 在HttpServerBind中设置的以及这里获取到的都是InetSocketAddress
        SocketAddress local = Objects.requireNonNull(config.bindAddress().get(), "Bind Address supplier returned null");
        // HttpServerBind中的设置的是InetSocketAddress类型的
        if (local instanceof InetSocketAddress) {
            InetSocketAddress localInet = (InetSocketAddress) local;

            // 如果地址还没解析过则解析
            if (localInet.isUnresolved()) {
                local = AddressUtils.createResolved(localInet.getHostName(), localInet.getPort());
            }
        }

        boolean isDomainSocket = false;
        // 创建disposableServer
        DisposableBind disposableServer;
        // 对于HttpServerBind来说,这里的local是InetSocketAddress类型的,不是DomainSocketAddress
        if (local instanceof DomainSocketAddress) {
            isDomainSocket = true;
            disposableServer = new UdsDisposableBind(sink, config, local);
        }
        else {
            // 创建server
            disposableServer = new InetDisposableBind(sink, config, local);
        }

        /*
         * 创建连接观察者,config中的childObserver默认是空操作。
         * 所以在Spring Boot中会往HttpServer中添加一个handler,然后会被封装为ConnectionObserver。
         */
        ConnectionObserver childObs =
                new ChildObserver(config.defaultChildObserver().then(config.childObserver()));
        /*
         * 从config中获取eventLoop和channelInitializer来创建acceptor
         * 该acceptor就是reactor模型中的重要角色,
         * 这里通过config来获取EventLoopGroup、ChannelHandler、childOptions、childAttrs,
         * 这些都是Netty中的概念或组件。
         * 对于EventLoopGroup的获取,内部会通过LoopResources来实现。
         *
         * 这里的Acceptor继承了ChannelInboundHandlerAdapter,该类是Netty中的概念。
         *
         * 客户端连接到服务端后,Acceptor的channelRead方法会把通道注册到child事件循环中。
         */
        Acceptor acceptor = new Acceptor(config.childEventLoopGroup(), config.channelInitializer(childObs, null, true),
                config.childOptions, config.childAttrs, isDomainSocket);
        /*
         * 执行底层的bind操作
         * 这里的AcceptorInitializer是一个ChannelInitializer,用于初始化channel
         */
        TransportConnector.bind(config, new AcceptorInitializer(acceptor), local, isDomainSocket)
                          .subscribe(disposableServer);
    });

    // 这里默认是null,所以可以忽略
    if (config.doOnBind() != null) {
        // 执行bind回调
        mono = mono.doOnSubscribe(s -> config.doOnBind().accept(config));
    }
    return mono;
}

该方法可以总结为下面几步:

  • 创建socket地址对象;
  • 创建DisposableBind对象;
  • 创建连接观察者;
  • 创建Acceptor;
  • 执行底层的bind操作;

重点看一下bind操作。

v1.1.17
java
reactor-netty-core/src/main/java/reactor/netty/transport/TransportConnector.java
@SuppressWarnings("FutureReturnValueIgnored")
public static Mono<Channel> bind(TransportConfig config, ChannelInitializer<Channel> channelInitializer,
        SocketAddress bindAddress, boolean isDomainSocket) {
    Objects.requireNonNull(config, "config");
    Objects.requireNonNull(bindAddress, "bindAddress");
    Objects.requireNonNull(channelInitializer, "channelInitializer");

    /*
     * 先通过doInitAndRegister创建channel对象
     * config.eventLoopGroup().next()会从EventLoopGroup中选一个EventLoop
     */
    return doInitAndRegister(config, channelInitializer, isDomainSocket, config.eventLoopGroup().next())
            .flatMap(channel -> {
                MonoChannelPromise promise = new MonoChannelPromise(channel);
                // "FutureReturnValueIgnored" this is deliberate
                // 执行bind操作,至此进入了netty的范畴
                channel.eventLoop().execute(() -> channel.bind(bindAddress, promise.unvoid()));
                return promise;
            });
}

该方法又分为了下面几步:

  • 获取boss事件循环组;
  • 创建通道对象;
  • 通过通道对象执行绑定;

下面依次来分析各个步骤的实现原理。

创建事件循环组

在创建Acceptor和创建通道时都获取了事件循环组,底层会判断如果还没创建过事件循环组就会新建。

v1.1.17
java
reactor-netty-core/src/main/java/reactor/netty/transport/ServerTransportConfig.java
@Override
protected final EventLoopGroup eventLoopGroup() {
    // 获取boss事件循环组
    return loopResources().onServerSelect(isPreferNative());
}
final EventLoopGroup childEventLoopGroup() {
    /*
     * 创建worker group
     *
     * 这里的isPreferNative返回的是父类的属性preferNative,在父类的构造器中赋值为LoopResources.DEFAULT,
     * 而在LoopResources中,是获取名为reactor.netty.native的系统属性。(这里一般考虑为false的情况就行)
     */
    return loopResources().onServer(isPreferNative());
}

loopResources方法

在看具体的代码之前,先要知道ReactorNetty中使用了资源对象来封装Netty中的资源,如事件循环组和通道。所以自然要通过资源对象来创建这些资源,本方法的目的在于创建资源对象。LoopResources是个接口,其实现体系如下图所示:

上面两个方法中都调用了loopResources方法,下面来看看这个方法做了什么。

v1.1.17
loopResources
defaultLoopResources
get
getOrCreate
<
>
java
reactor-netty-core/src/main/java/reactor/netty/transport/TransportConfig.java
public final LoopResources loopResources() {
    // 这里考虑为null的情况
    return loopResources != null ? loopResources : defaultLoopResources();
}
protected abstract LoopResources defaultLoopResources();
java
reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java
@Override
protected LoopResources defaultLoopResources() {
    return HttpResources.get();
}
java
reactor-netty-http/src/main/java/reactor/netty/http/HttpResources.java
public static HttpResources get() {
    /*
     * 这里的ON_HTTP_NEW其实就是该类构造器,如果httpResources中为null,则新建一个
     * 如果已存在,则直接从中获取就是了。
     */
    return getOrCreate(httpResources, null, null, ON_HTTP_NEW, "http");
}
static final BiFunction<LoopResources, ConnectionProvider, HttpResources> ON_HTTP_NEW;

static final AtomicReference<HttpResources>                          httpResources;

static {
    ON_HTTP_NEW = HttpResources::new;
    httpResources = new AtomicReference<>();
}
java
reactor-netty-core/src/main/java/reactor/netty/tcp/TcpResources.java
protected static <T extends TcpResources> T getOrCreate(AtomicReference<T> ref,
        @Nullable LoopResources loops,
        @Nullable ConnectionProvider provider,
        BiFunction<LoopResources, ConnectionProvider, T> onNew,
        String name) {
    T update;
    for (;;) {
        T resources = ref.get();
        if (resources == null || loops != null || provider != null) {
            // 创建资源对象
            update = create(resources, loops, provider, name, onNew);
            // 将新建的资源对象通过CAS操作设置到ref中
            if (ref.compareAndSet(resources, update)) {
                return update;
            }
            else {
                update._dispose();
            }
        }
        else {
            return resources;
        }
    }
}
static <T extends TcpResources> T create(@Nullable T previous,
        @Nullable LoopResources loops, @Nullable ConnectionProvider provider,
        String name,
        BiFunction<LoopResources, ConnectionProvider, T> onNew) {
    // 对于HttpResources,这里传入的loops和provider都是null

    if (previous == null) {
        // 创建LoopResources和ConnectionProvider
        loops = loops == null ? LoopResources.create("reactor-" + name) : loops;
        // 这里传入了默认的最大连接数为500
        provider = provider == null ? ConnectionProvider.create(name, 500) : provider;
    }
    else {
        loops = loops == null ? previous.defaultLoops : loops;
        provider = provider == null ? previous.defaultProvider : provider;
    }
    return onNew.apply(loops, provider);
}

整个调用链条有点长,重点关注TcpResources中的create方法。该方法中创建了循环资源和连接提供器。然后用这两个对象作为参数调用HttpResources的构造方法。

LoopResourcescreate方法

v1.1.17
LoopResources
DefaultLoopResources
<
>
java
reactor-netty-core/src/main/java/reactor/netty/resources/LoopResources.java
int DEFAULT_IO_WORKER_COUNT = Integer.parseInt(System.getProperty(
        ReactorNetty.IO_WORKER_COUNT,
        // 默认是(逻辑)处理器个数,但是不能少于4
        "" + Math.max(Runtime.getRuntime().availableProcessors(), 4)));
int DEFAULT_IO_SELECT_COUNT = Integer.parseInt(System.getProperty(
        ReactorNetty.IO_SELECT_COUNT,
        // 默认是-1,没有选择线程,DefaultLoopResources的构造方法中会修改为和worker一样的数量
        "" + -1));
static LoopResources create(String prefix) {
    if (Objects.requireNonNull(prefix, "prefix").isEmpty()) {
        throw new IllegalArgumentException("Cannot use empty prefix");
    }
    // 创建默认的LoopResources实例,第二和第三个参数都是从系统属性中获取
    return new DefaultLoopResources(prefix, DEFAULT_IO_SELECT_COUNT, DEFAULT_IO_WORKER_COUNT, true);
}
java
reactor-netty-core/src/main/java/reactor/netty/resources/DefaultLoopResources.java
DefaultLoopResources(String prefix, int workerCount, boolean daemon) {
    this(prefix, -1, workerCount, daemon);
}

DefaultLoopResources(String prefix, int selectCount, int workerCount, boolean daemon) {
    this(prefix, selectCount, workerCount, daemon, true);
}

DefaultLoopResources(String prefix, int selectCount, int workerCount, boolean daemon, boolean colocate) {
    this.running = new AtomicBoolean(true);
    this.daemon = daemon;
    this.workerCount = workerCount;
    this.prefix = prefix;
    this.colocate = colocate;

    this.serverLoops = new AtomicReference<>();
    this.clientLoops = new AtomicReference<>();

    this.cacheNativeClientLoops = new AtomicReference<>();
    this.cacheNativeServerLoops = new AtomicReference<>();

    if (selectCount == -1) {
        // 默认和worker组的数量一样
        this.selectCount = workerCount;
        this.serverSelectLoops = this.serverLoops;
        this.cacheNativeSelectLoops = this.cacheNativeServerLoops;
    }
    else {
        this.selectCount = selectCount;
        this.serverSelectLoops = new AtomicReference<>();
        this.cacheNativeSelectLoops = new AtomicReference<>();
    }
}

这里创建的是DefaultLoopResources类型的对象,下面很快就用看到是如何利用它来创建事件循环组的。之前传入的selectCount的值是-1,所以这里会设置为和worker一样的数量。

ConnectionProvidercreate方法

v1.1.17
create
build
<
>
java
reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java
long DEFAULT_POOL_ACQUIRE_TIMEOUT = Long.parseLong(System.getProperty(
        ReactorNetty.POOL_ACQUIRE_TIMEOUT,
        // 默认最多等45秒
        "" + 45000));
static ConnectionProvider create(String name, int maxConnections) {
    return builder(name).maxConnections(maxConnections) // 设置最大连接数
                        .pendingAcquireTimeout(Duration.ofMillis(DEFAULT_POOL_ACQUIRE_TIMEOUT))
                        .build();
}
java
reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java
public ConnectionProvider build() {
    // 创建默认的池化连接提供者
    return new DefaultPooledConnectionProvider(this);
}

这里构建者模式用到的Builder是一个内部类,其构建的是DefautlPooledConnectionProvider类型的对象。

HttpResources的构造方法

v1.1.17
HttpResources
TcpResources
<
>
java
reactor-netty-http/src/main/java/reactor/netty/http/HttpResources.java
// 创建HttpResources实例
HttpResources(LoopResources loops, ConnectionProvider provider) {
    super(loops, provider);
    http2ConnectionProvider = new AtomicReference<>();
}
java
reactor-netty-core/src/main/java/reactor/netty/tcp/TcpResources.java
/*
 * 虽然TcpResources(本类)实现了LoopResources接口,但是却组合了LoopResources其他的实现类来实现具体的操作。
 * 这里考虑defaultLoops为DefaultLoopResources的情况。
 */
final LoopResources                            defaultLoops;
/*
 * 同样地,虽然TcpResources(本类)实现了ConnectionProvider接口,但是组合了其他的实现类来实现具体的操作。
 * 种类考虑defaultProvider是DefaultConnectionProvider的情况。
 */
final ConnectionProvider                       defaultProvider;
final AtomicReference<AddressResolverGroup<?>> defaultResolver;
protected TcpResources(LoopResources defaultLoops, ConnectionProvider defaultProvider) {
    this.defaultLoops = defaultLoops;
    this.defaultProvider = defaultProvider;
    this.defaultResolver = new AtomicReference<>();
}

这样,上面创建的循环资源对象和连接提供器就被设置到TcpResources中的属性中了,后续操作会依靠这两个属性。

最终loopResources方法返回的是HttpResources类型的对象。

创建或获取事件循环组

上面调用的LoopResources接口中的默认方法onServerSelect和需要子类实现的onServer方法,实际实现在TcpResources中。

v1.1.17
java
reactor-netty-core/src/main/java/reactor/netty/tcp/TcpResources.java
@Override
public EventLoopGroup onServer(boolean useNative) {
    // 调用DefaultLoopResources中的方法
    return defaultLoops.onServer(useNative);
}
@Override
public EventLoopGroup onServerSelect(boolean useNative) {
    // 调用DefaultLoopResources中的方法
    return defaultLoops.onServerSelect(useNative);
}

这两个方法都调用了DefaultLoopResources类中实现的方法。

v1.1.17
java
reactor-netty-core/src/main/java/reactor/netty/resources/DefaultLoopResources.java
@Override
public EventLoopGroup onServer(boolean useNative) {
    // 考虑useNative为false的情况
    if (useNative && LoopResources.hasNativeSupport()) {
        return cacheNativeServerLoops();
    }
    return cacheNioServerLoops();
}
@Override
public EventLoopGroup onServerSelect(boolean useNative) {
    if (useNative && LoopResources.hasNativeSupport()) {
        return cacheNativeSelectLoops();
    }
    return cacheNioSelectLoops();
}

本文只分析Nio的情况,上面两个方法又调用了不同的方法来获取或创建boss和worker事件循环组。

v1.1.17
boss组
worker组
<
>
java
reactor-netty-core/src/main/java/reactor/netty/resources/DefaultLoopResources.java
@SuppressWarnings("FutureReturnValueIgnored")
EventLoopGroup cacheNioSelectLoops() {
    if (serverSelectLoops == serverLoops) {
        return cacheNioServerLoops();
    }

    // 获取缓存
    EventLoopGroup eventLoopGroup = serverSelectLoops.get();
    // 没有缓存
    if (null == eventLoopGroup) {
        // 新建boss事件循环组,负责select
        EventLoopGroup newEventLoopGroup = new NioEventLoopGroup(selectCount,
                threadFactory(this, "select-nio"));
        // 通过CAS设置
        if (!serverSelectLoops.compareAndSet(null, newEventLoopGroup)) {
            //"FutureReturnValueIgnored" this is deliberate
            // 设置失败则关闭新建的事件循环组
            newEventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
        }
        // 递归
        eventLoopGroup = cacheNioSelectLoops();
    }
    return eventLoopGroup;
}
java
reactor-netty-core/src/main/java/reactor/netty/resources/DefaultLoopResources.java
@SuppressWarnings("FutureReturnValueIgnored")
EventLoopGroup cacheNioServerLoops() {
    // 采用属性来保存事件循环组,这也是方法名中cache的来源。
    EventLoopGroup eventLoopGroup = serverLoops.get();
    if (null == eventLoopGroup) {
        // 创建worker事件循环组,负责连接的读写处理
        EventLoopGroup newEventLoopGroup = new NioEventLoopGroup(workerCount,
                threadFactory(this, "nio"));
        // 通过CAS设置
        if (!serverLoops.compareAndSet(null, newEventLoopGroup)) {
            //"FutureReturnValueIgnored" this is deliberate
            newEventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
        }
        /*
         * 这里用递归来代替循环,如果上面的CAS操作成功,那么递归一次就可以了。
         * 如果上面的CAS操作没有成功,那么一次递归直到成功为止。
         */
        eventLoopGroup = cacheNioServerLoops();
    }
    return eventLoopGroup;
}

两个方法的逻辑很相似,都是先获取缓存中设置的事件循环组对象,如果没有获取到,则新建一个并设置到属性中,如果设置失败则递归尝试。

注意这里的方法名,一个是Select,一个是Server,分别对应boss组和worker组。别看错了以为是相同的方法,我一开始就看成了是一个方法,结果以为只创建了一个事件循环组。

通道创建和注册

v1.1.17
java
reactor-netty-core/src/main/java/reactor/netty/transport/TransportConnector.java
static Mono<Channel> doInitAndRegister(
        TransportConfig config,
        ChannelInitializer<Channel> channelInitializer,
        boolean isDomainSocket,
        EventLoop eventLoop) {
    // 获取创建Channel对象的工厂,这里返回了一个lambda,内部仍然是通过LoopResources来创建Channel
    ChannelFactory<? extends Channel> channelFactory = config.connectionFactory(config.eventLoopGroup(), isDomainSocket);

    Channel channel = null;
    try {
        // 创建channel对象
        channel = channelFactory.newChannel();
        if (channelInitializer instanceof ServerTransport.AcceptorInitializer) {
            // 开启auto read
            ((ServerTransport.AcceptorInitializer) channelInitializer).acceptor.enableAutoReadTask(channel);
        }
        /*
         * 设置Channel初始化器,内部是把Acceptor添加在pipeline的末尾了,为什么这里不直接添加Acceptor?
         */
        channel.pipeline().addLast(channelInitializer);
        // 设置通道的选项,这里的选项在HttpServerBind中有设置
        setChannelOptions(channel, config.options, isDomainSocket);
        // 设置通道的属性,默认在HttpServerBind中没有设置任何属性
        setAttributes(channel, config.attrs);
    }
    catch (Throwable t) {
        if (channel != null) {
            channel.unsafe().closeForcibly();
        }
        return Mono.error(t);
    }

    MonoChannelPromise monoChannelPromise = new MonoChannelPromise(channel);
    // 将channel注册到事件循环中,但是此时还没有进行监听
    channel.unsafe().register(eventLoop, monoChannelPromise);
    return monoChannelPromise;
}

该方法的步骤如下:

  • 创建通道工厂;
  • 通过通道工厂创建通道;
  • 为通道流水线添加初始化器;
  • 设置通道选项和属性;
  • 将通道注册到事件循环的选择器中;

通道工厂

这里获取的通道工厂是个lambda函数。

v1.1.17
connectionFactory
channelType
<
>
java
reactor-netty-core/src/main/java/reactor/netty/transport/TransportConfig.java
protected ChannelFactory<? extends Channel> connectionFactory(EventLoopGroup elg, boolean isDomainSocket) {
    /*
     * 返回一个Channel工厂,用于创建Channel实例。这里是通过LoopResources来创建的。
     * 这里考虑isDomainSocket为false的情况,所以通道类型为ServerSocketChannel。
     */
    return () -> loopResources().onChannel(channelType(isDomainSocket), elg);
}
protected abstract Class<? extends Channel> channelType(boolean isDomainSocket);
java
reactor-netty-core/src/main/java/reactor/netty/transport/ServerTransportConfig.java
@Override
protected Class<? extends Channel> channelType(boolean isDomainSocket) {
    return isDomainSocket ? ServerDomainSocketChannel.class : ServerSocketChannel.class;
}

注意参数isDomainSocket是在ServerTransportbind方法中判断并传入的,默认是false。所以channelType会返回ServerSocketChannel类型。

创建通道

这个步骤的操作就是调用lambda函数。

v1.1.17
java
reactor-netty-core/src/main/java/reactor/netty/tcp/TcpResources.java
@Override
public <CHANNEL extends Channel> CHANNEL onChannel(Class<CHANNEL> channelType, EventLoopGroup group) {
    requireNonNull(channelType, "channelType");
    requireNonNull(group, "group");
    return defaultLoops.onChannel(channelType, group);
}

继续来看DefaultLoopResources中的实现,但该类中并没有重写该方法,所以看接口LoopResources中的默认方法。

v1.1.17
java
reactor-netty-core/src/main/java/reactor/netty/resources/LoopResources.java
/*
 * 这里的channelType,重点考虑为ServerSocketChannel的情况
 */
default <CHANNEL extends Channel> CHANNEL onChannel(Class<CHANNEL> channelType, EventLoopGroup group) {
    // 这里就考虑DefaultLoopNativeDetector.NIO,其是一个DefaultLoopNIO类型的实例。
    DefaultLoop channelFactory =
            DefaultLoopNativeDetector.INSTANCE.supportGroup(group) ?
                    DefaultLoopNativeDetector.INSTANCE :
                    DefaultLoopNativeDetector.NIO;

    return channelFactory.getChannel(channelType);
}

这里会判断使用哪种通道工厂,本文就以普通的NIO为例。

按理说这里的通道工厂才是真正的工厂,上面的只是一层封装而已。

v1.1.17
java
reactor-netty-core/src/main/java/reactor/netty/resources/DefaultLoopNIO.java
@Override
@SuppressWarnings("unchecked")
public <CHANNEL extends Channel> CHANNEL getChannel(Class<CHANNEL> channelClass) {
    if (channelClass.equals(SocketChannel.class)) {
        return (CHANNEL) new NioSocketChannel();
    }
    if (channelClass.equals(ServerSocketChannel.class)) {
        // 创建Netty组件:非阻塞服务端网络通道
        return (CHANNEL) new NioServerSocketChannel();
    }
    if (channelClass.equals(DatagramChannel.class)) {
        return (CHANNEL) new NioDatagramChannel();
    }
    throw new IllegalArgumentException("Unsupported channel type: " + channelClass.getSimpleName());
}

虽然传入了通道的类型,但是并没有通过反射来创建对象,而是手动new的。对于服务端来说,这里创建的是NioServerSocketChannel类型的通道对象。

如果对NioServerSocketChannel不是很熟悉,请参考这篇文章:Netty中的NioServerSocketChannel

设置通道初始化器

这里设置的通道初始化器是在ServerTransportbind方法中创建的,类型为AcceptorInitializer

v1.1.17
java
reactor-netty-core/src/main/java/reactor/netty/transport/ServerTransport.java
static final class AcceptorInitializer extends ChannelInitializer<Channel> {

    final Acceptor acceptor;

    AcceptorInitializer(Acceptor acceptor) {
        this.acceptor = acceptor;
    }

    // 把acceptor添加到channel的管道末尾
    @Override
    public void initChannel(final Channel ch) {
        ch.eventLoop().execute(() -> ch.pipeline().addLast(acceptor));
    }
}

在通道被注册后会调用该初始化器的initChannel方法,在该方法中会向通道的流水线注册一个通道处理器,这个处理器会处理接收到的新请求。

v1.1.17
java
reactor-netty-core/src/main/java/reactor/netty/transport/ServerTransport.java
// 该方法会处理客户端连接通道。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    /*
     * 为客户端通道添加handler,这里的childHandler的类型其实是TransportChannelInitializer,是一个通道初始化器。
     * 这里执行add操作后,该handler被添加到通道的pipeline后,会封装延时任务,等通道注册时会执行这些延时任务。
     * 所以通道注册时会调用该handler的handlerAdd回调,在该回调中会调用initChannel方法。
     */
    child.pipeline().addLast(childHandler);

    // 为通道设置选项和属性
    TransportConnector.setChannelOptions(child, childOptions, isDomainSocket);
    TransportConnector.setAttributes(child, childAttrs);

    try {
        /*
         * 将客户端的通道注册到worker组的事件循环中,
         * 在register内部会调用next()方法来选择一个事件循环,然后将通道注册到该循环中。
         */
        childGroup.register(child).addListener((ChannelFutureListener) future -> {
            if (!future.isSuccess()) {
                forceClose(child, future.cause());
            }
        });
    }
    catch (Throwable t) {
        forceClose(child, t);
    }
}

可以看到,这个通道处理器会为新请求的通道对象添加处理器,设置选项,并注册到worker事件循环组中。

childHandler属性

重点是这里的childHandler属性是什么?这个属性是通过Acceptor类的构造方法的参数来设置的,Acceptor对象是在ServerTransportbind方法中被创建的,传入的handler是通过config对象创建的通道初始化器。

v1.1.17
java
reactor-netty-core/src/main/java/reactor/netty/transport/TransportConfig.java
public final ChannelInitializer<Channel> channelInitializer(ConnectionObserver connectionObserver,
        @Nullable SocketAddress remoteAddress, boolean onServer) {
    requireNonNull(connectionObserver, "connectionObserver");
    return new TransportChannelInitializer(this, connectionObserver, remoteAddress, onServer);
}

在本文中不深入分析该初始化器是怎么工作的,内部的实现过程还是比较复杂,现在只需要知道是处理请求的就行了,详情请参考ReactoryNetty中HttpServer的请求处理流程一文。

接下来就是对通道进行注册了,详细过程请参考这篇文章:Netty中的通道注册

地址绑定

Netty的AbstractChannel中实现了bind方法。

v4.1.83
java
transport/src/main/java/io/netty/channel/AbstractChannel.java
@Override
public ChannelFuture bind(SocketAddress localAddress) {
    return pipeline.bind(localAddress);
}

通过流水线来执行绑定。

v4.1.83
java
transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
@Override
public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
    // 调用通道的unsafe对象执行底层的bind操作。
    unsafe.bind(localAddress, promise);
}

在流水线的HeadContext的bind方法中调用了unsafe类来进行地址绑定。

v4.1.83
java
transport/src/main/java/io/netty/channel/AbstractChannel.java
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();

    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }


    boolean wasActive = isActive();
    try {
        // 调用JDK底层绑定端口
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    /*
     * 这里判断了两个条件,避免重复发布事件,在上面的register0方法中也有发布该事件的逻辑,注意区分情况。
     */
    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                // 传播channelActive事件
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}

调用各种通道各自的实现,本文涉及的是NioServerSocketChannel

v4.1.83
java
transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) { // JDK版本大于等于7的情况
        // 通过底层JDK的通道执行bind操作,注意内部不仅会bind,还会listen
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

这里会调用底层JDK中的通道的bind方法,注意在这个方法里面,不仅会绑定地址,而且还会监听。

ReactorNetty框架虽然没有使用Netty的启动器ServerBootstrap,但是通道的操作步骤都是一样的,先注册到事件循环中的选择器上,然后执行绑定。

总结

本文从Spring Boot,到ReactoryNetty框架,再到底层Netty,全面分析了是怎么创建事件循环、又是怎么创建通道、以及怎么进行通道注册和地址绑定的。