Nacos中客户端服务注册过程

在服务中,引入Nacos服务发现的依赖并配置好服务注册中心的信息,启动后会自动将当前服务的信息注册到Nacos服务端。本文就来分析一下在客户端启动时,是怎么注册服务的。


NacosServiceRegistryAutoConfiguration

一般分析其他组件与Spring Boot的整合都从自动配置类入手,Nacos也不例外。在该自动配置类中,会注册NacosAutoServiceRegistration,间接实现了Spring Cloud中定义的AutoServiceRegistration接口,从名称就可以看出来会自动注册服务。

java
spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/registry/NacosServiceRegistryAutoConfiguration.java
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
		// 默认值是true
		matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
		AutoServiceRegistrationAutoConfiguration.class,
		NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {

	@Bean
	public NacosServiceRegistry nacosServiceRegistry(
			NacosServiceManager nacosServiceManager,
			// 保存了一些服务端的信息
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		// 创建服务注册bean
		return new NacosServiceRegistry(nacosServiceManager, nacosDiscoveryProperties);
	}

	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosRegistration nacosRegistration(
			ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
			NacosDiscoveryProperties nacosDiscoveryProperties,
			ApplicationContext context) {
		return new NacosRegistration(registrationCustomizers.getIfAvailable(),
				nacosDiscoveryProperties, context);
	}

	// 下面这个bean的参数中正好包含了上面两个bean
	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosAutoServiceRegistration nacosAutoServiceRegistration(
			NacosServiceRegistry registry,
			AutoServiceRegistrationProperties autoServiceRegistrationProperties,
			NacosRegistration registration) {
		return new NacosAutoServiceRegistration(registry,
				autoServiceRegistrationProperties, registration);
	}

}

这个类中注册了3个bean,都是继承自Spring Cloud中的接口。

关于Registry和Registration,可以这样理解,前者是注册处,后者是注册项,注册处用来处理注册项。

NacosAutoServiceRegistration

父类AbstractAutoServiceRegistration是Spring Cloud中定义的抽象类,实现了ApplicationListener接口,会在web服务器初始化事件发布之后进行服务注册。

java
spring-cloud-commons/src/main/java/org/springframework/cloud/client/serviceregistry/AbstractAutoServiceRegistration.java
@Override
@SuppressWarnings("deprecation")
public void onApplicationEvent(WebServerInitializedEvent event) {
    bind(event);
}
@Deprecated
public void bind(WebServerInitializedEvent event) {
    ApplicationContext context = event.getApplicationContext();
    if (context instanceof ConfigurableWebServerApplicationContext) {
        if ("management".equals(((ConfigurableWebServerApplicationContext) context)
                .getServerNamespace())) {
            return;
        }
    }
    // 记录web服务器的端口
    this.port.compareAndSet(0, event.getWebServer().getPort());
    // 执行服务注册
    this.start();
}
public void start() {
    if (!isEnabled()) {
        if (logger.isDebugEnabled()) {
            logger.debug("Discovery Lifecycle disabled. Not starting");
        }
        return;
    }

    // only initialize if nonSecurePort is greater than 0 and it isn't already running
    // because of containerPortInitializer below
    // 进行服务注册,并在注册前后分别发布事件
    if (!this.running.get()) {
        this.context.publishEvent(
                new InstancePreRegisteredEvent(this, getRegistration()));
        // 执行服务注册
        register();
        if (shouldRegisterManagement()) {
            registerManagement();
        }
        // 发布服务注册事件
        this.context.publishEvent(
                new InstanceRegisteredEvent<>(this, getConfiguration()));
        this.running.compareAndSet(false, true);
    }

}
protected void register() {
    // 进行服务注册
    this.serviceRegistry.register(getRegistration());
}
protected abstract R getRegistration();

子类NacosAutoServiceRegistration中重写了getRegistration方法,该方法返回注册信息,并通过NacosServiceRegistry来的register方法来注册。

java
spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/registry/NacosAutoServiceRegistration.java
@Override
protected NacosRegistration getRegistration() {
    if (this.registration.getPort() < 0 && this.getPort().get() > 0) {
        this.registration.setPort(this.getPort().get());
    }
    Assert.isTrue(this.registration.getPort() > 0, "service.port has not been set");
    return this.registration;
}

NacosServiceRegistry

java
spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/registry/NacosServiceRegistry.java
@Override
public void register(Registration registration) {

    if (StringUtils.isEmpty(registration.getServiceId())) {
        log.warn("No service to register for nacos client...");
        return;
    }

    // 创建服务注册对象
    NamingService namingService = namingService();
    // 获取服务id和组,这个id默认就是spring.application.name
    String serviceId = registration.getServiceId();
    String group = nacosDiscoveryProperties.getGroup();

    // 创建instance对象,表示当前的服务实例
    Instance instance = getNacosInstanceFromRegistration(registration);

    try {
        // 进行服务注册,进入nacos客户端的范畴
        namingService.registerInstance(serviceId, group, instance);
        log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
                instance.getIp(), instance.getPort());
    }
    catch (Exception e) {
        if (nacosDiscoveryProperties.isFailFast()) {
            log.error("nacos registry, {} register failed...{},", serviceId,
                    registration.toString(), e);
            rethrowRuntimeException(e);
        }
        else {
            log.warn("Failfast is false. {} register failed...{},", serviceId,
                    registration.toString(), e);
        }
    }
}

该方法的逻辑如下:

  • 获取服务注册对象;
  • 获取服务id和组,这个id默认就是spring.application.name属性,组默认是”DEFAULT_GROUP“。
  • 创建Instance对象,表示服务实例;
  • 进行服务注册,参考下面的NacosNamingService小节;

获取服务注册对象

服务注册对象是通过NacosServiceManager来管理的。

java
spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/NacosServiceManager.java
	public NamingService getNamingService() {
		if (Objects.isNull(this.namingService)) {
			buildNamingService(nacosDiscoveryProperties.getNacosProperties());
		}
		return namingService;
	}
	private NamingService buildNamingService(Properties properties) {
		if (Objects.isNull(namingService)) {
			synchronized (NacosServiceManager.class) {
				if (Objects.isNull(namingService)) {
					// 设置属性,下次就不用再创建了
					namingService = createNewNamingService(properties);
				}
			}
		}
		return namingService;
	}
import static com.alibaba.nacos.api.NacosFactory.createMaintainService;
import static com.alibaba.nacos.api.NacosFactory.createNamingService;
	private NamingService createNewNamingService(Properties properties) {
		try {
			// 注意这里是上面导入的NacosFactory中的静态方法
			return createNamingService(properties);
		}
		catch (NacosException e) {
			throw new RuntimeException(e);
		}
	}

调用链进入了Nacos项目的代码,NacosFactory是Nacos项目中的类。

v2.1.0
NacosFactory
NamingFactory
<
>
java
api/src/main/java/com/alibaba/nacos/api/NacosFactory.java
public static NamingService createNamingService(Properties properties) throws NacosException {
    return NamingFactory.createNamingService(properties);
}
java
api/src/main/java/com/alibaba/nacos/api/naming/NamingFactory.java
public static NamingService createNamingService(Properties properties) throws NacosException {
    try {
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        return (NamingService) constructor.newInstance(properties);
    } catch (Throwable e) {
        throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
    }
}

NamingFactory的方法中,通过反射来创建了NacosNamingService实例,而该类则是服务注册的关键类。

封装Instance对象

java
spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/registry/NacosServiceRegistry.java
private Instance getNacosInstanceFromRegistration(Registration registration) {
    Instance instance = new Instance();
    instance.setIp(registration.getHost());
    instance.setPort(registration.getPort());
    instance.setWeight(nacosDiscoveryProperties.getWeight());
    instance.setClusterName(nacosDiscoveryProperties.getClusterName());
    instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
    instance.setMetadata(registration.getMetadata());
    // 默认值为true,表示是临时实例
    instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
    return instance;
}

将Spring Cloud中定义的服务注册信息Registration转为Nacos的Instance类型。

NacosNamingService

下面是创建操作,主要是在创建客户端,主要考虑gRPC客户端。

v2.1.0
java
client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java
public NacosNamingService(Properties properties) throws NacosException {
    // 执行初始化操作
    init(properties);
}
private void init(Properties properties) throws NacosException {
    ValidatorUtils.checkInitParam(properties);
    this.namespace = InitUtils.initNamespaceForNaming(properties);
    InitUtils.initSerialization();
    InitUtils.initWebRootContext(properties);
    initLogName(properties);
    
    this.changeNotifier = new InstancesChangeNotifier();
    // 注册发布器和订阅器
    NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
    NotifyCenter.registerSubscriber(changeNotifier);
    // 创建服务信息holder对象
    this.serviceInfoHolder = new ServiceInfoHolder(namespace, properties);
    // 创建客户端
    this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
}

init方法中的最后,创建了客户端对象,用于与服务端通信。

客户端对象的创建过程

NamingClientProxyDelegate

v2.1.0
java
client/src/main/java/com/alibaba/nacos/client/naming/remote/NamingClientProxyDelegate.java
public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties,
        InstancesChangeNotifier changeNotifier) throws NacosException {
    // 创建服务端信息更新服务
    this.serviceInfoUpdateService = new ServiceInfoUpdateService(properties, serviceInfoHolder, this,
            changeNotifier);
    // 创建服务端列表管理器
    this.serverListManager = new ServerListManager(properties, namespace);
    this.serviceInfoHolder = serviceInfoHolder;
    // 创建安全代理对象
    this.securityProxy = new SecurityProxy(this.serverListManager.getServerList(), NamingHttpClientManager.getInstance().getNacosRestTemplate());
    // 初始化安全代理
    initSecurityProxy(properties);
    // 创建HTTP客户端
    this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties,
            serviceInfoHolder);
    // 创建GRPC客户端
    this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,
            serviceInfoHolder);
}

从该类名称中就可以看到代理了两类客户端,HTTP和gRPC,本文重点分析gRPC。

NamingGrpcClientProxy

v2.1.0
java
client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java
public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
        Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
    super(securityProxy);
    this.namespaceId = namespaceId;
    // 生成gRPC客户端的名称
    this.uuid = UUID.randomUUID().toString();
    // 解析请求超时时间
    this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));
    Map<String, String> labels = new HashMap<String, String>();
    labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
    labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
    // 创建客户端对象,指定的类型为GRPC
    this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
    // 创建redo服务
    this.redoService = new NamingGrpcRedoService(this);
    // 启动客户端
    start(serverListFactory, serviceInfoHolder);
}

该方法步骤如下:

  • 生成gRPC客户端的名称,是个随机的UUID;
  • 解析客户端请求的超时时间;
  • 创建gRPC类型的客户端对象;
  • 创建redo服务;
  • 启动客户端;

关于gRPC客户端的创建和启动过程请参考Nacos中gRPC客户端的创建和启动过程这篇文章。

服务注册

v2.1.0
java
client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);
    // 注册服务
    clientProxy.registerService(serviceName, groupName, instance);
}

调用客户端代理对象来注册服务。

v2.1.0
java
client/src/main/java/com/alibaba/nacos/client/naming/remote/NamingClientProxyDelegate.java
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    // 判断实例是否是临时的来选择客户端类型是grpc还是http
    getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}
private NamingClientProxy getExecuteClientProxy(Instance instance) {
    // 如果是临时实例,则会使用GRPC客户端,否则还是使用HTTP客户端
    return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
}

先根据实例类型来选择不同的客户端类型,默认是临时实例,所以会使用gRPC客户端。

v2.1.0
java
client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingGrpcClientProxy.java
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
            instance);
    redoService.cacheInstanceForRedo(serviceName, groupName, instance);
    // 注册服务
    doRegisterService(serviceName, groupName, instance);
}
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
    // 创建请求对象
    InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
            NamingRemoteConstants.REGISTER_INSTANCE, instance);
    // 发起请求
    requestToServer(request, Response.class);
    redoService.instanceRegistered(serviceName, groupName);
}
private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass)
        throws NacosException {
    try {
        request.putAllHeader(
                getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
        // 发起请求
        Response response =
                requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
        if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
            throw new NacosException(response.getErrorCode(), response.getMessage());
        }
        if (responseClass.isAssignableFrom(response.getClass())) {
            return (T) response;
        }
        NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'",
                response.getClass().getName(), responseClass.getName());
    } catch (Exception e) {
        throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
    }
    throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
}

接下来调用目标gRPC客户端对象来发送请求。注意这里创建的请求类型是InstanceRequest,在分析服务端的服务注册处理流程时,以该请求类型对应的handler作为入口点来分析。

v2.1.0
java
common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java
public Response request(Request request) throws NacosException {
    return request(request, DEFAULT_TIMEOUT_MILLS);
}
public Response request(Request request, long timeoutMills) throws NacosException {
    int retryTimes = 0;
    Response response;
    Exception exceptionThrow = null;
    long start = System.currentTimeMillis();
    // 尝试发送请求,直到达到重试次数或超时
    while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {
        boolean waitReconnect = false;
        try {
            // 如果当前连接是null,或者当前客户端没有在运行中
            if (this.currentConnection == null || !isRunning()) {
                waitReconnect = true;
                // 则抛出异常
                throw new NacosException(NacosException.CLIENT_DISCONNECT,
                        "Client not connected, current status:" + rpcClientStatus.get());
            }
            // 发起请求
            response = this.currentConnection.request(request, timeoutMills);
            if (response == null) {
                throw new NacosException(SERVER_ERROR, "Unknown Exception.");
            }
            // 处理错误响应
            if (response instanceof ErrorResponse) {
                // 如果是未注册错误
                if (response.getErrorCode() == NacosException.UN_REGISTER) {
                    synchronized (this) {
                        waitReconnect = true;
                        // 设置客户端状态为不健康
                        if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                            LoggerUtils.printIfErrorEnabled(LOGGER,
                                    "Connection is unregistered, switch server, connectionId = {}, request = {}",
                                    currentConnection.getConnectionId(), request.getClass().getSimpleName());
                            // 添加一个重连事件
                            switchServerAsync();
                        }
                    }
                    
                }
                throw new NacosException(response.getErrorCode(), response.getMessage());
            }
            // return response.
            lastActiveTimeStamp = System.currentTimeMillis();
            return response;
            
        } catch (Exception e) {
            if (waitReconnect) {
                try {
                    // wait client to reconnect.
                    // 等待重连
                    Thread.sleep(Math.min(100, timeoutMills / 3));
                } catch (Exception exception) {
                    // Do nothing.
                }
            }
            
            LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request = {}, retryTimes = {}, errorMessage = {}",
                    request, retryTimes, e.getMessage());
            
            exceptionThrow = e;
            
        }
        retryTimes++;
        
    }

    // 如果请求超时或者超过重试次数,则将客户端状态修改为不健康
    if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
        // 添加一个重连事件
        switchServerAsyncOnRequestFail();
    }

    // 处理异常
    if (exceptionThrow != null) {
        throw (exceptionThrow instanceof NacosException) ? (NacosException) exceptionThrow
                : new NacosException(SERVER_ERROR, exceptionThrow);
    } else {
        throw new NacosException(SERVER_ERROR, "Request fail, unknown Error");
    }
}

默认情况下不会进行重试,引入传入的参数timeoutMills默认是-1,参考上面给出的NamingGrpcClientProxy构造方法,里面会初始化请求超时时间,默认值是-1。 如果注册请求失败,则会将实例状态改为不健康,并添加一个重连事件,并抛出异常。关于重连事件的工作机制,请参考Nacos中gRPC客户端的创建和启动过程这篇文章。

至此,Nacos服务注册的客户端工作流程已分析完毕。