Netty中通道的注册
不管是服务端还是客户端的通道,都会注册到事件循环所关联的通道选择器中。注册到选择器之后,当通道有事件发生时,事件循环会通过选择器获取到这些事件,并相应调用方法来处理。本文来分析一下通道的注册过程。
EventLoopGroup接口中的register方法
EventLoopGroup接口中定义了注册通道的方法,实际上是会调用事件循环中的register方法,毕竟通道选择器被保存在事件循环对象中。
java
transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java
@Override
public ChannelFuture register(Channel channel) {
// 选择一个事件循环,并调用其register方法
return next().register(channel);
}
EventLoop对象的register实现定义在SingleThreadEventLoop类中
java
transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java
@Override
public ChannelFuture register(Channel channel) {
// 将通道对象封装为promise对象
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// 通过通道的unsafe对象来进行注册
promise.channel().unsafe().register(this, promise);
return promise;
}
这里是调用到了Unsafe类中的register方法。
Unsafe接口中的register方法
Unsafe接口中的register方法被子类AbstractUnsafe类实现,不管是NioMessageUnsafe还是NioByteUnsafe,都对应该方法。
register
register0
java
transport/src/main/java/io/netty/channel/AbstractChannel.java
/*
* 将通道注册到事件循环关联的选择器中
*/
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// 绑定事件循环器
AbstractChannel.this.eventLoop = eventLoop;
// 判断当前线程是否是reactor线程
if (eventLoop.inEventLoop()) {
// 进行注册
register0(promise);
} else {
try {
/*
* ====================
* NioEventLoop的启动入口
* ====================
*/
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
java
transport/src/main/java/io/netty/channel/AbstractChannel.java
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 执行注册
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// 会执行handlerAdded回调
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
// 传播channelRegistered事件
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
/*
* ======================== 特别重要 ==========================
* 这里因为调用链比较绕,所以很容易搞混
*
* 这里如果注册的是NioServerSocketChannel,那么此时还没被bind,这里isActive会返回false
* 后续执行bind的时候,会以异步的形式在HeadContext执行fireChannelActive,
* 注意同时HeadContext的channelActive()方法也会被触发调用,
* 最终会执行到AbstractNioUnsafe#doBeginRead,然后进行事件注册
*
* 这里如果注册的是NioSocketChannel,那么isActive会返回true(因为连接已经建立了),
* firstRegistration也会是true,那么就直接在这里执行了fireChannelActive,
* 然后一样地,HeadContext的channelActive()方法会被触发调用,
* 最终会执行到AbstractNioUnsafe#doBeginRead,然后进行事件注册
* ======================== 特别重要 ==========================
*/
if (isActive()) {
if (firstRegistration) {
/*
* 传播channelActive事件
*
*/
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
实际的注册是在register0方法中调用doRegister来实现的,在AbstractChannel中该方法是个模板方法,子类AbstractNioChannel重写了它。
java
transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java
@Override
protected void doRegister() throws Exception {
boolean selected = false;
// 为什么要自旋?可能是担心会注册失败,所以在循环中方便重试一次。
for (;;) {
try {
/*
* 拿到Java层面的channel(ServerSocketChannel),注册到java层面的selector上,
* 并把netty中的channel作为附件挂在jdk层面的selector上,后续事件达到selector时,
* 可以拿出netty层面的channel处理
* 注意,这里注册的事件代码是0,表示还不关心任何事件,只是建立绑定关系。
* 设置的附件是当前通道对象,在事件循环中会使用该通道对象的Unsafe对象来处理事件。
*
* 后续在通道Active的时候,会注册具体感兴趣的事件。
*/
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
// 执行一次事件选择
eventLoop().selectNow();
// 再试一次注册,
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
// 第二次还是失败的话则抛出异常
throw e;
}
}
}
}
通过调用底层JDK的方法将通道注册到选择器中,注意这里将当前通道对象设置成了附件对象。后续在事件循环中,会通过该通道对象的Unsafe对象来处理就绪事件。参考Netty中的NioEventLoop一文。