Spring Cloud Gateway是基于Spring WebFlux的,使用了ReactorNetty框架和Netty作为底层服务器的实现。Spring Cloud Gateway和WebFlux的关联点在RoutePredicateHandlerMapping,DispatcherHandler会调用它来获取handler方法。本文就以此为入口点来分析Spring Cloud Gateway的请求处理流程。
本文不会一次性介绍所有组件是怎么来的,避免增加复杂度。而是先顺着流程分析,然后遇到什么组件才介绍该组件是怎么来的。
RoutePredicateHandlerMapping
注册
在GatewayAutoConfiguration中,会注册RoutePredicateHandlerMapping类型的bean,在DispatcherHandler中会被调用。
/*
* 这里注入的是RoutePredicateHandlerMapping,
* 但SpringBoot中的WebFluxAutoConfiguration类中会自动引入RequestMappingHandlerMapping类,
* 那么在DispatcherHandler中的handler方法中处理请求时,会不会把请求交给后者来处理?
*
* 虽然会自动引入RequestMappingHandlerMapping,但是在DispatcherHandler的handle方法中可以看到,会调用每个handlerMapping的getHandler方法,
* 然后选取第一个来处理请求。
*
* webHandler是在上面filteringWebHandler方法中创建的。
* routerLocator参数是名称为cachedCompositeRouteLocator的bean,参考上面的cachedCompositeRouteLocator方法。
*/
@Bean
@ConditionalOnMissingBean
public RoutePredicateHandlerMapping routePredicateHandlerMapping(FilteringWebHandler webHandler,
RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties, Environment environment) {
return new RoutePredicateHandlerMapping(webHandler, routeLocator, globalCorsProperties, environment);
}
获取handler
@Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
// don't handle requests on management port if set and different than server port
if (this.managementPortType == DIFFERENT && this.managementPort != null
&& exchange.getRequest().getLocalAddress() != null
&& exchange.getRequest().getLocalAddress().getPort() == this.managementPort) {
return Mono.empty();
}
exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
return Mono.deferContextual(contextView -> {
exchange.getAttributes().put(GATEWAY_REACTOR_CONTEXT_ATTR, contextView);
return lookupRoute(exchange) // 查找路由
// .log("route-predicate-handler-mapping", Level.FINER) //name this
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}
// 将路由信息放入上下文中
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
// 返回FilteringWebHandler给DispatcherHandler
return Mono.just(webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})));
});
}
该方法的关键点就在于两步:
- 查找路由;
- 返回FilteringWebHandler;
查找路由
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
return this.routeLocator.getRoutes() // 通过路由定位器获取到所有的路由
// individually filter routes so that filterWhen error delaying is not a
// problem
.concatMap(route -> Mono.just(route)
// 过滤筛选路由
.filterWhen(r -> {
// add the current route we are testing
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
// 根据断言获取真正的路由
return r.getPredicate().apply(exchange);
})
// instead of immediately stopping main flux due to error, log and
// swallow it
.doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
.onErrorResume(e -> Mono.empty()))
// .defaultIfEmpty() put a static Route not found
// or .switchIfEmpty()
// .switchIfEmpty(Mono.<Route>empty().log("noroute"))
.next()
// TODO: error handling
.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("Route matched: " + route.getId());
}
// 校验路由
validateRoute(route, exchange);
return route;
});
/*
* TODO: trace logging if (logger.isTraceEnabled()) {
* logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); }
*/
}
该方法中的主要逻辑是先获取路由定位器中的所有路由信息,然后通过断言来进行过滤。
SimpleHandlerAdapter
注册
Spring Cloud Gateway中使用的是这个适配器,该适配器默认情况下就是会被注册的。在Spring Framework中的WebFluxConfigurationSupport中会被注册。
@Bean
public SimpleHandlerAdapter simpleHandlerAdapter() {
return new SimpleHandlerAdapter();
}
至于WebFluxConfigurationSupport中的bean方法是怎么生效的,请参考Spring Boot中HttpHandler的创建过程中有介绍。简单来说,就是Spring Boot中会导入一个自动配置类,而它又是WebFluxConfigurationSupport的子类。
执行handler
public class SimpleHandlerAdapter implements HandlerAdapter {
@Override
public boolean supports(Object handler) {
return WebHandler.class.isAssignableFrom(handler.getClass());
}
@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
WebHandler webHandler = (WebHandler) handler;
Mono<Void> mono = webHandler.handle(exchange);
return mono.then(Mono.empty());
}
}
可以看到,该适配器支持执行所有的WebHandler,而上面RoutePredicateHandlerMapping返回的是FilteringWebHandler,而这就正好是一个WebHandler的实现类,所以能够被SimpleHandlerAdapter处理。
FilteringWebHandler
注意,该类是Spring Cloud Gateway中提供的类,Spring Framework中也有一个同名类,不要搞混了。前者负责执行Spring Cloud Gateway中的GlobalFilter和GatewayFilter,而后者负责执行bean工厂中所有的WebFilter。
注册
Spring在创建RoutePredicateHandlerMapping的时候,就需要先获取FilteringWebHandler类型的bean。同样地,在GatewayAutoConfiguration中会注册FilteringWebHandler类型的bean。
/*
* 会将所有GlobalFilter类型的bean封装起来,注意这里的FilteringWebHandler和Spring Framework中的同名类不是同一个,
* 这里的是gateway模块定义的,要注意区分。
*/
@Bean
@ConditionalOnMissingBean
public FilteringWebHandler filteringWebHandler(List<GlobalFilter> globalFilters) {
return new FilteringWebHandler(globalFilters);
}
创建和初始化
private final List<GatewayFilter> globalFilters;
public FilteringWebHandler(List<GlobalFilter> globalFilters) {
this.globalFilters = loadFilters(globalFilters);
}
private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {
return filters.stream().map(filter -> {
// 通过适配器将GlobalFilter适配为GatewayFilter
GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);
// 如果过滤器实现了Ordered接口,则将GatewayFilter封装为OrderedGatewayFilter
if (filter instanceof Ordered) {
int order = ((Ordered) filter).getOrder();
return new OrderedGatewayFilter(gatewayFilter, order);
}
return gatewayFilter;
}).collect(Collectors.toList());
}
在创建的时候,会传入bean工厂中所有的GlobalFilter,并为每个过滤器创建适配器,目的是统一类型为GatewayFilter。
private static class GatewayFilterAdapter implements GatewayFilter {
private final GlobalFilter delegate;
GatewayFilterAdapter(GlobalFilter delegate) {
this.delegate = delegate;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return this.delegate.filter(exchange, chain);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("GatewayFilterAdapter{");
sb.append("delegate=").append(delegate);
sb.append('}');
return sb.toString();
}
}
GlobalFilter针对所有的路由,而GatewayFilter针对特定的路由。
处理请求
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
// 从上下文中获取路由信息
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
// 获取到该路由的所有filter
List<GatewayFilter> gatewayFilters = route.getFilters();
// 和全局过滤器合并在一起
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
// 合并两种filter
combined.addAll(gatewayFilters);
// TODO: needed or cached?
// 对filter进行排序
AnnotationAwareOrderComparator.sort(combined);
if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}
// 创建filter链并执行
return new DefaultGatewayFilterChain(combined).filter(exchange);
}
该方法中先从路由中获取GatewayFilter,然后和全局过滤器进行合并和排序,最后封装到过滤器链中,并执行过滤器链。
private static class DefaultGatewayFilterChain implements GatewayFilterChain {
private final int index;
private final List<GatewayFilter> filters;
DefaultGatewayFilterChain(List<GatewayFilter> filters) {
this.filters = filters;
this.index = 0;
}
private DefaultGatewayFilterChain(DefaultGatewayFilterChain parent, int index) {
this.filters = parent.getFilters();
this.index = index;
}
public List<GatewayFilter> getFilters() {
return filters;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
// 判断是否已经执行过所有的过滤器了
if (this.index < filters.size()) {
// 取出当前所要执行的过滤器
GatewayFilter filter = filters.get(this.index);
// 对于每个filter,都创建一个DefaultGatewayFilterChain
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
// 递归调用过滤器
return filter.filter(exchange, chain);
}
else {
return Mono.empty(); // complete
}
});
}
}
正常的过滤链操作,调用每个过滤器。Spring Cloud Gateway的操作都是封装在一个个的过滤器中的。本文主要是分析大体上请求的处理流程,而不会分析具体的过滤逻辑。