Netty中通道的注册

不管是服务端还是客户端的通道,都会注册到事件循环所关联的通道选择器中。注册到选择器之后,当通道有事件发生时,事件循环会通过选择器获取到这些事件,并相应调用方法来处理。本文来分析一下通道的注册过程。


EventLoopGroup接口中的register方法

EventLoopGroup接口中定义了注册通道的方法,实际上是会调用事件循环中的register方法,毕竟通道选择器被保存在事件循环对象中。

v4.1.83
java
transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java
@Override
public ChannelFuture register(Channel channel) {
    // 选择一个事件循环,并调用其register方法
    return next().register(channel);
}

EventLoop对象的register实现定义在SingleThreadEventLoop类中

v4.1.83
java
transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java
@Override
public ChannelFuture register(Channel channel) {
    // 将通道对象封装为promise对象
    return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    // 通过通道的unsafe对象来进行注册
    promise.channel().unsafe().register(this, promise);
    return promise;
}

这里是调用到了Unsafe类中的register方法。

Unsafe接口中的register方法

Unsafe接口中的register方法被子类AbstractUnsafe类实现,不管是NioMessageUnsafe还是NioByteUnsafe,都对应该方法。

v4.1.83
register
register0
<
>
java
transport/src/main/java/io/netty/channel/AbstractChannel.java
/*
 * 将通道注册到事件循环关联的选择器中
 */
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    // 绑定事件循环器
    AbstractChannel.this.eventLoop = eventLoop;

    // 判断当前线程是否是reactor线程
    if (eventLoop.inEventLoop()) {
        // 进行注册
        register0(promise);
    } else {
        try {
            /*
             * ====================
             * NioEventLoop的启动入口
             * ====================
             */
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}
java
transport/src/main/java/io/netty/channel/AbstractChannel.java
private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        // 执行注册
        doRegister();
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        // 会执行handlerAdded回调
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        // 传播channelRegistered事件
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.

        /*
         * ======================== 特别重要 ==========================
         * 这里因为调用链比较绕,所以很容易搞混
         *
         * 这里如果注册的是NioServerSocketChannel,那么此时还没被bind,这里isActive会返回false
         * 后续执行bind的时候,会以异步的形式在HeadContext执行fireChannelActive,
         * 注意同时HeadContext的channelActive()方法也会被触发调用,
         * 最终会执行到AbstractNioUnsafe#doBeginRead,然后进行事件注册
         *
         * 这里如果注册的是NioSocketChannel,那么isActive会返回true(因为连接已经建立了),
         * firstRegistration也会是true,那么就直接在这里执行了fireChannelActive,
         * 然后一样地,HeadContext的channelActive()方法会被触发调用,
         * 最终会执行到AbstractNioUnsafe#doBeginRead,然后进行事件注册
         * ======================== 特别重要 ==========================
         */
        if (isActive()) {
            if (firstRegistration) {
                /*
                 * 传播channelActive事件
                 *
                 */
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

实际的注册是在register0方法中调用doRegister来实现的,在AbstractChannel中该方法是个模板方法,子类AbstractNioChannel重写了它。

v4.1.83
java
transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java
@Override
protected void doRegister() throws Exception {
    boolean selected = false;

    // 为什么要自旋?可能是担心会注册失败,所以在循环中方便重试一次。
    for (;;) {
        try {
            /*
             * 拿到Java层面的channel(ServerSocketChannel),注册到java层面的selector上,
             * 并把netty中的channel作为附件挂在jdk层面的selector上,后续事件达到selector时,
             * 可以拿出netty层面的channel处理
             * 注意,这里注册的事件代码是0,表示还不关心任何事件,只是建立绑定关系。
             * 设置的附件是当前通道对象,在事件循环中会使用该通道对象的Unsafe对象来处理事件。
             *
             * 后续在通道Active的时候,会注册具体感兴趣的事件。
             */
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                // 执行一次事件选择
                eventLoop().selectNow();
                // 再试一次注册,
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                // 第二次还是失败的话则抛出异常
                throw e;
            }
        }
    }
}

通过调用底层JDK的方法将通道注册到选择器中,注意这里将当前通道对象设置成了附件对象。后续在事件循环中,会通过该通道对象的Unsafe对象来处理就绪事件。参考Netty中的NioEventLoop一文。