Nacos服务端gRPC服务的启动过程
Nacos的服务端整合了Spring Boot框架,其利用了Spring中的一些扩展点来实现启动过程中需要完成的操作。本文会分析Nacos是怎么创建和启动gRPC服务的,以及是怎么扫描、注册和映射处理请求的handler的。
创建和启动server
BaseGrpcServer是Nacos中的gRPC的主要实现,其上下继承体系如下图所示。
- BaseRpcServer:定义了启动server的扩展点。
- BaseGrpcServer:是主要实现,包括创建server、服务注册和启动server。
- GrpcSdkServer和GrpcClusterServer:实现了在Spring中的bean定义,以及返回不同的端口和线程池。
注册bean
GrpcSdkServer
GrpcClusterServer
java
core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcSdkServer.java
@Service
public class GrpcSdkServer extends BaseGrpcServer {
private static final int PORT_OFFSET = 1000;
@Override
public int rpcPortOffset() {
return PORT_OFFSET;
}
@Override
public ThreadPoolExecutor getRpcExecutor() {
return GlobalExecutor.sdkRpcExecutor;
}
}
java
core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcClusterServer.java
@Service
public class GrpcClusterServer extends BaseGrpcServer {
private static final int PORT_OFFSET = 1001;
@Override
public int rpcPortOffset() {
return PORT_OFFSET;
}
@Override
public ThreadPoolExecutor getRpcExecutor() {
if (!GlobalExecutor.clusterRpcExecutor.allowsCoreThreadTimeOut()) {
// 允许核心线程超时退出
GlobalExecutor.clusterRpcExecutor.allowCoreThreadTimeOut(true);
}
return GlobalExecutor.clusterRpcExecutor;
}
}
两者定义了在基础端口(默认是8448)上的不同偏移(下面会分析到这个偏移是怎么被用到的),以及返回不同的线程池。最重要的是两者类上被@Service注解修饰,所以会被Spring自动扫描并注册和创建。
启动扩展点
要启动gRPC服务,那要有一个调用点吧,这个调用点定义在BaseRpcServer中,利用的是Spring中的@PostConstruct扩展点。
java
core/src/main/java/com/alibaba/nacos/core/remote/BaseRpcServer.java
@PostConstruct
public void start() throws Exception {
// 获取类名
String serverName = getClass().getSimpleName();
// 打印日志,说明启动的哪种server实现
Loggers.REMOTE.info("Nacos {} Rpc server starting at port {}", serverName, getServicePort());
// 启动server
startServer();
Loggers.REMOTE.info("Nacos {} Rpc server started at port {}", serverName, getServicePort());
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Loggers.REMOTE.info("Nacos {} Rpc server stopping", serverName);
try {
BaseRpcServer.this.stopServer();
Loggers.REMOTE.info("Nacos {} Rpc server stopped successfully...", serverName);
} catch (Exception e) {
Loggers.REMOTE.error("Nacos {} Rpc server stopped fail...", serverName, e);
}
}));
}
public abstract void startServer() throws Exception;
上面的start方法被@PostConstruct注解修饰,会在bean创建后被调用。主要的逻辑是调用了startServer方法,这是一个模板方法,在子类BaseGrpcServer类中实现了该方法。
另外,还打印了一条日志,说明启动的server类型及端口。这里的端口会获取到子类定义的端口偏移。
getServicePort
getPort
java
core/src/main/java/com/alibaba/nacos/core/remote/BaseRpcServer.java
public int getServicePort() {
return EnvUtil.getPort() + rpcPortOffset();
}
public abstract int rpcPortOffset();
java
sys/src/main/java/com/alibaba/nacos/sys/env/EnvUtil.java
private static final String SERVER_PORT_PROPERTY = "server.port";
private static final int DEFAULT_SERVER_PORT = 8848;
public static int getPort() {
if (port == -1) {
// 获取server.port属性,DEFAULT_SERVER_PORT的值为8848
port = getProperty(SERVER_PORT_PROPERTY, Integer.class, DEFAULT_SERVER_PORT);
}
return port;
}
在logs目录下的remote-core.log中会打印如下内容:
log
2024-10-31 17:35:22,698 INFO Nacos GrpcSdkServer Rpc server starting at port 9848
2024-10-31 17:35:22,974 INFO Nacos GrpcClusterServer Rpc server starting at port 9849
服务注册
java
core/src/main/java/com/alibaba/nacos/core/remote/grpc/BaseGrpcServer.java
@Override
public void startServer() throws Exception {
/*
* 该方法使用的是建造者设计模式,建造者无非就是往这个对象里面设置一些属性,最后用调用build方法创建出来
*/
final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
// server interceptor to set connection id.
// 创建服务端拦截器
ServerInterceptor serverInterceptor = new ServerInterceptor() {
// 拦截调用请求
@Override
public <T, S> ServerCall.Listener<T> interceptCall(
ServerCall<T, S> call, // 代表入站的调用
Metadata headers, // 调用相关的元数据
ServerCallHandler<T, S> next // 处理调用的下一个handler
) {
// 获取上下文
Context ctx = Context.current() // 获取当前的上下文
// 设置一些属性到上下文中
// 设置连接id
.withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID))
// 设置远端IP
.withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP))
// 设置远端端口
.withValue(CONTEXT_KEY_CONN_REMOTE_PORT, call.getAttributes().get(TRANS_KEY_REMOTE_PORT))
// 设置本地端口
.withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT));
// 判断被调用的服务是不是BiRequestStream
if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) {
// 获取内部的通道
Channel internalChannel = getInternalChannel(call);
ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel);
}
// 拦截调用
return Contexts.interceptCall(ctx, call, headers, next);
}
};
// 添加服务,也就是进行请求服务映射
addServices(handlerRegistry, serverInterceptor);
// 构建server
server = ServerBuilder
// 设置端口
.forPort(getServicePort())
// 设置线程池
.executor(getRpcExecutor())
// 设置最大入栈消息大小
.maxInboundMessageSize(getInboundMessageSize())
.fallbackHandlerRegistry(handlerRegistry)
.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
// 添加通信过滤器
.addTransportFilter(new ServerTransportFilter() {
@Override
public Attributes transportReady(Attributes transportAttrs) {
// 获取请求方地址
InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs
.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
// 获取本地地址
InetSocketAddress localAddress = (InetSocketAddress) transportAttrs
.get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
// 获取请求方端口
int remotePort = remoteAddress.getPort();
// 获取本地端口
int localPort = localAddress.getPort();
// 获取请求方的IP
String remoteIp = remoteAddress.getAddress().getHostAddress();
Attributes attrWrapper = transportAttrs.toBuilder()
// 设置连接id,在上面的interceptor中会将TRANS_KEY_CONN_ID属性转为CONTEXT_KEY_CONN_ID
.set(TRANS_KEY_CONN_ID, System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort)
.set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort)
.set(TRANS_KEY_LOCAL_PORT, localPort).build();
// 获取连接id
String connectionId = attrWrapper.get(TRANS_KEY_CONN_ID);
Loggers.REMOTE_DIGEST.info("Connection transportReady,connectionId = {} ", connectionId);
return attrWrapper;
}
@Override
public void transportTerminated(Attributes transportAttrs) {
String connectionId = null;
try {
// 获取连接id
connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
} catch (Exception e) {
// Ignore
}
if (StringUtils.isNotBlank(connectionId)) {
Loggers.REMOTE_DIGEST
.info("Connection transportTerminated,connectionId = {} ", connectionId);
// 注销连接
connectionManager.unregister(connectionId);
}
}
}).build();
// 启动server
server.start();
}
该方法比较长,但也就是做了下面几件事情:
- 创建gRPC的拦截器;
- 注册服务到拦截器中;
- 构建server对象;
- 启动server;
server的创建和启动是gRPC中的实现,本文主要关注服务的注册,即addServices方法。
java
core/src/main/java/com/alibaba/nacos/core/remote/grpc/BaseGrpcServer.java
private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {
// unary common call register.
// 创建一元RPC请求方法
final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder()
// 设置通信方式为一元
.setType(MethodDescriptor.MethodType.UNARY)
// 设置方法名称
.setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_SERVICE_NAME, REQUEST_METHOD_NAME))
// 设置序列化和反序列化器
.setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
// grpc请求服务处理映射逻辑,主要关注grpcCommonRequestAcceptor.request
final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls
.asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));
// 创建一元service,添加方法及其handler
final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(REQUEST_SERVICE_NAME)
.addMethod(unaryPayloadMethod, payloadHandler).build();
// 添加一元服务
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));
// bi stream register.
// 创建双向流处理器
final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall(
(responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));
// 创建双向流方法
final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
// 设置通信方式为双向流
.setType(MethodDescriptor.MethodType.BIDI_STREAMING)
// 设置方法名
.setFullMethodName(MethodDescriptor
.generateFullMethodName(REQUEST_BI_STREAM_SERVICE_NAME, REQUEST_BI_STREAM_METHOD_NAME))
// 设置序列化和反序列化器
.setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build()))
.setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
// 创建双向流service
final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition
.builder(REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build();
// 添加双向流服务
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));
}
该方法中注册了两种服务:一元(Unary)服务和双向流(BiStream)服务。每种服务的注册过程如下:
- 创建请求的方法;
- 创建处理请求的处理器:传入了一个lambda,处理请求时会被调用。两种服务使用的acceptor不同。
- 根据方法和处理器创建服务;
- 注册服务;
至此,关于Nacos中gRPC服务启动流程部分内容已分析完成。