Spring Cloud Gateway中的请求处理流程

Spring Cloud Gateway是基于Spring WebFlux的,使用了ReactorNetty框架和Netty作为底层服务器的实现。Spring Cloud Gateway和WebFlux的关联点在RoutePredicateHandlerMappingDispatcherHandler会调用它来获取handler方法。本文就以此为入口点来分析Spring Cloud Gateway的请求处理流程。


本文不会一次性介绍所有组件是怎么来的,避免增加复杂度。而是先顺着流程分析,然后遇到什么组件才介绍该组件是怎么来的。

RoutePredicateHandlerMapping

注册

GatewayAutoConfiguration中,会注册RoutePredicateHandlerMapping类型的bean,在DispatcherHandler中会被调用。

java
spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/config/GatewayAutoConfiguration.java
/*
 * 这里注入的是RoutePredicateHandlerMapping,
 * 但SpringBoot中的WebFluxAutoConfiguration类中会自动引入RequestMappingHandlerMapping类,
 * 那么在DispatcherHandler中的handler方法中处理请求时,会不会把请求交给后者来处理?
 *
 * 虽然会自动引入RequestMappingHandlerMapping,但是在DispatcherHandler的handle方法中可以看到,会调用每个handlerMapping的getHandler方法,
 * 然后选取第一个来处理请求。
 *
 * webHandler是在上面filteringWebHandler方法中创建的。
 * routerLocator参数是名称为cachedCompositeRouteLocator的bean,参考上面的cachedCompositeRouteLocator方法。
 */
@Bean
@ConditionalOnMissingBean
public RoutePredicateHandlerMapping routePredicateHandlerMapping(FilteringWebHandler webHandler,
        RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties, Environment environment) {
    return new RoutePredicateHandlerMapping(webHandler, routeLocator, globalCorsProperties, environment);
}

获取handler

java
spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/handler/RoutePredicateHandlerMapping.java
@Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
    // don't handle requests on management port if set and different than server port
    if (this.managementPortType == DIFFERENT && this.managementPort != null
            && exchange.getRequest().getLocalAddress() != null
            && exchange.getRequest().getLocalAddress().getPort() == this.managementPort) {
        return Mono.empty();
    }
    exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());

    return Mono.deferContextual(contextView -> {
        exchange.getAttributes().put(GATEWAY_REACTOR_CONTEXT_ATTR, contextView);
        return lookupRoute(exchange) // 查找路由
                // .log("route-predicate-handler-mapping", Level.FINER) //name this
                .flatMap((Function<Route, Mono<?>>) r -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
                    }

                    // 将路由信息放入上下文中
                    exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
                    // 返回FilteringWebHandler给DispatcherHandler
                    return Mono.just(webHandler);
                }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isTraceEnabled()) {
                        logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
                    }
                })));
    });
}

该方法的关键点就在于两步:

  • 查找路由;
  • 返回FilteringWebHandler

查找路由

java
spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/handler/RoutePredicateHandlerMapping.java
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
    return this.routeLocator.getRoutes() // 通过路由定位器获取到所有的路由
            // individually filter routes so that filterWhen error delaying is not a
            // problem
            .concatMap(route -> Mono.just(route)
                    // 过滤筛选路由
                    .filterWhen(r -> {
                        // add the current route we are testing
                        exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
                        // 根据断言获取真正的路由
                        return r.getPredicate().apply(exchange);
                    })
                    // instead of immediately stopping main flux due to error, log and
                    // swallow it
                    .doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
                    .onErrorResume(e -> Mono.empty()))
            // .defaultIfEmpty() put a static Route not found
            // or .switchIfEmpty()
            // .switchIfEmpty(Mono.<Route>empty().log("noroute"))
            .next()
            // TODO: error handling
            .map(route -> {
                if (logger.isDebugEnabled()) {
                    logger.debug("Route matched: " + route.getId());
                }
                // 校验路由
                validateRoute(route, exchange);
                return route;
            });

    /*
     * TODO: trace logging if (logger.isTraceEnabled()) {
     * logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); }
     */
}

该方法中的主要逻辑是先获取路由定位器中的所有路由信息,然后通过断言来进行过滤。

SimpleHandlerAdapter

注册

Spring Cloud Gateway中使用的是这个适配器,该适配器默认情况下就是会被注册的。在Spring Framework中的WebFluxConfigurationSupport中会被注册。

java
spring-webflux/src/main/java/org/springframework/web/reactive/config/WebFluxConfigurationSupport.java
@Bean
public SimpleHandlerAdapter simpleHandlerAdapter() {
    return new SimpleHandlerAdapter();
}

至于WebFluxConfigurationSupport中的bean方法是怎么生效的,请参考Spring Boot中HttpHandler的创建过程中有介绍。简单来说,就是Spring Boot中会导入一个自动配置类,而它又是WebFluxConfigurationSupport的子类。

执行handler

java
spring-webflux/src/main/java/org/springframework/web/reactive/result/SimpleHandlerAdapter.java
public class SimpleHandlerAdapter implements HandlerAdapter {

	@Override
	public boolean supports(Object handler) {
		return WebHandler.class.isAssignableFrom(handler.getClass());
	}

	@Override
	public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
		WebHandler webHandler = (WebHandler) handler;
		Mono<Void> mono = webHandler.handle(exchange);
		return mono.then(Mono.empty());
	}

}

可以看到,该适配器支持执行所有的WebHandler,而上面RoutePredicateHandlerMapping返回的是FilteringWebHandler,而这就正好是一个WebHandler的实现类,所以能够被SimpleHandlerAdapter处理。

FilteringWebHandler

注意,该类是Spring Cloud Gateway中提供的类,Spring Framework中也有一个同名类,不要搞混了。前者负责执行Spring Cloud Gateway中的GlobalFilterGatewayFilter,而后者负责执行bean工厂中所有的WebFilter

注册

Spring在创建RoutePredicateHandlerMapping的时候,就需要先获取FilteringWebHandler类型的bean。同样地,在GatewayAutoConfiguration中会注册FilteringWebHandler类型的bean。

java
spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/config/GatewayAutoConfiguration.java
/*
 * 会将所有GlobalFilter类型的bean封装起来,注意这里的FilteringWebHandler和Spring Framework中的同名类不是同一个,
 * 这里的是gateway模块定义的,要注意区分。
 */
@Bean
@ConditionalOnMissingBean
public FilteringWebHandler filteringWebHandler(List<GlobalFilter> globalFilters) {
    return new FilteringWebHandler(globalFilters);
}

创建和初始化

java
spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/handler/FilteringWebHandler.java
private final List<GatewayFilter> globalFilters;
public FilteringWebHandler(List<GlobalFilter> globalFilters) {
    this.globalFilters = loadFilters(globalFilters);
}
private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {
    return filters.stream().map(filter -> {
        // 通过适配器将GlobalFilter适配为GatewayFilter
        GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);

        // 如果过滤器实现了Ordered接口,则将GatewayFilter封装为OrderedGatewayFilter
        if (filter instanceof Ordered) {
            int order = ((Ordered) filter).getOrder();
            return new OrderedGatewayFilter(gatewayFilter, order);
        }
        return gatewayFilter;
    }).collect(Collectors.toList());
}

在创建的时候,会传入bean工厂中所有的GlobalFilter,并为每个过滤器创建适配器,目的是统一类型为GatewayFilter

java
spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/handler/FilteringWebHandler.java
private static class GatewayFilterAdapter implements GatewayFilter {

    private final GlobalFilter delegate;

    GatewayFilterAdapter(GlobalFilter delegate) {
        this.delegate = delegate;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        return this.delegate.filter(exchange, chain);
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("GatewayFilterAdapter{");
        sb.append("delegate=").append(delegate);
        sb.append('}');
        return sb.toString();
    }

}

GlobalFilter针对所有的路由,而GatewayFilter针对特定的路由。

处理请求

java
spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/handler/FilteringWebHandler.java
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
    // 从上下文中获取路由信息
    Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
    // 获取到该路由的所有filter
    List<GatewayFilter> gatewayFilters = route.getFilters();

    // 和全局过滤器合并在一起
    List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
    // 合并两种filter
    combined.addAll(gatewayFilters);
    // TODO: needed or cached?
    // 对filter进行排序
    AnnotationAwareOrderComparator.sort(combined);

    if (logger.isDebugEnabled()) {
        logger.debug("Sorted gatewayFilterFactories: " + combined);
    }

    // 创建filter链并执行
    return new DefaultGatewayFilterChain(combined).filter(exchange);
}

该方法中先从路由中获取GatewayFilter,然后和全局过滤器进行合并和排序,最后封装到过滤器链中,并执行过滤器链。

java
spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/handler/FilteringWebHandler.java
private static class DefaultGatewayFilterChain implements GatewayFilterChain {

    private final int index;

    private final List<GatewayFilter> filters;

    DefaultGatewayFilterChain(List<GatewayFilter> filters) {
        this.filters = filters;
        this.index = 0;
    }

    private DefaultGatewayFilterChain(DefaultGatewayFilterChain parent, int index) {
        this.filters = parent.getFilters();
        this.index = index;
    }

    public List<GatewayFilter> getFilters() {
        return filters;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange) {
        return Mono.defer(() -> {
            // 判断是否已经执行过所有的过滤器了
            if (this.index < filters.size()) {
                // 取出当前所要执行的过滤器
                GatewayFilter filter = filters.get(this.index);
                // 对于每个filter,都创建一个DefaultGatewayFilterChain
                DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
                // 递归调用过滤器
                return filter.filter(exchange, chain);
            }
            else {
                return Mono.empty(); // complete
            }
        });
    }

}

正常的过滤链操作,调用每个过滤器。Spring Cloud Gateway的操作都是封装在一个个的过滤器中的。本文主要是分析大体上请求的处理流程,而不会分析具体的过滤逻辑。