Netty中的NioEventLoop

NioEventLoop是Netty中最常使用的一种事件循环实现,在NioEventLoopGroup的创建过程中就会创建该类型的事件循环。本文分析一下创建过程中做了哪些操作,以及最重要的事件循环是怎样的。


创建过程

在介绍构造方法之前,先来熟悉一下NioEventLoop的继承体系。可以发现EventLoop本身就是一个EventLoopGroup,即只有1个线程的特殊事件循环组。在EventLoopGroupEventLoop称为child,而在EventLoop中将当前事件循环对象所归属的EventLoopGroup称为parent。

下面就顺着继承体系来分析NioEventLoop的创建过程。

NioEventLoop

v4.1.83
java
transport/src/main/java/io/netty/channel/nio/NioEventLoop.java
/*
 * 在NioEventLoopGroup的newChild方法中国创建该类型实例时会调用该方法。
 */
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
             EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
    /*
     * 先调用父类的构造器
     * 这里会创建任务队列:newTaskQueue
     * 下面还会创建一个jdk层面的selector
     * nioEventLoop、MPSC队列、selector三者都是一对一关系
     * NioEventLoopGroup中的线程选择器的作用正是为请求选择一个NioEventLoop,并绑定到对应的selector上
     */
    super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
            rejectedExecutionHandler);
    // 在Linux系统上,这里的provider是JDK中的EPollSelectorProvider
    this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
    this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
    // 创建一个selector,是对底层JDK的Selector的封装
    final SelectorTuple selectorTuple = openSelector();
    // 将两个selector绑定在事件循环上,内部的事件循环逻辑中会用到这两个selector
    this.selector = selectorTuple.selector;
    this.unwrappedSelector = selectorTuple.unwrappedSelector;
}

在调用父类的构造方法之前,会先创建任务队列。

创建任务队列

v4.1.83
newTaskQueue
DEFAULT_MAX_PENDING_TASKS
<
>
java
transport/src/main/java/io/netty/channel/nio/NioEventLoop.java
private static Queue<Runnable> newTaskQueue(
        EventLoopTaskQueueFactory queueFactory) {
    // 没有传入队列工厂
    if (queueFactory == null) {
        // 使用默认方式创建
        return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
    }
    // 使用队列工厂来创建队列
    return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
    // This event loop never calls takeTask()
    /*
     * 这里创建的是高性能MPSC队列,即多生产者单消费者队列。之所以说高效,是因为使用了JCTools这个包
     * 单消费者指的是nioEventLoop所在线程
     * 多消费者指的是多个业务线程
     *
     * 这里maxPendingTasks默认值就是Integer.MAX_VALUE,所以条件满足。
     */
    return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
            : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
java
transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
        // 默认值是最大的int值
        SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));

注意在NioEventLoop中,有两个newTaskQueue方法,一个静态方法一个实例方法,上面构造方法中调用的是静态方法。如果构造方法中传入的队列工厂是null,则使用默认方式创建(调用newTaskQueue0方法)。

DEFAULT_MAX_PENDING_TASKS这个属性是定义在父类SingleThreadEventLoop中的,默认值就是最大的整型值。所以默认会调用那个无参的方法来创建队列。

v4.1.83
java
common/src/main/java/io/netty/util/internal/PlatformDependent.java
public static <T> Queue<T> newMpscQueue() {
    return Mpsc.newMpscQueue();
}
public static <T> Queue<T> newMpscQueue(final int maxCapacity) {
    return Mpsc.newMpscQueue(maxCapacity);
}

这两个方法又会调用到内部类Mpsc中的静态方法。

v4.1.83
java
common/src/main/java/io/netty/util/internal/PlatformDependent.java
private static final int MPSC_CHUNK_SIZE =  1024;
private static final int MIN_MAX_MPSC_CAPACITY =  MPSC_CHUNK_SIZE * 2;
// 这里是 1 << 30
private static final int MAX_ALLOWED_MPSC_CAPACITY = Pow2.MAX_POW2;
private static final class Mpsc {
    private static final boolean USE_MPSC_CHUNKED_ARRAY_QUEUE;

    private Mpsc() {
    }

    static {
        Object unsafe = null;
        if (hasUnsafe()) {
            // jctools goes through its own process of initializing unsafe; of
            // course, this requires permissions which might not be granted to calling code, so we
            // must mark this block as privileged too
            unsafe = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    // force JCTools to initialize unsafe
                    return UnsafeAccess.UNSAFE;
                }
            });
        }

        if (unsafe == null) {
            logger.debug("org.jctools-core.MpscChunkedArrayQueue: unavailable");
            USE_MPSC_CHUNKED_ARRAY_QUEUE = false;
        } else {
            logger.debug("org.jctools-core.MpscChunkedArrayQueue: available");
            USE_MPSC_CHUNKED_ARRAY_QUEUE = true;
        }
    }

    static <T> Queue<T> newMpscQueue(final int maxCapacity) {
        // Calculate the max capacity which can not be bigger than MAX_ALLOWED_MPSC_CAPACITY.
        // This is forced by the MpscChunkedArrayQueue implementation as will try to round it
        // up to the next power of two and so will overflow otherwise.
        final int capacity = max(
                // 使用的容量必须在允许的范围内,MAX_ALLOWED_MPSC_CAPACITY是jctools中的值,是1 << 30;
                min(maxCapacity, MAX_ALLOWED_MPSC_CAPACITY),
                // 而且必须比最小的容量大
                MIN_MAX_MPSC_CAPACITY);
        return newChunkedMpscQueue(MPSC_CHUNK_SIZE, capacity);
    }

    static <T> Queue<T> newChunkedMpscQueue(final int chunkSize, final int capacity) {
        return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscChunkedArrayQueue<T>(chunkSize, capacity)
                : new MpscChunkedAtomicArrayQueue<T>(chunkSize, capacity);
    }

    static <T> Queue<T> newMpscQueue() {
        return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscUnboundedArrayQueue<T>(MPSC_CHUNK_SIZE)
                                            : new MpscUnboundedAtomicArrayQueue<T>(MPSC_CHUNK_SIZE);
    }
}

创建I/O通道选择器

v4.1.83
openSelector
SelectorTuple
<
>
java
transport/src/main/java/io/netty/channel/nio/NioEventLoop.java
private SelectorTuple openSelector() {
    final Selector unwrappedSelector;
    try {
        // 在Linux系统中,这里返回的是EPollSelectorImpl
        unwrappedSelector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }

    // 该属性默认是false
    if (DISABLE_KEY_SET_OPTIMIZATION) {
        return new SelectorTuple(unwrappedSelector);
    }

    // 判断是否存在SelectorImpl类
    Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                // 加载SelectorImpl类
                return Class.forName(
                        "sun.nio.ch.SelectorImpl",
                        false,
                        PlatformDependent.getSystemClassLoader());
            } catch (Throwable cause) {
                return cause;
            }
        }
    });

    // 判断sun.nio.ch.SelectorImpl是否是不存在(因为可能是异常对象,上面的doPrivileged失败返回了Throwable,所以不是Class),
    if (!(maybeSelectorImplClass instanceof Class) ||
        // ensure the current selector implementation is what we can instrument.
            // 或者不是上面通过openSelector()得到的Selector的父类型。
        !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
        // 如果不存在sun.nio.ch.SelectorImpl,则记录日志
        if (maybeSelectorImplClass instanceof Throwable) {
            Throwable t = (Throwable) maybeSelectorImplClass;
            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
        }
        // 不使用Netty封装的selector对象
        return new SelectorTuple(unwrappedSelector);
    }

    final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
    // SelectedSelectionKeySet继承了AbstractSet,相比HashSet效率更高
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                // 反射获取下面两个字段
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                // 如果底层的JDK是版本9及以上,而且JDK中存在Unsafe类。
                if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                    // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
                    // This allows us to also do this in Java9+ without any extra flags.
                    // 下面在获取两个字段在类中的偏移
                    long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                    long publicSelectedKeysFieldOffset =
                            PlatformDependent.objectFieldOffset(publicSelectedKeysField);

                    // 如果这两个字段存在(也就是具有合适的偏移),
                    if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                        // 则为这两个字段赋值
                        PlatformDependent.putObject(
                                unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                        PlatformDependent.putObject(
                                unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                        return null;
                    }
                    // We could not retrieve the offset, lets try reflection as last-resort.
                }

                // 如果是JDK9以下或者不存在Unsafe类,则通过反射来修改字段

                // 设置两个属性的反射修改权限

                Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                if (cause != null) {
                    return cause;
                }
                cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                if (cause != null) {
                    return cause;
                }

                /*
                 * 通过反射将selectedKeys和selector进行绑定,用netty中的属性替换掉jdk中的这两个属性
                 * 当然netty是想做优化。默认的是HashSet,Netty中的SelectedSelectionKeySet直接继承了AbstractSet,效率更高。
                 */
                selectedKeysField.set(unwrappedSelector, selectedKeySet);
                publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                return null;
            } catch (NoSuchFieldException e) {
                return e;
            } catch (IllegalAccessException e) {
                return e;
            }
        }
    });

    if (maybeException instanceof Exception) {
        selectedKeys = null;
        Exception e = (Exception) maybeException;
        logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
        // 不使用Netty封装的选择器
        return new SelectorTuple(unwrappedSelector);
    }

    selectedKeys = selectedKeySet;
    logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
    // 将JDK层面的Selector和Netty中的Selector建立元组,其实后者也实现了JDK层面的Selector接口
    return new SelectorTuple(unwrappedSelector,
                             // 创建Netty实现的选择器
                             new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
java
transport/src/main/java/io/netty/channel/nio/NioEventLoop.java
private static final class SelectorTuple {
    // 表示JDK层面实现的Selector
    final Selector unwrappedSelector;
    // 表示Netty中实现的Selector
    final Selector selector;

    SelectorTuple(Selector unwrappedSelector) {
        this.unwrappedSelector = unwrappedSelector;
        // 没有使用Netty封装的选择器
        this.selector = unwrappedSelector;
    }

    SelectorTuple(Selector unwrappedSelector, Selector selector) {
        this.unwrappedSelector = unwrappedSelector;
        this.selector = selector;
    }
}

该方法较长,主要是在创建Selector对象后,想替换SelectorImpl类中定义的selectedKeyspublicSelectedKeys这两个字段,Netty中使用了更高效的实现。当然如果JDK中存在SelectorImpl类,而且是创建的选择器对象的父类,那么才进行字段替换。 在替换方式上,有两种情况:

  • JDK版本大于等于9,而且存在Unsafe类,那么通过Unsafe来进行字段替换;
  • 否则通过普通的反射操作来替换;

最后将选择器对象封装为SelectorTuple对象(选择器组)并返回。选择器组中封装了JDK层面的选择器,和Netty中的选择器,如果上面的替换操作成功执行,则会创建SelectedSelectionKeySetSelector这个Netty实现的选择器;如果不需要替换或者替换失败,则不创建Netty实现的选择器。

接下来再重点看看这个Selector是怎么被创建的,在Netty中的NioEventLoopGroup一文中,介绍到,在Linux系统中,SelectorProvider类的provider方法返回的是EPollSelectorProvider。所以这里看看该类是怎么创建选择器对象的。

java
jdk/src/solaris/classes/sun/nio/ch/EPollSelectorProvider.java
public AbstractSelector openSelector() throws IOException {
    return new EPollSelectorImpl(this);
}

可以看到,创建的是EPollSelectorImpl类型的对象。

SingleThreadEventLoop

该类中的构造器方法也很多,NioEventLoop调用的是下面这个。

v4.1.83
java
transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
                                RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
    tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}

该方法中基本上没有做额外的操作,直接看父类的构造方法。

SingleThreadEventExecutor

v4.1.83
java
common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java
static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
        // 默认为最大的整型值
        SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    // 默认为最大的整型值
    this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
    // 封装执行器
    this.executor = ThreadExecutorMap.apply(executor, this);
    this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
    this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

注意,在该类中定义了属性DEFAULT_MAX_PENDING_EXECUTOR_TASKS,子类SingleThreadEventLoop中定义的是DEFAULT_MAX_PENDING_TASKS,虽然两者默认值都是最大整型值,但是不要搞混了。

封装执行器

在上面的构造器中通过ThreadExecutorMap类来封装执行器。

v4.1.83
java
common/src/main/java/io/netty/util/internal/ThreadExecutorMap.java
private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();
private static void setCurrentEventExecutor(EventExecutor executor) {
    mappings.set(executor);
}
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
    ObjectUtil.checkNotNull(executor, "executor");
    ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
    /*
     * 在执行器外面封装一层,为什么要封装一层?有什么意义?
     * 目的是为了实现对所要执行的任务进行封装。
     */
    return new Executor() {
        @Override
        public void execute(final Runnable command) {
            executor.execute(
                    // 对任务进行封装,在任务执行前,将执行器设置到线程本地存储中。
                    apply(command, eventExecutor));
        }
    };
}
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
    ObjectUtil.checkNotNull(command, "command");
    ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
    return new Runnable() {
        @Override
        public void run() {
            // 在任务执行前将事件执行器设置到线程本地存储中
            setCurrentEventExecutor(eventExecutor);
            try {
                // 执行任务,这样在任务执行过程中就可以通过线程本地存储获取到执行器。
                command.run();
            } finally {
                // 任务执行完后,清空设置到线程本地存储中的事件执行器
                setCurrentEventExecutor(null);
            }
        }
    };
}

AbstractScheduledEventExecutor

v4.1.83
java
common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java
protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
    super(parent);
}

AbstractEventExecutor

v4.1.83
java
common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java
private final EventExecutorGroup parent;
protected AbstractEventExecutor(EventExecutorGroup parent) {
    this.parent = parent;
}

注册通道

v4.1.83
java
transport/src/main/java/io/netty/channel/nio/NioEventLoop.java
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
    ObjectUtil.checkNotNull(ch, "ch");
    if (interestOps == 0) {
        throw new IllegalArgumentException("interestOps must be non-zero.");
    }
    // 检查感兴趣的是否是否为有效值
    if ((interestOps & ~ch.validOps()) != 0) {
        throw new IllegalArgumentException(
                "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
    }
    ObjectUtil.checkNotNull(task, "task");

    if (isShutdown()) {
        throw new IllegalStateException("event loop shut down");
    }

    if (inEventLoop()) {
        // 将通道注册到选择器上
        register0(ch, interestOps, task);
    } else {
        try {
            // Offload to the EventLoop as otherwise java.nio.channels.spi.AbstractSelectableChannel.register
            // may block for a long time while trying to obtain an internal lock that may be hold while selecting.
            // 如果不在事件循环中,则提交一个任务
            submit(new Runnable() {
                @Override
                public void run() {
                    // 在事件循环中进行注册
                    register0(ch, interestOps, task);
                }
            }).sync();
        } catch (InterruptedException ignore) {
            // Even if interrupted we did schedule it so just mark the Thread as interrupted.
            Thread.currentThread().interrupt();
        }
    }
}
private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
    try {
        ch.register(unwrappedSelector, interestOps, task);
    } catch (Exception e) {
        throw new EventLoopException("failed to register a channel", e);
    }
}

在事件循环中,检测到通道的对应事件发生,则该NioTask会被执行。

注意这里的attachment是NioTask,而不是AbstractNioChannel。一般服务端和客户端的socket的通道不会在这里进行注册。

事件循环

进入事件循环

在介绍事件循环之前,先介绍一下是怎么进入到事件循环中的。在SingleThreadEventExecutor中,定义了execute方法,可以网事件循环中添加异步任务。

v4.1.83
java
common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java
@Override
public void execute(Runnable task) {
    execute0(task);
}
private void execute0(@Schedule Runnable task) {
    ObjectUtil.checkNotNull(task, "task");
    execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
private void execute(Runnable task, boolean immediate) {
    /*
     * 因为execute(Runnable task)方法是public的,可能被用户代码所调用,
     * 所以这里再次判断一下当前线程是否是reactor线程
     */
    boolean inEventLoop = inEventLoop();
    // 添加任务
    addTask(task);
    // 如果不在事件循环中
    if (!inEventLoop) {
        // 如果还没创建reactor线程,则创建并启动
        startThread();
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
                // The task queue does not support removal so the best thing we can do is to just move on and
                // hope we will be able to pick-up the task before its completely terminated.
                // In worst case we will log on termination.
            }
            if (reject) {
                reject();
            }
        }
    }

    // reactor线程可能被阻塞,所以需要唤醒线程
    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}

在私有的execute方法中,如果发现当前不在事件循环中,则会通过startThread方法来创建并启动线程。

v4.1.83
java
common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java
private void startThread() {
    // 只有没启动才创建线程
    if (state == ST_NOT_STARTED) {
        // 如果状态为未启动,那么启动线程并设置状态为已启动
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            boolean success = false;
            try {
                // 真正地线程创建
                doStartThread();
                success = true;
            } finally {
                if (!success) {
                    STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                }
            }
        }
    }
}

这里通过CAS来修改线程启动状态,避免启动多个线程。 真正触发创建线程是在doStartThread方法中。

v4.1.83
java
common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java
protected abstract void run();
private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() {
        @Override
        public void run() {
            /*
             * 将线程绑定到NioEventLoop,这里创建过一次线程后,后续NioEventLoop每次execute执行任务的时候就不会再创建了
             * 虽然这里executor是ThreadPerTaskExecutor,每次execute都会new一个线程。
             * 因为后续NioEventLoop上execute任务,是往任务队列里面添加runnable实例(也就是任务),然后唤醒线程
             * 这个线程在阻塞、唤醒、执行任务之间轮流变换状态
             */
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                /*
                 * 这里执行的run方法是Reactor模型的核心,SingleThreadEventExecutor的子类NioEventLoop实现了run方法。
                 */
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
            }
        }
    });
}

在该方法中,往执行器中提交了一个任务,该任务中会调用到该类中的模板方法run,而子类NioEventLoop中实现了该方法,而且这个方法就是Netty中的事件循环的核心。

回顾上面的创建过程可知,这里的执行器是构造方法传进来的,一直追溯可知该执行器是在NioEventLoopGroup的创建时,由用户传递进来的,或者使用默认的ThreadPerTaskExecutor。对于默认情况,其会在提交任务的时候通过线程工厂来创建并启动线程。

v4.1.83
java
common/src/main/java/io/netty/util/concurrent/ThreadPerTaskExecutor.java
public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

由于上面startThread方法中判断了线程启动状态,只有没启动才会启动线程,所以对于每个NioEventLoop而言,doStartThread只会被执行一次,确保了只有一个线程被创建和启动。

事件循环主体

v4.1.83
java
transport/src/main/java/io/netty/channel/nio/NioEventLoop.java
// 事件循环的核心所在
@Override
protected void run() {
    /*
     * 该变量记录了select操作的次数,用于检测JDK的selector.select()方法是否发生了bug
     * 该bug是指其内部无法阻塞线程,而导致线程空转,CPU达到100%使用率
     * 如果发生了bug,那么selectCnt会增长得十分快
     */
    int selectCnt = 0;
    /*
     * 不断地从任务队列中获取任务来执行
     * 这就是Netty中的事件循环。
     */
    for (;;) {
        try {
            int strategy;
            try {
                /*
                 * 获取select策略,对于DefaultSelectStrategy而言,
                 * 如果没有任务会返回SelectStrategy.SELECT;
                 * 如果有任务会执行supplier的get方法,在这里是selectNowSupplier,
                 *  它的get方法会执行selectNow方法,返回的就是就绪任务数量。
                 *  这个数量肯定是会大于0的,所以SelectStrategy中定义的枚举常量都是负数,是为了区分。
                 */
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                case SelectStrategy.CONTINUE:
                    continue;

                case SelectStrategy.BUSY_WAIT:
                    // fall-through to SELECT since the busy-wait is not supported with NIO

                case SelectStrategy.SELECT:
                    long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                    // 如果队列中没有任务
                    if (curDeadlineNanos == -1L) {
                        // 则允许select一直阻塞
                        curDeadlineNanos = NONE; // nothing on the calendar
                    }
                    nextWakeupNanos.set(curDeadlineNanos);
                    try {
                        // 如果没有任务
                        if (!hasTasks()) {
                            // 执行一次事件轮询
                            strategy = select(curDeadlineNanos);
                        }
                    } finally {
                        // This update is just to help block unnecessary selector wakeups
                        // so use of lazySet is ok (no race condition)
                        nextWakeupNanos.lazySet(AWAKE);
                    }
                    // fall through
                default:
                }
            } catch (IOException e) {
                // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                // the selector and retry. https://github.com/netty/netty/issues/8566
                rebuildSelector0();
                selectCnt = 0;
                handleLoopException(e);
                continue;
            }
            // 增加一次select次数
            selectCnt++;
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            boolean ranTasks;
            if (ioRatio == 100) {
                try {
                    // 存在就绪事件
                    if (strategy > 0) {
                        // 处理IO事件
                        processSelectedKeys();
                    }
                } finally {
                    // Ensure we always run tasks.
                    // 无时间限制地执行所有任务
                    ranTasks = runAllTasks();
                }
            } else if (strategy > 0) { // 存在就绪事件
                final long ioStartTime = System.nanoTime();
                try {
                    // 处理IO事件
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    // 执行任务,先计算允许的能执行任务最久的时间
                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            } else {
                // 尽量少的执行任务
                ranTasks = runAllTasks(0); // This will run the minimum number of tasks
            }

            // 如果执行了任务或者处理了IO事件
            if (ranTasks || strategy > 0) {
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
                // 重置selectCnt标志
                selectCnt = 0;
            }
            /*
             * 如果发生了异常,重新构建selector
             * (创建一个新的selector,并将旧的selector上注册的channel转移到新的selector上),
             * 至于为什么能够解决空轮询的bug,应该详细了解一些bug的原因。
             *
             * 并重置selectCnt为0
             */
            else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                selectCnt = 0;
            }
        } catch (CancelledKeyException e) {
            // Harmless exception - log anyway
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
        } catch (Error e) {
            throw e;
        } catch (Throwable t) {
            handleLoopException(t);
        } finally {
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Error e) {
                throw e;
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
}

上面的方法就是Netty中NioEventLoop事件循环的主体,主要可以分为两部分内容:

  • 执行I/O事件;
  • 执行异步任务

执行I/O事件

选择I/O事件

在事件循环主题中,首先会获取选择策略,下面是默认的选择策略(DefaultSelectStrategy)实现。

v4.1.83
calculateStrategy
selectNowSupplier
<
>
java
transport/src/main/java/io/netty/channel/DefaultSelectStrategy.java
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    // 根据队列中是否有任务来判断进行不同的操作
    // selectSupplier.get()中会进行selector.selectNow()
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
java
transport/src/main/java/io/netty/channel/nio/NioEventLoop.java
private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
        // 执行select操作
        return selectNow();
    }
};

传入的IntSupplierNioEventLoop中的属性selectNowSupplier。如果有无任务执行,都会执行底层的I/O事件选择操作。只不过一个是selectNow方法,一个是select方法。

v4.1.83
selectNow
select
<
>
java
transport/src/main/java/io/netty/channel/nio/NioEventLoop.java
int selectNow() throws IOException {
    // 执行底层的select操作,立即返回不会阻塞
    return selector.selectNow();
}
java
transport/src/main/java/io/netty/channel/nio/NioEventLoop.java
private int select(long deadlineNanos) throws IOException {
    // 如果没有任务,则调用select进入阻塞状态
    if (deadlineNanos == NONE) {
        // 执行底层的select操作,不会阻塞
        return selector.select();
    }
    // Timeout will only be 0 if deadline is within 5 microsecs
    // 定时任务截止时间快到了(5毫秒内),中断本次轮询
    long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
    // 根据超时时间是否有效来选择进行阻塞和非阻塞实现
    return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}

处理I/O事件

v4.1.83
java
transport/src/main/java/io/netty/channel/nio/NioEventLoop.java
private void processSelectedKeys() {
    /*
     * 这里的优化逻辑请参考openSelector()方法,也就是成功替换掉SelectorImpl中的那两个字段的情况。
     */
    if (selectedKeys != null) {
        // 重点关注processSelectedKeysOptimized()方法
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

主要关注processSelectedKeysOptimized方法这种情况。

v4.1.83
java
transport/src/main/java/io/netty/channel/nio/NioEventLoop.java
private void processSelectedKeysOptimized() {
    // 遍历io事件,可以发现优化后使用数组的方式用于遍历可以实现更高效率
    for (int i = 0; i < selectedKeys.size; ++i) {
        // 取出io事件和对应的channel
        final SelectionKey k = selectedKeys.keys[i];
        // null out entry in the array to allow to have it GC'ed once the Channel close
        // See https://github.com/netty/netty/issues/2363
        /*
         * 这里为什么要置为null?
         * 直接原因:
         *       避免数组过长,导致数组尾部的元素无法被回收,从而导致内存泄漏
         */
        selectedKeys.keys[i] = null;

        final Object a = k.attachment();

        // 如果注册的附件是netty的Channel
        if (a instanceof AbstractNioChannel) {
            // 处理该channel
            processSelectedKey(k, (AbstractNioChannel) a);
        } else { // 如果附件是NioTask(暂时不用分析)
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        /*
         * 判断是否需要再进行一次轮询,参考cancel(SelectedKey)方法
         * 也就是说每达到256次的连接断开,就要重新清理一下selectedKeys,其实也就是批量删除
         *
         */
        if (needsToSelectAgain) {
            // null out entries in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.reset(i + 1);

            selectAgain();
            i = -1;
        }
    }
}

上面的方法中在遍历选择键,并取出附件,包含两种情况:

  • IO通道;
  • 非阻塞任务;

处理IO通道

v4.1.83
java
transport/src/main/java/io/netty/channel/nio/NioEventLoop.java
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            // If the channel implementation throws an exception because there is no event loop, we ignore this
            // because we are only trying to determine if ch is registered to this event loop and thus has authority
            // to close ch.
            return;
        }
        // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
        // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
        // still healthy and should not be closed.
        // See https://github.com/netty/netty/issues/5125
        if (eventLoop == this) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
        }
        return;
    }

    // 处理IO事件
    try {
        // 获取就绪的事件类型
        int readyOps = k.readyOps();
        // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
        // the NIO JDK channel implementation may throw a NotYetConnectedException.
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
           unsafe.forceFlush();
        }

        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        /*
         * 如果是连接事件
         * 这里如果readyOps是0,那么仍然处理,以解决JDK的bug。
         */
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            // 服务端这里unsafe对应的是NioMessageUnsafe
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

这里会根据就绪的事件类型来做不同的操作,比如是ACCEPT或READ事件时,会调用unsafe类的read方法。

处理NioEvent

v4.1.83
java
transport/src/main/java/io/netty/channel/nio/NioEventLoop.java
private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
    int state = 0;
    try {
        // 执行任务的回调方法
        task.channelReady(k.channel(), k);
        state = 1;
    } catch (Exception e) {
        k.cancel();
        invokeChannelUnregistered(task, k, e);
        state = 2;
    } finally {
        switch (state) {
        case 0:
            k.cancel();
            invokeChannelUnregistered(task, k, null);
            break;
        case 1:
            if (!k.isValid()) { // Cancelled by channelReady()
                invokeChannelUnregistered(task, k, null);
            }
            break;
        default:
             break;
        }
    }
}

这里会执行到NioTaskchannelReady这个回调方法。

执行异步任务

NioEventLoop的事件循环中,有3种执行异步任务的情况:

  • runAllTasks():没有指定参数,不考虑时间;
  • runAllTasks(timeout):只能执行指定时长这么久;
  • runAllTasks(0):最多执行64个任务;

第二种和第三种情况共用一个方法,所以是两个重载方法。

从调度队列中获取任务

不管是限时还是不限时执行任务的情况,都会从调度队列中获取调度任务。所以先来看一下是怎么获取的。

v4.1.83
fetchFromScheduledTaskQueue
pollScheduledTask
<
>
java
common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java
private boolean fetchFromScheduledTaskQueue() {
    // 如果任务调度队列中没有任务,直接返回
    if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
        return true;
    }
    // 获取当前时间
    long nanoTime = getCurrentTimeNanos();
    for (;;) {
        // 从任务调度队列中取任务
        Runnable scheduledTask = pollScheduledTask(nanoTime);
        // 如果任务调度队列中没有任务了,直接返回
        if (scheduledTask == null) {
            return true;
        }
        // 将周期性调度任务放入taskQueue中
        if (!taskQueue.offer(scheduledTask)) {
            // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
            // 添加到taskQueue失败,则再次放回调度队列中
            scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
    }
}
java
common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java
protected final Runnable pollScheduledTask(long nanoTime) {
    assert inEventLoop();

    // 取出任务
    ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
    // 如果没有任务,或者还没到任务执行的时间,
    if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
        // 则返回null
        return null;
    }
    // 将任务从调度队列中移除
    scheduledTaskQueue.remove();
    scheduledTask.setConsumed();
    // 返回任务
    return scheduledTask;
}

在该方法中,任务会从调度队列(scheduledTaskQueue)转移到普通队列(taskQueue)中。

这里先不管任务是怎么被添加到调度队列中的,后面单独写文章分析调度任务。

不限时执行

v4.1.83
runAllTasks
safeExecute
<
>
java
common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java
protected boolean runAllTasks() {
    assert inEventLoop();
    boolean fetchedAll;
    boolean ranAtLeastOne = false;

    /*
     * 只要优先级队列中还存在任务,就一直循环,不考虑时间。
     */
    do {
        // 从任务调度队列中取任务到taskQueue中
        fetchedAll = fetchFromScheduledTaskQueue();
        // 执行taskQueue中的任务
        if (runAllTasksFrom(taskQueue)) {
            ranAtLeastOne = true;
        }
    } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

    if (ranAtLeastOne) {
        // 记录本次执行任务的时间
        lastExecutionTime = getCurrentTimeNanos();
    }
    // 执行延迟队列中的任务
    afterRunningAllTasks();
    return ranAtLeastOne;
}
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
    // 从延迟队列中取任务
    Runnable task = pollTaskFrom(taskQueue);
    if (task == null) {
        return false;
    }
    for (;;) {
        // 执行任务
        safeExecute(task);
        // 取任务
        task = pollTaskFrom(taskQueue);
        // 队列中的任务被执行完了,退出循环
        if (task == null) {
            return true;
        }
    }
}
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
    for (;;) {
        // 取出任务
        Runnable task = taskQueue.poll();
        if (task != WAKEUP_TASK) {
            // 返回任务
            return task;
        }
    }
}
java
common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java
protected static void safeExecute(Runnable task) {
    try {
        runTask(task);
    } catch (Throwable t) {
        logger.warn("A task raised an exception. Task: {}", task, t);
    }
}
protected static void runTask(@Execute Runnable task) {
    // 调用任务对象的run方法来执行任务
    task.run();
}

在无参的runAllTasks方法中,确实没有对其进行时间限制,只要有任务,就一直执行。

限时执行

v4.1.83
runAllTasks
safeExecute
<
>
java
common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java
    protected boolean runAllTasks(long timeoutNanos) {
        // 从调度队列中获取调度任务到taskQueue中
        fetchFromScheduledTaskQueue();
        // 获取任务
        Runnable task = pollTask();
        // 如果没有任务
        if (task == null) {
            // 执行延迟队列中的任务
            afterRunningAllTasks();
            return false;
        }

        // 计算应该停止执行任务的时间
        final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
        // 记录执行了的任务数量
        long runTasks = 0;
        long lastExecutionTime;
        // 循环执行任务
        for (;;) {
            // 执行任务
            safeExecute(task);

            // 已执行的任务数量加1
            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            // 每执行64个任务,进入if块判断一下
            if ((runTasks & 0x3F) == 0) {
                // 获取当前时间
                lastExecutionTime = getCurrentTimeNanos();
                // 如果已经超过了应该停止的时间了
                if (lastExecutionTime >= deadline) {
                    // 则退出循环,结束运行
                    break;
                }
            }

            // 再次从队列中取任务
            task = pollTask();
            // 如果队列中的任务被执行完了
            if (task == null) {
                // 获取当前时间
                lastExecutionTime = getCurrentTimeNanos();
                break;
            }
        }

        // 运行延迟队列中的任务
        afterRunningAllTasks();
        // 设置本次运行结束后的时间
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
    // 从延迟队列中取任务
    Runnable task = pollTaskFrom(taskQueue);
    if (task == null) {
        return false;
    }
    for (;;) {
        // 执行任务
        safeExecute(task);
        // 取任务
        task = pollTaskFrom(taskQueue);
        // 队列中的任务被执行完了,退出循环
        if (task == null) {
            return true;
        }
    }
}
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
    for (;;) {
        // 取出任务
        Runnable task = taskQueue.poll();
        if (task != WAKEUP_TASK) {
            // 返回任务
            return task;
        }
    }
}
java
common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java
protected static void safeExecute(Runnable task) {
    try {
        runTask(task);
    } catch (Throwable t) {
        logger.warn("A task raised an exception. Task: {}", task, t);
    }
}
protected static void runTask(@Execute Runnable task) {
    // 调用任务对象的run方法来执行任务
    task.run();
}

在有参的runAllTasks方法中,每执行64个任务会判断一下是否到了截止时间,如果到了则停止运行任务。如果参数是0,那么只要进行一次判断,就会立即退出,所以这也是为什么说参数是0的话,只能执行64个任务。另外,如果没有任务也会退出。

任务执行后置操作

不管是限不限时,在执行了普通队列taskQueue中的任务后,都会调用afterRunningAllTasks方法,这是一个空方法,子类SingleThreadEventLoop中重写了该方法。

v4.1.83
afterRunningAllTasks
runAllTasksFrom
<
>
java
transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java
@Override
protected void afterRunningAllTasks() {
    // 执行延迟队列中的任务
    runAllTasksFrom(tailTasks);
}
java
common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
    // 从延迟队列中取任务
    Runnable task = pollTaskFrom(taskQueue);
    if (task == null) {
        return false;
    }
    for (;;) {
        // 执行任务
        safeExecute(task);
        // 取任务
        task = pollTaskFrom(taskQueue);
        // 队列中的任务被执行完了,退出循环
        if (task == null) {
            return true;
        }
    }
}
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
    for (;;) {
        // 取出任务
        Runnable task = taskQueue.poll();
        if (task != WAKEUP_TASK) {
            // 返回任务
            return task;
        }
    }
}

在该方法中,不断从tailQueue任务队列中取出任务来执行。

总结

本文首先分析了NioEventLoop的继承体系,然后分析了其创建过程是怎样的,包括怎么创建的任务队列,怎么创建的通道选择器对象等。另外重点分析了事件循环,包括I/O事件是怎么处理的,又是怎么执行异步任务的。