NioServerSocketChannel是Netty中的一种服务端网络通道,是一种最常被使用的通道类型。本文会分析该类对象的创建过程是怎样的,以及是怎么实现地址绑定和监听,和怎么接受客户端的连接请求的。
创建过程
在介绍其构造方法之前,先来熟悉一下NioServerSocketChannel的继承体系。
NioServerSocketChannel
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
/*
* 这里获取JDK层面的SelectorProvider,在Linux系统中是EPollSelectorProvider
*/
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class);
private static final Method OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY =
SelectorProviderUtil.findOpenMethod("openServerSocketChannel");
/**
* Create a new instance
*/
public NioServerSocketChannel() {
// 传入默认的选择器提供者
this(DEFAULT_SELECTOR_PROVIDER);
}
/**
* Create a new instance using the given {@link SelectorProvider}.
*/
public NioServerSocketChannel(SelectorProvider provider) {
this(provider, null);
}
/**
* Create a new instance using the given {@link SelectorProvider} and protocol family (supported only since JDK 15).
*/
public NioServerSocketChannel(SelectorProvider provider, InternetProtocolFamily family) {
// 先创建JDK层面的ServerSocketChannel,再对其进行封装。
this(newChannel(provider, family));
}
/**
* Create a new instance using the given {@link ServerSocketChannel}.
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
/*
* 通过父类构造器来进行封装,这里传入了该channel所感兴趣的事件为OP_ACCEPT,表示处理客户端的连接。
*/
super(null, channel, SelectionKey.OP_ACCEPT);
/*
* 创建配置
* 这里传入了JDK层面的ServerSocketChannel中的ServerSocket对象
*/
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
如果使用无参构造方法,没有传入SelectorProvider的话,则使用默认的。在Linux平台上,默认创建的是EPollSelectorImpl类的实例。
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
// java.nio.channels.spi.SelectorProvider 属性指定实现类
if (loadProviderFromProperty())
return provider;
// SPI 指定实现类
if (loadProviderAsService())
return provider;
// 默认实现,Windows 和 Linux 下不同
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
public static SelectorProvider create() {
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux")) // 如果是Linux系统
// EPollSelectorProvider中创建的选择器是EPollSelectorImpl
return createProvider("sun.nio.ch.EPollSelectorProvider");
return new sun.nio.ch.PollSelectorProvider();
}
@SuppressWarnings("unchecked")
private static SelectorProvider createProvider(String cn) {
Class<SelectorProvider> c;
try {
// 加载指定的类
c = (Class<SelectorProvider>)Class.forName(cn);
} catch (ClassNotFoundException x) {
throw new AssertionError(x);
}
try {
// 反射创建指定类的实例
return c.newInstance();
} catch (IllegalAccessException | InstantiationException x) {
throw new AssertionError(x);
}
}
而在newChannel方法中则会通过工具类来反射调用EPollSelectorProvider的openServerSocketChannel方法。
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
/*
* 这里获取JDK层面的SelectorProvider,在Linux系统中是EPollSelectorProvider
*/
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class);
private static final Method OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY =
SelectorProviderUtil.findOpenMethod("openServerSocketChannel");
/*
* 创建服务端网络通道
*/
private static ServerSocketChannel newChannel(SelectorProvider provider, InternetProtocolFamily family) {
try {
// 创建通道
ServerSocketChannel channel =
SelectorProviderUtil.newChannel(OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY, provider, family);
/*
* 这里通过JDK的SelectorProvider来创建服务端网络通道,具体逻辑参考SelectorProviderImpl。
*/
return channel == null ? provider.openServerSocketChannel() : channel;
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
static <C extends Channel> C newChannel(Method method, SelectorProvider provider,
InternetProtocolFamily family) throws IOException {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each SocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
if (family != null && method != null) {
try {
@SuppressWarnings("unchecked")
// 反射调用方法
C channel = (C) method.invoke(
provider, ProtocolFamilyConverter.convert(family));
return channel;
} catch (InvocationTargetException e) {
throw new IOException(e);
} catch (IllegalAccessException e) {
throw new IOException(e);
}
}
return null;
}
实际调用到的方法是在EPollSelectorProvider的父类SelectorProviderImpl中。
public ServerSocketChannel openServerSocketChannel() throws IOException {
// 创建服务端的网络通道
return new ServerSocketChannelImpl(this);
}
可以看到,创建的是ServerSocketChannelImpl类型的对象。
在创建完服务端通道对象后,就调用了父类的构造方法。
AbstractNioMessageChannel
/*
* parent参数是Netty中的Channel,第二个参数是JDK中的SelectableChannel
*/
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
继续调用父类的构造方法。
AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
// 设置底层JDK的通道对象
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
// 配置为非阻塞模式
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
在该方法中,将JDK层面的服务端通道设置为了非阻塞模式。
AbstractChannel
protected AbstractChannel(Channel parent) {
this.parent = parent;
/*
* 下面分别创建了通道ID,操作底层IO的Unsafe类的实例以及通道流水线
*/
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
在该构造方法中,最重要的是创建了Unsafe对象和ChannelPipeline通道流水线对象。
创建Unsafe对象
在AbstractChannel中的newUnsafe方法是个模板方法,在子类AbstractNioMessageChannel中有实现。
@Override
protected AbstractNioUnsafe newUnsafe() {
// 创建unsafe对象
return new NioMessageUnsafe();
}
创建ChannelPipeline对象
protected DefaultChannelPipeline newChannelPipeline() {
// 创建默认通道流水线
return new DefaultChannelPipeline(this);
}
这里创建的是Netty中的通道流水线的默认实现DefaultChannelPipeline。
绑定地址和监听
在NioServerSocketChannel类中,只定义了与绑定地址相关的方法,而没有监听的方法,这是怎么回事?别着急,继续往下看。
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) { // JDK版本大于等于7的情况
// 通过底层JDK的通道执行bind操作,注意内部不仅会bind,还会listen
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
在上面的方法中,通过JDK层面的通道对象来进行绑定的(只分析JDK版本大于等于7的情况,毕竟现在没有哪个公司还在用低于7的版本吧?)。
@Override
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
synchronized (lock) {
if (!isOpen())
throw new ClosedChannelException();
if (isBound())
throw new AlreadyBoundException();
// 创建地址对象
InetSocketAddress isa = (local == null) ? new InetSocketAddress(0) :
Net.checkAddress(local);
SecurityManager sm = System.getSecurityManager();
if (sm != null)
// 检查端口是否可以被监听
sm.checkListen(isa.getPort());
NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
// 执行绑定
Net.bind(fd, isa.getAddress(), isa.getPort());
// 执行监听,backlog默认是50
Net.listen(fd, backlog < 1 ? 50 : backlog);
synchronized (stateLock) {
localAddress = Net.localAddress(fd);
}
}
return this;
}
可以看到,在该方法中不仅完成了bind,还进行了listen操作。所以Netty在NioServerSocketChannel中没有定义listen相关方法。
接受客户端连接请求
在NioEventLoop的事件循环中,如果发现通道的就绪事件为ACCEPT,那么会调用Unsafe对象的read方法来处理。对于NioServerSocketChannel而言,上面看到了创建的是NioMessageUnsafe对象。在该类中,实现了read方法。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
// 必需要由reactor线程调用
assert eventLoop().inEventLoop();
// 获取服务端通道的配置
final ChannelConfig config = config();
// 获取channel对应的pipeline
final ChannelPipeline pipeline = pipeline();
// 分配接收缓冲区
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
/*
* 创建新的NioSocketChannel,并将新连接的通道对象保存到readBuf中。
*/
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
}
/*
* 这里判断了是否应该继续读
* 一次只会读取部分通道,不会一直卡在这里。
*/
while (continueReading(allocHandle));
} catch (Throwable t) {
exception = t;
}
// 设置并绑定NioSocketChannel
int size = readBuf.size();
// 对每个新通道执行流水线处理
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
// 当前轮询的事件已处理,清空集合,以便下次处理。
readBuf.clear();
allocHandle.readComplete();
// 广播readComplete事件
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
接收连接
在上面的read方法中,调用的doReadMessages方法是AbstractNioMessageChannel中的模板方法,子类NioServerSocketChannel中实现了该方法。
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// 执行Accept,创建JDK层面的channel,用于表示新连接
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// 将新连接通道封装为NioSocketChannel对象
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
@Override
public SocketChannel run() throws IOException {
// 执行accept操作
return serverSocketChannel.accept();
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
在该方法中,通过底层JDK中的accept方法来接受连接请求,然后将新连接的JDK通道对象封装为Netty中的NioSocketChannel,并以当前NioServerSocketChannel作为parent。
public SocketChannel accept() throws IOException {
synchronized (lock) {
if (!isOpen())
throw new ClosedChannelException();
if (!isBound())
throw new NotYetBoundException();
SocketChannel sc = null;
int n = 0;
FileDescriptor newfd = new FileDescriptor();
InetSocketAddress[] isaa = new InetSocketAddress[1];
try {
begin();
if (!isOpen())
return null;
thread = NativeThread.current();
for (;;) {
// 调用操作系统的accept系统调用
n = accept0(this.fd, newfd, isaa);
if ((n == IOStatus.INTERRUPTED) && isOpen())
continue;
break;
}
} finally {
thread = 0;
end(n > 0);
assert IOStatus.check(n);
}
if (n < 1)
return null;
// 默认设置为阻塞模式,(为什么不直接设置为非阻塞模式?)
IOUtil.configureBlocking(newfd, true);
InetSocketAddress isa = isaa[0];
// 创建socket对象
sc = new SocketChannelImpl(provider(), newfd, isa);
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
try {
sm.checkAccept(isa.getAddress().getHostAddress(),
isa.getPort());
} catch (SecurityException x) {
sc.close();
throw x;
}
}
return sc;
}
}
注册新连接通道
在NioMessageUnsafe类的read方法中,会调用fireChannelRead方法来向通道流水线中发起read事件,并传入新连接的通道对象让通道处理器处理。 如果使用了ServerBootstrap这一启动器来启动的话,那么会向NioServerSocketChannel的通道流水线中注册一个用于处理新连接的通道处理器ServerBootstrapAcceptor。
@Override
void init(Channel channel) {
// 为通道设置空的选项集合
setChannelOptions(channel, newOptionsArray(), logger);
// 为通道设置空的属性集合
setAttributes(channel, newAttributesArray());
// 获取通道的流水线
ChannelPipeline p = channel.pipeline();
// 下面lambda中会用到,所以这里转为effective final。
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
// 添加通道初始化器
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
// 获取通道流水线
final ChannelPipeline pipeline = ch.pipeline();
// 获取自定义的ChannelHandler
ChannelHandler handler = config.handler();
if (handler != null) {
// 将自定义的通道处理器添加到流水线末尾
pipeline.addLast(handler);
}
// 设置一个自定义的ChannelHandler
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加acceptor
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
而在该通过处理器中,会将新连接的通道注册到子事件循环组中。
// 新连接接入时被调用
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 为什么这里msg可以直接转为Channel
final Channel child = (Channel) msg;
/*
* 给新连接添加用户自定义的handler处理器,这个childHandler是通过ServerBootstrap.childHandler方法传入的。
* 通常是一个特殊的ChannelHandler,即ChannelInitializer。
* channel注册后,会执行ChannelInitializer的initChannel方法,然后并删除自身。
*/
child.pipeline().addLast(childHandler);
// 设置选项,主要是和TCP有关的一些参数
setChannelOptions(child, childOptions, logger);
// 设置属性
setAttributes(child, childAttrs);
try {
/*
* 绑定新连接请求的channel到reactor线程,这里childGroup就是指的worker线程组
*/
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
这样,boss事件循环组的任务就完成了,接下来就是由worker事件循环组来负责数据读写了。
总结
本文介绍了Netty中的NioServerSocketChannel这一最常使用的服务端通道的相关原理。包括创建过程,以及地址绑定和监听的实现,最后介绍了接受新连接请求的实现过程。