Spring中WebFlux的请求处理流程

其实WebFlux也和WebMvc一样,有一套请求的处理流程,而且两套流程也很相似。都是先获取handler方法,再获取handler适配器,然后通过handler适配器调用handler方法,最后处理结果。本文就来分析一下该过程具体的实现。


请求到达DispatcherHandler之前

WebFlux是需要运行在底层的服务器实现之上的,比较常见的就是ReactoryNetty框架。前面已经分析过ReactoryNetty是怎么处理请求,以及最后会把请求移交给上层应用设置的handler来处理的。Spring Boot中设置的这个handler其实是Spring Framework中提供类:ReactorHttpHandlerAdapter

java
spring-web/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java
/*
 * 该类是reactor-netty库和spring框架的适配器
 */
public class ReactorHttpHandlerAdapter implements BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> {

	private static final Log logger = HttpLogging.forLogName(ReactorHttpHandlerAdapter.class);


	private final HttpHandler httpHandler;


	public ReactorHttpHandlerAdapter(HttpHandler httpHandler) {
		Assert.notNull(httpHandler, "HttpHandler must not be null");
		this.httpHandler = httpHandler;
	}


	/*
	 * 该方法被reactor-netty调用。
	 */
	@Override
	public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
		NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
		try {
			ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
			ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);

			if (request.getMethod() == HttpMethod.HEAD) {
				response = new HttpHeadResponseDecorator(response);
			}

			// 处理请求,参考HttpWebHandlerAdapter
			return this.httpHandler.handle(request, response)
					.doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage()))
					.doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));
		}
		catch (URISyntaxException ex) {
			if (logger.isDebugEnabled()) {
				logger.debug("Failed to get request URI: " + ex.getMessage());
			}
			reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
			return Mono.empty();
		}
	}

}

Spring Boot中调用构造方法设置的属性httpHandlerDelayedInitializationHttpHandler类型的对象,而该对象封装了HttpWebHandlerAdapter。在上面的apply方法中会调用到它的handle方法。

HttpWebHandlerAdapter

java
spring-web/src/main/java/org/springframework/web/server/adapter/HttpWebHandlerAdapter.java
// 这是请求到达WebFlux领域的第一站
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
    if (this.forwardedHeaderTransformer != null) {
        try {
            request = this.forwardedHeaderTransformer.apply(request);
        }
        catch (Throwable ex) {
            if (logger.isDebugEnabled()) {
                logger.debug("Failed to apply forwarded headers to " + formatRequest(request), ex);
            }
            response.setStatusCode(HttpStatus.BAD_REQUEST);
            return response.setComplete();
        }
    }
    // 创建exchange对象
    ServerWebExchange exchange = createExchange(request, response);

    LogFormatUtils.traceDebug(logger, traceOn ->
            exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
                    (traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));

    // 这里获取的delegate对象是在WebHttpHandlerBuilder的build方法中调用该类构造器是传递进来的。
    return getDelegate().handle(exchange) // 调用的是ExceptionHandlingWebHandler#handle
            .doOnSuccess(aVoid -> logResponse(exchange))
            .onErrorResume(ex -> handleUnresolvedError(exchange, ex))
            .then(Mono.defer(response::setComplete));
}

这里会调用到委托对象的handle方法,这个委托对象是在创建过程中被设置的,是ExceptionHandlingWebHandler。为了方便理清除调用链条,这里再次贴出Spring Boot中创建HttpHandler的核心步骤的代码。

java
spring-web/src/main/java/org/springframework/web/server/adapter/WebHttpHandlerBuilder.java
/*
 * 在SpringBoot中,HttpHandlerAutoConfiguration类有一个静态内部配置类,
 * 该配置类向容器中注入了一个httpHandler,在该方法中调用了此方法以及上面的applicationContext方法。
 */
public HttpHandler build() {
    /*
     * 采用装饰器模式封装handler和filter,这两个属性是在上面的applicationContext方法中被设置的。
     * 这里的webHandler就是核心组件DispatcherHandler
     * 内部会调用默认的过滤器链
     */
    WebHandler decorated = new FilteringWebHandler(this.webHandler, this.filters);
    // 再封装异常处理器
    decorated = new ExceptionHandlingWebHandler(decorated,  this.exceptionHandlers);

    // 创建适配器,并设置一些属性,由此可见adapter里面的delegate保存的是ExceptionHandlingWebHandler实例
    HttpWebHandlerAdapter adapted = new HttpWebHandlerAdapter(decorated);
    if (this.sessionManager != null) {
        adapted.setSessionManager(this.sessionManager);
    }
    if (this.codecConfigurer != null) {
        adapted.setCodecConfigurer(this.codecConfigurer);
    }
    if (this.localeContextResolver != null) {
        adapted.setLocaleContextResolver(this.localeContextResolver);
    }
    if (this.forwardedHeaderTransformer != null) {
        adapted.setForwardedHeaderTransformer(this.forwardedHeaderTransformer);
    }
    if (this.applicationContext != null) {
        adapted.setApplicationContext(this.applicationContext);
    }
    // 目前方法内只有打印日志的语句,不用关注
    adapted.afterPropertiesSet();

    // 如果存在装饰器,则再装饰一下
    return (this.httpHandlerDecorator != null ? this.httpHandlerDecorator.apply(adapted) : adapted);
}

ExceptionHandlingWebHandler

java
spring-web/src/main/java/org/springframework/web/server/handler/ExceptionHandlingWebHandler.java
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
    Mono<Void> completion;
    try {
        // 先调用下一个WebHandlerDecorate
        completion = super.handle(exchange);
    }
    catch (Throwable ex) {
        completion = Mono.error(ex);
    }

    // 遍历执行WebExceptionHandler
    for (WebExceptionHandler handler : this.exceptionHandlers) {
        completion = completion.onErrorResume(ex -> handler.handle(exchange, ex));
    }
    return completion;
}

本文分析的是请求的处理流程,请忽略上面方法中调用的异常处理器,关注父类的handle方法。

java
spring-web/src/main/java/org/springframework/web/server/handler/WebHandlerDecorator.java
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
    // 默认实现,调用下一个WebHandler的handle方法
    return this.delegate.handle(exchange);
}

这里又是调用的委托对象的handle方法,这里的委托对象的类型是FilteringWebHandler

FilteringWebHandler

FilteringWebHandler
DefaultWebFilterChain
<
>
java
spring-web/src/main/java/org/springframework/web/server/handler/FilteringWebHandler.java
public class FilteringWebHandler extends WebHandlerDecorator {

	private final DefaultWebFilterChain chain;


	/**
	 * Constructor.
	 * @param filters the chain of filters
	 */
	public FilteringWebHandler(WebHandler handler, List<WebFilter> filters) {
		// 设置delegate,是DispatcherHandler
		super(handler);
		// 同时把handler传递给webFilterChain
		this.chain = new DefaultWebFilterChain(handler, filters);
	}


	/**
	 * Return a read-only list of the configured filters.
	 */
	public List<WebFilter> getFilters() {
		return this.chain.getFilters();
	}


	@Override
	public Mono<Void> handle(ServerWebExchange exchange) {
		/*
		 * 执行过滤器链
		 * 这里没有调用delegate的handle方法,而是由chain来执行WebHandler的handle方法
		 *
		 */
		return this.chain.filter(exchange);
	}

}
java
spring-web/src/main/java/org/springframework/web/server/handler/DefaultWebFilterChain.java
public class DefaultWebFilterChain implements WebFilterChain {

	private final List<WebFilter> allFilters;

	// 该属性是DispatchHandler
	private final WebHandler handler;

	@Nullable
	private final WebFilter currentFilter;

	@Nullable
	private final DefaultWebFilterChain chain;


	/**
	 * Public constructor with the list of filters and the target handler to use.
	 * @param handler the target handler
	 * @param filters the filters ahead of the handler
	 * @since 5.1
	 */
	public DefaultWebFilterChain(WebHandler handler, List<WebFilter> filters) {
		Assert.notNull(handler, "WebHandler is required");
		this.allFilters = Collections.unmodifiableList(filters);
		this.handler = handler;
		DefaultWebFilterChain chain = initChain(filters, handler);
		this.currentFilter = chain.currentFilter;
		this.chain = chain.chain;
	}

	private static DefaultWebFilterChain initChain(List<WebFilter> filters, WebHandler handler) {
		// 创建默认的过滤器链,封装WebFilter
		DefaultWebFilterChain chain = new DefaultWebFilterChain(filters, handler, null, null);
		ListIterator<? extends WebFilter> iterator = filters.listIterator(filters.size());
		/*
		 * 从最后一个向前遍历,直到找到第一个filter,作为currentFilter。
		 *
		 * 注意:这里用了一种比较绕的方式(好不好还不一定):
		 * 每个filter都对应了一个DefaultWebFilterChain,
		 * 而该chain中的currentFilter就是下一个filter(有点绕,注意理解),
		 * 第一次执行这个循环其实就是在创建最后一个filter对应的chain,而该chain是在while循环上面创建的,
		 * 可以看到,那个chain的currentFilter是null,即表示过滤器链的结束。
		 */
		while (iterator.hasPrevious()) {
			chain = new DefaultWebFilterChain(filters, handler, iterator.previous(), chain);
		}
		return chain;
	}

	/**
	 * Private constructor to represent one link in the chain.
	 */
	private DefaultWebFilterChain(List<WebFilter> allFilters, WebHandler handler,
			@Nullable WebFilter currentFilter, @Nullable DefaultWebFilterChain chain) {

		this.allFilters = allFilters;
		this.currentFilter = currentFilter;
		this.handler = handler;
		this.chain = chain;
	}

	/**
	 * Public constructor with the list of filters and the target handler to use.
	 * @param handler the target handler
	 * @param filters the filters ahead of the handler
	 * @deprecated as of 5.1 this constructor is deprecated in favor of
	 * {@link #DefaultWebFilterChain(WebHandler, List)}.
	 */
	@Deprecated
	public DefaultWebFilterChain(WebHandler handler, WebFilter... filters) {
		this(handler, Arrays.asList(filters));
	}


	public List<WebFilter> getFilters() {
		return this.allFilters;
	}

	public WebHandler getHandler() {
		return this.handler;
	}


	@Override
	public Mono<Void> filter(ServerWebExchange exchange) {
		/*
		 * 如果过滤器链没被执行完,则执行过滤器。至于什么时候执行完,
		 * 采用了一种比较绕的方式来实现,参考上面的initChain方法。
		 * 否则调用handler的handle方法,其实这里的handler就是核心组件DispatcherHandler
		 */
		return Mono.defer(() ->
				this.currentFilter != null && this.chain != null ?
						// 如果过滤器链还没执行完则执行过滤器
						invokeFilter(this.currentFilter, this.chain, exchange) :
						// 调用DispatcherHandler中的handle方法
						this.handler.handle(exchange));
	}

	private Mono<Void> invokeFilter(WebFilter current, DefaultWebFilterChain chain, ServerWebExchange exchange) {
		String currentName = current.getClass().getName();
		return current.filter(exchange, chain).checkpoint(currentName + " [DefaultWebFilterChain]");
	}

}

FilteringWebHandler中,将所有的过滤器封装进过滤器链对象DefaultWebFilterChain中,然后在handle方法中,直接调用过滤器链对象的filter方法。这个filter方法的主要逻辑就是依次执行过滤器链中的过滤器,然后再调用handler的handle方法。而这里的handler正是DispatcherHandler类型的对象。

DispatcherHandler

java
spring-webflux/src/main/java/org/springframework/web/reactive/DispatcherHandler.java
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
    if (this.handlerMappings == null) {
        return createNotFoundError();
    }
    // 处理预检请求
    if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
        return handlePreFlight(exchange);
    }
    // 遍历所有的handlerMapping
    return Flux.fromIterable(this.handlerMappings)
            // 从HandlerMapping中获取handler
            .concatMap(mapping -> mapping.getHandler(exchange))
            // 因为上面遍历了多个handlerMapping,可能多个mapping会返回handler,这里选取第一个。
            .next()
            // 如果没匹配到handlerMethod,则处理错误
            .switchIfEmpty(createNotFoundError())
            // 执行handler,会由HandlerAdapter处理
            .flatMap(handler -> invokeHandler(exchange, handler))
            .flatMap(result -> handleResult(exchange, result));
}

这里会先获取handler方法,再获取handler适配器,然后通过适配器调用处理器方法,最后通过结果处理器来处理结果,整个过程和MVC请求的处理流程很相似。虽然涉及的这些组件是Reactive包下的,但是其逻辑和MVC版本的都大差不差,这也是为什么我们可以不用修改Controller的代码,只需将应用类型切换为“Reactive”的原因所在。所以本文不再对三种组件进行分析,详细过程请参考Spring中WebMvc的请求处理流程