其实WebFlux也和WebMvc一样,有一套请求的处理流程,而且两套流程也很相似。都是先获取handler方法,再获取handler适配器,然后通过handler适配器调用handler方法,最后处理结果。本文就来分析一下该过程具体的实现。
请求到达DispatcherHandler之前
WebFlux是需要运行在底层的服务器实现之上的,比较常见的就是ReactoryNetty框架。前面已经分析过ReactoryNetty是怎么处理请求,以及最后会把请求移交给上层应用设置的handler来处理的。Spring Boot中设置的这个handler其实是Spring Framework中提供类:ReactorHttpHandlerAdapter。
/*
* 该类是reactor-netty库和spring框架的适配器
*/
public class ReactorHttpHandlerAdapter implements BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> {
private static final Log logger = HttpLogging.forLogName(ReactorHttpHandlerAdapter.class);
private final HttpHandler httpHandler;
public ReactorHttpHandlerAdapter(HttpHandler httpHandler) {
Assert.notNull(httpHandler, "HttpHandler must not be null");
this.httpHandler = httpHandler;
}
/*
* 该方法被reactor-netty调用。
*/
@Override
public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
try {
ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);
if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response);
}
// 处理请求,参考HttpWebHandlerAdapter
return this.httpHandler.handle(request, response)
.doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage()))
.doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));
}
catch (URISyntaxException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to get request URI: " + ex.getMessage());
}
reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
return Mono.empty();
}
}
}
Spring Boot中调用构造方法设置的属性httpHandler是DelayedInitializationHttpHandler类型的对象,而该对象封装了HttpWebHandlerAdapter。在上面的apply方法中会调用到它的handle方法。
- 关于上面的httpHandler的来源,请参考文章:Spring Boot中HttpHandler的创建过程。
- 关于ReactorHttpHandlerAdapter之前的请求处理过程,请参考ReactoryNetty中HttpServer的请求处理流程一文。
HttpWebHandlerAdapter
// 这是请求到达WebFlux领域的第一站
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
if (this.forwardedHeaderTransformer != null) {
try {
request = this.forwardedHeaderTransformer.apply(request);
}
catch (Throwable ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to apply forwarded headers to " + formatRequest(request), ex);
}
response.setStatusCode(HttpStatus.BAD_REQUEST);
return response.setComplete();
}
}
// 创建exchange对象
ServerWebExchange exchange = createExchange(request, response);
LogFormatUtils.traceDebug(logger, traceOn ->
exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
(traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));
// 这里获取的delegate对象是在WebHttpHandlerBuilder的build方法中调用该类构造器是传递进来的。
return getDelegate().handle(exchange) // 调用的是ExceptionHandlingWebHandler#handle
.doOnSuccess(aVoid -> logResponse(exchange))
.onErrorResume(ex -> handleUnresolvedError(exchange, ex))
.then(Mono.defer(response::setComplete));
}
这里会调用到委托对象的handle方法,这个委托对象是在创建过程中被设置的,是ExceptionHandlingWebHandler。为了方便理清除调用链条,这里再次贴出Spring Boot中创建HttpHandler的核心步骤的代码。
/*
* 在SpringBoot中,HttpHandlerAutoConfiguration类有一个静态内部配置类,
* 该配置类向容器中注入了一个httpHandler,在该方法中调用了此方法以及上面的applicationContext方法。
*/
public HttpHandler build() {
/*
* 采用装饰器模式封装handler和filter,这两个属性是在上面的applicationContext方法中被设置的。
* 这里的webHandler就是核心组件DispatcherHandler
* 内部会调用默认的过滤器链
*/
WebHandler decorated = new FilteringWebHandler(this.webHandler, this.filters);
// 再封装异常处理器
decorated = new ExceptionHandlingWebHandler(decorated, this.exceptionHandlers);
// 创建适配器,并设置一些属性,由此可见adapter里面的delegate保存的是ExceptionHandlingWebHandler实例
HttpWebHandlerAdapter adapted = new HttpWebHandlerAdapter(decorated);
if (this.sessionManager != null) {
adapted.setSessionManager(this.sessionManager);
}
if (this.codecConfigurer != null) {
adapted.setCodecConfigurer(this.codecConfigurer);
}
if (this.localeContextResolver != null) {
adapted.setLocaleContextResolver(this.localeContextResolver);
}
if (this.forwardedHeaderTransformer != null) {
adapted.setForwardedHeaderTransformer(this.forwardedHeaderTransformer);
}
if (this.applicationContext != null) {
adapted.setApplicationContext(this.applicationContext);
}
// 目前方法内只有打印日志的语句,不用关注
adapted.afterPropertiesSet();
// 如果存在装饰器,则再装饰一下
return (this.httpHandlerDecorator != null ? this.httpHandlerDecorator.apply(adapted) : adapted);
}
ExceptionHandlingWebHandler
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
Mono<Void> completion;
try {
// 先调用下一个WebHandlerDecorate
completion = super.handle(exchange);
}
catch (Throwable ex) {
completion = Mono.error(ex);
}
// 遍历执行WebExceptionHandler
for (WebExceptionHandler handler : this.exceptionHandlers) {
completion = completion.onErrorResume(ex -> handler.handle(exchange, ex));
}
return completion;
}
本文分析的是请求的处理流程,请忽略上面方法中调用的异常处理器,关注父类的handle方法。
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
// 默认实现,调用下一个WebHandler的handle方法
return this.delegate.handle(exchange);
}
这里又是调用的委托对象的handle方法,这里的委托对象的类型是FilteringWebHandler。
FilteringWebHandler
public class FilteringWebHandler extends WebHandlerDecorator {
private final DefaultWebFilterChain chain;
/**
* Constructor.
* @param filters the chain of filters
*/
public FilteringWebHandler(WebHandler handler, List<WebFilter> filters) {
// 设置delegate,是DispatcherHandler
super(handler);
// 同时把handler传递给webFilterChain
this.chain = new DefaultWebFilterChain(handler, filters);
}
/**
* Return a read-only list of the configured filters.
*/
public List<WebFilter> getFilters() {
return this.chain.getFilters();
}
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
/*
* 执行过滤器链
* 这里没有调用delegate的handle方法,而是由chain来执行WebHandler的handle方法
*
*/
return this.chain.filter(exchange);
}
}
public class DefaultWebFilterChain implements WebFilterChain {
private final List<WebFilter> allFilters;
// 该属性是DispatchHandler
private final WebHandler handler;
@Nullable
private final WebFilter currentFilter;
@Nullable
private final DefaultWebFilterChain chain;
/**
* Public constructor with the list of filters and the target handler to use.
* @param handler the target handler
* @param filters the filters ahead of the handler
* @since 5.1
*/
public DefaultWebFilterChain(WebHandler handler, List<WebFilter> filters) {
Assert.notNull(handler, "WebHandler is required");
this.allFilters = Collections.unmodifiableList(filters);
this.handler = handler;
DefaultWebFilterChain chain = initChain(filters, handler);
this.currentFilter = chain.currentFilter;
this.chain = chain.chain;
}
private static DefaultWebFilterChain initChain(List<WebFilter> filters, WebHandler handler) {
// 创建默认的过滤器链,封装WebFilter
DefaultWebFilterChain chain = new DefaultWebFilterChain(filters, handler, null, null);
ListIterator<? extends WebFilter> iterator = filters.listIterator(filters.size());
/*
* 从最后一个向前遍历,直到找到第一个filter,作为currentFilter。
*
* 注意:这里用了一种比较绕的方式(好不好还不一定):
* 每个filter都对应了一个DefaultWebFilterChain,
* 而该chain中的currentFilter就是下一个filter(有点绕,注意理解),
* 第一次执行这个循环其实就是在创建最后一个filter对应的chain,而该chain是在while循环上面创建的,
* 可以看到,那个chain的currentFilter是null,即表示过滤器链的结束。
*/
while (iterator.hasPrevious()) {
chain = new DefaultWebFilterChain(filters, handler, iterator.previous(), chain);
}
return chain;
}
/**
* Private constructor to represent one link in the chain.
*/
private DefaultWebFilterChain(List<WebFilter> allFilters, WebHandler handler,
@Nullable WebFilter currentFilter, @Nullable DefaultWebFilterChain chain) {
this.allFilters = allFilters;
this.currentFilter = currentFilter;
this.handler = handler;
this.chain = chain;
}
/**
* Public constructor with the list of filters and the target handler to use.
* @param handler the target handler
* @param filters the filters ahead of the handler
* @deprecated as of 5.1 this constructor is deprecated in favor of
* {@link #DefaultWebFilterChain(WebHandler, List)}.
*/
@Deprecated
public DefaultWebFilterChain(WebHandler handler, WebFilter... filters) {
this(handler, Arrays.asList(filters));
}
public List<WebFilter> getFilters() {
return this.allFilters;
}
public WebHandler getHandler() {
return this.handler;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange) {
/*
* 如果过滤器链没被执行完,则执行过滤器。至于什么时候执行完,
* 采用了一种比较绕的方式来实现,参考上面的initChain方法。
* 否则调用handler的handle方法,其实这里的handler就是核心组件DispatcherHandler
*/
return Mono.defer(() ->
this.currentFilter != null && this.chain != null ?
// 如果过滤器链还没执行完则执行过滤器
invokeFilter(this.currentFilter, this.chain, exchange) :
// 调用DispatcherHandler中的handle方法
this.handler.handle(exchange));
}
private Mono<Void> invokeFilter(WebFilter current, DefaultWebFilterChain chain, ServerWebExchange exchange) {
String currentName = current.getClass().getName();
return current.filter(exchange, chain).checkpoint(currentName + " [DefaultWebFilterChain]");
}
}
在FilteringWebHandler中,将所有的过滤器封装进过滤器链对象DefaultWebFilterChain中,然后在handle方法中,直接调用过滤器链对象的filter方法。这个filter方法的主要逻辑就是依次执行过滤器链中的过滤器,然后再调用handler的handle方法。而这里的handler正是DispatcherHandler类型的对象。
DispatcherHandler
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return createNotFoundError();
}
// 处理预检请求
if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
return handlePreFlight(exchange);
}
// 遍历所有的handlerMapping
return Flux.fromIterable(this.handlerMappings)
// 从HandlerMapping中获取handler
.concatMap(mapping -> mapping.getHandler(exchange))
// 因为上面遍历了多个handlerMapping,可能多个mapping会返回handler,这里选取第一个。
.next()
// 如果没匹配到handlerMethod,则处理错误
.switchIfEmpty(createNotFoundError())
// 执行handler,会由HandlerAdapter处理
.flatMap(handler -> invokeHandler(exchange, handler))
.flatMap(result -> handleResult(exchange, result));
}
这里会先获取handler方法,再获取handler适配器,然后通过适配器调用处理器方法,最后通过结果处理器来处理结果,整个过程和MVC请求的处理流程很相似。虽然涉及的这些组件是Reactive包下的,但是其逻辑和MVC版本的都大差不差,这也是为什么我们可以不用修改Controller的代码,只需将应用类型切换为“Reactive”的原因所在。所以本文不再对三种组件进行分析,详细过程请参考Spring中WebMvc的请求处理流程。