在服务中,引入Nacos服务发现的依赖并配置好服务注册中心的信息,启动后会自动将当前服务的信息注册到Nacos服务端。本文就来分析一下在客户端启动时,是怎么注册服务的。
NacosServiceRegistryAutoConfiguration
一般分析其他组件与Spring Boot的整合都从自动配置类入手,Nacos也不例外。在该自动配置类中,会注册NacosAutoServiceRegistration,间接实现了Spring Cloud中定义的AutoServiceRegistration接口,从名称就可以看出来会自动注册服务。
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
// 默认值是true
matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
AutoServiceRegistrationAutoConfiguration.class,
NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {
@Bean
public NacosServiceRegistry nacosServiceRegistry(
NacosServiceManager nacosServiceManager,
// 保存了一些服务端的信息
NacosDiscoveryProperties nacosDiscoveryProperties) {
// 创建服务注册bean
return new NacosServiceRegistry(nacosServiceManager, nacosDiscoveryProperties);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(
ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
NacosDiscoveryProperties nacosDiscoveryProperties,
ApplicationContext context) {
return new NacosRegistration(registrationCustomizers.getIfAvailable(),
nacosDiscoveryProperties, context);
}
// 下面这个bean的参数中正好包含了上面两个bean
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
}
这个类中注册了3个bean,都是继承自Spring Cloud中的接口。
关于Registry和Registration,可以这样理解,前者是注册处,后者是注册项,注册处用来处理注册项。
NacosAutoServiceRegistration
父类AbstractAutoServiceRegistration是Spring Cloud中定义的抽象类,实现了ApplicationListener接口,会在web服务器初始化事件发布之后进行服务注册。
@Override
@SuppressWarnings("deprecation")
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
@Deprecated
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context)
.getServerNamespace())) {
return;
}
}
// 记录web服务器的端口
this.port.compareAndSet(0, event.getWebServer().getPort());
// 执行服务注册
this.start();
}
public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
// 进行服务注册,并在注册前后分别发布事件
if (!this.running.get()) {
this.context.publishEvent(
new InstancePreRegisteredEvent(this, getRegistration()));
// 执行服务注册
register();
if (shouldRegisterManagement()) {
registerManagement();
}
// 发布服务注册事件
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
}
protected void register() {
// 进行服务注册
this.serviceRegistry.register(getRegistration());
}
protected abstract R getRegistration();
子类NacosAutoServiceRegistration中重写了getRegistration方法,该方法返回注册信息,并通过NacosServiceRegistry来的register方法来注册。
@Override
protected NacosRegistration getRegistration() {
if (this.registration.getPort() < 0 && this.getPort().get() > 0) {
this.registration.setPort(this.getPort().get());
}
Assert.isTrue(this.registration.getPort() > 0, "service.port has not been set");
return this.registration;
}
NacosServiceRegistry
@Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
// 创建服务注册对象
NamingService namingService = namingService();
// 获取服务id和组,这个id默认就是spring.application.name
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
// 创建instance对象,表示当前的服务实例
Instance instance = getNacosInstanceFromRegistration(registration);
try {
// 进行服务注册,进入nacos客户端的范畴
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
if (nacosDiscoveryProperties.isFailFast()) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
rethrowRuntimeException(e);
}
else {
log.warn("Failfast is false. {} register failed...{},", serviceId,
registration.toString(), e);
}
}
}
该方法的逻辑如下:
- 获取服务注册对象;
- 获取服务id和组,这个id默认就是spring.application.name属性,组默认是”DEFAULT_GROUP“。
- 创建Instance对象,表示服务实例;
- 进行服务注册,参考下面的NacosNamingService小节;
获取服务注册对象
服务注册对象是通过NacosServiceManager来管理的。
public NamingService getNamingService() {
if (Objects.isNull(this.namingService)) {
buildNamingService(nacosDiscoveryProperties.getNacosProperties());
}
return namingService;
}
private NamingService buildNamingService(Properties properties) {
if (Objects.isNull(namingService)) {
synchronized (NacosServiceManager.class) {
if (Objects.isNull(namingService)) {
// 设置属性,下次就不用再创建了
namingService = createNewNamingService(properties);
}
}
}
return namingService;
}
import static com.alibaba.nacos.api.NacosFactory.createMaintainService;
import static com.alibaba.nacos.api.NacosFactory.createNamingService;
private NamingService createNewNamingService(Properties properties) {
try {
// 注意这里是上面导入的NacosFactory中的静态方法
return createNamingService(properties);
}
catch (NacosException e) {
throw new RuntimeException(e);
}
}
调用链进入了Nacos项目的代码,NacosFactory是Nacos项目中的类。
public static NamingService createNamingService(Properties properties) throws NacosException {
return NamingFactory.createNamingService(properties);
}
public static NamingService createNamingService(Properties properties) throws NacosException {
try {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
return (NamingService) constructor.newInstance(properties);
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
在NamingFactory的方法中,通过反射来创建了NacosNamingService实例,而该类则是服务注册的关键类。
封装Instance对象
private Instance getNacosInstanceFromRegistration(Registration registration) {
Instance instance = new Instance();
instance.setIp(registration.getHost());
instance.setPort(registration.getPort());
instance.setWeight(nacosDiscoveryProperties.getWeight());
instance.setClusterName(nacosDiscoveryProperties.getClusterName());
instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
instance.setMetadata(registration.getMetadata());
// 默认值为true,表示是临时实例
instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
return instance;
}
将Spring Cloud中定义的服务注册信息Registration转为Nacos的Instance类型。
NacosNamingService
下面是创建操作,主要是在创建客户端,主要考虑gRPC客户端。
public NacosNamingService(Properties properties) throws NacosException {
// 执行初始化操作
init(properties);
}
private void init(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
this.namespace = InitUtils.initNamespaceForNaming(properties);
InitUtils.initSerialization();
InitUtils.initWebRootContext(properties);
initLogName(properties);
this.changeNotifier = new InstancesChangeNotifier();
// 注册发布器和订阅器
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
NotifyCenter.registerSubscriber(changeNotifier);
// 创建服务信息holder对象
this.serviceInfoHolder = new ServiceInfoHolder(namespace, properties);
// 创建客户端
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
}
在init方法中的最后,创建了客户端对象,用于与服务端通信。
客户端对象的创建过程
NamingClientProxyDelegate
public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties,
InstancesChangeNotifier changeNotifier) throws NacosException {
// 创建服务端信息更新服务
this.serviceInfoUpdateService = new ServiceInfoUpdateService(properties, serviceInfoHolder, this,
changeNotifier);
// 创建服务端列表管理器
this.serverListManager = new ServerListManager(properties, namespace);
this.serviceInfoHolder = serviceInfoHolder;
// 创建安全代理对象
this.securityProxy = new SecurityProxy(this.serverListManager.getServerList(), NamingHttpClientManager.getInstance().getNacosRestTemplate());
// 初始化安全代理
initSecurityProxy(properties);
// 创建HTTP客户端
this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties,
serviceInfoHolder);
// 创建GRPC客户端
this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,
serviceInfoHolder);
}
从该类名称中就可以看到代理了两类客户端,HTTP和gRPC,本文重点分析gRPC。
NamingGrpcClientProxy
public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
super(securityProxy);
this.namespaceId = namespaceId;
// 生成gRPC客户端的名称
this.uuid = UUID.randomUUID().toString();
// 解析请求超时时间
this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));
Map<String, String> labels = new HashMap<String, String>();
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
// 创建客户端对象,指定的类型为GRPC
this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
// 创建redo服务
this.redoService = new NamingGrpcRedoService(this);
// 启动客户端
start(serverListFactory, serviceInfoHolder);
}
该方法步骤如下:
- 生成gRPC客户端的名称,是个随机的UUID;
- 解析客户端请求的超时时间;
- 创建gRPC类型的客户端对象;
- 创建redo服务;
- 启动客户端;
关于gRPC客户端的创建和启动过程请参考Nacos中gRPC客户端的创建和启动过程这篇文章。
服务注册
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
// 注册服务
clientProxy.registerService(serviceName, groupName, instance);
}
调用客户端代理对象来注册服务。
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
// 判断实例是否是临时的来选择客户端类型是grpc还是http
getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}
private NamingClientProxy getExecuteClientProxy(Instance instance) {
// 如果是临时实例,则会使用GRPC客户端,否则还是使用HTTP客户端
return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
}
先根据实例类型来选择不同的客户端类型,默认是临时实例,所以会使用gRPC客户端。
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
instance);
redoService.cacheInstanceForRedo(serviceName, groupName, instance);
// 注册服务
doRegisterService(serviceName, groupName, instance);
}
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
// 创建请求对象
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.REGISTER_INSTANCE, instance);
// 发起请求
requestToServer(request, Response.class);
redoService.instanceRegistered(serviceName, groupName);
}
private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass)
throws NacosException {
try {
request.putAllHeader(
getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
// 发起请求
Response response =
requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
throw new NacosException(response.getErrorCode(), response.getMessage());
}
if (responseClass.isAssignableFrom(response.getClass())) {
return (T) response;
}
NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'",
response.getClass().getName(), responseClass.getName());
} catch (Exception e) {
throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
}
throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
}
接下来调用目标gRPC客户端对象来发送请求。注意这里创建的请求类型是InstanceRequest,在分析服务端的服务注册处理流程时,以该请求类型对应的handler作为入口点来分析。
public Response request(Request request) throws NacosException {
return request(request, DEFAULT_TIMEOUT_MILLS);
}
public Response request(Request request, long timeoutMills) throws NacosException {
int retryTimes = 0;
Response response;
Exception exceptionThrow = null;
long start = System.currentTimeMillis();
// 尝试发送请求,直到达到重试次数或超时
while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {
boolean waitReconnect = false;
try {
// 如果当前连接是null,或者当前客户端没有在运行中
if (this.currentConnection == null || !isRunning()) {
waitReconnect = true;
// 则抛出异常
throw new NacosException(NacosException.CLIENT_DISCONNECT,
"Client not connected, current status:" + rpcClientStatus.get());
}
// 发起请求
response = this.currentConnection.request(request, timeoutMills);
if (response == null) {
throw new NacosException(SERVER_ERROR, "Unknown Exception.");
}
// 处理错误响应
if (response instanceof ErrorResponse) {
// 如果是未注册错误
if (response.getErrorCode() == NacosException.UN_REGISTER) {
synchronized (this) {
waitReconnect = true;
// 设置客户端状态为不健康
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
LoggerUtils.printIfErrorEnabled(LOGGER,
"Connection is unregistered, switch server, connectionId = {}, request = {}",
currentConnection.getConnectionId(), request.getClass().getSimpleName());
// 添加一个重连事件
switchServerAsync();
}
}
}
throw new NacosException(response.getErrorCode(), response.getMessage());
}
// return response.
lastActiveTimeStamp = System.currentTimeMillis();
return response;
} catch (Exception e) {
if (waitReconnect) {
try {
// wait client to reconnect.
// 等待重连
Thread.sleep(Math.min(100, timeoutMills / 3));
} catch (Exception exception) {
// Do nothing.
}
}
LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request = {}, retryTimes = {}, errorMessage = {}",
request, retryTimes, e.getMessage());
exceptionThrow = e;
}
retryTimes++;
}
// 如果请求超时或者超过重试次数,则将客户端状态修改为不健康
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
// 添加一个重连事件
switchServerAsyncOnRequestFail();
}
// 处理异常
if (exceptionThrow != null) {
throw (exceptionThrow instanceof NacosException) ? (NacosException) exceptionThrow
: new NacosException(SERVER_ERROR, exceptionThrow);
} else {
throw new NacosException(SERVER_ERROR, "Request fail, unknown Error");
}
}
默认情况下不会进行重试,引入传入的参数timeoutMills默认是-1,参考上面给出的NamingGrpcClientProxy构造方法,里面会初始化请求超时时间,默认值是-1。 如果注册请求失败,则会将实例状态改为不健康,并添加一个重连事件,并抛出异常。关于重连事件的工作机制,请参考Nacos中gRPC客户端的创建和启动过程这篇文章。
至此,Nacos服务注册的客户端工作流程已分析完毕。