Nacos中GrpcRequestAcceptor的工作原理

Nacos中服务端gRPC服务的启动过程一文中,介绍到gRPC启动之前会进行服务注册。当接收到服务时,会调用handler来处理。当接收到一元服务时,所要执行的handler是通过本文介绍的GrpcRequestAcceptor来处理的,本文来分析一下它是怎么工作的。


request方法

该方法的主要逻辑是根据请求类型找到对应的处理器,然后调用处理器来处理请求。

v2.1.0
java
core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcRequestAcceptor.java
@Autowired
RequestHandlerRegistry requestHandlerRegistry;
    // server check.
    // 特殊类型的请求,没有为该类型设置handler,所以这里特殊处理
    if (ServerCheckRequest.class.getSimpleName().equals(type)) {
        // 获取连接id,并设置到响应中
        Payload serverCheckResponseP = GrpcUtils.convert(new ServerCheckResponse(CONTEXT_KEY_CONN_ID.get()));
        traceIfNecessary(serverCheckResponseP, false);
        responseObserver.onNext(serverCheckResponseP);
        responseObserver.onCompleted();
        return;
    }



    // 根据type查找handler对象来处理请求
    RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
    //no handler found.
    // 如果没找到处理器,则响应错误
    if (requestHandler == null) {
        Loggers.REMOTE_DIGEST.warn(String.format("[%s] No handler for request type : %s :", "grpc", type));
        Payload payloadResponse = GrpcUtils
                .convert(ErrorResponse.build(NacosException.NO_HANDLER, "RequestHandler Not Found"));
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
        return;
    }
    
    //check connection status.
    // 获取连接id
    String connectionId = CONTEXT_KEY_CONN_ID.get();
    // 检查连接id是否有效
    boolean requestValid = connectionManager.checkValid(connectionId);
    if (!requestValid) {
        Loggers.REMOTE_DIGEST
                .warn("[{}] Invalid connection Id ,connection [{}] is un registered ,", "grpc", connectionId);
        Payload payloadResponse = GrpcUtils
                .convert(ErrorResponse.build(NacosException.UN_REGISTER, "Connection is unregistered."));
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
        return;
    }
    
    Object parseObj = null;
    try {
        // 解析请求
        parseObj = GrpcUtils.parse(grpcRequest);
    } catch (Exception e) {
        Loggers.REMOTE_DIGEST
                .warn("[{}] Invalid request receive from connection [{}] ,error={}", "grpc", connectionId, e);
        Payload payloadResponse = GrpcUtils.convert(ErrorResponse.build(NacosException.BAD_GATEWAY, e.getMessage()));
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
        return;
    }

    // 无效请求
    if (parseObj == null) {
        Loggers.REMOTE_DIGEST.warn("[{}] Invalid request receive  ,parse request is null", connectionId);
        Payload payloadResponse = GrpcUtils
                .convert(ErrorResponse.build(NacosException.BAD_GATEWAY, "Invalid request"));
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
        return;
    }

    // 如果请求体不是Request对象,则响应错误
    if (!(parseObj instanceof Request)) {
        Loggers.REMOTE_DIGEST
                .warn("[{}] Invalid request receive  ,parsed payload is not a request,parseObj={}", connectionId,
                        parseObj);
        Payload payloadResponse = GrpcUtils
                .convert(ErrorResponse.build(NacosException.BAD_GATEWAY, "Invalid request"));
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
        return;
    }

    // 转为Request对象
    Request request = (Request) parseObj;
    try {
        // 获取连接对象
        Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
        // 创建并设置一些请求元数据
        RequestMeta requestMeta = new RequestMeta();
        requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
        requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
        requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
        requestMeta.setLabels(connection.getMetaInfo().getLabels());
        // 刷新客户端的存活时间
        connectionManager.refreshActiveTime(requestMeta.getConnectionId());
        /*
         * 调用handler来处理请求,
         * 这里的handleRequest方法是RequestHandler中的方法,内部会调用过滤器,然后再调用各个子类实现的handle方法。
         */
        Response response = requestHandler.handleRequest(request, requestMeta);
        // 对响应对象进行转换
        Payload payloadResponse = GrpcUtils.convert(response);
        traceIfNecessary(payloadResponse, false);
        // 设置响应
        responseObserver.onNext(payloadResponse);
        // 结束请求处理
        responseObserver.onCompleted();
    } catch (Throwable e) {
        Loggers.REMOTE_DIGEST
                .error("[{}] Fail to handle request from connection [{}] ,error message :{}", "grpc", connectionId,
                        e);
        Payload payloadResponse = GrpcUtils.convert(ErrorResponse.build(
                (e instanceof NacosException) ? ((NacosException) e).getErrCode() : ResponseCode.FAIL.getCode(),
                e.getMessage()));
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
    }
    
}

是通过属性requestHandlerRegistry来获取的handler,这是一个被@Service注解修饰的类,另外还是一个应用监听器,在Spring应用上下文刷新的时候会调用它来注册处理器。

RequestHandlerRegistry

该类很简单,当监听到容器上下文刷新事件时,会从bean工厂中获取RequestHandler类型的bean,然后将其添加到registryHandlers集合中。

v2.1.0
java
core/src/main/java/com/alibaba/nacos/core/remote/RequestHandlerRegistry.java
@Service
public class RequestHandlerRegistry implements ApplicationListener<ContextRefreshedEvent> {
    
    Map<String, RequestHandler> registryHandlers = new HashMap<String, RequestHandler>();
    
    @Autowired
    private TpsMonitorManager tpsMonitorManager;
    
    /**
     * Get Request Handler By request Type.
     *
     * @param requestType see definitions  of sub constants classes of RequestTypeConstants
     * @return request handler.
     */
    public RequestHandler getByRequestType(String requestType) {
        // 从map中取,请求类型作为key
        return registryHandlers.get(requestType);
    }
    
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        // 获取所有RequestHandler类型的bean
        Map<String, RequestHandler> beansOfType = event.getApplicationContext().getBeansOfType(RequestHandler.class);
        // 获取bean集合
        Collection<RequestHandler> values = beansOfType.values();

        // 遍历所有的请求处理器
        for (RequestHandler requestHandler : values) {

            // 获取bean的类型
            Class<?> clazz = requestHandler.getClass();
            boolean skip = false;
            // 遍历判断其父类是否是RequestHandler
            while (!clazz.getSuperclass().equals(RequestHandler.class)) {
                // 遍历到Object类
                if (clazz.getSuperclass().equals(Object.class)) {
                    // 设置标记,说明当前bean的类型不是RequestHandler的子类
                    skip = true;
                    // 结束遍历
                    break;
                }
                clazz = clazz.getSuperclass();
            }
            // 跳过当前遍历到的bean
            if (skip) {
                continue;
            }
            
            try {
                // 获取handle方法
                Method method = clazz.getMethod("handle", Request.class, RequestMeta.class);
                // 如果方法上修饰了@TpsControl,第二个条件默认返回true
                if (method.isAnnotationPresent(TpsControl.class) && TpsControlConfig.isTpsControlEnabled()) {
                    // 获取@TpsControl注解
                    TpsControl tpsControl = method.getAnnotation(TpsControl.class);
                    // 获取请求的动作类型
                    String pointName = tpsControl.pointName();
                    // 创建监控点对象
                    TpsMonitorPoint tpsMonitorPoint = new TpsMonitorPoint(pointName);
                    // 注册tps控制点
                    tpsMonitorManager.registerTpsControlPoint(tpsMonitorPoint);
                }
            } catch (Exception e) {
                //ignore.
            }
            // 获取其直接父类的第一个泛型实参,即请求的类型
            Class tClass = (Class) ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments()[0];
            // 注册处理器
            registryHandlers.putIfAbsent(tClass.getSimpleName(), requestHandler);
        }
    }
}

RequestHandler

上面说到,会调用该类的handleRequest方法来处理请求。在该方法中,主要是实现了过滤器的逻辑,然后再调用handle模板方法,让子类来处理。

v2.1.0
java
core/src/main/java/com/alibaba/nacos/core/remote/RequestHandler.java
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class RequestHandler<T extends Request, S extends Response> {
    
    @Autowired
    private RequestFilters requestFilters;
    
    /**
     * Handler request.
     *
     * @param request request
     * @param meta    request meta data
     * @return response
     * @throws NacosException nacos exception when handle request has problem.
     */
    public Response handleRequest(T request, RequestMeta meta) throws NacosException {
        // 遍历请求过滤器
        for (AbstractRequestFilter filter : requestFilters.filters) {
            try {
                // 过滤请求
                Response filterResult = filter.filter(request, meta, this.getClass());
                // 如果过滤器没有成功执行,以及返回了结果,
                if (filterResult != null && !filterResult.isSuccess()) {
                    // 则直接返回过滤器的结果
                    return filterResult;
                }
            } catch (Throwable throwable) {
                Loggers.REMOTE.error("filter error", throwable);
            }
            
        }
        // 调用具体的handler的handle方法
        return handle(request, meta);
    }
    
    /**
     * Handler request.
     *
     * @param request request
     * @param meta    request meta data
     * @return response
     * @throws NacosException nacos exception when handle request has problem.
     */
    public abstract S handle(T request, RequestMeta meta) throws NacosException;
    
}

RequestHandler有很多实现类,包括但不限于:

  • InstanceRequestHandler:负责服务注册和下线。
  • ConfigPublishRequestHandler:负责配置发布,当我们在控制台添加配置点击保存后请求会指定到这里。
  • ConfigQueryRequestHandler:负责获取配置。

如果对哪部分感兴趣,就分析对应handler的handle方法。