Nacos中GrpcRequestAcceptor的工作原理
在Nacos中服务端gRPC服务的启动过程一文中,介绍到gRPC启动之前会进行服务注册。当接收到服务时,会调用handler来处理。当接收到一元服务时,所要执行的handler是通过本文介绍的GrpcRequestAcceptor来处理的,本文来分析一下它是怎么工作的。
request方法
该方法的主要逻辑是根据请求类型找到对应的处理器,然后调用处理器来处理请求。
java
core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcRequestAcceptor.java
@Autowired
RequestHandlerRegistry requestHandlerRegistry;
// server check.
// 特殊类型的请求,没有为该类型设置handler,所以这里特殊处理
if (ServerCheckRequest.class.getSimpleName().equals(type)) {
// 获取连接id,并设置到响应中
Payload serverCheckResponseP = GrpcUtils.convert(new ServerCheckResponse(CONTEXT_KEY_CONN_ID.get()));
traceIfNecessary(serverCheckResponseP, false);
responseObserver.onNext(serverCheckResponseP);
responseObserver.onCompleted();
return;
}
// 根据type查找handler对象来处理请求
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
//no handler found.
// 如果没找到处理器,则响应错误
if (requestHandler == null) {
Loggers.REMOTE_DIGEST.warn(String.format("[%s] No handler for request type : %s :", "grpc", type));
Payload payloadResponse = GrpcUtils
.convert(ErrorResponse.build(NacosException.NO_HANDLER, "RequestHandler Not Found"));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
return;
}
//check connection status.
// 获取连接id
String connectionId = CONTEXT_KEY_CONN_ID.get();
// 检查连接id是否有效
boolean requestValid = connectionManager.checkValid(connectionId);
if (!requestValid) {
Loggers.REMOTE_DIGEST
.warn("[{}] Invalid connection Id ,connection [{}] is un registered ,", "grpc", connectionId);
Payload payloadResponse = GrpcUtils
.convert(ErrorResponse.build(NacosException.UN_REGISTER, "Connection is unregistered."));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
return;
}
Object parseObj = null;
try {
// 解析请求
parseObj = GrpcUtils.parse(grpcRequest);
} catch (Exception e) {
Loggers.REMOTE_DIGEST
.warn("[{}] Invalid request receive from connection [{}] ,error={}", "grpc", connectionId, e);
Payload payloadResponse = GrpcUtils.convert(ErrorResponse.build(NacosException.BAD_GATEWAY, e.getMessage()));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
return;
}
// 无效请求
if (parseObj == null) {
Loggers.REMOTE_DIGEST.warn("[{}] Invalid request receive ,parse request is null", connectionId);
Payload payloadResponse = GrpcUtils
.convert(ErrorResponse.build(NacosException.BAD_GATEWAY, "Invalid request"));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
return;
}
// 如果请求体不是Request对象,则响应错误
if (!(parseObj instanceof Request)) {
Loggers.REMOTE_DIGEST
.warn("[{}] Invalid request receive ,parsed payload is not a request,parseObj={}", connectionId,
parseObj);
Payload payloadResponse = GrpcUtils
.convert(ErrorResponse.build(NacosException.BAD_GATEWAY, "Invalid request"));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
return;
}
// 转为Request对象
Request request = (Request) parseObj;
try {
// 获取连接对象
Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
// 创建并设置一些请求元数据
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
requestMeta.setLabels(connection.getMetaInfo().getLabels());
// 刷新客户端的存活时间
connectionManager.refreshActiveTime(requestMeta.getConnectionId());
/*
* 调用handler来处理请求,
* 这里的handleRequest方法是RequestHandler中的方法,内部会调用过滤器,然后再调用各个子类实现的handle方法。
*/
Response response = requestHandler.handleRequest(request, requestMeta);
// 对响应对象进行转换
Payload payloadResponse = GrpcUtils.convert(response);
traceIfNecessary(payloadResponse, false);
// 设置响应
responseObserver.onNext(payloadResponse);
// 结束请求处理
responseObserver.onCompleted();
} catch (Throwable e) {
Loggers.REMOTE_DIGEST
.error("[{}] Fail to handle request from connection [{}] ,error message :{}", "grpc", connectionId,
e);
Payload payloadResponse = GrpcUtils.convert(ErrorResponse.build(
(e instanceof NacosException) ? ((NacosException) e).getErrCode() : ResponseCode.FAIL.getCode(),
e.getMessage()));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
}
}
是通过属性requestHandlerRegistry来获取的handler,这是一个被@Service注解修饰的类,另外还是一个应用监听器,在Spring应用上下文刷新的时候会调用它来注册处理器。
RequestHandlerRegistry
该类很简单,当监听到容器上下文刷新事件时,会从bean工厂中获取RequestHandler类型的bean,然后将其添加到registryHandlers集合中。
java
core/src/main/java/com/alibaba/nacos/core/remote/RequestHandlerRegistry.java
@Service
public class RequestHandlerRegistry implements ApplicationListener<ContextRefreshedEvent> {
Map<String, RequestHandler> registryHandlers = new HashMap<String, RequestHandler>();
@Autowired
private TpsMonitorManager tpsMonitorManager;
/**
* Get Request Handler By request Type.
*
* @param requestType see definitions of sub constants classes of RequestTypeConstants
* @return request handler.
*/
public RequestHandler getByRequestType(String requestType) {
// 从map中取,请求类型作为key
return registryHandlers.get(requestType);
}
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// 获取所有RequestHandler类型的bean
Map<String, RequestHandler> beansOfType = event.getApplicationContext().getBeansOfType(RequestHandler.class);
// 获取bean集合
Collection<RequestHandler> values = beansOfType.values();
// 遍历所有的请求处理器
for (RequestHandler requestHandler : values) {
// 获取bean的类型
Class<?> clazz = requestHandler.getClass();
boolean skip = false;
// 遍历判断其父类是否是RequestHandler
while (!clazz.getSuperclass().equals(RequestHandler.class)) {
// 遍历到Object类
if (clazz.getSuperclass().equals(Object.class)) {
// 设置标记,说明当前bean的类型不是RequestHandler的子类
skip = true;
// 结束遍历
break;
}
clazz = clazz.getSuperclass();
}
// 跳过当前遍历到的bean
if (skip) {
continue;
}
try {
// 获取handle方法
Method method = clazz.getMethod("handle", Request.class, RequestMeta.class);
// 如果方法上修饰了@TpsControl,第二个条件默认返回true
if (method.isAnnotationPresent(TpsControl.class) && TpsControlConfig.isTpsControlEnabled()) {
// 获取@TpsControl注解
TpsControl tpsControl = method.getAnnotation(TpsControl.class);
// 获取请求的动作类型
String pointName = tpsControl.pointName();
// 创建监控点对象
TpsMonitorPoint tpsMonitorPoint = new TpsMonitorPoint(pointName);
// 注册tps控制点
tpsMonitorManager.registerTpsControlPoint(tpsMonitorPoint);
}
} catch (Exception e) {
//ignore.
}
// 获取其直接父类的第一个泛型实参,即请求的类型
Class tClass = (Class) ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments()[0];
// 注册处理器
registryHandlers.putIfAbsent(tClass.getSimpleName(), requestHandler);
}
}
}
RequestHandler
上面说到,会调用该类的handleRequest方法来处理请求。在该方法中,主要是实现了过滤器的逻辑,然后再调用handle模板方法,让子类来处理。
java
core/src/main/java/com/alibaba/nacos/core/remote/RequestHandler.java
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class RequestHandler<T extends Request, S extends Response> {
@Autowired
private RequestFilters requestFilters;
/**
* Handler request.
*
* @param request request
* @param meta request meta data
* @return response
* @throws NacosException nacos exception when handle request has problem.
*/
public Response handleRequest(T request, RequestMeta meta) throws NacosException {
// 遍历请求过滤器
for (AbstractRequestFilter filter : requestFilters.filters) {
try {
// 过滤请求
Response filterResult = filter.filter(request, meta, this.getClass());
// 如果过滤器没有成功执行,以及返回了结果,
if (filterResult != null && !filterResult.isSuccess()) {
// 则直接返回过滤器的结果
return filterResult;
}
} catch (Throwable throwable) {
Loggers.REMOTE.error("filter error", throwable);
}
}
// 调用具体的handler的handle方法
return handle(request, meta);
}
/**
* Handler request.
*
* @param request request
* @param meta request meta data
* @return response
* @throws NacosException nacos exception when handle request has problem.
*/
public abstract S handle(T request, RequestMeta meta) throws NacosException;
}
RequestHandler有很多实现类,包括但不限于:
- InstanceRequestHandler:负责服务注册和下线。
- ConfigPublishRequestHandler:负责配置发布,当我们在控制台添加配置点击保存后请求会指定到这里。
- ConfigQueryRequestHandler:负责获取配置。
如果对哪部分感兴趣,就分析对应handler的handle方法。