Netty中的NioServerSocketChannel

NioServerSocketChannel是Netty中的一种服务端网络通道,是一种最常被使用的通道类型。本文会分析该类对象的创建过程是怎样的,以及是怎么实现地址绑定和监听,和怎么接受客户端的连接请求的。


创建过程

在介绍其构造方法之前,先来熟悉一下NioServerSocketChannel的继承体系。

NioServerSocketChannel

v4.1.83
java
transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
/*
 * 这里获取JDK层面的SelectorProvider,在Linux系统中是EPollSelectorProvider
 */
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class);

private static final Method OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY =
        SelectorProviderUtil.findOpenMethod("openServerSocketChannel");
/**
 * Create a new instance
 */
public NioServerSocketChannel() {
    // 传入默认的选择器提供者
    this(DEFAULT_SELECTOR_PROVIDER);
}

/**
 * Create a new instance using the given {@link SelectorProvider}.
 */
public NioServerSocketChannel(SelectorProvider provider) {
    this(provider, null);
}

/**
 * Create a new instance using the given {@link SelectorProvider} and protocol family (supported only since JDK 15).
 */
public NioServerSocketChannel(SelectorProvider provider, InternetProtocolFamily family) {
    // 先创建JDK层面的ServerSocketChannel,再对其进行封装。
    this(newChannel(provider, family));
}

/**
 * Create a new instance using the given {@link ServerSocketChannel}.
 */
public NioServerSocketChannel(ServerSocketChannel channel) {
    /*
     * 通过父类构造器来进行封装,这里传入了该channel所感兴趣的事件为OP_ACCEPT,表示处理客户端的连接。
     */
    super(null, channel, SelectionKey.OP_ACCEPT);
    /*
     * 创建配置
     * 这里传入了JDK层面的ServerSocketChannel中的ServerSocket对象
     */
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

如果使用无参构造方法,没有传入SelectorProvider的话,则使用默认的。在Linux平台上,默认创建的是EPollSelectorImpl类的实例。

provider
create
<
>
java
jdk/src/share/classes/java/nio/channels/spi/SelectorProvider.java
public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<SelectorProvider>() {
                public SelectorProvider run() {
                        // java.nio.channels.spi.SelectorProvider 属性指定实现类
                        if (loadProviderFromProperty())
                            return provider;
                        // SPI 指定实现类
                        if (loadProviderAsService())
                            return provider;
                        // 默认实现,Windows 和 Linux 下不同
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}
java
jdk/src/solaris/classes/sun/nio/ch/DefaultSelectorProvider.java
public static SelectorProvider create() {
    String osname = AccessController
        .doPrivileged(new GetPropertyAction("os.name"));
    if (osname.equals("SunOS"))
        return createProvider("sun.nio.ch.DevPollSelectorProvider");
    
    if (osname.equals("Linux")) // 如果是Linux系统
        // EPollSelectorProvider中创建的选择器是EPollSelectorImpl
        return createProvider("sun.nio.ch.EPollSelectorProvider");
    return new sun.nio.ch.PollSelectorProvider();
}
@SuppressWarnings("unchecked")
private static SelectorProvider createProvider(String cn) {
    Class<SelectorProvider> c;
    try {
        // 加载指定的类
        c = (Class<SelectorProvider>)Class.forName(cn);
    } catch (ClassNotFoundException x) {
        throw new AssertionError(x);
    }
    try {
        // 反射创建指定类的实例
        return c.newInstance();
    } catch (IllegalAccessException | InstantiationException x) {
        throw new AssertionError(x);
    }

}

而在newChannel方法中则会通过工具类来反射调用EPollSelectorProvideropenServerSocketChannel方法。

v4.1.83
newChannel
newChannel
<
>
java
transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
/*
 * 这里获取JDK层面的SelectorProvider,在Linux系统中是EPollSelectorProvider
 */
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class);

private static final Method OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY =
        SelectorProviderUtil.findOpenMethod("openServerSocketChannel");
    /*
     * 创建服务端网络通道
     */
    private static ServerSocketChannel newChannel(SelectorProvider provider, InternetProtocolFamily family) {
        try {
            // 创建通道
            ServerSocketChannel channel =
                    SelectorProviderUtil.newChannel(OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY, provider, family);
            /*
             * 这里通过JDK的SelectorProvider来创建服务端网络通道,具体逻辑参考SelectorProviderImpl。
             */
            return channel == null ? provider.openServerSocketChannel() : channel;
        } catch (IOException e) {
            throw new ChannelException("Failed to open a socket.", e);
        }
    }
java
transport/src/main/java/io/netty/channel/socket/nio/SelectorProviderUtil.java
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
static <C extends Channel> C newChannel(Method method, SelectorProvider provider,
                                                InternetProtocolFamily family) throws IOException {
    /**
     *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
     *  {@link SelectorProvider#provider()} which is called by each SocketChannel.open() otherwise.
     *
     *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
     */
    if (family != null && method != null) {
        try {
            @SuppressWarnings("unchecked")
                    // 反射调用方法
            C channel = (C) method.invoke(
                    provider, ProtocolFamilyConverter.convert(family));
            return channel;
        } catch (InvocationTargetException e) {
            throw new IOException(e);
        } catch (IllegalAccessException e) {
            throw new IOException(e);
        }
    }
    return null;
}

实际调用到的方法是在EPollSelectorProvider的父类SelectorProviderImpl中。

java
jdk/src/share/classes/sun/nio/ch/SelectorProviderImpl.java
public ServerSocketChannel openServerSocketChannel() throws IOException {
    // 创建服务端的网络通道
    return new ServerSocketChannelImpl(this);
}

可以看到,创建的是ServerSocketChannelImpl类型的对象。

在创建完服务端通道对象后,就调用了父类的构造方法。

AbstractNioMessageChannel

v4.1.83
java
transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java
/*
 * parent参数是Netty中的Channel,第二个参数是JDK中的SelectableChannel
 */
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent, ch, readInterestOp);
}

继续调用父类的构造方法。

AbstractNioChannel

v4.1.83
java
transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    // 设置底层JDK的通道对象
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        // 配置为非阻塞模式
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            logger.warn(
                        "Failed to close a partially initialized socket.", e2);
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

在该方法中,将JDK层面的服务端通道设置为了非阻塞模式。

AbstractChannel

v4.1.83
java
transport/src/main/java/io/netty/channel/AbstractChannel.java
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    /*
     * 下面分别创建了通道ID,操作底层IO的Unsafe类的实例以及通道流水线
     */
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

在该构造方法中,最重要的是创建了Unsafe对象和ChannelPipeline通道流水线对象。

创建Unsafe对象

AbstractChannel中的newUnsafe方法是个模板方法,在子类AbstractNioMessageChannel中有实现。

v4.1.83
java
transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java
@Override
protected AbstractNioUnsafe newUnsafe() {
    // 创建unsafe对象
    return new NioMessageUnsafe();
}

创建ChannelPipeline对象

v4.1.83
java
transport/src/main/java/io/netty/channel/AbstractChannel.java
protected DefaultChannelPipeline newChannelPipeline() {
    // 创建默认通道流水线
    return new DefaultChannelPipeline(this);
}

这里创建的是Netty中的通道流水线的默认实现DefaultChannelPipeline

绑定地址和监听

NioServerSocketChannel类中,只定义了与绑定地址相关的方法,而没有监听的方法,这是怎么回事?别着急,继续往下看。

v4.1.83
java
transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) { // JDK版本大于等于7的情况
        // 通过底层JDK的通道执行bind操作,注意内部不仅会bind,还会listen
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

在上面的方法中,通过JDK层面的通道对象来进行绑定的(只分析JDK版本大于等于7的情况,毕竟现在没有哪个公司还在用低于7的版本吧?)。

java
jdk/src/share/classes/sun/nio/ch/ServerSocketChannelImpl.java
@Override
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
    synchronized (lock) {
        if (!isOpen())
            throw new ClosedChannelException();
        if (isBound())
            throw new AlreadyBoundException();
        // 创建地址对象
        InetSocketAddress isa = (local == null) ? new InetSocketAddress(0) :
            Net.checkAddress(local);
        SecurityManager sm = System.getSecurityManager();
        if (sm != null)
            // 检查端口是否可以被监听
            sm.checkListen(isa.getPort());
        NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
        // 执行绑定
        Net.bind(fd, isa.getAddress(), isa.getPort());
        // 执行监听,backlog默认是50
        Net.listen(fd, backlog < 1 ? 50 : backlog);
        synchronized (stateLock) {
            localAddress = Net.localAddress(fd);
        }
    }
    return this;
}

可以看到,在该方法中不仅完成了bind,还进行了listen操作。所以Netty在NioServerSocketChannel中没有定义listen相关方法。

接受客户端连接请求

NioEventLoop的事件循环中,如果发现通道的就绪事件为ACCEPT,那么会调用Unsafe对象的read方法来处理。对于NioServerSocketChannel而言,上面看到了创建的是NioMessageUnsafe对象。在该类中,实现了read方法。

v4.1.83
java
transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java
private final class NioMessageUnsafe extends AbstractNioUnsafe {

    private final List<Object> readBuf = new ArrayList<Object>();

    @Override
    public void read() {
        // 必需要由reactor线程调用
        assert eventLoop().inEventLoop();
        // 获取服务端通道的配置
        final ChannelConfig config = config();
        // 获取channel对应的pipeline
        final ChannelPipeline pipeline = pipeline();
        // 分配接收缓冲区
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.reset(config);

        boolean closed = false;
        Throwable exception = null;
        try {
            try {
                do {
                    /*
                     * 创建新的NioSocketChannel,并将新连接的通道对象保存到readBuf中。
                     */
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
                        break;
                    }
                    if (localRead < 0) {
                        closed = true;
                        break;
                    }

                    allocHandle.incMessagesRead(localRead);
                }
                /*
                 * 这里判断了是否应该继续读
                 * 一次只会读取部分通道,不会一直卡在这里。
                 */
                while (continueReading(allocHandle));
            } catch (Throwable t) {
                exception = t;
            }

            // 设置并绑定NioSocketChannel
            int size = readBuf.size();
            // 对每个新通道执行流水线处理
            for (int i = 0; i < size; i ++) {
                readPending = false;
                pipeline.fireChannelRead(readBuf.get(i));
            }
            // 当前轮询的事件已处理,清空集合,以便下次处理。
            readBuf.clear();
            allocHandle.readComplete();
            // 广播readComplete事件
            pipeline.fireChannelReadComplete();

            if (exception != null) {
                closed = closeOnReadError(exception);

                pipeline.fireExceptionCaught(exception);
            }

            if (closed) {
                inputShutdown = true;
                if (isOpen()) {
                    close(voidPromise());
                }
            }
        } finally {
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
}

接收连接

在上面的read方法中,调用的doReadMessages方法是AbstractNioMessageChannel中的模板方法,子类NioServerSocketChannel中实现了该方法。

v4.1.83
doReadMessages
accept
<
>
java
transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    // 执行Accept,创建JDK层面的channel,用于表示新连接
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) {
            // 将新连接通道封装为NioSocketChannel对象
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}
java
common/src/main/java/io/netty/util/internal/SocketUtils.java
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
    try {
        return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
            @Override
            public SocketChannel run() throws IOException {
                // 执行accept操作
                return serverSocketChannel.accept();
            }
        });
    } catch (PrivilegedActionException e) {
        throw (IOException) e.getCause();
    }
}

在该方法中,通过底层JDK中的accept方法来接受连接请求,然后将新连接的JDK通道对象封装为Netty中的NioSocketChannel,并以当前NioServerSocketChannel作为parent。

java
jdk/src/share/classes/sun/nio/ch/ServerSocketChannelImpl.java
public SocketChannel accept() throws IOException {
    synchronized (lock) {
        if (!isOpen())
            throw new ClosedChannelException();
        if (!isBound())
            throw new NotYetBoundException();
        SocketChannel sc = null;

        int n = 0;
        FileDescriptor newfd = new FileDescriptor();
        InetSocketAddress[] isaa = new InetSocketAddress[1];

        try {
            begin();
            if (!isOpen())
                return null;
            thread = NativeThread.current();
            for (;;) {
                // 调用操作系统的accept系统调用
                n = accept0(this.fd, newfd, isaa);
                if ((n == IOStatus.INTERRUPTED) && isOpen())
                    continue;
                break;
            }
        } finally {
            thread = 0;
            end(n > 0);
            assert IOStatus.check(n);
        }

        if (n < 1)
            return null;

        // 默认设置为阻塞模式,(为什么不直接设置为非阻塞模式?)
        IOUtil.configureBlocking(newfd, true);
        InetSocketAddress isa = isaa[0];
        // 创建socket对象
        sc = new SocketChannelImpl(provider(), newfd, isa);
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            try {
                sm.checkAccept(isa.getAddress().getHostAddress(),
                               isa.getPort());
            } catch (SecurityException x) {
                sc.close();
                throw x;
            }
        }
        return sc;

    }
}

注册新连接通道

NioMessageUnsafe类的read方法中,会调用fireChannelRead方法来向通道流水线中发起read事件,并传入新连接的通道对象让通道处理器处理。 如果使用了ServerBootstrap这一启动器来启动的话,那么会向NioServerSocketChannel的通道流水线中注册一个用于处理新连接的通道处理器ServerBootstrapAcceptor

v4.1.83
java
transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java
@Override
void init(Channel channel) {
    // 为通道设置空的选项集合
    setChannelOptions(channel, newOptionsArray(), logger);
    // 为通道设置空的属性集合
    setAttributes(channel, newAttributesArray());

    // 获取通道的流水线
    ChannelPipeline p = channel.pipeline();

    // 下面lambda中会用到,所以这里转为effective final。
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);

    // 添加通道初始化器
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            // 获取通道流水线
            final ChannelPipeline pipeline = ch.pipeline();
            // 获取自定义的ChannelHandler
            ChannelHandler handler = config.handler();
            if (handler != null) {
                // 将自定义的通道处理器添加到流水线末尾
                pipeline.addLast(handler);
            }

            // 设置一个自定义的ChannelHandler
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    // 添加acceptor
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

而在该通过处理器中,会将新连接的通道注册到子事件循环组中。

v4.1.83
java
transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java
// 新连接接入时被调用
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // 为什么这里msg可以直接转为Channel
    final Channel child = (Channel) msg;

    /*
     * 给新连接添加用户自定义的handler处理器,这个childHandler是通过ServerBootstrap.childHandler方法传入的。
     * 通常是一个特殊的ChannelHandler,即ChannelInitializer。
     * channel注册后,会执行ChannelInitializer的initChannel方法,然后并删除自身。
     */
    child.pipeline().addLast(childHandler);

    // 设置选项,主要是和TCP有关的一些参数
    setChannelOptions(child, childOptions, logger);
    // 设置属性
    setAttributes(child, childAttrs);

    try {
        /*
         * 绑定新连接请求的channel到reactor线程,这里childGroup就是指的worker线程组
         */
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

这样,boss事件循环组的任务就完成了,接下来就是由worker事件循环组来负责数据读写了。

总结

本文介绍了Netty中的NioServerSocketChannel这一最常使用的服务端通道的相关原理。包括创建过程,以及地址绑定和监听的实现,最后介绍了接受新连接请求的实现过程。