在启动过程中分析到,当客户端的请求到来时,会为其设置一个特殊的ReactiveBridge处理器来处理请求,从名称就可以看出来,该处理器是Netty和反应式的桥接器。本文就来分析一下,当新请求到来时,该处理器是怎么处理的。
强烈建议在阅读本文之前先熟悉下面两篇文章中的内容,否则容易不知道讲的是什么。
前情回顾
当Acceptor收到请求后,会往请求的通道对象中添加一个通道初始化器TransportChannelInitializer,然后并注册到worker事件循环组中。注册后会调用这个通道初始化器的initChannel方法。
@Override
protected void initChannel(Channel channel) {
ChannelPipeline pipeline = channel.pipeline();
/*
* 设置reactiveBridge通道处理器
* 第二个参数是连接操作提供者,会用来创建连接操作对象。
* 对于HttpServerConfig而言,没有实现TransportConfig中的方法,而后者的默认实现是返回null。
*/
ChannelOperations.addReactiveBridge(channel, config.channelOperationsProvider(), connectionObserver);
// 追加通道初始化操作
config.defaultOnChannelInit()
// 添加配置中的初始化器
.then(config.doOnChannelInit)
// 执行通道初始化
.onChannelInit(connectionObserver, channel, remoteAddress);
// 从pipeline中移除该handler,因为初始化器执行一次就够了。
pipeline.remove(this);
if (log.isDebugEnabled()) {
log.debug(format(channel, "Initialized pipeline {}"), pipeline.toString());
}
}
在该方法中,会往请求的通道对象中注册一个通道处理器。在调用addReactiveBridge方法时,除了通道对象外,第二个参数是通道操作提供器,第三个参数是连接观察者(会处理通道事件)。
public static void addReactiveBridge(Channel ch, OnSetup opsFactory, ConnectionObserver listener) {
requireNonNull(ch, "channel");
requireNonNull(opsFactory, "opsFactory");
requireNonNull(listener, "listener");
// 注册通道处理器
ch.pipeline()
.addLast(NettyPipeline.ReactiveBridge, new ChannelOperationsHandler(opsFactory, listener));
}
添加的通道处理器是ChannelOperationsHandler类型的。
添加了ReactiveBridge处理器后,会执行通道初始化操作。
通道初始化
初始化操作调用的是HttpServerConfig中的内部类HttpServerChannelInitializer中的onChannelInit方法。
configureHttp11Pipeline(
channel.pipeline(),
accessLogEnabled,
accessLog,
compressPredicate(compressPredicate, minCompressionSize),
cookieDecoder,
cookieEncoder,
false,
decoder,
formDecoderProvider,
forwardedHeaderHandler,
httpMessageLogFactory,
idleTimeout,
observer,
mapHandle,
maxKeepAliveRequests,
methodTagValue,
metricsRecorder,
minCompressionSize,
readTimeout,
requestTimeout,
uriTagValue);
该方法比较复杂,忽略的SSL和其他HTTP版本的实现,来看看configureHttp11Pipeline方法的执行过程。
@SuppressWarnings("deprecation")
static void configureHttp11Pipeline(ChannelPipeline p,
boolean accessLogEnabled,
@Nullable Function<AccessLogArgProvider, AccessLog> accessLog,
@Nullable BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate,
ServerCookieDecoder cookieDecoder,
ServerCookieEncoder cookieEncoder,
boolean channelOpened,
HttpRequestDecoderSpec decoder,
HttpServerFormDecoderProvider formDecoderProvider,
@Nullable BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler,
HttpMessageLogFactory httpMessageLogFactory,
@Nullable Duration idleTimeout,
ConnectionObserver listener,
@Nullable BiFunction<? super Mono<Void>, ? super Connection, ? extends Mono<Void>> mapHandle,
int maxKeepAliveRequests,
@Nullable Function<String, String> methodTagValue,
@Nullable ChannelMetricsRecorder metricsRecorder,
int minCompressionSize,
@Nullable Duration readTimeout,
@Nullable Duration requestTimeout,
@Nullable Function<String, String> uriTagValue) {
HttpDecoderConfig decoderConfig = new HttpDecoderConfig();
decoderConfig.setMaxInitialLineLength(decoder.maxInitialLineLength())
.setMaxHeaderSize(decoder.maxHeaderSize())
.setMaxChunkSize(decoder.maxChunkSize())
.setValidateHeaders(decoder.validateHeaders())
.setInitialBufferSize(decoder.initialBufferSize())
.setAllowDuplicateContentLengths(decoder.allowDuplicateContentLengths());
p.addBefore(NettyPipeline.ReactiveBridge,
NettyPipeline.HttpCodec,
new HttpServerCodec(decoderConfig))
.addBefore(NettyPipeline.ReactiveBridge, // 添加在ReactiveBridge之前
NettyPipeline.HttpTrafficHandler,
// 创建通道处理器
new HttpTrafficHandler(compressPredicate, cookieDecoder, cookieEncoder, formDecoderProvider,
forwardedHeaderHandler, httpMessageLogFactory, idleTimeout, listener, mapHandle, maxKeepAliveRequests,
readTimeout, requestTimeout));
}
在该方法中添加了一些通道处理器,重点关注一下HttpTrafficHandler。
HttpTrafficHandler
这里不是要介绍是怎么进行HTTP流量的处理的,而是重点关注HttpServerOperations对象的创建和绑定。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
HttpServerOperations ops;
// 创建HTTP服务的操作对象
ops = new HttpServerOperations(Connection.from(ctx.channel()),
listener,
request,
compress,
connectionInfo,
cookieDecoder,
cookieEncoder,
formDecoderProvider,
httpMessageLogFactory,
false,
mapHandle,
readTimeout,
requestTimeout,
secure,
timestamp);
// 绑定到通道对象中
ops.bind();
}
该方法非常长,只需要关注与HttpServerOperations相关的部分即可。创建好ops对象后,会将其绑定到通道的属性中,下面会获取。
default Connection bind() {
channel().attr(ReactorNetty.CONNECTION)
.set(this);
return this;
}
因为HttpTrafficHandler被添加在ChannelOperationHandler的前面,所以下面会获取到绑定的ops对象。
ChannelOperationHandler
本文关注的是请求入站处理,所以直接看channelRead方法。
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public final void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg == null || msg == Unpooled.EMPTY_BUFFER || msg instanceof EmptyByteBuf) {
return;
}
try {
// 这里主要考虑返回值类型是HttpServerOperations
ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
if (ops != null) {
ops.onInboundNext(ctx, msg);
}
else {
ReferenceCountUtil.release(msg);
}
}
catch (Throwable err) {
}
}
该方法中就两个操作:
- 获取通道操作对象;
- 调用操作对象的onInboundNext方法。
获取通道操作对象
@Nullable
public static ChannelOperations<?, ?> get(Channel ch) {
return Connection.from(ch)
.as(ChannelOperations.class);
}
static Connection from(Channel channel) {
requireNonNull(channel, "channel");
// 如果通道属性中存在
if (channel.hasAttr(ReactorNetty.CONNECTION)) {
// 则从属性中获取
return channel.attr(ReactorNetty.CONNECTION)
.get();
}
// 新建对象
return new ReactorNetty.SimpleConnection(channel).bind();
}
在上面的from方法中,会进入if语句块,获取到上面绑定的ops对象。
调用onInboundNext方法
@Override
protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
try {
// 接收请求
listener().onStateChange(this, HttpServerState.REQUEST_RECEIVED);
}
return;
}
}
这里调用到的listener是从TransportChannelInitializer的initChannel方法中一路传进去的,就是ConnectionObserver类型的属性connectionObserver,这个属性又是ServerTransport的bind方法传入的,就是同方法中定义的变量childObs(ChildObserver类型的变量,是ServerTransport中的内部类)。 总之,这里会传播REQUEST_RECEIVED请求状态,连接观察者如果对该状态感兴趣则会处理。
@Override
public void onStateChange(Connection connection, State newState) {
if (newState == State.DISCONNECTING) {
if (connection.channel().isActive() && !connection.isPersistent()) {
connection.dispose();
}
}
// 处理连接状态
childObs.onStateChange(connection, newState);
}
public Mono<? extends DisposableServer> bind() {
// 这里的config参考HttpServerBind的无参构造器
CONF config = configuration();
Objects.requireNonNull(config.bindAddress(), "bindAddress");
Mono<? extends DisposableServer> mono = Mono.create(sink -> {
// 在HttpServerBind中设置的以及这里获取到的都是InetSocketAddress
SocketAddress local = Objects.requireNonNull(config.bindAddress().get(), "Bind Address supplier returned null");
// HttpServerBind中的设置的是InetSocketAddress类型的
if (local instanceof InetSocketAddress) {
InetSocketAddress localInet = (InetSocketAddress) local;
// 如果地址还没解析过则解析
if (localInet.isUnresolved()) {
local = AddressUtils.createResolved(localInet.getHostName(), localInet.getPort());
}
}
boolean isDomainSocket = false;
// 创建disposableServer
DisposableBind disposableServer;
// 对于HttpServerBind来说,这里的local是InetSocketAddress类型的,不是DomainSocketAddress
if (local instanceof DomainSocketAddress) {
isDomainSocket = true;
disposableServer = new UdsDisposableBind(sink, config, local);
}
else {
// 创建server
disposableServer = new InetDisposableBind(sink, config, local);
}
/*
* 创建连接观察者,config中的childObserver默认是空操作。
* 所以在Spring Boot中会往HttpServer中添加一个handler,然后会被封装为ConnectionObserver。
*/
ConnectionObserver childObs =
new ChildObserver(config.defaultChildObserver().then(config.childObserver()));
/*
* 从config中获取eventLoop和channelInitializer来创建acceptor
* 该acceptor就是reactor模型中的重要角色,
* 这里通过config来获取EventLoopGroup、ChannelHandler、childOptions、childAttrs,
* 这些都是Netty中的概念或组件。
* 对于EventLoopGroup的获取,内部会通过LoopResources来实现。
*
* 这里的Acceptor继承了ChannelInboundHandlerAdapter,该类是Netty中的概念。
*
* 客户端连接到服务端后,Acceptor的channelRead方法会把通道注册到child事件循环中。
*/
Acceptor acceptor = new Acceptor(config.childEventLoopGroup(), config.channelInitializer(childObs, null, true),
config.childOptions, config.childAttrs, isDomainSocket);
/*
* 执行底层的bind操作
* 这里的AcceptorInitializer是一个ChannelInitializer,用于初始化channel
*/
TransportConnector.bind(config, new AcceptorInitializer(acceptor), local, isDomainSocket)
.subscribe(disposableServer);
});
// 这里默认是null,所以可以忽略
if (config.doOnBind() != null) {
// 执行bind回调
mono = mono.doOnSubscribe(s -> config.doOnBind().accept(config));
}
return mono;
}
ChildObserver的childObs是config对象中配置的连接观察者对象,那是哪里往config中配置了连接观察者对象呢? 实际上在Spring Boot中为HttpServer对象设置处理器的时候,就会把处理器封装为一个连接观察者对象,然后添加到config对象中。
DisposableServer startHttpServer() {
HttpServer server = this.httpServer;
// 下面这个条件默认是true
if (this.routeProviders.isEmpty()) {
// 设置请求处理器
server = server.handle(this.handler);
}
else {
server = server.route(this::applyRouteProviders);
}
if (this.lifecycleTimeout != null) {
return server.bindNow(this.lifecycleTimeout);
}
/*
* 执行bind操作,HttpServer是reactor-netty提供的,所以离开了spring-boot范畴,
* 最后会一直执行到netty层面的ServerBootstrap#bind()
*/
return server.bindNow();
}
Spring Boot会调用HttpServer的handle方法。
public final HttpServer handle(
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
Objects.requireNonNull(handler, "handler");
return childObserve(new HttpServerHandle(handler));
}
可以看到,这里把我们传入的处理器封装为了HttpServerHandle对象。而该对象刚好对请求的REQUEST_RECEIVED状态感兴趣。
HttpServerHandle
static final class HttpServerHandle implements ConnectionObserver {
final BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler;
HttpServerHandle(BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
this.handler = handler;
}
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void onStateChange(Connection connection, State newState) {
if (newState == HttpServerState.REQUEST_RECEIVED) {
try {
if (log.isDebugEnabled()) {
log.debug(format(connection.channel(), "Handler is being applied: {}"), handler);
}
// 封装连接
HttpServerOperations ops = (HttpServerOperations) connection;
// 调用apply方法,处理连接
Publisher<Void> publisher = handler.apply(ops, ops);
Mono<Void> mono = Mono.deferContextual(ctx -> {
ops.currentContext = Context.of(ctx);
return Mono.fromDirect(publisher);
});
if (ops.mapHandle != null) {
mono = ops.mapHandle.apply(mono, connection);
}
mono.subscribe(ops.disposeSubscriber());
}
catch (Throwable t) {
log.error(format(connection.channel(), ""), t);
//"FutureReturnValueIgnored" this is deliberate
connection.channel()
.close();
}
}
}
}
在上面的onStateChange方法中,会调用handler的apply方法,在上面Spring Boot中设置的是Spring Framework提供的ReactorHttpHandlerAdapter。本文仅关注ReactorNetty框架中的实现,不深入Spring Framework中分析了。
总结
通过本文,我们了解了在ReactorNetty框架中请求是怎么被处理的,以及怎么进入我们设置的handler的。从而达到了知其然,更知其所以然的目的。