Netty中的NioSocketChannel

在Netty的客户端通信中,一般会使用NioSocketChannel作为通道类型;另外在服务端,NioServerSocketChannel也会使用该类型的通道表示客户端的连接请求。本文分析一下该类的实现原理。


创建过程

在介绍构造方法之前,先来介绍一下NioSocketChannel的继承体系。

NioSocketChannel

v4.1.83
java
transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class);
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

private static final Method OPEN_SOCKET_CHANNEL_WITH_FAMILY =
        SelectorProviderUtil.findOpenMethod("openSocketChannel");

private final SocketChannelConfig config;
/**
 * Create a new instance
 */
public NioSocketChannel() {
    this(DEFAULT_SELECTOR_PROVIDER);
}

/**
 * Create a new instance using the given {@link SelectorProvider}.
 */
public NioSocketChannel(SelectorProvider provider) {
    this(provider, null);
}

/**
 * Create a new instance using the given {@link SelectorProvider} and protocol family (supported only since JDK 15).
 */
public NioSocketChannel(SelectorProvider provider, InternetProtocolFamily family) {
    this(newChannel(provider, family));
}

/**
 * Create a new instance using the given {@link SocketChannel}.
 */
public NioSocketChannel(SocketChannel socket) {
    this(null, socket);
}

/**
 * Create a new instance
 *
 * @param parent    the {@link Channel} which created this instance or {@code null} if it was created by the user
 * @param socket    the {@link SocketChannel} which will be used
 */
public NioSocketChannel(Channel parent, SocketChannel socket) {
    super(parent, socket);
    config = new NioSocketChannelConfig(this, socket.socket());
}

如果使用无参构造方法,没有传入SelectorProvider的话,则使用默认的。在Linux平台上,默认创建的是EPollSelectorImpl类的实例。

provider
create
<
>
java
jdk/src/share/classes/java/nio/channels/spi/SelectorProvider.java
public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<SelectorProvider>() {
                public SelectorProvider run() {
                        // java.nio.channels.spi.SelectorProvider 属性指定实现类
                        if (loadProviderFromProperty())
                            return provider;
                        // SPI 指定实现类
                        if (loadProviderAsService())
                            return provider;
                        // 默认实现,Windows 和 Linux 下不同
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}
java
jdk/src/solaris/classes/sun/nio/ch/DefaultSelectorProvider.java
public static SelectorProvider create() {
    String osname = AccessController
        .doPrivileged(new GetPropertyAction("os.name"));
    if (osname.equals("SunOS"))
        return createProvider("sun.nio.ch.DevPollSelectorProvider");
    
    if (osname.equals("Linux")) // 如果是Linux系统
        // EPollSelectorProvider中创建的选择器是EPollSelectorImpl
        return createProvider("sun.nio.ch.EPollSelectorProvider");
    return new sun.nio.ch.PollSelectorProvider();
}
@SuppressWarnings("unchecked")
private static SelectorProvider createProvider(String cn) {
    Class<SelectorProvider> c;
    try {
        // 加载指定的类
        c = (Class<SelectorProvider>)Class.forName(cn);
    } catch (ClassNotFoundException x) {
        throw new AssertionError(x);
    }
    try {
        // 反射创建指定类的实例
        return c.newInstance();
    } catch (IllegalAccessException | InstantiationException x) {
        throw new AssertionError(x);
    }

}

而在newChannel方法中则会通过工具类来反射调用EPollSelectorProvideropenSocketChannel方法。

v4.1.83
NioSocketChannel
SelectorProviderUtil
<
>
java
transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java
private static SocketChannel newChannel(SelectorProvider provider, InternetProtocolFamily family) {
    try {
        SocketChannel channel = SelectorProviderUtil.newChannel(OPEN_SOCKET_CHANNEL_WITH_FAMILY, provider, family);
        return channel == null ? provider.openSocketChannel() : channel;
    } catch (IOException e) {
        throw new ChannelException("Failed to open a socket.", e);
    }
}
java
transport/src/main/java/io/netty/channel/socket/nio/SelectorProviderUtil.java
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
static <C extends Channel> C newChannel(Method method, SelectorProvider provider,
                                                InternetProtocolFamily family) throws IOException {
    /**
     *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
     *  {@link SelectorProvider#provider()} which is called by each SocketChannel.open() otherwise.
     *
     *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
     */
    if (family != null && method != null) {
        try {
            @SuppressWarnings("unchecked")
                    // 反射调用方法
            C channel = (C) method.invoke(
                    provider, ProtocolFamilyConverter.convert(family));
            return channel;
        } catch (InvocationTargetException e) {
            throw new IOException(e);
        } catch (IllegalAccessException e) {
            throw new IOException(e);
        }
    }
    return null;
}

实际调用到的方法在EPollSelectorProvider的父类SelectorProviderImpl中。

java
jdk/src/share/classes/sun/nio/ch/SelectorProviderImpl.java
public SocketChannel openSocketChannel() throws IOException {
    return new SocketChannelImpl(this);
}

接下来就调用父类的构造方法了。

AbstractNioByteChannel

v4.1.83
java
transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}

在该构造方法中,设置了当前网络通道感兴趣的事件是READ。

AbstractNioChannel

v4.1.83
java
transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    // 设置底层JDK的通道对象
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        // 配置为非阻塞模式
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            logger.warn(
                        "Failed to close a partially initialized socket.", e2);
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

在该方法中把网络通道设置为了非阻塞模式,记得在底层JDK的服务端通道的accept方法中默认是设置为阻塞模式的。

AbstractChannel

v4.1.83
java
transport/src/main/java/io/netty/channel/AbstractChannel.java
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    /*
     * 下面分别创建了通道ID,操作底层IO的Unsafe类的实例以及通道流水线
     */
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

在该方法中,重要的是创建了Unsafe对象和ChannelPipeline对象。

创建Unsafe对象

该方法在AbstractChannel中是个模板方法,在子类AbstractNioByteChannel中实现了该方法。

v4.1.83
java
transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java
@Override
protected AbstractNioUnsafe newUnsafe() {
    // 创建unsafe对象
    return new NioByteUnsafe();
}

这里创建的是NioByteUnsafe类型的实例。

连接操作

如果使用启动器Bootstrap来启动的话,那么一般会调用其connect方法来建立与服务端的连接。该操作最后调用到Unsafe类的connect方法,在子类AbstractNioUnsafe类中实现了connect方法。

v4.1.83
java
transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java
@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    try {
        if (connectPromise != null) {
            // Already a connect in process.
            throw new ConnectionPendingException();
        }

        boolean wasActive = isActive();
        // 执行连接
        if (doConnect(remoteAddress, localAddress)) {
            fulfillConnectPromise(promise, wasActive);
        } else {
            connectPromise = promise;
            requestedRemoteAddress = remoteAddress;

            // Schedule connect timeout.
            int connectTimeoutMillis = config().getConnectTimeoutMillis();
            if (connectTimeoutMillis > 0) {
                connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                    @Override
                    public void run() {
                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                        if (connectPromise != null && !connectPromise.isDone()
                                && connectPromise.tryFailure(new ConnectTimeoutException(
                                        "connection timed out: " + remoteAddress))) {
                            close(voidPromise());
                        }
                    }
                }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
            }

            promise.addListener(new ChannelFutureListener() {

                // 连接成功后会执行该回调
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isCancelled()) {
                        if (connectTimeoutFuture != null) {
                            connectTimeoutFuture.cancel(false);
                        }
                        connectPromise = null;
                        close(voidPromise());
                    }
                }
            });
        }
    } catch (Throwable t) {
        promise.tryFailure(annotateConnectException(t, remoteAddress));
        closeIfClosed();
    }
}

在该方法中,会调用doConnect方法来建立连接,该方法在AbstractNioUnsafe中是个模板方法,在子类NioSocketChannel中重写了该方法。

v4.1.83
doConnect
connect
<
>
java
transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    if (localAddress != null) {
        // 一般客户端不用绑定地址
        doBind0(localAddress);
    }

    boolean success = false;
    try {
        // 调用底层JDK通道的connect操作
        boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
        if (!connected) { // 如果还没有连接成功
            // 则设置感兴趣的事件为CONNECT
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {
        if (!success) {
            doClose();
        }
    }
}
java
common/src/main/java/io/netty/util/internal/SocketUtils.java
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
        throws IOException {
    try {
        return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
            @Override
            public Boolean run() throws IOException {
                return socketChannel.connect(remoteAddress);
            }
        });
    } catch (PrivilegedActionException e) {
        throw (IOException) e.getCause();
    }
}

在该方法中会通过工具类来调用底层JDK通道的connect操作。

java
jdk/src/share/classes/sun/nio/ch/SocketChannelImpl.java
public boolean connect(SocketAddress sa) throws IOException {
    int localPort = 0;

    synchronized (readLock) {
        synchronized (writeLock) {
            ensureOpenAndUnconnected();
            InetSocketAddress isa = Net.checkAddress(sa);
            SecurityManager sm = System.getSecurityManager();
            if (sm != null)
                sm.checkConnect(isa.getAddress().getHostAddress(),
                                isa.getPort());
            synchronized (blockingLock()) {
                int n = 0;
                try {
                    try {
                        begin();
                        synchronized (stateLock) {
                            if (!isOpen()) {
                                return false;
                            }
                            // notify hook only if unbound
                            if (localAddress == null) {
                                NetHooks.beforeTcpConnect(fd,
                                                       isa.getAddress(),
                                                       isa.getPort());
                            }
                            readerThread = NativeThread.current();
                        }
                        for (;;) {
                            InetAddress ia = isa.getAddress();
                            if (ia.isAnyLocalAddress())
                                ia = InetAddress.getLocalHost();
                            // 执行底层操作系统的connect调用
                            n = Net.connect(fd,
                                            ia,
                                            isa.getPort());
                            if (  (n == IOStatus.INTERRUPTED)
                                  && isOpen())
                                continue;
                            break;
                        }

                    } finally {
                        readerCleanup();
                        end((n > 0) || (n == IOStatus.UNAVAILABLE));
                        assert IOStatus.check(n);
                    }
                } catch (IOException x) {
                    // If an exception was thrown, close the channel after
                    // invoking end() so as to avoid bogus
                    // AsynchronousCloseExceptions
                    close();
                    throw x;
                }
                synchronized (stateLock) {
                    remoteAddress = isa;
                    if (n > 0) {

                        // Connection succeeded; disallow further
                        // invocation
                        state = ST_CONNECTED;
                        if (isOpen())
                            localAddress = Net.localAddress(fd);
                        return true;
                    }
                    // If nonblocking and no exception then connection
                    // pending; disallow another invocation
                    if (!isBlocking())
                        state = ST_PENDING;
                    else
                        assert false;
                }
            }
            return false;
        }
    }
}