Nacos服务端gRPC服务的启动过程

Nacos的服务端整合了Spring Boot框架,其利用了Spring中的一些扩展点来实现启动过程中需要完成的操作。本文会分析Nacos是怎么创建和启动gRPC服务的,以及是怎么扫描、注册和映射处理请求的handler的。


创建和启动server

BaseGrpcServer是Nacos中的gRPC的主要实现,其上下继承体系如下图所示。

  • BaseRpcServer:定义了启动server的扩展点。
  • BaseGrpcServer:是主要实现,包括创建server、服务注册和启动server。
  • GrpcSdkServerGrpcClusterServer:实现了在Spring中的bean定义,以及返回不同的端口和线程池。

注册bean

v2.1.0
GrpcSdkServer
GrpcClusterServer
<
>
java
core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcSdkServer.java
@Service
public class GrpcSdkServer extends BaseGrpcServer {
    
    private static final int PORT_OFFSET = 1000;
    
    @Override
    public int rpcPortOffset() {
        return PORT_OFFSET;
    }
    
    @Override
    public ThreadPoolExecutor getRpcExecutor() {
        return GlobalExecutor.sdkRpcExecutor;
    }
}
java
core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcClusterServer.java
@Service
public class GrpcClusterServer extends BaseGrpcServer {
    
    private static final int PORT_OFFSET = 1001;
    
    @Override
    public int rpcPortOffset() {
        return PORT_OFFSET;
    }
    
    @Override
    public ThreadPoolExecutor getRpcExecutor() {
        if (!GlobalExecutor.clusterRpcExecutor.allowsCoreThreadTimeOut()) {
            // 允许核心线程超时退出
            GlobalExecutor.clusterRpcExecutor.allowCoreThreadTimeOut(true);
        }
        return GlobalExecutor.clusterRpcExecutor;
    }
}

两者定义了在基础端口(默认是8448)上的不同偏移(下面会分析到这个偏移是怎么被用到的),以及返回不同的线程池。最重要的是两者类上被@Service注解修饰,所以会被Spring自动扫描并注册和创建。

启动扩展点

要启动gRPC服务,那要有一个调用点吧,这个调用点定义在BaseRpcServer中,利用的是Spring中的@PostConstruct扩展点。

v2.1.0
java
core/src/main/java/com/alibaba/nacos/core/remote/BaseRpcServer.java
@PostConstruct
public void start() throws Exception {
    // 获取类名
    String serverName = getClass().getSimpleName();
    // 打印日志,说明启动的哪种server实现
    Loggers.REMOTE.info("Nacos {} Rpc server starting at port {}", serverName, getServicePort());

    // 启动server
    startServer();

    Loggers.REMOTE.info("Nacos {} Rpc server started at port {}", serverName, getServicePort());
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        Loggers.REMOTE.info("Nacos {} Rpc server stopping", serverName);
        try {
            BaseRpcServer.this.stopServer();
            Loggers.REMOTE.info("Nacos {} Rpc server stopped successfully...", serverName);
        } catch (Exception e) {
            Loggers.REMOTE.error("Nacos {} Rpc server stopped fail...", serverName, e);
        }
    }));

}
public abstract void startServer() throws Exception;

上面的start方法被@PostConstruct注解修饰,会在bean创建后被调用。主要的逻辑是调用了startServer方法,这是一个模板方法,在子类BaseGrpcServer类中实现了该方法。

另外,还打印了一条日志,说明启动的server类型及端口。这里的端口会获取到子类定义的端口偏移。

v2.1.0
getServicePort
getPort
<
>
java
core/src/main/java/com/alibaba/nacos/core/remote/BaseRpcServer.java
public int getServicePort() {
    return EnvUtil.getPort() + rpcPortOffset();
}
public abstract int rpcPortOffset();
java
sys/src/main/java/com/alibaba/nacos/sys/env/EnvUtil.java
private static final String SERVER_PORT_PROPERTY = "server.port";
private static final int DEFAULT_SERVER_PORT = 8848;
public static int getPort() {
    if (port == -1) {
        // 获取server.port属性,DEFAULT_SERVER_PORT的值为8848
        port = getProperty(SERVER_PORT_PROPERTY, Integer.class, DEFAULT_SERVER_PORT);
    }
    return port;
}

在logs目录下的remote-core.log中会打印如下内容:

log
2024-10-31 17:35:22,698 INFO Nacos GrpcSdkServer Rpc server starting at port 9848
2024-10-31 17:35:22,974 INFO Nacos GrpcClusterServer Rpc server starting at port 9849

服务注册

v2.1.0
java
core/src/main/java/com/alibaba/nacos/core/remote/grpc/BaseGrpcServer.java
@Override
public void startServer() throws Exception {

    /*
     * 该方法使用的是建造者设计模式,建造者无非就是往这个对象里面设置一些属性,最后用调用build方法创建出来
     */

    final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
    
    // server interceptor to set connection id.
    // 创建服务端拦截器
    ServerInterceptor serverInterceptor = new ServerInterceptor() {
        // 拦截调用请求
        @Override
        public <T, S> ServerCall.Listener<T> interceptCall(
                ServerCall<T, S> call, // 代表入站的调用
                Metadata headers, // 调用相关的元数据
                ServerCallHandler<T, S> next // 处理调用的下一个handler
        ) {
            // 获取上下文
            Context ctx = Context.current() // 获取当前的上下文
                    // 设置一些属性到上下文中
                    // 设置连接id
                    .withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID))
                    // 设置远端IP
                    .withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP))
                    // 设置远端端口
                    .withValue(CONTEXT_KEY_CONN_REMOTE_PORT, call.getAttributes().get(TRANS_KEY_REMOTE_PORT))
                    // 设置本地端口
                    .withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT));
            // 判断被调用的服务是不是BiRequestStream
            if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) {
                // 获取内部的通道
                Channel internalChannel = getInternalChannel(call);
                ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel);
            }
            // 拦截调用
            return Contexts.interceptCall(ctx, call, headers, next);
        }
    };

    // 添加服务,也就是进行请求服务映射
    addServices(handlerRegistry, serverInterceptor);

    // 构建server
    server = ServerBuilder
            // 设置端口
            .forPort(getServicePort())
            // 设置线程池
            .executor(getRpcExecutor())
            // 设置最大入栈消息大小
            .maxInboundMessageSize(getInboundMessageSize())
            .fallbackHandlerRegistry(handlerRegistry)
            .compressorRegistry(CompressorRegistry.getDefaultInstance())
            .decompressorRegistry(DecompressorRegistry.getDefaultInstance())
            // 添加通信过滤器
            .addTransportFilter(new ServerTransportFilter() {
                @Override
                public Attributes transportReady(Attributes transportAttrs) {
                    // 获取请求方地址
                    InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs
                            .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
                    // 获取本地地址
                    InetSocketAddress localAddress = (InetSocketAddress) transportAttrs
                            .get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
                    // 获取请求方端口
                    int remotePort = remoteAddress.getPort();
                    // 获取本地端口
                    int localPort = localAddress.getPort();
                    // 获取请求方的IP
                    String remoteIp = remoteAddress.getAddress().getHostAddress();
                    Attributes attrWrapper = transportAttrs.toBuilder()
                            // 设置连接id,在上面的interceptor中会将TRANS_KEY_CONN_ID属性转为CONTEXT_KEY_CONN_ID
                            .set(TRANS_KEY_CONN_ID, System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort)
                            .set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort)
                            .set(TRANS_KEY_LOCAL_PORT, localPort).build();
                    // 获取连接id
                    String connectionId = attrWrapper.get(TRANS_KEY_CONN_ID);
                    Loggers.REMOTE_DIGEST.info("Connection transportReady,connectionId = {} ", connectionId);
                    return attrWrapper;
                    
                }
                
                @Override
                public void transportTerminated(Attributes transportAttrs) {
                    String connectionId = null;
                    try {
                        // 获取连接id
                        connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
                    } catch (Exception e) {
                        // Ignore
                    }
                    if (StringUtils.isNotBlank(connectionId)) {
                        Loggers.REMOTE_DIGEST
                                .info("Connection transportTerminated,connectionId = {} ", connectionId);
                        // 注销连接
                        connectionManager.unregister(connectionId);
                    }
                }
            }).build();

    // 启动server
    server.start();
}

该方法比较长,但也就是做了下面几件事情:

  • 创建gRPC的拦截器;
  • 注册服务到拦截器中;
  • 构建server对象;
  • 启动server;

server的创建和启动是gRPC中的实现,本文主要关注服务的注册,即addServices方法。

v2.1.0
java
core/src/main/java/com/alibaba/nacos/core/remote/grpc/BaseGrpcServer.java
private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {
    
    // unary common call register.
    // 创建一元RPC请求方法
    final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder()
            // 设置通信方式为一元
            .setType(MethodDescriptor.MethodType.UNARY)
            // 设置方法名称
            .setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_SERVICE_NAME, REQUEST_METHOD_NAME))
            // 设置序列化和反序列化器
            .setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
            .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();

    // grpc请求服务处理映射逻辑,主要关注grpcCommonRequestAcceptor.request
    final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls
            .asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));

    // 创建一元service,添加方法及其handler
    final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(REQUEST_SERVICE_NAME)
            .addMethod(unaryPayloadMethod, payloadHandler).build();
    // 添加一元服务
    handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));
    
    // bi stream register.
    // 创建双向流处理器
    final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall(
            (responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));

    // 创建双向流方法
    final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
            // 设置通信方式为双向流
            .setType(MethodDescriptor.MethodType.BIDI_STREAMING)
            // 设置方法名
            .setFullMethodName(MethodDescriptor
                    .generateFullMethodName(REQUEST_BI_STREAM_SERVICE_NAME, REQUEST_BI_STREAM_METHOD_NAME))
            // 设置序列化和反序列化器
            .setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build()))
            .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();

    // 创建双向流service
    final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition
            .builder(REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build();
    // 添加双向流服务
    handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));
    
}

该方法中注册了两种服务:一元(Unary)服务和双向流(BiStream)服务。每种服务的注册过程如下:

  • 创建请求的方法;
  • 创建处理请求的处理器:传入了一个lambda,处理请求时会被调用。两种服务使用的acceptor不同。
  • 根据方法和处理器创建服务;
  • 注册服务;

至此,关于Nacos中gRPC服务启动流程部分内容已分析完成。