在SpringBoot中的Web容器一文中介绍过Spring Boot在启动过程中会创建Web服务器并启动它。在[ReactorNetty中的HttpServer的创建过程]这篇文章中也介绍了创建的过程,本文就来分析一下其启动过程是怎样的。
重要类的继承和实现
在继续介绍下面内容之前,先熟悉一下HttpServer的继承和实现体系,以及HttpServerConfig的继承体系。
HttpServer
HttpServerConfig
回顾Spring Boot中的实现
@Override
public void start() throws WebServerException {
if (this.disposableServer == null) {
try {
// 启动httpServer
this.disposableServer = startHttpServer();
}
catch (Exception ex) {
PortInUseException.ifCausedBy(ex, ChannelBindException.class, (bindException) -> {
if (bindException.localPort() > 0 && !isPermissionDenied(bindException.getCause())) {
throw new PortInUseException(bindException.localPort(), ex);
}
});
throw new WebServerException("Unable to start Netty", ex);
}
if (this.disposableServer != null) { // 启动成功,打印日志
logger.info("Netty started" + getStartedOnMessage(this.disposableServer));
}
// 创建线程
startDaemonAwaitThread(this.disposableServer);
}
}
DisposableServer startHttpServer() {
HttpServer server = this.httpServer;
// 下面这个条件默认是true
if (this.routeProviders.isEmpty()) {
// 设置请求处理器
server = server.handle(this.handler);
}
else {
server = server.route(this::applyRouteProviders);
}
if (this.lifecycleTimeout != null) {
return server.bindNow(this.lifecycleTimeout);
}
/*
* 执行bind操作,HttpServer是reactor-netty提供的,所以离开了spring-boot范畴,
* 最后会一直执行到netty层面的ServerBootstrap#bind()
*/
return server.bindNow();
}
在该方法中,主要是调用了HttpServer的handle和bindNow这两个方法。在调用handle方法时传入的handler是在NettyReactiveWebServerFactory的getWebServer方法中创建的,类型为ReactorHttpHandlerAdapter,这是一个Spring Framework提供的类。
handle方法
public final HttpServer handle(
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
Objects.requireNonNull(handler, "handler");
return childObserve(new HttpServerHandle(handler));
}
public T childObserve(ConnectionObserver observer) {
Objects.requireNonNull(observer, "observer");
T dup = duplicate();
// 获取当前已经配置的observer
ConnectionObserver current = configuration().childObserver;
// 和已有的observer合并在一起
dup.configuration().childObserver = current == null ? observer : current.then(observer);
return dup;
}
先是将handler封装为observer对象,然后再和配置中已有的拼接在一起。这里暂时不深入分析太多,本文主线是分析启动过程,在下面会遇到这个observer的。
bindNow方法
/*
* 在SpringBoot中的NettyWebServer中的startHttpServer方法中调用了该方法
*/
public final DisposableServer bindNow() {
// 设置绑定操作的超时时间为45秒
return bindNow(Duration.ofSeconds(45));
}
public final DisposableServer bindNow(Duration timeout) {
Objects.requireNonNull(timeout, "timeout");
try {
// 调用bind方法,必须在timeout之内绑定完成
return Objects.requireNonNull(bind().block(timeout), "aborted");
}
catch (IllegalStateException e) {
if (e.getMessage()
.contains("blocking read")) {
throw new IllegalStateException(getClass().getSimpleName() + " couldn't be started within " + timeout.toMillis() + "ms");
}
throw e;
}
}
这里会阻塞执行绑定操作,默认最多阻塞等待45秒。
public Mono<? extends DisposableServer> bind() {
// 这里的config参考HttpServerBind的无参构造器
CONF config = configuration();
Objects.requireNonNull(config.bindAddress(), "bindAddress");
Mono<? extends DisposableServer> mono = Mono.create(sink -> {
// 在HttpServerBind中设置的以及这里获取到的都是InetSocketAddress
SocketAddress local = Objects.requireNonNull(config.bindAddress().get(), "Bind Address supplier returned null");
// HttpServerBind中的设置的是InetSocketAddress类型的
if (local instanceof InetSocketAddress) {
InetSocketAddress localInet = (InetSocketAddress) local;
// 如果地址还没解析过则解析
if (localInet.isUnresolved()) {
local = AddressUtils.createResolved(localInet.getHostName(), localInet.getPort());
}
}
boolean isDomainSocket = false;
// 创建disposableServer
DisposableBind disposableServer;
// 对于HttpServerBind来说,这里的local是InetSocketAddress类型的,不是DomainSocketAddress
if (local instanceof DomainSocketAddress) {
isDomainSocket = true;
disposableServer = new UdsDisposableBind(sink, config, local);
}
else {
// 创建server
disposableServer = new InetDisposableBind(sink, config, local);
}
/*
* 创建连接观察者,config中的childObserver默认是空操作。
* 所以在Spring Boot中会往HttpServer中添加一个handler,然后会被封装为ConnectionObserver。
*/
ConnectionObserver childObs =
new ChildObserver(config.defaultChildObserver().then(config.childObserver()));
/*
* 从config中获取eventLoop和channelInitializer来创建acceptor
* 该acceptor就是reactor模型中的重要角色,
* 这里通过config来获取EventLoopGroup、ChannelHandler、childOptions、childAttrs,
* 这些都是Netty中的概念或组件。
* 对于EventLoopGroup的获取,内部会通过LoopResources来实现。
*
* 这里的Acceptor继承了ChannelInboundHandlerAdapter,该类是Netty中的概念。
*
* 客户端连接到服务端后,Acceptor的channelRead方法会把通道注册到child事件循环中。
*/
Acceptor acceptor = new Acceptor(config.childEventLoopGroup(), config.channelInitializer(childObs, null, true),
config.childOptions, config.childAttrs, isDomainSocket);
/*
* 执行底层的bind操作
* 这里的AcceptorInitializer是一个ChannelInitializer,用于初始化channel
*/
TransportConnector.bind(config, new AcceptorInitializer(acceptor), local, isDomainSocket)
.subscribe(disposableServer);
});
// 这里默认是null,所以可以忽略
if (config.doOnBind() != null) {
// 执行bind回调
mono = mono.doOnSubscribe(s -> config.doOnBind().accept(config));
}
return mono;
}
该方法可以总结为下面几步:
- 创建socket地址对象;
- 创建DisposableBind对象;
- 创建连接观察者;
- 创建Acceptor;
- 执行底层的bind操作;
重点看一下bind操作。
@SuppressWarnings("FutureReturnValueIgnored")
public static Mono<Channel> bind(TransportConfig config, ChannelInitializer<Channel> channelInitializer,
SocketAddress bindAddress, boolean isDomainSocket) {
Objects.requireNonNull(config, "config");
Objects.requireNonNull(bindAddress, "bindAddress");
Objects.requireNonNull(channelInitializer, "channelInitializer");
/*
* 先通过doInitAndRegister创建channel对象
* config.eventLoopGroup().next()会从EventLoopGroup中选一个EventLoop
*/
return doInitAndRegister(config, channelInitializer, isDomainSocket, config.eventLoopGroup().next())
.flatMap(channel -> {
MonoChannelPromise promise = new MonoChannelPromise(channel);
// "FutureReturnValueIgnored" this is deliberate
// 执行bind操作,至此进入了netty的范畴
channel.eventLoop().execute(() -> channel.bind(bindAddress, promise.unvoid()));
return promise;
});
}
该方法又分为了下面几步:
- 获取boss事件循环组;
- 创建通道对象;
- 通过通道对象执行绑定;
下面依次来分析各个步骤的实现原理。
创建事件循环组
在创建Acceptor和创建通道时都获取了事件循环组,底层会判断如果还没创建过事件循环组就会新建。
@Override
protected final EventLoopGroup eventLoopGroup() {
// 获取boss事件循环组
return loopResources().onServerSelect(isPreferNative());
}
final EventLoopGroup childEventLoopGroup() {
/*
* 创建worker group
*
* 这里的isPreferNative返回的是父类的属性preferNative,在父类的构造器中赋值为LoopResources.DEFAULT,
* 而在LoopResources中,是获取名为reactor.netty.native的系统属性。(这里一般考虑为false的情况就行)
*/
return loopResources().onServer(isPreferNative());
}
loopResources方法
在看具体的代码之前,先要知道ReactorNetty中使用了资源对象来封装Netty中的资源,如事件循环组和通道。所以自然要通过资源对象来创建这些资源,本方法的目的在于创建资源对象。LoopResources是个接口,其实现体系如下图所示:
上面两个方法中都调用了loopResources方法,下面来看看这个方法做了什么。
public final LoopResources loopResources() {
// 这里考虑为null的情况
return loopResources != null ? loopResources : defaultLoopResources();
}
protected abstract LoopResources defaultLoopResources();
@Override
protected LoopResources defaultLoopResources() {
return HttpResources.get();
}
public static HttpResources get() {
/*
* 这里的ON_HTTP_NEW其实就是该类构造器,如果httpResources中为null,则新建一个
* 如果已存在,则直接从中获取就是了。
*/
return getOrCreate(httpResources, null, null, ON_HTTP_NEW, "http");
}
static final BiFunction<LoopResources, ConnectionProvider, HttpResources> ON_HTTP_NEW;
static final AtomicReference<HttpResources> httpResources;
static {
ON_HTTP_NEW = HttpResources::new;
httpResources = new AtomicReference<>();
}
protected static <T extends TcpResources> T getOrCreate(AtomicReference<T> ref,
@Nullable LoopResources loops,
@Nullable ConnectionProvider provider,
BiFunction<LoopResources, ConnectionProvider, T> onNew,
String name) {
T update;
for (;;) {
T resources = ref.get();
if (resources == null || loops != null || provider != null) {
// 创建资源对象
update = create(resources, loops, provider, name, onNew);
// 将新建的资源对象通过CAS操作设置到ref中
if (ref.compareAndSet(resources, update)) {
return update;
}
else {
update._dispose();
}
}
else {
return resources;
}
}
}
static <T extends TcpResources> T create(@Nullable T previous,
@Nullable LoopResources loops, @Nullable ConnectionProvider provider,
String name,
BiFunction<LoopResources, ConnectionProvider, T> onNew) {
// 对于HttpResources,这里传入的loops和provider都是null
if (previous == null) {
// 创建LoopResources和ConnectionProvider
loops = loops == null ? LoopResources.create("reactor-" + name) : loops;
// 这里传入了默认的最大连接数为500
provider = provider == null ? ConnectionProvider.create(name, 500) : provider;
}
else {
loops = loops == null ? previous.defaultLoops : loops;
provider = provider == null ? previous.defaultProvider : provider;
}
return onNew.apply(loops, provider);
}
整个调用链条有点长,重点关注TcpResources中的create方法。该方法中创建了循环资源和连接提供器。然后用这两个对象作为参数调用HttpResources的构造方法。
LoopResources的create方法
int DEFAULT_IO_WORKER_COUNT = Integer.parseInt(System.getProperty(
ReactorNetty.IO_WORKER_COUNT,
// 默认是(逻辑)处理器个数,但是不能少于4
"" + Math.max(Runtime.getRuntime().availableProcessors(), 4)));
int DEFAULT_IO_SELECT_COUNT = Integer.parseInt(System.getProperty(
ReactorNetty.IO_SELECT_COUNT,
// 默认是-1,没有选择线程,DefaultLoopResources的构造方法中会修改为和worker一样的数量
"" + -1));
static LoopResources create(String prefix) {
if (Objects.requireNonNull(prefix, "prefix").isEmpty()) {
throw new IllegalArgumentException("Cannot use empty prefix");
}
// 创建默认的LoopResources实例,第二和第三个参数都是从系统属性中获取
return new DefaultLoopResources(prefix, DEFAULT_IO_SELECT_COUNT, DEFAULT_IO_WORKER_COUNT, true);
}
DefaultLoopResources(String prefix, int workerCount, boolean daemon) {
this(prefix, -1, workerCount, daemon);
}
DefaultLoopResources(String prefix, int selectCount, int workerCount, boolean daemon) {
this(prefix, selectCount, workerCount, daemon, true);
}
DefaultLoopResources(String prefix, int selectCount, int workerCount, boolean daemon, boolean colocate) {
this.running = new AtomicBoolean(true);
this.daemon = daemon;
this.workerCount = workerCount;
this.prefix = prefix;
this.colocate = colocate;
this.serverLoops = new AtomicReference<>();
this.clientLoops = new AtomicReference<>();
this.cacheNativeClientLoops = new AtomicReference<>();
this.cacheNativeServerLoops = new AtomicReference<>();
if (selectCount == -1) {
// 默认和worker组的数量一样
this.selectCount = workerCount;
this.serverSelectLoops = this.serverLoops;
this.cacheNativeSelectLoops = this.cacheNativeServerLoops;
}
else {
this.selectCount = selectCount;
this.serverSelectLoops = new AtomicReference<>();
this.cacheNativeSelectLoops = new AtomicReference<>();
}
}
这里创建的是DefaultLoopResources类型的对象,下面很快就用看到是如何利用它来创建事件循环组的。之前传入的selectCount的值是-1,所以这里会设置为和worker一样的数量。
ConnectionProvider的create方法
long DEFAULT_POOL_ACQUIRE_TIMEOUT = Long.parseLong(System.getProperty(
ReactorNetty.POOL_ACQUIRE_TIMEOUT,
// 默认最多等45秒
"" + 45000));
static ConnectionProvider create(String name, int maxConnections) {
return builder(name).maxConnections(maxConnections) // 设置最大连接数
.pendingAcquireTimeout(Duration.ofMillis(DEFAULT_POOL_ACQUIRE_TIMEOUT))
.build();
}
public ConnectionProvider build() {
// 创建默认的池化连接提供者
return new DefaultPooledConnectionProvider(this);
}
这里构建者模式用到的Builder是一个内部类,其构建的是DefautlPooledConnectionProvider类型的对象。
HttpResources的构造方法
// 创建HttpResources实例
HttpResources(LoopResources loops, ConnectionProvider provider) {
super(loops, provider);
http2ConnectionProvider = new AtomicReference<>();
}
/*
* 虽然TcpResources(本类)实现了LoopResources接口,但是却组合了LoopResources其他的实现类来实现具体的操作。
* 这里考虑defaultLoops为DefaultLoopResources的情况。
*/
final LoopResources defaultLoops;
/*
* 同样地,虽然TcpResources(本类)实现了ConnectionProvider接口,但是组合了其他的实现类来实现具体的操作。
* 种类考虑defaultProvider是DefaultConnectionProvider的情况。
*/
final ConnectionProvider defaultProvider;
final AtomicReference<AddressResolverGroup<?>> defaultResolver;
protected TcpResources(LoopResources defaultLoops, ConnectionProvider defaultProvider) {
this.defaultLoops = defaultLoops;
this.defaultProvider = defaultProvider;
this.defaultResolver = new AtomicReference<>();
}
这样,上面创建的循环资源对象和连接提供器就被设置到TcpResources中的属性中了,后续操作会依靠这两个属性。
最终loopResources方法返回的是HttpResources类型的对象。
创建或获取事件循环组
上面调用的LoopResources接口中的默认方法onServerSelect和需要子类实现的onServer方法,实际实现在TcpResources中。
@Override
public EventLoopGroup onServer(boolean useNative) {
// 调用DefaultLoopResources中的方法
return defaultLoops.onServer(useNative);
}
@Override
public EventLoopGroup onServerSelect(boolean useNative) {
// 调用DefaultLoopResources中的方法
return defaultLoops.onServerSelect(useNative);
}
这两个方法都调用了DefaultLoopResources类中实现的方法。
@Override
public EventLoopGroup onServer(boolean useNative) {
// 考虑useNative为false的情况
if (useNative && LoopResources.hasNativeSupport()) {
return cacheNativeServerLoops();
}
return cacheNioServerLoops();
}
@Override
public EventLoopGroup onServerSelect(boolean useNative) {
if (useNative && LoopResources.hasNativeSupport()) {
return cacheNativeSelectLoops();
}
return cacheNioSelectLoops();
}
本文只分析Nio的情况,上面两个方法又调用了不同的方法来获取或创建boss和worker事件循环组。
@SuppressWarnings("FutureReturnValueIgnored")
EventLoopGroup cacheNioSelectLoops() {
if (serverSelectLoops == serverLoops) {
return cacheNioServerLoops();
}
// 获取缓存
EventLoopGroup eventLoopGroup = serverSelectLoops.get();
// 没有缓存
if (null == eventLoopGroup) {
// 新建boss事件循环组,负责select
EventLoopGroup newEventLoopGroup = new NioEventLoopGroup(selectCount,
threadFactory(this, "select-nio"));
// 通过CAS设置
if (!serverSelectLoops.compareAndSet(null, newEventLoopGroup)) {
//"FutureReturnValueIgnored" this is deliberate
// 设置失败则关闭新建的事件循环组
newEventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
}
// 递归
eventLoopGroup = cacheNioSelectLoops();
}
return eventLoopGroup;
}
@SuppressWarnings("FutureReturnValueIgnored")
EventLoopGroup cacheNioServerLoops() {
// 采用属性来保存事件循环组,这也是方法名中cache的来源。
EventLoopGroup eventLoopGroup = serverLoops.get();
if (null == eventLoopGroup) {
// 创建worker事件循环组,负责连接的读写处理
EventLoopGroup newEventLoopGroup = new NioEventLoopGroup(workerCount,
threadFactory(this, "nio"));
// 通过CAS设置
if (!serverLoops.compareAndSet(null, newEventLoopGroup)) {
//"FutureReturnValueIgnored" this is deliberate
newEventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
}
/*
* 这里用递归来代替循环,如果上面的CAS操作成功,那么递归一次就可以了。
* 如果上面的CAS操作没有成功,那么一次递归直到成功为止。
*/
eventLoopGroup = cacheNioServerLoops();
}
return eventLoopGroup;
}
两个方法的逻辑很相似,都是先获取缓存中设置的事件循环组对象,如果没有获取到,则新建一个并设置到属性中,如果设置失败则递归尝试。
注意这里的方法名,一个是Select,一个是Server,分别对应boss组和worker组。别看错了以为是相同的方法,我一开始就看成了是一个方法,结果以为只创建了一个事件循环组。
通道创建和注册
static Mono<Channel> doInitAndRegister(
TransportConfig config,
ChannelInitializer<Channel> channelInitializer,
boolean isDomainSocket,
EventLoop eventLoop) {
// 获取创建Channel对象的工厂,这里返回了一个lambda,内部仍然是通过LoopResources来创建Channel
ChannelFactory<? extends Channel> channelFactory = config.connectionFactory(config.eventLoopGroup(), isDomainSocket);
Channel channel = null;
try {
// 创建channel对象
channel = channelFactory.newChannel();
if (channelInitializer instanceof ServerTransport.AcceptorInitializer) {
// 开启auto read
((ServerTransport.AcceptorInitializer) channelInitializer).acceptor.enableAutoReadTask(channel);
}
/*
* 设置Channel初始化器,内部是把Acceptor添加在pipeline的末尾了,为什么这里不直接添加Acceptor?
*/
channel.pipeline().addLast(channelInitializer);
// 设置通道的选项,这里的选项在HttpServerBind中有设置
setChannelOptions(channel, config.options, isDomainSocket);
// 设置通道的属性,默认在HttpServerBind中没有设置任何属性
setAttributes(channel, config.attrs);
}
catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
}
return Mono.error(t);
}
MonoChannelPromise monoChannelPromise = new MonoChannelPromise(channel);
// 将channel注册到事件循环中,但是此时还没有进行监听
channel.unsafe().register(eventLoop, monoChannelPromise);
return monoChannelPromise;
}
该方法的步骤如下:
- 创建通道工厂;
- 通过通道工厂创建通道;
- 为通道流水线添加初始化器;
- 设置通道选项和属性;
- 将通道注册到事件循环的选择器中;
通道工厂
这里获取的通道工厂是个lambda函数。
protected ChannelFactory<? extends Channel> connectionFactory(EventLoopGroup elg, boolean isDomainSocket) {
/*
* 返回一个Channel工厂,用于创建Channel实例。这里是通过LoopResources来创建的。
* 这里考虑isDomainSocket为false的情况,所以通道类型为ServerSocketChannel。
*/
return () -> loopResources().onChannel(channelType(isDomainSocket), elg);
}
protected abstract Class<? extends Channel> channelType(boolean isDomainSocket);
@Override
protected Class<? extends Channel> channelType(boolean isDomainSocket) {
return isDomainSocket ? ServerDomainSocketChannel.class : ServerSocketChannel.class;
}
注意参数isDomainSocket是在ServerTransport的bind方法中判断并传入的,默认是false。所以channelType会返回ServerSocketChannel类型。
创建通道
这个步骤的操作就是调用lambda函数。
@Override
public <CHANNEL extends Channel> CHANNEL onChannel(Class<CHANNEL> channelType, EventLoopGroup group) {
requireNonNull(channelType, "channelType");
requireNonNull(group, "group");
return defaultLoops.onChannel(channelType, group);
}
继续来看DefaultLoopResources中的实现,但该类中并没有重写该方法,所以看接口LoopResources中的默认方法。
/*
* 这里的channelType,重点考虑为ServerSocketChannel的情况
*/
default <CHANNEL extends Channel> CHANNEL onChannel(Class<CHANNEL> channelType, EventLoopGroup group) {
// 这里就考虑DefaultLoopNativeDetector.NIO,其是一个DefaultLoopNIO类型的实例。
DefaultLoop channelFactory =
DefaultLoopNativeDetector.INSTANCE.supportGroup(group) ?
DefaultLoopNativeDetector.INSTANCE :
DefaultLoopNativeDetector.NIO;
return channelFactory.getChannel(channelType);
}
这里会判断使用哪种通道工厂,本文就以普通的NIO为例。
按理说这里的通道工厂才是真正的工厂,上面的只是一层封装而已。
@Override
@SuppressWarnings("unchecked")
public <CHANNEL extends Channel> CHANNEL getChannel(Class<CHANNEL> channelClass) {
if (channelClass.equals(SocketChannel.class)) {
return (CHANNEL) new NioSocketChannel();
}
if (channelClass.equals(ServerSocketChannel.class)) {
// 创建Netty组件:非阻塞服务端网络通道
return (CHANNEL) new NioServerSocketChannel();
}
if (channelClass.equals(DatagramChannel.class)) {
return (CHANNEL) new NioDatagramChannel();
}
throw new IllegalArgumentException("Unsupported channel type: " + channelClass.getSimpleName());
}
虽然传入了通道的类型,但是并没有通过反射来创建对象,而是手动new的。对于服务端来说,这里创建的是NioServerSocketChannel类型的通道对象。
如果对NioServerSocketChannel不是很熟悉,请参考这篇文章:Netty中的NioServerSocketChannel。
设置通道初始化器
这里设置的通道初始化器是在ServerTransport的bind方法中创建的,类型为AcceptorInitializer。
static final class AcceptorInitializer extends ChannelInitializer<Channel> {
final Acceptor acceptor;
AcceptorInitializer(Acceptor acceptor) {
this.acceptor = acceptor;
}
// 把acceptor添加到channel的管道末尾
@Override
public void initChannel(final Channel ch) {
ch.eventLoop().execute(() -> ch.pipeline().addLast(acceptor));
}
}
在通道被注册后会调用该初始化器的initChannel方法,在该方法中会向通道的流水线注册一个通道处理器,这个处理器会处理接收到的新请求。
// 该方法会处理客户端连接通道。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
/*
* 为客户端通道添加handler,这里的childHandler的类型其实是TransportChannelInitializer,是一个通道初始化器。
* 这里执行add操作后,该handler被添加到通道的pipeline后,会封装延时任务,等通道注册时会执行这些延时任务。
* 所以通道注册时会调用该handler的handlerAdd回调,在该回调中会调用initChannel方法。
*/
child.pipeline().addLast(childHandler);
// 为通道设置选项和属性
TransportConnector.setChannelOptions(child, childOptions, isDomainSocket);
TransportConnector.setAttributes(child, childAttrs);
try {
/*
* 将客户端的通道注册到worker组的事件循环中,
* 在register内部会调用next()方法来选择一个事件循环,然后将通道注册到该循环中。
*/
childGroup.register(child).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
});
}
catch (Throwable t) {
forceClose(child, t);
}
}
可以看到,这个通道处理器会为新请求的通道对象添加处理器,设置选项,并注册到worker事件循环组中。
childHandler属性
重点是这里的childHandler属性是什么?这个属性是通过Acceptor类的构造方法的参数来设置的,Acceptor对象是在ServerTransport的bind方法中被创建的,传入的handler是通过config对象创建的通道初始化器。
public final ChannelInitializer<Channel> channelInitializer(ConnectionObserver connectionObserver,
@Nullable SocketAddress remoteAddress, boolean onServer) {
requireNonNull(connectionObserver, "connectionObserver");
return new TransportChannelInitializer(this, connectionObserver, remoteAddress, onServer);
}
在本文中不深入分析该初始化器是怎么工作的,内部的实现过程还是比较复杂,现在只需要知道是处理请求的就行了,详情请参考ReactoryNetty中HttpServer的请求处理流程一文。
接下来就是对通道进行注册了,详细过程请参考这篇文章:Netty中的通道注册。
地址绑定
Netty的AbstractChannel中实现了bind方法。
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return pipeline.bind(localAddress);
}
通过流水线来执行绑定。
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
// 调用通道的unsafe对象执行底层的bind操作。
unsafe.bind(localAddress, promise);
}
在流水线的HeadContext的bind方法中调用了unsafe类来进行地址绑定。
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean wasActive = isActive();
try {
// 调用JDK底层绑定端口
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
/*
* 这里判断了两个条件,避免重复发布事件,在上面的register0方法中也有发布该事件的逻辑,注意区分情况。
*/
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
// 传播channelActive事件
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
调用各种通道各自的实现,本文涉及的是NioServerSocketChannel。
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) { // JDK版本大于等于7的情况
// 通过底层JDK的通道执行bind操作,注意内部不仅会bind,还会listen
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
这里会调用底层JDK中的通道的bind方法,注意在这个方法里面,不仅会绑定地址,而且还会监听。
ReactorNetty框架虽然没有使用Netty的启动器ServerBootstrap,但是通道的操作步骤都是一样的,先注册到事件循环中的选择器上,然后执行绑定。
总结
本文从Spring Boot,到ReactoryNetty框架,再到底层Netty,全面分析了是怎么创建事件循环、又是怎么创建通道、以及怎么进行通道注册和地址绑定的。