Nacos中gRPC客户端的创建和启动过程

在Nacos升级到2.x之后,用gRPC替代了之前版本中的gRPC,服务注册和配置获取都会基于该客户端,所以本文就来梳理一下Nacos中的gRPC客户端的创建和启动过程。


创建过程

v2.1.0
java
common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java
public static RpcClient createClient(String clientName, ConnectionType connectionType, Map<String, String> labels) {
    return createClient(clientName, connectionType, null, null, labels);
}
public static RpcClient createClient(String clientName, ConnectionType connectionType, Integer threadPoolCoreSize,
        Integer threadPoolMaxSize, Map<String, String> labels) {
    // 如果连接类型不是GRPC
    if (!ConnectionType.GRPC.equals(connectionType)) {
        // 则抛出异常
        throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
    }

    // 如果客户端还不存在,则创建新对象并保存在map中
    return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> {
        LOGGER.info("[RpcClientFactory] create a new rpc client of " + clientName);
        try {
            // 创建GrpcClient对象
            GrpcClient client = new GrpcSdkClient(clientNameInner);
            // 设置线程核心数和最大数
            client.setThreadPoolCoreSize(threadPoolCoreSize);
            client.setThreadPoolMaxSize(threadPoolMaxSize);
            client.labels(labels);
            // 返回client对象,会被放入CLIENT_MAP中
            return client;
        } catch (Throwable throwable) {
            LOGGER.error("Error to init GrpcSdkClient for client name :" + clientName, throwable);
            throw throwable;
        }
        
    });
}

在上面的方法中,创建了GrpcSdkClient对象,并设置核心和最大线程池数量,最后将其加入缓存中。

启动过程

v2.1.0
java
common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java
public final void start() throws NacosException {

    // 第一部分:修改客户端状态
    // 利用CAS来确保客户端只启动一次
    boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
    if (!success) {
        return;
    }

    // 创建线程池
    clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
        Thread t = new Thread(r);
        t.setName("com.alibaba.nacos.client.remote.worker");
        t.setDaemon(true);
        return t;
    });

    // 第二部分:提交两个异步任务

    // connection event consumer.
    // 提交任务1,处理连接成功的情况
    clientEventExecutor.submit(() -> {
        while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
            ConnectionEvent take;
            try {
                // 连接创建成功会被往该队列中加入连接事件,所以这里取出事件来处理
                take = eventLinkedBlockingQueue.take();
                if (take.isConnected()) { // 如果是连接建立的这种情况
                    // 触发监听器执行逻辑
                    notifyConnected();
                } else if (take.isDisConnected()) { // 如果是关闭连接的这种情况
                    // 触发监听器执行逻辑
                    notifyDisConnected();
                }
            } catch (Throwable e) {
                // Do nothing
            }
        }
    });

    // 提交任务2,处理连接失败的情况
    clientEventExecutor.submit(() -> {
        while (true) {
            try {
                if (isShutdown()) {
                    break;
                }
                // 获取重连事件,这里是限时获取,而不会一直阻塞,默认最多等5秒
                ReconnectContext reconnectContext = reconnectionSignal
                        .poll(keepAliveTime, TimeUnit.MILLISECONDS);
                // 如果没有重连事件
                if (reconnectContext == null) {
                    // check alive time.
                    // 距离上次心跳检查超过了保活时间
                    if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
                        // 向服务端发起心跳检查,主要检查网络是否通畅
                        boolean isHealthy = healthCheck();
                        // 如果向服务端发起心跳失败,需要尝试重新连接
                        if (!isHealthy) { // 心跳检查失败
                            // 如果当前连接是null,则跳过
                            if (currentConnection == null) {
                                continue;
                            }
                            LoggerUtils.printIfInfoEnabled(LOGGER,
                                    "[{}] Server healthy check fail, currentConnection = {}", name,
                                    currentConnection.getConnectionId());

                            // 获取连接状态
                            RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
                            // 判断连接状态是否为已关闭,如果是则结束异步任务
                            if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
                                break;
                            }

                            // 修改连接状态为不健康
                            boolean statusFLowSuccess = RpcClient.this.rpcClientStatus
                                    .compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
                            if (statusFLowSuccess) { // CAS成功
                                // 重新赋值,这里没有continue,会继续执行下面的逻辑
                                reconnectContext = new ReconnectContext(null, false);
                            } else { // CAS失败,则下轮再来执行
                                continue;
                            }
                            
                        } else {
                            // 心跳检查成功,刷新时间
                            lastActiveTimeStamp = System.currentTimeMillis();
                            continue;
                        }
                    } else { // 上次心跳保活还在有效期中
                        continue;
                    }
                    
                }

                // 执行到这里说明reconnectContext不是null,处理重连事件,在下面调用的switchServerAsync方法中会设置为null
                if (reconnectContext.serverInfo != null) { // 不是心跳检查失败的情况
                    // clear recommend server if server is not in server list.
                    boolean serverExist = false;
                    // 判断目标服务器是否在服务器列表中
                    // 遍历服务端的列表
                    for (String server : getServerListFactory().getServerList()) {
                        // 解析服务端的信息
                        ServerInfo serverInfo = resolveServerInfo(server);
                        // 如果是目标服务端
                        if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
                            serverExist = true;
                            // 设置端口
                            reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
                            break;
                        }
                    }
                    // 如果目标服务器不在服务器列表中,则清除相应信息
                    if (!serverExist) {
                        LoggerUtils.printIfInfoEnabled(LOGGER,
                                "[{}] Recommend server is not in server list, ignore recommend server {}", name,
                                reconnectContext.serverInfo.getAddress());
                        
                        reconnectContext.serverInfo = null;
                        
                    }
                }
                // (重点)重新连接
                reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
            } catch (Throwable throwable) {
                // Do nothing
            }
        }
    });

    // 第三部分:比较关键
    
    // connect to server, try to connect to server sync RETRY_TIMES times, async starting if failed.
    // 创建的连接对象
    Connection connectToServer = null;
    // 修改客户端状态为启动中
    rpcClientStatus.set(RpcClientStatus.STARTING);

    // 重试次数,默认是3次
    int startUpRetryTimes = RETRY_TIMES;
    // 循环尝试建立连接,直到用尽重试次数或者连接成功
    while (startUpRetryTimes > 0 && connectToServer == null) {
        try {
            startUpRetryTimes--;
            // 获取下一个服务端信息,规避连不上的服务端
            ServerInfo serverInfo = nextRpcServer();
            
            LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}", name,
                    serverInfo);

            // 与服务端建立连接,建立成功的话connectToServer是会被成功赋值的,从而结束循环
            connectToServer = connectToServer(serverInfo);
        } catch (Throwable e) {
            LoggerUtils.printIfWarnEnabled(LOGGER,
                    "[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}",
                    name, e.getMessage(), startUpRetryTimes);
        }
        
    }

    // 第四部分

    if (connectToServer != null) { // 成功连接
        LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}",
                name, connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId());
        // 设置为当前连接
        this.currentConnection = connectToServer;
        // 修改客户端状态为运行中
        rpcClientStatus.set(RpcClientStatus.RUNNING);
        // 添加连接事件
        eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
    } else {
        // 如果启动失败的话,则转为异步地尝试连接
        switchServerAsync();
    }

    // 注册两个服务端请求的处理器

    registerServerRequestHandler(new ConnectResetRequestHandler());
    
    // register client detection request.
    registerServerRequestHandler(request -> {
        if (request instanceof ClientDetectionRequest) {
            return new ClientDetectionResponse();
        }
        
        return null;
    });
    
}

该方法比较长,可以分为四部分:

  • 修改客户端状态,确保只能启动一次;
  • 提交两个异步任务:
    • 任务1:处理连接成功的情况;
    • 任务2:处理连接失败的情况;
  • 建立与服务端的连接;
  • 根据连接建立成功与否,会发布不同的事件,由第二步中的两个任务来处理,另外还会注册两个处理来自服务端请求的处理器;

前面3个部分比较复杂,所以下面单独介绍,第四部分比较简单,先来看看请求处理器的注册逻辑。

设置请求处理器

客户端是会收到服务端主动发送来的请求的,比如服务端一些信息发生了更改需要同步到各个客户端,所以需要设置请求处理器。

v2.1.0
java
common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java
protected List<ServerRequestHandler> serverRequestHandlers = new ArrayList<>();
public synchronized void registerServerRequestHandler(ServerRequestHandler serverRequestHandler) {
    LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Register server push request handler:{}", name,
            serverRequestHandler.getClass().getName());

    // 添加到列表中
    this.serverRequestHandlers.add(serverRequestHandler);
}

这里并没有什么高深的操作,仅仅将处理器放入列表中。

连接到服务端

在父类RpcClient类中,connectToServer是个模板方法,子类GrpcClient实现了这个方法。

v2.1.0
java
common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java
@Override
public Connection connectToServer(ServerInfo serverInfo) {
    try {
        if (grpcExecutor == null) {
            // 创建线程池
            this.grpcExecutor = createGrpcExecutor(serverInfo.getServerIp());
        }
        // 这里是在nacos的8848端口上加了一个1000的偏移,所以grpc通信端口是9848
        int port = serverInfo.getServerPort() + rpcPortOffset();
        // 创建到服务端的连接
        RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(), port);
        if (newChannelStubTemp != null) {

            // 检查一下服务端是否正常
            Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
            if (response == null || !(response instanceof ServerCheckResponse)) { // 如果服务端有问题
                // 关闭通道
                shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel());
                return null;
            }

            // 创建双向流stub对象
            BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc
                    // 传入表示连接的通道对象
                    .newStub(newChannelStubTemp.getChannel());
            // 创建连接对象
            GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
            // 从检查请求响应中获取连接id并设置到连接对象中
            grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());
            
            //create stream request and bind connection event to this connection.
            // 发起双向流调用
            StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);
            
            // stream observer to send response to server
            grpcConn.setPayloadStreamObserver(payloadStreamObserver);
            grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
            // 设置通道
            grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());
            //send a  setup request.
            // 创建连接设置请求
            ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
            // 设置客户端的版本
            conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
            // 设置标签
            conSetupRequest.setLabels(super.getLabels());
            // 设置客户端的能力信息
            conSetupRequest.setAbilities(super.clientAbilities);
            // 设置租户
            conSetupRequest.setTenant(super.getTenant());

            // 发起连接请求,注册连接
            grpcConn.sendRequest(conSetupRequest);
            //wait to register connection setup
            // 等待注册连接设置运行完成
            Thread.sleep(100L);
            return grpcConn;
        }
        return null;
    } catch (Exception e) {
        LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e);
    }
    return null;
}

该方法逻辑如下:

  • 创建线程池;
  • 将目标gRPC服务端口设置为9848;
  • 发起检查请求,服务端会返回客户端id;
  • 创建连接对象并设置一些属性;
  • 建立双向流通信;
  • 发起连接设置请求;

发起检查请求

v2.1.0
java
common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java
private Response serverCheck(String ip, int port, RequestGrpc.RequestFutureStub requestBlockingStub) {
    try {
        if (requestBlockingStub == null) {
            return null;
        }
        // 创建检查请求,服务端会返回连接id
        ServerCheckRequest serverCheckRequest = new ServerCheckRequest();
        // 对请求进行转换
        Payload grpcRequest = GrpcUtils.convert(serverCheckRequest);
        // 发起一元服务调用
        ListenableFuture<Payload> responseFuture = requestBlockingStub.request(grpcRequest);
        // 限时获取响应,最多等待3s
        Payload response = responseFuture.get(3000L, TimeUnit.MILLISECONDS);
        //receive connection unregister response here,not check response is success.
        // 解析响应
        return (Response) GrpcUtils.parse(response);
    } catch (Exception e) {
        LoggerUtils.printIfErrorEnabled(LOGGER,
                "Server check fail, please check server {} ,port {} is available , error ={}", ip, port, e);
        return null;
    }
}

这里会发起一个一元服务调用,下面简单看看服务端是怎么返回客户端id的。因为是一元请求,所以在服务端的GrpcRequestAcceptor中的request方法中会被特殊处理。但又是在哪里生成的呢?其实gRPC中也有过滤器和拦截器的逻辑,而在服务端的过滤器中,会生成连接id,然后在拦截器中会转换一下属性名称。

v2.1.0
GrpcRequestAcceptor
BaseGrpcServer
<
>
java
core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcRequestAcceptor.java
// server check.
// 特殊类型的请求,没有为该类型设置handler,所以这里特殊处理
if (ServerCheckRequest.class.getSimpleName().equals(type)) {
    // 获取连接id,并设置到响应中
    Payload serverCheckResponseP = GrpcUtils.convert(new ServerCheckResponse(CONTEXT_KEY_CONN_ID.get()));
    traceIfNecessary(serverCheckResponseP, false);
    responseObserver.onNext(serverCheckResponseP);
    responseObserver.onCompleted();
    return;
}
java
core/src/main/java/com/alibaba/nacos/core/remote/grpc/BaseGrpcServer.java
@Override
public void startServer() throws Exception {

    /*
     * 该方法使用的是建造者设计模式,建造者无非就是往这个对象里面设置一些属性,最后用调用build方法创建出来
     */

    final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
    
    // server interceptor to set connection id.
    // 创建服务端拦截器
    ServerInterceptor serverInterceptor = new ServerInterceptor() {
        // 拦截调用请求
        @Override
        public <T, S> ServerCall.Listener<T> interceptCall(
                ServerCall<T, S> call, // 代表入站的调用
                Metadata headers, // 调用相关的元数据
                ServerCallHandler<T, S> next // 处理调用的下一个handler
        ) {
            // 获取上下文
            Context ctx = Context.current() // 获取当前的上下文
                    // 设置一些属性到上下文中
                    // 设置连接id
                    .withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID))
                    // 设置远端IP
                    .withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP))
                    // 设置远端端口
                    .withValue(CONTEXT_KEY_CONN_REMOTE_PORT, call.getAttributes().get(TRANS_KEY_REMOTE_PORT))
                    // 设置本地端口
                    .withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT));
            // 判断被调用的服务是不是BiRequestStream
            if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) {
                // 获取内部的通道
                Channel internalChannel = getInternalChannel(call);
                ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel);
            }
            // 拦截调用
            return Contexts.interceptCall(ctx, call, headers, next);
        }
    };

    // 添加服务,也就是进行请求服务映射
    addServices(handlerRegistry, serverInterceptor);

    // 构建server
    server = ServerBuilder
            // 设置端口
            .forPort(getServicePort())
            // 设置线程池
            .executor(getRpcExecutor())
            // 设置最大入栈消息大小
            .maxInboundMessageSize(getInboundMessageSize())
            .fallbackHandlerRegistry(handlerRegistry)
            .compressorRegistry(CompressorRegistry.getDefaultInstance())
            .decompressorRegistry(DecompressorRegistry.getDefaultInstance())
            // 添加通信过滤器
            .addTransportFilter(new ServerTransportFilter() {
                @Override
                public Attributes transportReady(Attributes transportAttrs) {
                    // 获取请求方地址
                    InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs
                            .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
                    // 获取本地地址
                    InetSocketAddress localAddress = (InetSocketAddress) transportAttrs
                            .get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
                    // 获取请求方端口
                    int remotePort = remoteAddress.getPort();
                    // 获取本地端口
                    int localPort = localAddress.getPort();
                    // 获取请求方的IP
                    String remoteIp = remoteAddress.getAddress().getHostAddress();
                    Attributes attrWrapper = transportAttrs.toBuilder()
                            // 设置连接id,在上面的interceptor中会将TRANS_KEY_CONN_ID属性转为CONTEXT_KEY_CONN_ID
                            .set(TRANS_KEY_CONN_ID, System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort)
                            .set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort)
                            .set(TRANS_KEY_LOCAL_PORT, localPort).build();
                    // 获取连接id
                    String connectionId = attrWrapper.get(TRANS_KEY_CONN_ID);
                    Loggers.REMOTE_DIGEST.info("Connection transportReady,connectionId = {} ", connectionId);
                    return attrWrapper;
                    
                }
                
                @Override
                public void transportTerminated(Attributes transportAttrs) {
                    String connectionId = null;
                    try {
                        // 获取连接id
                        connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
                    } catch (Exception e) {
                        // Ignore
                    }
                    if (StringUtils.isNotBlank(connectionId)) {
                        Loggers.REMOTE_DIGEST
                                .info("Connection transportTerminated,connectionId = {} ", connectionId);
                        // 注销连接
                        connectionManager.unregister(connectionId);
                    }
                }
            }).build();

    // 启动server
    server.start();
}

可以看待,所谓的连接id就是时间戳、客户端IP和端口的字符串拼接。客户端在从检查请求的响应中获取到连接id后,会将其设置到双向流连接对象中,后续通过双向流发起的请求会携带上该连接id。

建立双向流通信

v2.1.0
java
common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java
private StreamObserver<Payload> bindRequestStream(final BiRequestStreamGrpc.BiRequestStreamStub streamStub,
        final GrpcConnection grpcConn) {
    
    return streamStub.requestBiStream(new StreamObserver<Payload>() {

        // 接收到服务端的消息时调用
        @Override
        public void onNext(Payload payload) {
            
            LoggerUtils.printIfDebugEnabled(LOGGER, "[{}]Stream server request receive, original info: {}",
                    grpcConn.getConnectionId(), payload.toString());
            try {
                // 解析接收到的消息
                Object parseBody = GrpcUtils.parse(payload);
                // 解析为Request对象
                final Request request = (Request) parseBody;
                if (request != null) {
                    
                    try {
                        // 处理来自服务端的请求
                        Response response = handleServerRequest(request);
                        if (response != null) {
                            // 设置请求id
                            response.setRequestId(request.getRequestId());
                            // 发送响应
                            sendResponse(response);
                        } else {
                            LOGGER.warn("[{}]Fail to process server request, ackId->{}", grpcConn.getConnectionId(),
                                    request.getRequestId());
                        }
                        
                    } catch (Exception e) {
                        LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Handle server request exception: {}",
                                grpcConn.getConnectionId(), payload.toString(), e.getMessage());
                        Response errResponse = ErrorResponse
                                .build(NacosException.CLIENT_ERROR, "Handle server request error");
                        errResponse.setRequestId(request.getRequestId());
                        sendResponse(errResponse);
                    }
                    
                }
                
            } catch (Exception e) {
                
                LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Error to process server push response: {}",
                        grpcConn.getConnectionId(), payload.getBody().getValue().toStringUtf8());
            }
        }
        
        @Override
        public void onError(Throwable throwable) {
            boolean isRunning = isRunning();
            boolean isAbandon = grpcConn.isAbandon();
            // 如果客户端正在运行且当前请求没有被抛弃
            if (isRunning && !isAbandon) {
                LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream error, switch server,error={}",
                        grpcConn.getConnectionId(), throwable);
                // 将客户端状态转为不健康
                if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                    // 添加重试事件
                    switchServerAsync();
                }
                
            } else {
                LoggerUtils.printIfWarnEnabled(LOGGER, "[{}]Ignore error event,isRunning:{},isAbandon={}",
                        grpcConn.getConnectionId(), isRunning, isAbandon);
            }
            
        }

        // 双向流关闭的时候调用
        @Override
        public void onCompleted() {
            boolean isRunning = isRunning();
            boolean isAbandon = grpcConn.isAbandon();
            if (isRunning && !isAbandon) {
                LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream onCompleted, switch server",
                        grpcConn.getConnectionId());
                // 将客户端状态转为不健康
                if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                    // 添加重试事件
                    switchServerAsync();
                }
                
            } else {
                LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Ignore complete event,isRunning:{},isAbandon={}",
                        grpcConn.getConnectionId(), isRunning, isAbandon);
            }
            
        }
    });
}

这里定义了三个回调,分别在收到服务端的请求、发生错误、以及双向流关闭时的调用。重点关于一下正常情况下的回调函数。

v2.1.0
java
common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java
protected Response handleServerRequest(final Request request) {
    
    LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Receive server push request, request = {}, requestId = {}", name,
            request.getClass().getSimpleName(), request.getRequestId());
    lastActiveTimeStamp = System.currentTimeMillis();
    // 遍历服务端请求处理器
    for (ServerRequestHandler serverRequestHandler : serverRequestHandlers) {
        try {
            // 处理请求
            Response response = serverRequestHandler.requestReply(request);

            // 如果有处理器的返回值不是null,就直接返回
            if (response != null) {
                LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Ack server push request, request = {}, requestId = {}", name,
                        request.getClass().getSimpleName(), request.getRequestId());
                // 返回响应
                return response;
            }
        } catch (Exception e) {
            LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] HandleServerRequest:{}, errorMessage = {}", name,
                    serverRequestHandler.getClass().getName(), e.getMessage());
        }
        
    }
    return null;
}

当收到服务端的请求后,会遍历服务端请求处理器,在start方法的第四阶段会添加两个,如果遇到一个处理器返回了非null值,则直接返回,不会执行后续的处理器。

发起连接设置请求

v2.1.0
sendRequest
convert
<
>
java
common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java
public void sendRequest(Request request) {
    Payload convert = GrpcUtils.convert(request);
    payloadStreamObserver.onNext(convert);
}
java
common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcUtils.java
public static Payload convert(Request request) {

    // 创建元数据对象
    Metadata newMeta = Metadata.newBuilder()
            .setType(request.getClass().getSimpleName()) // 设置请求类型
            .setClientIp(NetUtils.localIP()) // 设置客户端的IP
            .putAllHeaders(request.getHeaders()) // 设置请求头
            .build(); // 构建
    // 清除请求对象中的头部
    request.clearHeaders();
    // 将请求体转为JSON
    String jsonString = toJson(request);

    // 创建负载构建器
    Payload.Builder builder = Payload.newBuilder();

    // 构建负载
    return builder
            // 设置请求体
            .setBody(Any.newBuilder().setValue(ByteString.copyFrom(jsonString, StandardCharsets.UTF_8)))
            // 设置元数据
            .setMetadata(newMeta)
            // 构建负载
            .build();
    
}

在发送请求之前,会通过GrpcUtils这个工具类中的方法将客户端的请求类型和IP地址封装并设置到元数据对象中。

连接成功事件

v2.1.0
java
common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java
protected void notifyConnected() {
    if (connectionEventListeners.isEmpty()) {
        return;
    }
    LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify connected event to listeners.", name);
    // 遍历连接事件监听器
    for (ConnectionEventListener connectionEventListener : connectionEventListeners) {
        try {
            // 触发监听器
            connectionEventListener.onConnected();
        } catch (Throwable throwable) {
            LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify connect listener error, listener = {}", name,
                    connectionEventListener.getClass().getName());
        }
    }
}
protected void notifyDisConnected() {
    if (connectionEventListeners.isEmpty()) {
        return;
    }
    LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify disconnected event to listeners", name);
    // 遍历监听器
    for (ConnectionEventListener connectionEventListener : connectionEventListeners) {
        try {
            // 执行连接断开方法
            connectionEventListener.onDisConnect();
        } catch (Throwable throwable) {
            LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify disconnect listener error, listener = {}", name,
                    connectionEventListener.getClass().getName());
        }
    }
}

根据连接是建立还是关闭,回调各个监听器的相应方法。

连接失败事件

添加连接失败事件

v2.1.0
java
common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java
public void switchServerAsync() {
    switchServerAsync(null, false);
}
protected void switchServerAsync(final ServerInfo recommendServerInfo, boolean onRequestFail) {
    // 往队列中添加一个重连信号
    reconnectionSignal.offer(new ReconnectContext(recommendServerInfo, onRequestFail));
}

封装连接失败事件并添加到队列,上面的异步任务2会处理连接失败事件。

处理连接失败事件

重连

v2.1.0
java
common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java
protected void reconnect(final ServerInfo recommendServerInfo, boolean onRequestFail) {
    
    try {
        
        AtomicReference<ServerInfo> recommendServer = new AtomicReference<>(recommendServerInfo);
        if (onRequestFail && healthCheck()) {
            LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Server check success, currentServer is {} ", name,
                    currentConnection.serverInfo.getAddress());
            rpcClientStatus.set(RpcClientStatus.RUNNING);
            return;
        }
        
        LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to reconnect to a new server, server is {}", name,
                recommendServerInfo == null ? " not appointed, will choose a random server."
                        : (recommendServerInfo.getAddress() + ", will try it once."));
        
        // loop until start client success.
        boolean switchSuccess = false;
        
        int reConnectTimes = 0;
        int retryTurns = 0;
        Exception lastException;
        // 如果没成功,就一直循环
        while (!switchSuccess && !isShutdown()) {
            
            // 1.get a new server
            ServerInfo serverInfo = null;
            try {
                // 获取下一个server信息,如果有传递进来参数,则使用传递进来的
                serverInfo = recommendServer.get() == null ? nextRpcServer() : recommendServer.get();
                // 2.create a new channel to new server
                // 连接服务端
                Connection connectionNew = connectToServer(serverInfo);
                if (connectionNew != null) { // 连接成功
                    LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect a server [{}], connectionId = {}",
                            name, serverInfo.getAddress(), connectionNew.getConnectionId());
                    // successfully create a new connect.
                    if (currentConnection != null) {
                        LoggerUtils.printIfInfoEnabled(LOGGER,
                                "[{}] Abandon prev connection, server is {}, connectionId is {}", name,
                                currentConnection.serverInfo.getAddress(), currentConnection.getConnectionId());
                        // set current connection to enable connection event.
                        // 设置当前连接为已被遗弃
                        currentConnection.setAbandon(true);
                        // 关闭当前连接
                        closeConnection(currentConnection);
                    }
                    // 设置为当前连接
                    currentConnection = connectionNew;
                    // 设置客户端的状态为运行中
                    rpcClientStatus.set(RpcClientStatus.RUNNING);
                    switchSuccess = true;
                    // 添加连接成功事件
                    eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.CONNECTED));
                    return;
                }
                
                // close connection if client is already shutdown.
                if (isShutdown()) {
                    closeConnection(currentConnection);
                }
                
                lastException = null;
                
            } catch (Exception e) {
                lastException = e;
            } finally {
                recommendServer.set(null);
            }

            // 如果已经把服务端列表都试过一遍了
            if (reConnectTimes > 0 // 规避0对其他数字取余也为0的情况
                    && reConnectTimes % RpcClient.this.serverListFactory.getServerList().size() == 0) {
                LoggerUtils.printIfInfoEnabled(LOGGER,
                        "[{}] Fail to connect server, after trying {} times, last try server is {}, error = {}", name,
                        reConnectTimes, serverInfo, lastException == null ? "unknown" : lastException);
                // 如果重试次数达到最大整型值
                if (Integer.MAX_VALUE == retryTurns) {
                    // 调整为50
                    retryTurns = 50;
                } else {
                    // 自增重试次数
                    retryTurns++;
                }
            }

            // 自增重连计数
            reConnectTimes++;
            
            try {
                // sleep x milliseconds to switch next server.
                // 如果客户端不在运行中
                if (!isRunning()) {
                    // first round, try servers at a delay 100ms;second round, 200ms; max delays 5s. to be reconsidered.
                    // 重试次数越多,睡眠越久,但最多5秒
                    Thread.sleep(Math.min(retryTurns + 1, 50) * 100L);
                }
            } catch (InterruptedException e) {
                // Do nothing.
                // set the interrupted flag
                Thread.currentThread().interrupt();
            }
        }
        
        if (isShutdown()) {
            LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Client is shutdown, stop reconnect to server", name);
        }
        
    } catch (Exception e) {
        LoggerUtils.printIfWarnEnabled(LOGGER, "[{}] Fail to reconnect to server, error is {}", name, e);
    }
}

注意,在上面添加连接失败事情时,会将recommendServerInfo设置为null,所以重连时会通过轮询的方式尝试与服务端建立连接,直到连接成功。如果重连还是失败,则进行睡眠,失败次数越多,睡眠越久。

心跳检查

该异步任务除了重连以外,还负责心跳检查。如果没有连接失败事件而且距离上次的保活时间超过了保活时间,则会发送心跳请求。如果心跳检查失败,则会将当前连接的状态修改为不健康,并且会重新连接。

v2.1.0
java
common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java
private boolean healthCheck() {
    // 创建健康检查请求
    HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
    if (this.currentConnection == null) {
        return false;
    }
    try {
        // 发起心跳检查请求
        Response response = this.currentConnection.request(healthCheckRequest, 3000L);
        // not only check server is ok, also check connection is register.
        // 返回是否请求成功
        return response != null && response.isSuccess();
    } catch (NacosException e) {
        // ignore
    }
    return false;
}

Nacos专门定义了心跳检查请求类型,使用到的连接对象就是建立连接时返回的GrpcConnection对象。