在Nacos升级到2.x之后,用gRPC替代了之前版本中的gRPC,服务注册和配置获取都会基于该客户端,所以本文就来梳理一下Nacos中的gRPC客户端的创建和启动过程。
创建过程
public static RpcClient createClient(String clientName, ConnectionType connectionType, Map<String, String> labels) {
return createClient(clientName, connectionType, null, null, labels);
}
public static RpcClient createClient(String clientName, ConnectionType connectionType, Integer threadPoolCoreSize,
Integer threadPoolMaxSize, Map<String, String> labels) {
// 如果连接类型不是GRPC
if (!ConnectionType.GRPC.equals(connectionType)) {
// 则抛出异常
throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
}
// 如果客户端还不存在,则创建新对象并保存在map中
return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> {
LOGGER.info("[RpcClientFactory] create a new rpc client of " + clientName);
try {
// 创建GrpcClient对象
GrpcClient client = new GrpcSdkClient(clientNameInner);
// 设置线程核心数和最大数
client.setThreadPoolCoreSize(threadPoolCoreSize);
client.setThreadPoolMaxSize(threadPoolMaxSize);
client.labels(labels);
// 返回client对象,会被放入CLIENT_MAP中
return client;
} catch (Throwable throwable) {
LOGGER.error("Error to init GrpcSdkClient for client name :" + clientName, throwable);
throw throwable;
}
});
}
在上面的方法中,创建了GrpcSdkClient对象,并设置核心和最大线程池数量,最后将其加入缓存中。
启动过程
public final void start() throws NacosException {
// 第一部分:修改客户端状态
// 利用CAS来确保客户端只启动一次
boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
if (!success) {
return;
}
// 创建线程池
clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.remote.worker");
t.setDaemon(true);
return t;
});
// 第二部分:提交两个异步任务
// connection event consumer.
// 提交任务1,处理连接成功的情况
clientEventExecutor.submit(() -> {
while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
ConnectionEvent take;
try {
// 连接创建成功会被往该队列中加入连接事件,所以这里取出事件来处理
take = eventLinkedBlockingQueue.take();
if (take.isConnected()) { // 如果是连接建立的这种情况
// 触发监听器执行逻辑
notifyConnected();
} else if (take.isDisConnected()) { // 如果是关闭连接的这种情况
// 触发监听器执行逻辑
notifyDisConnected();
}
} catch (Throwable e) {
// Do nothing
}
}
});
// 提交任务2,处理连接失败的情况
clientEventExecutor.submit(() -> {
while (true) {
try {
if (isShutdown()) {
break;
}
// 获取重连事件,这里是限时获取,而不会一直阻塞,默认最多等5秒
ReconnectContext reconnectContext = reconnectionSignal
.poll(keepAliveTime, TimeUnit.MILLISECONDS);
// 如果没有重连事件
if (reconnectContext == null) {
// check alive time.
// 距离上次心跳检查超过了保活时间
if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
// 向服务端发起心跳检查,主要检查网络是否通畅
boolean isHealthy = healthCheck();
// 如果向服务端发起心跳失败,需要尝试重新连接
if (!isHealthy) { // 心跳检查失败
// 如果当前连接是null,则跳过
if (currentConnection == null) {
continue;
}
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Server healthy check fail, currentConnection = {}", name,
currentConnection.getConnectionId());
// 获取连接状态
RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
// 判断连接状态是否为已关闭,如果是则结束异步任务
if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
break;
}
// 修改连接状态为不健康
boolean statusFLowSuccess = RpcClient.this.rpcClientStatus
.compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
if (statusFLowSuccess) { // CAS成功
// 重新赋值,这里没有continue,会继续执行下面的逻辑
reconnectContext = new ReconnectContext(null, false);
} else { // CAS失败,则下轮再来执行
continue;
}
} else {
// 心跳检查成功,刷新时间
lastActiveTimeStamp = System.currentTimeMillis();
continue;
}
} else { // 上次心跳保活还在有效期中
continue;
}
}
// 执行到这里说明reconnectContext不是null,处理重连事件,在下面调用的switchServerAsync方法中会设置为null
if (reconnectContext.serverInfo != null) { // 不是心跳检查失败的情况
// clear recommend server if server is not in server list.
boolean serverExist = false;
// 判断目标服务器是否在服务器列表中
// 遍历服务端的列表
for (String server : getServerListFactory().getServerList()) {
// 解析服务端的信息
ServerInfo serverInfo = resolveServerInfo(server);
// 如果是目标服务端
if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
serverExist = true;
// 设置端口
reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
break;
}
}
// 如果目标服务器不在服务器列表中,则清除相应信息
if (!serverExist) {
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Recommend server is not in server list, ignore recommend server {}", name,
reconnectContext.serverInfo.getAddress());
reconnectContext.serverInfo = null;
}
}
// (重点)重新连接
reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
} catch (Throwable throwable) {
// Do nothing
}
}
});
// 第三部分:比较关键
// connect to server, try to connect to server sync RETRY_TIMES times, async starting if failed.
// 创建的连接对象
Connection connectToServer = null;
// 修改客户端状态为启动中
rpcClientStatus.set(RpcClientStatus.STARTING);
// 重试次数,默认是3次
int startUpRetryTimes = RETRY_TIMES;
// 循环尝试建立连接,直到用尽重试次数或者连接成功
while (startUpRetryTimes > 0 && connectToServer == null) {
try {
startUpRetryTimes--;
// 获取下一个服务端信息,规避连不上的服务端
ServerInfo serverInfo = nextRpcServer();
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}", name,
serverInfo);
// 与服务端建立连接,建立成功的话connectToServer是会被成功赋值的,从而结束循环
connectToServer = connectToServer(serverInfo);
} catch (Throwable e) {
LoggerUtils.printIfWarnEnabled(LOGGER,
"[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}",
name, e.getMessage(), startUpRetryTimes);
}
}
// 第四部分
if (connectToServer != null) { // 成功连接
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}",
name, connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId());
// 设置为当前连接
this.currentConnection = connectToServer;
// 修改客户端状态为运行中
rpcClientStatus.set(RpcClientStatus.RUNNING);
// 添加连接事件
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
} else {
// 如果启动失败的话,则转为异步地尝试连接
switchServerAsync();
}
// 注册两个服务端请求的处理器
registerServerRequestHandler(new ConnectResetRequestHandler());
// register client detection request.
registerServerRequestHandler(request -> {
if (request instanceof ClientDetectionRequest) {
return new ClientDetectionResponse();
}
return null;
});
}
该方法比较长,可以分为四部分:
- 修改客户端状态,确保只能启动一次;
- 提交两个异步任务:
- 任务1:处理连接成功的情况;
- 任务2:处理连接失败的情况;
- 建立与服务端的连接;
- 根据连接建立成功与否,会发布不同的事件,由第二步中的两个任务来处理,另外还会注册两个处理来自服务端请求的处理器;
前面3个部分比较复杂,所以下面单独介绍,第四部分比较简单,先来看看请求处理器的注册逻辑。
设置请求处理器
客户端是会收到服务端主动发送来的请求的,比如服务端一些信息发生了更改需要同步到各个客户端,所以需要设置请求处理器。
protected List<ServerRequestHandler> serverRequestHandlers = new ArrayList<>();
public synchronized void registerServerRequestHandler(ServerRequestHandler serverRequestHandler) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Register server push request handler:{}", name,
serverRequestHandler.getClass().getName());
// 添加到列表中
this.serverRequestHandlers.add(serverRequestHandler);
}
这里并没有什么高深的操作,仅仅将处理器放入列表中。
连接到服务端
在父类RpcClient类中,connectToServer是个模板方法,子类GrpcClient实现了这个方法。
@Override
public Connection connectToServer(ServerInfo serverInfo) {
try {
if (grpcExecutor == null) {
// 创建线程池
this.grpcExecutor = createGrpcExecutor(serverInfo.getServerIp());
}
// 这里是在nacos的8848端口上加了一个1000的偏移,所以grpc通信端口是9848
int port = serverInfo.getServerPort() + rpcPortOffset();
// 创建到服务端的连接
RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(), port);
if (newChannelStubTemp != null) {
// 检查一下服务端是否正常
Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
if (response == null || !(response instanceof ServerCheckResponse)) { // 如果服务端有问题
// 关闭通道
shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel());
return null;
}
// 创建双向流stub对象
BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc
// 传入表示连接的通道对象
.newStub(newChannelStubTemp.getChannel());
// 创建连接对象
GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
// 从检查请求响应中获取连接id并设置到连接对象中
grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());
//create stream request and bind connection event to this connection.
// 发起双向流调用
StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);
// stream observer to send response to server
grpcConn.setPayloadStreamObserver(payloadStreamObserver);
grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
// 设置通道
grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());
//send a setup request.
// 创建连接设置请求
ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
// 设置客户端的版本
conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
// 设置标签
conSetupRequest.setLabels(super.getLabels());
// 设置客户端的能力信息
conSetupRequest.setAbilities(super.clientAbilities);
// 设置租户
conSetupRequest.setTenant(super.getTenant());
// 发起连接请求,注册连接
grpcConn.sendRequest(conSetupRequest);
//wait to register connection setup
// 等待注册连接设置运行完成
Thread.sleep(100L);
return grpcConn;
}
return null;
} catch (Exception e) {
LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e);
}
return null;
}
该方法逻辑如下:
- 创建线程池;
- 将目标gRPC服务端口设置为9848;
- 发起检查请求,服务端会返回客户端id;
- 创建连接对象并设置一些属性;
- 建立双向流通信;
- 发起连接设置请求;
发起检查请求
private Response serverCheck(String ip, int port, RequestGrpc.RequestFutureStub requestBlockingStub) {
try {
if (requestBlockingStub == null) {
return null;
}
// 创建检查请求,服务端会返回连接id
ServerCheckRequest serverCheckRequest = new ServerCheckRequest();
// 对请求进行转换
Payload grpcRequest = GrpcUtils.convert(serverCheckRequest);
// 发起一元服务调用
ListenableFuture<Payload> responseFuture = requestBlockingStub.request(grpcRequest);
// 限时获取响应,最多等待3s
Payload response = responseFuture.get(3000L, TimeUnit.MILLISECONDS);
//receive connection unregister response here,not check response is success.
// 解析响应
return (Response) GrpcUtils.parse(response);
} catch (Exception e) {
LoggerUtils.printIfErrorEnabled(LOGGER,
"Server check fail, please check server {} ,port {} is available , error ={}", ip, port, e);
return null;
}
}
这里会发起一个一元服务调用,下面简单看看服务端是怎么返回客户端id的。因为是一元请求,所以在服务端的GrpcRequestAcceptor中的request方法中会被特殊处理。但又是在哪里生成的呢?其实gRPC中也有过滤器和拦截器的逻辑,而在服务端的过滤器中,会生成连接id,然后在拦截器中会转换一下属性名称。
// server check.
// 特殊类型的请求,没有为该类型设置handler,所以这里特殊处理
if (ServerCheckRequest.class.getSimpleName().equals(type)) {
// 获取连接id,并设置到响应中
Payload serverCheckResponseP = GrpcUtils.convert(new ServerCheckResponse(CONTEXT_KEY_CONN_ID.get()));
traceIfNecessary(serverCheckResponseP, false);
responseObserver.onNext(serverCheckResponseP);
responseObserver.onCompleted();
return;
}
@Override
public void startServer() throws Exception {
/*
* 该方法使用的是建造者设计模式,建造者无非就是往这个对象里面设置一些属性,最后用调用build方法创建出来
*/
final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
// server interceptor to set connection id.
// 创建服务端拦截器
ServerInterceptor serverInterceptor = new ServerInterceptor() {
// 拦截调用请求
@Override
public <T, S> ServerCall.Listener<T> interceptCall(
ServerCall<T, S> call, // 代表入站的调用
Metadata headers, // 调用相关的元数据
ServerCallHandler<T, S> next // 处理调用的下一个handler
) {
// 获取上下文
Context ctx = Context.current() // 获取当前的上下文
// 设置一些属性到上下文中
// 设置连接id
.withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID))
// 设置远端IP
.withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP))
// 设置远端端口
.withValue(CONTEXT_KEY_CONN_REMOTE_PORT, call.getAttributes().get(TRANS_KEY_REMOTE_PORT))
// 设置本地端口
.withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT));
// 判断被调用的服务是不是BiRequestStream
if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) {
// 获取内部的通道
Channel internalChannel = getInternalChannel(call);
ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel);
}
// 拦截调用
return Contexts.interceptCall(ctx, call, headers, next);
}
};
// 添加服务,也就是进行请求服务映射
addServices(handlerRegistry, serverInterceptor);
// 构建server
server = ServerBuilder
// 设置端口
.forPort(getServicePort())
// 设置线程池
.executor(getRpcExecutor())
// 设置最大入栈消息大小
.maxInboundMessageSize(getInboundMessageSize())
.fallbackHandlerRegistry(handlerRegistry)
.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
// 添加通信过滤器
.addTransportFilter(new ServerTransportFilter() {
@Override
public Attributes transportReady(Attributes transportAttrs) {
// 获取请求方地址
InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs
.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
// 获取本地地址
InetSocketAddress localAddress = (InetSocketAddress) transportAttrs
.get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
// 获取请求方端口
int remotePort = remoteAddress.getPort();
// 获取本地端口
int localPort = localAddress.getPort();
// 获取请求方的IP
String remoteIp = remoteAddress.getAddress().getHostAddress();
Attributes attrWrapper = transportAttrs.toBuilder()
// 设置连接id,在上面的interceptor中会将TRANS_KEY_CONN_ID属性转为CONTEXT_KEY_CONN_ID
.set(TRANS_KEY_CONN_ID, System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort)
.set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort)
.set(TRANS_KEY_LOCAL_PORT, localPort).build();
// 获取连接id
String connectionId = attrWrapper.get(TRANS_KEY_CONN_ID);
Loggers.REMOTE_DIGEST.info("Connection transportReady,connectionId = {} ", connectionId);
return attrWrapper;
}
@Override
public void transportTerminated(Attributes transportAttrs) {
String connectionId = null;
try {
// 获取连接id
connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
} catch (Exception e) {
// Ignore
}
if (StringUtils.isNotBlank(connectionId)) {
Loggers.REMOTE_DIGEST
.info("Connection transportTerminated,connectionId = {} ", connectionId);
// 注销连接
connectionManager.unregister(connectionId);
}
}
}).build();
// 启动server
server.start();
}
可以看待,所谓的连接id就是时间戳、客户端IP和端口的字符串拼接。客户端在从检查请求的响应中获取到连接id后,会将其设置到双向流连接对象中,后续通过双向流发起的请求会携带上该连接id。
建立双向流通信
private StreamObserver<Payload> bindRequestStream(final BiRequestStreamGrpc.BiRequestStreamStub streamStub,
final GrpcConnection grpcConn) {
return streamStub.requestBiStream(new StreamObserver<Payload>() {
// 接收到服务端的消息时调用
@Override
public void onNext(Payload payload) {
LoggerUtils.printIfDebugEnabled(LOGGER, "[{}]Stream server request receive, original info: {}",
grpcConn.getConnectionId(), payload.toString());
try {
// 解析接收到的消息
Object parseBody = GrpcUtils.parse(payload);
// 解析为Request对象
final Request request = (Request) parseBody;
if (request != null) {
try {
// 处理来自服务端的请求
Response response = handleServerRequest(request);
if (response != null) {
// 设置请求id
response.setRequestId(request.getRequestId());
// 发送响应
sendResponse(response);
} else {
LOGGER.warn("[{}]Fail to process server request, ackId->{}", grpcConn.getConnectionId(),
request.getRequestId());
}
} catch (Exception e) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Handle server request exception: {}",
grpcConn.getConnectionId(), payload.toString(), e.getMessage());
Response errResponse = ErrorResponse
.build(NacosException.CLIENT_ERROR, "Handle server request error");
errResponse.setRequestId(request.getRequestId());
sendResponse(errResponse);
}
}
} catch (Exception e) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Error to process server push response: {}",
grpcConn.getConnectionId(), payload.getBody().getValue().toStringUtf8());
}
}
@Override
public void onError(Throwable throwable) {
boolean isRunning = isRunning();
boolean isAbandon = grpcConn.isAbandon();
// 如果客户端正在运行且当前请求没有被抛弃
if (isRunning && !isAbandon) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream error, switch server,error={}",
grpcConn.getConnectionId(), throwable);
// 将客户端状态转为不健康
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
// 添加重试事件
switchServerAsync();
}
} else {
LoggerUtils.printIfWarnEnabled(LOGGER, "[{}]Ignore error event,isRunning:{},isAbandon={}",
grpcConn.getConnectionId(), isRunning, isAbandon);
}
}
// 双向流关闭的时候调用
@Override
public void onCompleted() {
boolean isRunning = isRunning();
boolean isAbandon = grpcConn.isAbandon();
if (isRunning && !isAbandon) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream onCompleted, switch server",
grpcConn.getConnectionId());
// 将客户端状态转为不健康
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
// 添加重试事件
switchServerAsync();
}
} else {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Ignore complete event,isRunning:{},isAbandon={}",
grpcConn.getConnectionId(), isRunning, isAbandon);
}
}
});
}
这里定义了三个回调,分别在收到服务端的请求、发生错误、以及双向流关闭时的调用。重点关于一下正常情况下的回调函数。
protected Response handleServerRequest(final Request request) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Receive server push request, request = {}, requestId = {}", name,
request.getClass().getSimpleName(), request.getRequestId());
lastActiveTimeStamp = System.currentTimeMillis();
// 遍历服务端请求处理器
for (ServerRequestHandler serverRequestHandler : serverRequestHandlers) {
try {
// 处理请求
Response response = serverRequestHandler.requestReply(request);
// 如果有处理器的返回值不是null,就直接返回
if (response != null) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Ack server push request, request = {}, requestId = {}", name,
request.getClass().getSimpleName(), request.getRequestId());
// 返回响应
return response;
}
} catch (Exception e) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] HandleServerRequest:{}, errorMessage = {}", name,
serverRequestHandler.getClass().getName(), e.getMessage());
}
}
return null;
}
当收到服务端的请求后,会遍历服务端请求处理器,在start方法的第四阶段会添加两个,如果遇到一个处理器返回了非null值,则直接返回,不会执行后续的处理器。
发起连接设置请求
public void sendRequest(Request request) {
Payload convert = GrpcUtils.convert(request);
payloadStreamObserver.onNext(convert);
}
public static Payload convert(Request request) {
// 创建元数据对象
Metadata newMeta = Metadata.newBuilder()
.setType(request.getClass().getSimpleName()) // 设置请求类型
.setClientIp(NetUtils.localIP()) // 设置客户端的IP
.putAllHeaders(request.getHeaders()) // 设置请求头
.build(); // 构建
// 清除请求对象中的头部
request.clearHeaders();
// 将请求体转为JSON
String jsonString = toJson(request);
// 创建负载构建器
Payload.Builder builder = Payload.newBuilder();
// 构建负载
return builder
// 设置请求体
.setBody(Any.newBuilder().setValue(ByteString.copyFrom(jsonString, StandardCharsets.UTF_8)))
// 设置元数据
.setMetadata(newMeta)
// 构建负载
.build();
}
在发送请求之前,会通过GrpcUtils这个工具类中的方法将客户端的请求类型和IP地址封装并设置到元数据对象中。
连接成功事件
protected void notifyConnected() {
if (connectionEventListeners.isEmpty()) {
return;
}
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify connected event to listeners.", name);
// 遍历连接事件监听器
for (ConnectionEventListener connectionEventListener : connectionEventListeners) {
try {
// 触发监听器
connectionEventListener.onConnected();
} catch (Throwable throwable) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify connect listener error, listener = {}", name,
connectionEventListener.getClass().getName());
}
}
}
protected void notifyDisConnected() {
if (connectionEventListeners.isEmpty()) {
return;
}
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify disconnected event to listeners", name);
// 遍历监听器
for (ConnectionEventListener connectionEventListener : connectionEventListeners) {
try {
// 执行连接断开方法
connectionEventListener.onDisConnect();
} catch (Throwable throwable) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify disconnect listener error, listener = {}", name,
connectionEventListener.getClass().getName());
}
}
}
根据连接是建立还是关闭,回调各个监听器的相应方法。
连接失败事件
添加连接失败事件
public void switchServerAsync() {
switchServerAsync(null, false);
}
protected void switchServerAsync(final ServerInfo recommendServerInfo, boolean onRequestFail) {
// 往队列中添加一个重连信号
reconnectionSignal.offer(new ReconnectContext(recommendServerInfo, onRequestFail));
}
封装连接失败事件并添加到队列,上面的异步任务2会处理连接失败事件。
处理连接失败事件
重连
protected void reconnect(final ServerInfo recommendServerInfo, boolean onRequestFail) {
try {
AtomicReference<ServerInfo> recommendServer = new AtomicReference<>(recommendServerInfo);
if (onRequestFail && healthCheck()) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Server check success, currentServer is {} ", name,
currentConnection.serverInfo.getAddress());
rpcClientStatus.set(RpcClientStatus.RUNNING);
return;
}
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to reconnect to a new server, server is {}", name,
recommendServerInfo == null ? " not appointed, will choose a random server."
: (recommendServerInfo.getAddress() + ", will try it once."));
// loop until start client success.
boolean switchSuccess = false;
int reConnectTimes = 0;
int retryTurns = 0;
Exception lastException;
// 如果没成功,就一直循环
while (!switchSuccess && !isShutdown()) {
// 1.get a new server
ServerInfo serverInfo = null;
try {
// 获取下一个server信息,如果有传递进来参数,则使用传递进来的
serverInfo = recommendServer.get() == null ? nextRpcServer() : recommendServer.get();
// 2.create a new channel to new server
// 连接服务端
Connection connectionNew = connectToServer(serverInfo);
if (connectionNew != null) { // 连接成功
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect a server [{}], connectionId = {}",
name, serverInfo.getAddress(), connectionNew.getConnectionId());
// successfully create a new connect.
if (currentConnection != null) {
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Abandon prev connection, server is {}, connectionId is {}", name,
currentConnection.serverInfo.getAddress(), currentConnection.getConnectionId());
// set current connection to enable connection event.
// 设置当前连接为已被遗弃
currentConnection.setAbandon(true);
// 关闭当前连接
closeConnection(currentConnection);
}
// 设置为当前连接
currentConnection = connectionNew;
// 设置客户端的状态为运行中
rpcClientStatus.set(RpcClientStatus.RUNNING);
switchSuccess = true;
// 添加连接成功事件
eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.CONNECTED));
return;
}
// close connection if client is already shutdown.
if (isShutdown()) {
closeConnection(currentConnection);
}
lastException = null;
} catch (Exception e) {
lastException = e;
} finally {
recommendServer.set(null);
}
// 如果已经把服务端列表都试过一遍了
if (reConnectTimes > 0 // 规避0对其他数字取余也为0的情况
&& reConnectTimes % RpcClient.this.serverListFactory.getServerList().size() == 0) {
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Fail to connect server, after trying {} times, last try server is {}, error = {}", name,
reConnectTimes, serverInfo, lastException == null ? "unknown" : lastException);
// 如果重试次数达到最大整型值
if (Integer.MAX_VALUE == retryTurns) {
// 调整为50
retryTurns = 50;
} else {
// 自增重试次数
retryTurns++;
}
}
// 自增重连计数
reConnectTimes++;
try {
// sleep x milliseconds to switch next server.
// 如果客户端不在运行中
if (!isRunning()) {
// first round, try servers at a delay 100ms;second round, 200ms; max delays 5s. to be reconsidered.
// 重试次数越多,睡眠越久,但最多5秒
Thread.sleep(Math.min(retryTurns + 1, 50) * 100L);
}
} catch (InterruptedException e) {
// Do nothing.
// set the interrupted flag
Thread.currentThread().interrupt();
}
}
if (isShutdown()) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Client is shutdown, stop reconnect to server", name);
}
} catch (Exception e) {
LoggerUtils.printIfWarnEnabled(LOGGER, "[{}] Fail to reconnect to server, error is {}", name, e);
}
}
注意,在上面添加连接失败事情时,会将recommendServerInfo设置为null,所以重连时会通过轮询的方式尝试与服务端建立连接,直到连接成功。如果重连还是失败,则进行睡眠,失败次数越多,睡眠越久。
心跳检查
该异步任务除了重连以外,还负责心跳检查。如果没有连接失败事件而且距离上次的保活时间超过了保活时间,则会发送心跳请求。如果心跳检查失败,则会将当前连接的状态修改为不健康,并且会重新连接。
private boolean healthCheck() {
// 创建健康检查请求
HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
if (this.currentConnection == null) {
return false;
}
try {
// 发起心跳检查请求
Response response = this.currentConnection.request(healthCheckRequest, 3000L);
// not only check server is ok, also check connection is register.
// 返回是否请求成功
return response != null && response.isSuccess();
} catch (NacosException e) {
// ignore
}
return false;
}
Nacos专门定义了心跳检查请求类型,使用到的连接对象就是建立连接时返回的GrpcConnection对象。