Netty中的NioEventLoopGroup

NioEventLoopGroup是Netty中最常被使用的一种事件循环组,本文分析一下该类型的对象是怎么创建的,以及在内部是怎么创建事件循环EventLoop的。


构造方法

在介绍构造方法之前,先熟悉一下NioEventLoopGroup的继承体系。

下面是NioEventLoopGroup及其两个父类中的两个构造方法,注意两个父类中一个是”Loop“,一个是”Executor“。

v4.1.83
NioEventLoopGroup
MultithreadEventLoopGroup
MultithreadEventExecutorGroup
<
>
java
transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java
/**
 * Create a new instance using the default number of threads, the default {@link ThreadFactory} and
 * the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
 */
public NioEventLoopGroup() {
    this(0);
}

/**
 * Create a new instance using the specified number of threads, {@link ThreadFactory} and the
 * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
 */
public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}

/**
 * Create a new instance using the default number of threads, the given {@link ThreadFactory} and the
 * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
 */
public NioEventLoopGroup(ThreadFactory threadFactory) {
    this(0, threadFactory, SelectorProvider.provider());
}

/**
 * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the
 * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
 */
/*
 * 在reactor-netty的DefaultLoopResources中调用的就是该构造方法。
 */
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
    /*
     * 这里通过JDK的SelectorProvider来获取provider,在Linux平台下默认是EPollSelectorProvider。
     */
    this(nThreads, threadFactory, SelectorProvider.provider());
}

public NioEventLoopGroup(int nThreads, Executor executor) {
    this(nThreads, executor, SelectorProvider.provider());
}

/**
 * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given
 * {@link SelectorProvider}.
 */
public NioEventLoopGroup(
        int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
    /*
     * 这里传入了默认的选择策略工厂,用于创建默认的选择策略对象。
     */
    this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
    final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
    /*
     * 这里传入了拒绝执行的处理器,具体的实现是抛出异常。注意这里的拒绝处理器不是JUC中线程池里面的拒绝处理器,虽然两个接口同名。
     */
    super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

public NioEventLoopGroup(
        int nThreads, Executor executor, final SelectorProvider selectorProvider) {
    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                         final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
            RejectedExecutionHandlers.reject());
}

public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                         final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory,
                         final RejectedExecutionHandler rejectedExecutionHandler) {
    super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}

public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                         final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory,
                         final RejectedExecutionHandler rejectedExecutionHandler,
                         final EventLoopTaskQueueFactory taskQueueFactory) {
    super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
            rejectedExecutionHandler, taskQueueFactory);
}

public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                         SelectorProvider selectorProvider,
                         SelectStrategyFactory selectStrategyFactory,
                         RejectedExecutionHandler rejectedExecutionHandler,
                         EventLoopTaskQueueFactory taskQueueFactory,
                         EventLoopTaskQueueFactory tailTaskQueueFactory) {
    super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
            rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
java
transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java
private static final int DEFAULT_EVENT_LOOP_THREADS;

static {
    /*
     * 先从系统属性中获取,否则采用默认值即可用逻辑处理器数量的2倍。
     */
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

}
/**
 * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
 */
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    /*
     * 默认情况下这里传入的线程数为0,所以会使用默认的线程数计算策略
     * 默认的线程数是CPU数量乘以2
     */
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

/**
 * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)
 */
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}

/**
 * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor,
 * EventExecutorChooserFactory, Object...)
 */
protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                 Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
}
java
common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    // 这里传入的事件执行器为ThreadPerTaskExecutor类型的实例。
    this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    /*
     * 这里传入了默认的事件执行器选择器工厂,用于创建默认的事件执行器选择器。
     * 内部会根据事件执行器的数量是否是2的幂来选择创建不同的事件执行器选择器。
     */
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    checkPositive(nThreads, "nThreads");

    if (executor == null) {
        /*
         * 这里传入了默认的线程工厂,是DefaultThreadFactory类型的。
         * 创建的线程是Thread的子类FastThreadLocalThread,对ThreadLocal进行了优化。
         */
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    // 创建NioEventLoop,默认情况下个数是CPU核心数的2倍
    children = new EventExecutor[nThreads];
    // 每个线程对应一个NioEventLoop
    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // 创建事件执行器
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
        }
    }

    // 创建一个线程选择器
    chooser = chooserFactory.newChooser(children);

    // 运行结束监听器
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        // 设置运行结束监听器
        e.terminationFuture().addListener(terminationListener);
    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

NioEventLoopGroup中定义了大量的构造方法,而且有不同的各种参数,看得眼花缭乱,但是先别乱,注意父类MultithreadEventExecutorGroup中的构造器,代码行数最多的那个就是主要实现。在这个主要的构造方法实现中,也就是关注下面3个参数:

  • 线程数;
  • 执行器;
  • 事件执行器选择器工厂;

其他实参都被封装成了参数数组,在创建事件循环对象的时候会被用到。在主构造方法中主要做了下面几件事情:

  • 如果执行器是null,则创建默认的执行器。
  • 通过newChild方法来创建指定线程数量个数的事件执行器,对于NioEventLoopGroup而言就是在创建事件循环(NioEventLoop),这就是事件循环组的含义,包含多个事件循环。
  • 创建线程选择器,因为一个事件循环组中包含多个事件循环,当进行事件分派的时候,需要按照一定策略来进行选择,所以Netty将这部分逻辑抽象到了事件执行器选择器中。而事件执行器选择器工厂就是用来创建事件执行器选择器的。

如果不传入对应的参数,那么Netty会使用默认值,下面先来看看每种默认组件都是什么。

默认组件

先来看主要的构造方法所需的参数的默认值,再分析变长参数中的默认值。

默认线程数量

如果没有指定,则在MultithreadEventLoopGroup中会使用默认值,即当前机器上所有可用逻辑核个数的2倍。

v4.1.83
java
transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java
private static final int DEFAULT_EVENT_LOOP_THREADS;

static {
    /*
     * 先从系统属性中获取,否则采用默认值即可用逻辑处理器数量的2倍。
     */
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

}

执行器ThreadPerTaskExecutor

v4.1.83
java
common/src/main/java/io/netty/util/concurrent/ThreadPerTaskExecutor.java
public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

该类型直接实现了JDK中的顶层接口Executor,在每次执行任务时都会通过线程工厂来创建一个新线程来执行。

线程工厂

在上面创建ThreadPerTaskExecutor的时候,是需要传入线程工厂的,如果没有传入线程工厂,则会调用newDefaultThreadFactory方法来创建。该方法在父类MultithreadEventExecutorGroup和子类MultithreadEventLoopGroup中都有实现,所以会调用到MultithreadEventLoopGroup中的该方法。

v4.1.83
MultithreadEventLoopGroup
MultithreadEventExecutorGroup
<
>
java
transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java
@Override
protected ThreadFactory newDefaultThreadFactory() {
    return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
}
java
common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java
protected ThreadFactory newDefaultThreadFactory() {
    // 创建默认的线程工厂对象
    return new DefaultThreadFactory(getClass());
}

父子类中的实现都是创建DefaultThreadFactory类型实例,只不过线程的优先级有差异。子类中指定了使用最高优先级,而父类中没有指定,使用默认的普通优先级。

事件执行器选择器工厂DefaultEventExecutorChooserFactory

v4.1.83
java
common/src/main/java/io/netty/util/concurrent/DefaultEventExecutorChooserFactory.java
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
    // 根据nioEventLoop的数量是否是2的幂,来创建不同的线程选择器
    if (isPowerOfTwo(executors.length)) {
        // 根据位运算来实现
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        // 通过简单累加取模来实现选择
        return new GenericEventExecutorChooser(executors);
    }
}

这里会根据线程数量是2的幂,比如2、4、8、16等,来创建不同的选择器,如果是2的幂,那么会使用性能更好的选择器,否则使用普通的选择器。

选择器提供者SelectionProvider

注意这里的选择器,是指IO多路复用中的选择器(Selector),而不是上面事件执行器选择器chooser。Netty中使用的是默认值是JDK提供。

java
jdk/src/share/classes/java/nio/channels/spi/SelectorProvider.java
public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<SelectorProvider>() {
                public SelectorProvider run() {
                        // java.nio.channels.spi.SelectorProvider 属性指定实现类
                        if (loadProviderFromProperty())
                            return provider;
                        // SPI 指定实现类
                        if (loadProviderAsService())
                            return provider;
                        // 默认实现,Windows 和 Linux 下不同
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}

该方法中分为了三种实现,获取属性中指定的,通过SPI获取预配置好的,以及使用默认的。这里看一下默认实现。

java
jdk/src/solaris/classes/sun/nio/ch/DefaultSelectorProvider.java
public static SelectorProvider create() {
    String osname = AccessController
        .doPrivileged(new GetPropertyAction("os.name"));
    if (osname.equals("SunOS"))
        return createProvider("sun.nio.ch.DevPollSelectorProvider");
    
    if (osname.equals("Linux")) // 如果是Linux系统
        // EPollSelectorProvider中创建的选择器是EPollSelectorImpl
        return createProvider("sun.nio.ch.EPollSelectorProvider");
    return new sun.nio.ch.PollSelectorProvider();
}
@SuppressWarnings("unchecked")
private static SelectorProvider createProvider(String cn) {
    Class<SelectorProvider> c;
    try {
        // 加载指定的类
        c = (Class<SelectorProvider>)Class.forName(cn);
    } catch (ClassNotFoundException x) {
        throw new AssertionError(x);
    }
    try {
        // 反射创建指定类的实例
        return c.newInstance();
    } catch (IllegalAccessException | InstantiationException x) {
        throw new AssertionError(x);
    }

}

可以看到,如果是在Linux系统中,默认使用的是EPollSelectorProvider,该类提供的选择器类型是EpollSelectorImpl

拒绝执行处理器RejectionExecutionHandler

v4.1.83
java
common/src/main/java/io/netty/util/concurrent/RejectedExecutionHandlers.java
public static RejectedExecutionHandler reject() {
    return REJECT;
}
private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
    @Override
    public void rejected(Runnable task, SingleThreadEventExecutor executor) {
        throw new RejectedExecutionException();
    }
};

该拒绝执行处理器中会直接抛出拒绝执行异常。

创建事件循环

父类MultithreadEventExecutorGroup中的newChild是个模板方法,在子类NioEventLoopGroup中有实现。

v4.1.83
java
transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    /*
     * 从变长参数中获取特定类型的参数
     * 这是的实现好像不是很优雅,这些参数也是该类的构造器中按照super(...)传上去的,结果又一路原封不动传回来。
     * 通过一个对象来封装这些参数也比这样传好吧。
     */
    SelectorProvider selectorProvider = (SelectorProvider) args[0];
    SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
    RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
    EventLoopTaskQueueFactory taskQueueFactory = null;
    EventLoopTaskQueueFactory tailTaskQueueFactory = null;

    // 这里先考虑args.length就为3的情况。
    int argsLength = args.length;
    if (argsLength > 3) {
        taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
    }
    if (argsLength > 4) {
        tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
    }
    // 创建非阻塞事件循环对象
    return new NioEventLoop(this, executor, selectorProvider,
            selectStrategyFactory.newSelectStrategy(),
            rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}

在该方法中,按照一定的顺序分别从变长参数数组中获取参数并强转类型。然后使用这些参数创建事件循环NioEventLoop对象,该对象非常重要,其run方法就是事件循环核心,通过一个无限循环不断处理异步任务和IO事件。 具体创建的细节参考Netty中的NioEventLoop

这里的实现感觉很不优雅,两个问题:

  • 使用参数类型强转;
  • 父类构造器中用不到的参数,也向上传递了; 完全可以把这5个参数定义为NioEventLoopGroup类中的属性,在其构造器中,把传进来的各个属性赋值给属性,仅需向上传递父类需要的参数。这样在newChild方法中就可以直接使用各个属性,而不用依次获取变长参数并且进行类型强转了。

总结

本文分析了NioEventLoopGroup的创建过程,以及使用到了哪些默认组件,最后分析了是怎么创建事件循环NioEventLoop对象的。