NioEventLoop是Netty中最常使用的一种事件循环实现,在NioEventLoopGroup的创建过程中就会创建该类型的事件循环。本文分析一下创建过程中做了哪些操作,以及最重要的事件循环是怎样的。
创建过程
在介绍构造方法之前,先来熟悉一下NioEventLoop的继承体系。可以发现EventLoop本身就是一个EventLoopGroup,即只有1个线程的特殊事件循环组。在EventLoopGroup将EventLoop称为child,而在EventLoop中将当前事件循环对象所归属的EventLoopGroup称为parent。
下面就顺着继承体系来分析NioEventLoop的创建过程。
NioEventLoop
/*
* 在NioEventLoopGroup的newChild方法中国创建该类型实例时会调用该方法。
*/
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
/*
* 先调用父类的构造器
* 这里会创建任务队列:newTaskQueue
* 下面还会创建一个jdk层面的selector
* nioEventLoop、MPSC队列、selector三者都是一对一关系
* NioEventLoopGroup中的线程选择器的作用正是为请求选择一个NioEventLoop,并绑定到对应的selector上
*/
super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler);
// 在Linux系统上,这里的provider是JDK中的EPollSelectorProvider
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
// 创建一个selector,是对底层JDK的Selector的封装
final SelectorTuple selectorTuple = openSelector();
// 将两个selector绑定在事件循环上,内部的事件循环逻辑中会用到这两个selector
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
在调用父类的构造方法之前,会先创建任务队列。
创建任务队列
private static Queue<Runnable> newTaskQueue(
EventLoopTaskQueueFactory queueFactory) {
// 没有传入队列工厂
if (queueFactory == null) {
// 使用默认方式创建
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
}
// 使用队列工厂来创建队列
return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
// This event loop never calls takeTask()
/*
* 这里创建的是高性能MPSC队列,即多生产者单消费者队列。之所以说高效,是因为使用了JCTools这个包
* 单消费者指的是nioEventLoop所在线程
* 多消费者指的是多个业务线程
*
* 这里maxPendingTasks默认值就是Integer.MAX_VALUE,所以条件满足。
*/
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
// 默认值是最大的int值
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
注意在NioEventLoop中,有两个newTaskQueue方法,一个静态方法一个实例方法,上面构造方法中调用的是静态方法。如果构造方法中传入的队列工厂是null,则使用默认方式创建(调用newTaskQueue0方法)。
DEFAULT_MAX_PENDING_TASKS这个属性是定义在父类SingleThreadEventLoop中的,默认值就是最大的整型值。所以默认会调用那个无参的方法来创建队列。
public static <T> Queue<T> newMpscQueue() {
return Mpsc.newMpscQueue();
}
public static <T> Queue<T> newMpscQueue(final int maxCapacity) {
return Mpsc.newMpscQueue(maxCapacity);
}
这两个方法又会调用到内部类Mpsc中的静态方法。
private static final int MPSC_CHUNK_SIZE = 1024;
private static final int MIN_MAX_MPSC_CAPACITY = MPSC_CHUNK_SIZE * 2;
// 这里是 1 << 30
private static final int MAX_ALLOWED_MPSC_CAPACITY = Pow2.MAX_POW2;
private static final class Mpsc {
private static final boolean USE_MPSC_CHUNKED_ARRAY_QUEUE;
private Mpsc() {
}
static {
Object unsafe = null;
if (hasUnsafe()) {
// jctools goes through its own process of initializing unsafe; of
// course, this requires permissions which might not be granted to calling code, so we
// must mark this block as privileged too
unsafe = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
// force JCTools to initialize unsafe
return UnsafeAccess.UNSAFE;
}
});
}
if (unsafe == null) {
logger.debug("org.jctools-core.MpscChunkedArrayQueue: unavailable");
USE_MPSC_CHUNKED_ARRAY_QUEUE = false;
} else {
logger.debug("org.jctools-core.MpscChunkedArrayQueue: available");
USE_MPSC_CHUNKED_ARRAY_QUEUE = true;
}
}
static <T> Queue<T> newMpscQueue(final int maxCapacity) {
// Calculate the max capacity which can not be bigger than MAX_ALLOWED_MPSC_CAPACITY.
// This is forced by the MpscChunkedArrayQueue implementation as will try to round it
// up to the next power of two and so will overflow otherwise.
final int capacity = max(
// 使用的容量必须在允许的范围内,MAX_ALLOWED_MPSC_CAPACITY是jctools中的值,是1 << 30;
min(maxCapacity, MAX_ALLOWED_MPSC_CAPACITY),
// 而且必须比最小的容量大
MIN_MAX_MPSC_CAPACITY);
return newChunkedMpscQueue(MPSC_CHUNK_SIZE, capacity);
}
static <T> Queue<T> newChunkedMpscQueue(final int chunkSize, final int capacity) {
return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscChunkedArrayQueue<T>(chunkSize, capacity)
: new MpscChunkedAtomicArrayQueue<T>(chunkSize, capacity);
}
static <T> Queue<T> newMpscQueue() {
return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscUnboundedArrayQueue<T>(MPSC_CHUNK_SIZE)
: new MpscUnboundedAtomicArrayQueue<T>(MPSC_CHUNK_SIZE);
}
}
创建I/O通道选择器
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// 在Linux系统中,这里返回的是EPollSelectorImpl
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
// 该属性默认是false
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
// 判断是否存在SelectorImpl类
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
// 加载SelectorImpl类
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
// 判断sun.nio.ch.SelectorImpl是否是不存在(因为可能是异常对象,上面的doPrivileged失败返回了Throwable,所以不是Class),
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
// 或者不是上面通过openSelector()得到的Selector的父类型。
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
// 如果不存在sun.nio.ch.SelectorImpl,则记录日志
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
// 不使用Netty封装的selector对象
return new SelectorTuple(unwrappedSelector);
}
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
// SelectedSelectionKeySet继承了AbstractSet,相比HashSet效率更高
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
// 反射获取下面两个字段
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
// 如果底层的JDK是版本9及以上,而且JDK中存在Unsafe类。
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
// This allows us to also do this in Java9+ without any extra flags.
// 下面在获取两个字段在类中的偏移
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
// 如果这两个字段存在(也就是具有合适的偏移),
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
// 则为这两个字段赋值
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
// We could not retrieve the offset, lets try reflection as last-resort.
}
// 如果是JDK9以下或者不存在Unsafe类,则通过反射来修改字段
// 设置两个属性的反射修改权限
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
/*
* 通过反射将selectedKeys和selector进行绑定,用netty中的属性替换掉jdk中的这两个属性
* 当然netty是想做优化。默认的是HashSet,Netty中的SelectedSelectionKeySet直接继承了AbstractSet,效率更高。
*/
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
// 不使用Netty封装的选择器
return new SelectorTuple(unwrappedSelector);
}
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
// 将JDK层面的Selector和Netty中的Selector建立元组,其实后者也实现了JDK层面的Selector接口
return new SelectorTuple(unwrappedSelector,
// 创建Netty实现的选择器
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
private static final class SelectorTuple {
// 表示JDK层面实现的Selector
final Selector unwrappedSelector;
// 表示Netty中实现的Selector
final Selector selector;
SelectorTuple(Selector unwrappedSelector) {
this.unwrappedSelector = unwrappedSelector;
// 没有使用Netty封装的选择器
this.selector = unwrappedSelector;
}
SelectorTuple(Selector unwrappedSelector, Selector selector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = selector;
}
}
该方法较长,主要是在创建Selector对象后,想替换SelectorImpl类中定义的selectedKeys和publicSelectedKeys这两个字段,Netty中使用了更高效的实现。当然如果JDK中存在SelectorImpl类,而且是创建的选择器对象的父类,那么才进行字段替换。 在替换方式上,有两种情况:
- JDK版本大于等于9,而且存在Unsafe类,那么通过Unsafe来进行字段替换;
- 否则通过普通的反射操作来替换;
最后将选择器对象封装为SelectorTuple对象(选择器组)并返回。选择器组中封装了JDK层面的选择器,和Netty中的选择器,如果上面的替换操作成功执行,则会创建SelectedSelectionKeySetSelector这个Netty实现的选择器;如果不需要替换或者替换失败,则不创建Netty实现的选择器。
接下来再重点看看这个Selector是怎么被创建的,在Netty中的NioEventLoopGroup一文中,介绍到,在Linux系统中,SelectorProvider类的provider方法返回的是EPollSelectorProvider。所以这里看看该类是怎么创建选择器对象的。
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
可以看到,创建的是EPollSelectorImpl类型的对象。
SingleThreadEventLoop
该类中的构造器方法也很多,NioEventLoop调用的是下面这个。
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}
该方法中基本上没有做额外的操作,直接看父类的构造方法。
SingleThreadEventExecutor
static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
// 默认为最大的整型值
SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
// 默认为最大的整型值
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
// 封装执行器
this.executor = ThreadExecutorMap.apply(executor, this);
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
注意,在该类中定义了属性DEFAULT_MAX_PENDING_EXECUTOR_TASKS,子类SingleThreadEventLoop中定义的是DEFAULT_MAX_PENDING_TASKS,虽然两者默认值都是最大整型值,但是不要搞混了。
封装执行器
在上面的构造器中通过ThreadExecutorMap类来封装执行器。
private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();
private static void setCurrentEventExecutor(EventExecutor executor) {
mappings.set(executor);
}
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
/*
* 在执行器外面封装一层,为什么要封装一层?有什么意义?
* 目的是为了实现对所要执行的任务进行封装。
*/
return new Executor() {
@Override
public void execute(final Runnable command) {
executor.execute(
// 对任务进行封装,在任务执行前,将执行器设置到线程本地存储中。
apply(command, eventExecutor));
}
};
}
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Runnable() {
@Override
public void run() {
// 在任务执行前将事件执行器设置到线程本地存储中
setCurrentEventExecutor(eventExecutor);
try {
// 执行任务,这样在任务执行过程中就可以通过线程本地存储获取到执行器。
command.run();
} finally {
// 任务执行完后,清空设置到线程本地存储中的事件执行器
setCurrentEventExecutor(null);
}
}
};
}
AbstractScheduledEventExecutor
protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
super(parent);
}
AbstractEventExecutor
private final EventExecutorGroup parent;
protected AbstractEventExecutor(EventExecutorGroup parent) {
this.parent = parent;
}
注册通道
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
ObjectUtil.checkNotNull(ch, "ch");
if (interestOps == 0) {
throw new IllegalArgumentException("interestOps must be non-zero.");
}
// 检查感兴趣的是否是否为有效值
if ((interestOps & ~ch.validOps()) != 0) {
throw new IllegalArgumentException(
"invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
}
ObjectUtil.checkNotNull(task, "task");
if (isShutdown()) {
throw new IllegalStateException("event loop shut down");
}
if (inEventLoop()) {
// 将通道注册到选择器上
register0(ch, interestOps, task);
} else {
try {
// Offload to the EventLoop as otherwise java.nio.channels.spi.AbstractSelectableChannel.register
// may block for a long time while trying to obtain an internal lock that may be hold while selecting.
// 如果不在事件循环中,则提交一个任务
submit(new Runnable() {
@Override
public void run() {
// 在事件循环中进行注册
register0(ch, interestOps, task);
}
}).sync();
} catch (InterruptedException ignore) {
// Even if interrupted we did schedule it so just mark the Thread as interrupted.
Thread.currentThread().interrupt();
}
}
}
private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
try {
ch.register(unwrappedSelector, interestOps, task);
} catch (Exception e) {
throw new EventLoopException("failed to register a channel", e);
}
}
在事件循环中,检测到通道的对应事件发生,则该NioTask会被执行。
注意这里的attachment是NioTask,而不是AbstractNioChannel。一般服务端和客户端的socket的通道不会在这里进行注册。
事件循环
进入事件循环
在介绍事件循环之前,先介绍一下是怎么进入到事件循环中的。在SingleThreadEventExecutor中,定义了execute方法,可以网事件循环中添加异步任务。
@Override
public void execute(Runnable task) {
execute0(task);
}
private void execute0(@Schedule Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
private void execute(Runnable task, boolean immediate) {
/*
* 因为execute(Runnable task)方法是public的,可能被用户代码所调用,
* 所以这里再次判断一下当前线程是否是reactor线程
*/
boolean inEventLoop = inEventLoop();
// 添加任务
addTask(task);
// 如果不在事件循环中
if (!inEventLoop) {
// 如果还没创建reactor线程,则创建并启动
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
// reactor线程可能被阻塞,所以需要唤醒线程
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
在私有的execute方法中,如果发现当前不在事件循环中,则会通过startThread方法来创建并启动线程。
private void startThread() {
// 只有没启动才创建线程
if (state == ST_NOT_STARTED) {
// 如果状态为未启动,那么启动线程并设置状态为已启动
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
// 真正地线程创建
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
这里通过CAS来修改线程启动状态,避免启动多个线程。 真正触发创建线程是在doStartThread方法中。
protected abstract void run();
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
/*
* 将线程绑定到NioEventLoop,这里创建过一次线程后,后续NioEventLoop每次execute执行任务的时候就不会再创建了
* 虽然这里executor是ThreadPerTaskExecutor,每次execute都会new一个线程。
* 因为后续NioEventLoop上execute任务,是往任务队列里面添加runnable实例(也就是任务),然后唤醒线程
* 这个线程在阻塞、唤醒、执行任务之间轮流变换状态
*/
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
/*
* 这里执行的run方法是Reactor模型的核心,SingleThreadEventExecutor的子类NioEventLoop实现了run方法。
*/
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
}
}
});
}
在该方法中,往执行器中提交了一个任务,该任务中会调用到该类中的模板方法run,而子类NioEventLoop中实现了该方法,而且这个方法就是Netty中的事件循环的核心。
回顾上面的创建过程可知,这里的执行器是构造方法传进来的,一直追溯可知该执行器是在NioEventLoopGroup的创建时,由用户传递进来的,或者使用默认的ThreadPerTaskExecutor。对于默认情况,其会在提交任务的时候通过线程工厂来创建并启动线程。
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
由于上面startThread方法中判断了线程启动状态,只有没启动才会启动线程,所以对于每个NioEventLoop而言,doStartThread只会被执行一次,确保了只有一个线程被创建和启动。
事件循环主体
// 事件循环的核心所在
@Override
protected void run() {
/*
* 该变量记录了select操作的次数,用于检测JDK的selector.select()方法是否发生了bug
* 该bug是指其内部无法阻塞线程,而导致线程空转,CPU达到100%使用率
* 如果发生了bug,那么selectCnt会增长得十分快
*/
int selectCnt = 0;
/*
* 不断地从任务队列中获取任务来执行
* 这就是Netty中的事件循环。
*/
for (;;) {
try {
int strategy;
try {
/*
* 获取select策略,对于DefaultSelectStrategy而言,
* 如果没有任务会返回SelectStrategy.SELECT;
* 如果有任务会执行supplier的get方法,在这里是selectNowSupplier,
* 它的get方法会执行selectNow方法,返回的就是就绪任务数量。
* 这个数量肯定是会大于0的,所以SelectStrategy中定义的枚举常量都是负数,是为了区分。
*/
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
// 如果队列中没有任务
if (curDeadlineNanos == -1L) {
// 则允许select一直阻塞
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
// 如果没有任务
if (!hasTasks()) {
// 执行一次事件轮询
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
// 增加一次select次数
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
// 存在就绪事件
if (strategy > 0) {
// 处理IO事件
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
// 无时间限制地执行所有任务
ranTasks = runAllTasks();
}
} else if (strategy > 0) { // 存在就绪事件
final long ioStartTime = System.nanoTime();
try {
// 处理IO事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
// 执行任务,先计算允许的能执行任务最久的时间
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
// 尽量少的执行任务
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
// 如果执行了任务或者处理了IO事件
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
// 重置selectCnt标志
selectCnt = 0;
}
/*
* 如果发生了异常,重新构建selector
* (创建一个新的selector,并将旧的selector上注册的channel转移到新的selector上),
* 至于为什么能够解决空轮询的bug,应该详细了解一些bug的原因。
*
* 并重置selectCnt为0
*/
else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
上面的方法就是Netty中NioEventLoop事件循环的主体,主要可以分为两部分内容:
- 执行I/O事件;
- 执行异步任务
执行I/O事件
选择I/O事件
在事件循环主题中,首先会获取选择策略,下面是默认的选择策略(DefaultSelectStrategy)实现。
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
// 根据队列中是否有任务来判断进行不同的操作
// selectSupplier.get()中会进行selector.selectNow()
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
// 执行select操作
return selectNow();
}
};
传入的IntSupplier是NioEventLoop中的属性selectNowSupplier。如果有无任务执行,都会执行底层的I/O事件选择操作。只不过一个是selectNow方法,一个是select方法。
int selectNow() throws IOException {
// 执行底层的select操作,立即返回不会阻塞
return selector.selectNow();
}
private int select(long deadlineNanos) throws IOException {
// 如果没有任务,则调用select进入阻塞状态
if (deadlineNanos == NONE) {
// 执行底层的select操作,不会阻塞
return selector.select();
}
// Timeout will only be 0 if deadline is within 5 microsecs
// 定时任务截止时间快到了(5毫秒内),中断本次轮询
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
// 根据超时时间是否有效来选择进行阻塞和非阻塞实现
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
处理I/O事件
private void processSelectedKeys() {
/*
* 这里的优化逻辑请参考openSelector()方法,也就是成功替换掉SelectorImpl中的那两个字段的情况。
*/
if (selectedKeys != null) {
// 重点关注processSelectedKeysOptimized()方法
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
主要关注processSelectedKeysOptimized方法这种情况。
private void processSelectedKeysOptimized() {
// 遍历io事件,可以发现优化后使用数组的方式用于遍历可以实现更高效率
for (int i = 0; i < selectedKeys.size; ++i) {
// 取出io事件和对应的channel
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
/*
* 这里为什么要置为null?
* 直接原因:
* 避免数组过长,导致数组尾部的元素无法被回收,从而导致内存泄漏
*/
selectedKeys.keys[i] = null;
final Object a = k.attachment();
// 如果注册的附件是netty的Channel
if (a instanceof AbstractNioChannel) {
// 处理该channel
processSelectedKey(k, (AbstractNioChannel) a);
} else { // 如果附件是NioTask(暂时不用分析)
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
/*
* 判断是否需要再进行一次轮询,参考cancel(SelectedKey)方法
* 也就是说每达到256次的连接断开,就要重新清理一下selectedKeys,其实也就是批量删除
*
*/
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
上面的方法中在遍历选择键,并取出附件,包含两种情况:
- IO通道;
- 非阻塞任务;
处理IO通道
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}
// 处理IO事件
try {
// 获取就绪的事件类型
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
unsafe.forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
/*
* 如果是连接事件
* 这里如果readyOps是0,那么仍然处理,以解决JDK的bug。
*/
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 服务端这里unsafe对应的是NioMessageUnsafe
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
这里会根据就绪的事件类型来做不同的操作,比如是ACCEPT或READ事件时,会调用unsafe类的read方法。
处理NioEvent
private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
int state = 0;
try {
// 执行任务的回调方法
task.channelReady(k.channel(), k);
state = 1;
} catch (Exception e) {
k.cancel();
invokeChannelUnregistered(task, k, e);
state = 2;
} finally {
switch (state) {
case 0:
k.cancel();
invokeChannelUnregistered(task, k, null);
break;
case 1:
if (!k.isValid()) { // Cancelled by channelReady()
invokeChannelUnregistered(task, k, null);
}
break;
default:
break;
}
}
}
这里会执行到NioTask的channelReady这个回调方法。
执行异步任务
在NioEventLoop的事件循环中,有3种执行异步任务的情况:
- runAllTasks():没有指定参数,不考虑时间;
- runAllTasks(timeout):只能执行指定时长这么久;
- runAllTasks(0):最多执行64个任务;
第二种和第三种情况共用一个方法,所以是两个重载方法。
从调度队列中获取任务
不管是限时还是不限时执行任务的情况,都会从调度队列中获取调度任务。所以先来看一下是怎么获取的。
private boolean fetchFromScheduledTaskQueue() {
// 如果任务调度队列中没有任务,直接返回
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return true;
}
// 获取当前时间
long nanoTime = getCurrentTimeNanos();
for (;;) {
// 从任务调度队列中取任务
Runnable scheduledTask = pollScheduledTask(nanoTime);
// 如果任务调度队列中没有任务了,直接返回
if (scheduledTask == null) {
return true;
}
// 将周期性调度任务放入taskQueue中
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
// 添加到taskQueue失败,则再次放回调度队列中
scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
}
}
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
// 取出任务
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
// 如果没有任务,或者还没到任务执行的时间,
if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
// 则返回null
return null;
}
// 将任务从调度队列中移除
scheduledTaskQueue.remove();
scheduledTask.setConsumed();
// 返回任务
return scheduledTask;
}
在该方法中,任务会从调度队列(scheduledTaskQueue)转移到普通队列(taskQueue)中。
这里先不管任务是怎么被添加到调度队列中的,后面单独写文章分析调度任务。
不限时执行
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
/*
* 只要优先级队列中还存在任务,就一直循环,不考虑时间。
*/
do {
// 从任务调度队列中取任务到taskQueue中
fetchedAll = fetchFromScheduledTaskQueue();
// 执行taskQueue中的任务
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
// 记录本次执行任务的时间
lastExecutionTime = getCurrentTimeNanos();
}
// 执行延迟队列中的任务
afterRunningAllTasks();
return ranAtLeastOne;
}
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
// 从延迟队列中取任务
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
// 执行任务
safeExecute(task);
// 取任务
task = pollTaskFrom(taskQueue);
// 队列中的任务被执行完了,退出循环
if (task == null) {
return true;
}
}
}
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
for (;;) {
// 取出任务
Runnable task = taskQueue.poll();
if (task != WAKEUP_TASK) {
// 返回任务
return task;
}
}
}
protected static void safeExecute(Runnable task) {
try {
runTask(task);
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
protected static void runTask(@Execute Runnable task) {
// 调用任务对象的run方法来执行任务
task.run();
}
在无参的runAllTasks方法中,确实没有对其进行时间限制,只要有任务,就一直执行。
限时执行
protected boolean runAllTasks(long timeoutNanos) {
// 从调度队列中获取调度任务到taskQueue中
fetchFromScheduledTaskQueue();
// 获取任务
Runnable task = pollTask();
// 如果没有任务
if (task == null) {
// 执行延迟队列中的任务
afterRunningAllTasks();
return false;
}
// 计算应该停止执行任务的时间
final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
// 记录执行了的任务数量
long runTasks = 0;
long lastExecutionTime;
// 循环执行任务
for (;;) {
// 执行任务
safeExecute(task);
// 已执行的任务数量加1
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
// 每执行64个任务,进入if块判断一下
if ((runTasks & 0x3F) == 0) {
// 获取当前时间
lastExecutionTime = getCurrentTimeNanos();
// 如果已经超过了应该停止的时间了
if (lastExecutionTime >= deadline) {
// 则退出循环,结束运行
break;
}
}
// 再次从队列中取任务
task = pollTask();
// 如果队列中的任务被执行完了
if (task == null) {
// 获取当前时间
lastExecutionTime = getCurrentTimeNanos();
break;
}
}
// 运行延迟队列中的任务
afterRunningAllTasks();
// 设置本次运行结束后的时间
this.lastExecutionTime = lastExecutionTime;
return true;
}
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
// 从延迟队列中取任务
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
// 执行任务
safeExecute(task);
// 取任务
task = pollTaskFrom(taskQueue);
// 队列中的任务被执行完了,退出循环
if (task == null) {
return true;
}
}
}
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
for (;;) {
// 取出任务
Runnable task = taskQueue.poll();
if (task != WAKEUP_TASK) {
// 返回任务
return task;
}
}
}
protected static void safeExecute(Runnable task) {
try {
runTask(task);
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
protected static void runTask(@Execute Runnable task) {
// 调用任务对象的run方法来执行任务
task.run();
}
在有参的runAllTasks方法中,每执行64个任务会判断一下是否到了截止时间,如果到了则停止运行任务。如果参数是0,那么只要进行一次判断,就会立即退出,所以这也是为什么说参数是0的话,只能执行64个任务。另外,如果没有任务也会退出。
任务执行后置操作
不管是限不限时,在执行了普通队列taskQueue中的任务后,都会调用afterRunningAllTasks方法,这是一个空方法,子类SingleThreadEventLoop中重写了该方法。
@Override
protected void afterRunningAllTasks() {
// 执行延迟队列中的任务
runAllTasksFrom(tailTasks);
}
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
// 从延迟队列中取任务
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
// 执行任务
safeExecute(task);
// 取任务
task = pollTaskFrom(taskQueue);
// 队列中的任务被执行完了,退出循环
if (task == null) {
return true;
}
}
}
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
for (;;) {
// 取出任务
Runnable task = taskQueue.poll();
if (task != WAKEUP_TASK) {
// 返回任务
return task;
}
}
}
在该方法中,不断从tailQueue任务队列中取出任务来执行。
总结
本文首先分析了NioEventLoop的继承体系,然后分析了其创建过程是怎样的,包括怎么创建的任务队列,怎么创建的通道选择器对象等。另外重点分析了事件循环,包括I/O事件是怎么处理的,又是怎么执行异步任务的。