NioEventLoopGroup是Netty中最常被使用的一种事件循环组,本文分析一下该类型的对象是怎么创建的,以及在内部是怎么创建事件循环EventLoop的。
构造方法
在介绍构造方法之前,先熟悉一下NioEventLoopGroup的继承体系。
下面是NioEventLoopGroup及其两个父类中的两个构造方法,注意两个父类中一个是”Loop“,一个是”Executor“。
/**
* Create a new instance using the default number of threads, the default {@link ThreadFactory} and
* the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup() {
this(0);
}
/**
* Create a new instance using the specified number of threads, {@link ThreadFactory} and the
* {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
/**
* Create a new instance using the default number of threads, the given {@link ThreadFactory} and the
* {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup(ThreadFactory threadFactory) {
this(0, threadFactory, SelectorProvider.provider());
}
/**
* Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the
* {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
/*
* 在reactor-netty的DefaultLoopResources中调用的就是该构造方法。
*/
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
/*
* 这里通过JDK的SelectorProvider来获取provider,在Linux平台下默认是EPollSelectorProvider。
*/
this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
/**
* Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given
* {@link SelectorProvider}.
*/
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
/*
* 这里传入了默认的选择策略工厂,用于创建默认的选择策略对象。
*/
this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
/*
* 这里传入了拒绝执行的处理器,具体的实现是抛出异常。注意这里的拒绝处理器不是JUC中线程池里面的拒绝处理器,虽然两个接口同名。
*/
super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler,
final EventLoopTaskQueueFactory taskQueueFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
rejectedExecutionHandler, taskQueueFactory);
}
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
SelectorProvider selectorProvider,
SelectStrategyFactory selectStrategyFactory,
RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory,
EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
/*
* 先从系统属性中获取,否则采用默认值即可用逻辑处理器数量的2倍。
*/
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
/*
* 默认情况下这里传入的线程数为0,所以会使用默认的线程数计算策略
* 默认的线程数是CPU数量乘以2
*/
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor,
* EventExecutorChooserFactory, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
}
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
// 这里传入的事件执行器为ThreadPerTaskExecutor类型的实例。
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
/*
* 这里传入了默认的事件执行器选择器工厂,用于创建默认的事件执行器选择器。
* 内部会根据事件执行器的数量是否是2的幂来选择创建不同的事件执行器选择器。
*/
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
checkPositive(nThreads, "nThreads");
if (executor == null) {
/*
* 这里传入了默认的线程工厂,是DefaultThreadFactory类型的。
* 创建的线程是Thread的子类FastThreadLocalThread,对ThreadLocal进行了优化。
*/
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 创建NioEventLoop,默认情况下个数是CPU核心数的2倍
children = new EventExecutor[nThreads];
// 每个线程对应一个NioEventLoop
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 创建事件执行器
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
}
}
// 创建一个线程选择器
chooser = chooserFactory.newChooser(children);
// 运行结束监听器
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
// 设置运行结束监听器
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
NioEventLoopGroup中定义了大量的构造方法,而且有不同的各种参数,看得眼花缭乱,但是先别乱,注意父类MultithreadEventExecutorGroup中的构造器,代码行数最多的那个就是主要实现。在这个主要的构造方法实现中,也就是关注下面3个参数:
- 线程数;
- 执行器;
- 事件执行器选择器工厂;
其他实参都被封装成了参数数组,在创建事件循环对象的时候会被用到。在主构造方法中主要做了下面几件事情:
- 如果执行器是null,则创建默认的执行器。
- 通过newChild方法来创建指定线程数量个数的事件执行器,对于NioEventLoopGroup而言就是在创建事件循环(NioEventLoop),这就是事件循环组的含义,包含多个事件循环。
- 创建线程选择器,因为一个事件循环组中包含多个事件循环,当进行事件分派的时候,需要按照一定策略来进行选择,所以Netty将这部分逻辑抽象到了事件执行器选择器中。而事件执行器选择器工厂就是用来创建事件执行器选择器的。
如果不传入对应的参数,那么Netty会使用默认值,下面先来看看每种默认组件都是什么。
默认组件
先来看主要的构造方法所需的参数的默认值,再分析变长参数中的默认值。
默认线程数量
如果没有指定,则在MultithreadEventLoopGroup中会使用默认值,即当前机器上所有可用逻辑核个数的2倍。
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
/*
* 先从系统属性中获取,否则采用默认值即可用逻辑处理器数量的2倍。
*/
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
}
执行器ThreadPerTaskExecutor
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
该类型直接实现了JDK中的顶层接口Executor,在每次执行任务时都会通过线程工厂来创建一个新线程来执行。
线程工厂
在上面创建ThreadPerTaskExecutor的时候,是需要传入线程工厂的,如果没有传入线程工厂,则会调用newDefaultThreadFactory方法来创建。该方法在父类MultithreadEventExecutorGroup和子类MultithreadEventLoopGroup中都有实现,所以会调用到MultithreadEventLoopGroup中的该方法。
@Override
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
}
protected ThreadFactory newDefaultThreadFactory() {
// 创建默认的线程工厂对象
return new DefaultThreadFactory(getClass());
}
父子类中的实现都是创建DefaultThreadFactory类型实例,只不过线程的优先级有差异。子类中指定了使用最高优先级,而父类中没有指定,使用默认的普通优先级。
事件执行器选择器工厂DefaultEventExecutorChooserFactory
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
// 根据nioEventLoop的数量是否是2的幂,来创建不同的线程选择器
if (isPowerOfTwo(executors.length)) {
// 根据位运算来实现
return new PowerOfTwoEventExecutorChooser(executors);
} else {
// 通过简单累加取模来实现选择
return new GenericEventExecutorChooser(executors);
}
}
这里会根据线程数量是2的幂,比如2、4、8、16等,来创建不同的选择器,如果是2的幂,那么会使用性能更好的选择器,否则使用普通的选择器。
选择器提供者SelectionProvider
注意这里的选择器,是指IO多路复用中的选择器(Selector),而不是上面事件执行器选择器chooser。Netty中使用的是默认值是JDK提供。
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
// java.nio.channels.spi.SelectorProvider 属性指定实现类
if (loadProviderFromProperty())
return provider;
// SPI 指定实现类
if (loadProviderAsService())
return provider;
// 默认实现,Windows 和 Linux 下不同
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
该方法中分为了三种实现,获取属性中指定的,通过SPI获取预配置好的,以及使用默认的。这里看一下默认实现。
public static SelectorProvider create() {
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux")) // 如果是Linux系统
// EPollSelectorProvider中创建的选择器是EPollSelectorImpl
return createProvider("sun.nio.ch.EPollSelectorProvider");
return new sun.nio.ch.PollSelectorProvider();
}
@SuppressWarnings("unchecked")
private static SelectorProvider createProvider(String cn) {
Class<SelectorProvider> c;
try {
// 加载指定的类
c = (Class<SelectorProvider>)Class.forName(cn);
} catch (ClassNotFoundException x) {
throw new AssertionError(x);
}
try {
// 反射创建指定类的实例
return c.newInstance();
} catch (IllegalAccessException | InstantiationException x) {
throw new AssertionError(x);
}
}
可以看到,如果是在Linux系统中,默认使用的是EPollSelectorProvider,该类提供的选择器类型是EpollSelectorImpl。
拒绝执行处理器RejectionExecutionHandler
public static RejectedExecutionHandler reject() {
return REJECT;
}
private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
@Override
public void rejected(Runnable task, SingleThreadEventExecutor executor) {
throw new RejectedExecutionException();
}
};
该拒绝执行处理器中会直接抛出拒绝执行异常。
创建事件循环
父类MultithreadEventExecutorGroup中的newChild是个模板方法,在子类NioEventLoopGroup中有实现。
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
/*
* 从变长参数中获取特定类型的参数
* 这是的实现好像不是很优雅,这些参数也是该类的构造器中按照super(...)传上去的,结果又一路原封不动传回来。
* 通过一个对象来封装这些参数也比这样传好吧。
*/
SelectorProvider selectorProvider = (SelectorProvider) args[0];
SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
EventLoopTaskQueueFactory taskQueueFactory = null;
EventLoopTaskQueueFactory tailTaskQueueFactory = null;
// 这里先考虑args.length就为3的情况。
int argsLength = args.length;
if (argsLength > 3) {
taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
}
if (argsLength > 4) {
tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
}
// 创建非阻塞事件循环对象
return new NioEventLoop(this, executor, selectorProvider,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
在该方法中,按照一定的顺序分别从变长参数数组中获取参数并强转类型。然后使用这些参数创建事件循环NioEventLoop对象,该对象非常重要,其run方法就是事件循环核心,通过一个无限循环不断处理异步任务和IO事件。 具体创建的细节参考Netty中的NioEventLoop。
这里的实现感觉很不优雅,两个问题:
- 使用参数类型强转;
- 父类构造器中用不到的参数,也向上传递了; 完全可以把这5个参数定义为NioEventLoopGroup类中的属性,在其构造器中,把传进来的各个属性赋值给属性,仅需向上传递父类需要的参数。这样在newChild方法中就可以直接使用各个属性,而不用依次获取变长参数并且进行类型强转了。
总结
本文分析了NioEventLoopGroup的创建过程,以及使用到了哪些默认组件,最后分析了是怎么创建事件循环NioEventLoop对象的。