在微服务项目中配置好Nacos配置中心的信息,在启动之后,Nacos客户端会自动从配置中心中获取配置。本文就来分析一下背后的加载过程和原理。
在Spring Cloud中,配置的加载是通过PropertySourceLocator来进行的,Nacos中定义了响应的locator。在介绍Nacos的locator实现之前,先来介绍一下Spring Cloud中是怎么调用到这些locator的。
PropertySourceBootstrapConfiguration
这个类是Spring Cloud中的提供的类,并且在spring.factories文件中被配置为了BootstrapConfiguration,所以在Spring Cloud应用启动时会被加载到。 更重要的是这个类实现了ApplicationContextInitializer接口,在Spring Boot中的SpringApplication启动阶段,会被调用initialize方法。
@Override
public void initialize(ConfigurableApplicationContext applicationContext) {
List<PropertySource<?>> composite = new ArrayList<>();
// 对locator排序
AnnotationAwareOrderComparator.sort(this.propertySourceLocators);
boolean empty = true;
ConfigurableEnvironment environment = applicationContext.getEnvironment();
// 遍历locator
for (PropertySourceLocator locator : this.propertySourceLocators) {
// 收集属性源
Collection<PropertySource<?>> source = locator.locateCollection(environment);
if (source == null || source.size() == 0) {
continue;
}
List<PropertySource<?>> sourceList = new ArrayList<>();
// 遍历属性源
for (PropertySource<?> p : source) {
if (p instanceof EnumerablePropertySource) {
EnumerablePropertySource<?> enumerable = (EnumerablePropertySource<?>) p;
sourceList.add(new BootstrapPropertySource<>(enumerable));
}
else {
sourceList.add(new SimpleBootstrapPropertySource(p));
}
}
logger.info("Located property source: " + sourceList);
composite.addAll(sourceList);
empty = false;
}
if (!empty) {
MutablePropertySources propertySources = environment.getPropertySources();
String logConfig = environment.resolvePlaceholders("${logging.config:}");
LogFile logFile = LogFile.get(environment);
for (PropertySource<?> p : environment.getPropertySources()) {
if (p.getName().startsWith(BOOTSTRAP_PROPERTY_SOURCE_NAME)) {
propertySources.remove(p.getName());
}
}
// 插入属性源到environment中
insertPropertySources(propertySources, composite);
reinitializeLoggingSystem(environment, logConfig, logFile);
setLogLevels(applicationContext, environment);
handleIncludedProfiles(environment);
}
}
在该方法中,会遍历bean工厂中所有的属性源定位器(PropertySourceLocator),并调用它的locateCollection方法来获取数据源,最后将这些数据源添加到environment对象中。
public interface PropertySourceLocator {
/**
* @param environment The current Environment.
* @return A PropertySource, or null if there is none.
* @throws IllegalStateException if there is a fail-fast condition.
*/
PropertySource<?> locate(Environment environment);
default Collection<PropertySource<?>> locateCollection(Environment environment) {
return locateCollection(this, environment);
}
static Collection<PropertySource<?>> locateCollection(PropertySourceLocator locator,
Environment environment) {
// 从子类中加载属性
PropertySource<?> propertySource = locator.locate(environment);
if (propertySource == null) {
return Collections.emptyList();
}
if (CompositePropertySource.class.isInstance(propertySource)) {
Collection<PropertySource<?>> sources = ((CompositePropertySource) propertySource)
.getPropertySources();
List<PropertySource<?>> filteredSources = new ArrayList<>();
for (PropertySource<?> p : sources) {
if (p != null) {
filteredSources.add(p);
}
}
return filteredSources;
}
else {
return Arrays.asList(propertySource);
}
}
}
Nacos的客户端正是实现了Spring Cloud中的PropertySourceLocator接口,并重写了locate方法。
NacosPropertySourceLocator
在NacosConfigBootstrapConfiguration这个启动配置类中,会注册NacosPropertySourceLocator类型的bean。
@Bean
public NacosPropertySourceLocator nacosPropertySourceLocator(
NacosConfigManager nacosConfigManager) {
// 创建属性源定位器
return new NacosPropertySourceLocator(nacosConfigManager);
}
下面来看看它的locate方法是怎么实现的。
@Override
public PropertySource<?> locate(Environment env) {
nacosConfigProperties.setEnvironment(env);
// 获取configService对象
ConfigService configService = nacosConfigManager.getConfigService();
if (null == configService) {
log.warn("no instance of config service found, can't load config from nacos");
return null;
}
// 从yaml配置文件中获取配置信息
long timeout = nacosConfigProperties.getTimeout();
// 创建nacos数据源构建器,封装了配置服务对象
nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService,
timeout);
String name = nacosConfigProperties.getName();
// 获取dataId前缀
String dataIdPrefix = nacosConfigProperties.getPrefix();
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = name;
}
// 默认是以应用名称作为前缀,如果也没设置应用名称,则是null
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = env.getProperty("spring.application.name");
}
// 创建属性源对象
CompositePropertySource composite = new CompositePropertySource(
NACOS_PROPERTY_SOURCE_NAME);
/*
* 三中配置的优先级为:自身应用配置文件 > 额外配置文件 > 共享配置文件
* 也就是后加载的配置可以覆盖先加载的
*/
// 加载共享配置
loadSharedConfiguration(composite);
// 加载扩展配置
loadExtConfiguration(composite);
// 加载应用自身配置
loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);
return composite;
}
在该方法中,会加载三种不同类型的配置:
- 共享配置;
- 扩展配置;
- 应用自身的配置;
上面三类配置会被封装在一个复合数据源中,而且后加载的优先级更高,也就是说应用自身的配置可以覆盖扩展配置和共享配置中的相同配置。
加载共享配置
private void loadSharedConfiguration(
CompositePropertySource compositePropertySource) {
// 获取共享配置元数据,元数据配置是指为获取配置设置的配置信息
List<NacosConfigProperties.Config> sharedConfigs = nacosConfigProperties
.getSharedConfigs();
if (!CollectionUtils.isEmpty(sharedConfigs)) {
// 检查一下元配置信息
checkConfiguration(sharedConfigs, "shared-configs");
// 加载配置数据
loadNacosConfiguration(compositePropertySource, sharedConfigs);
}
}
元配置信息校验,主要就是在检查dataId的有效性,不能是null或者空串。
因为需要在项目中指定配置中心的配置,如IP和地址、命名空间、组、dataId、文件扩展名等,所以文本把这个配置叫作配置元数据,相当于是配置的配置。
private void checkConfiguration(List<NacosConfigProperties.Config> configs,
String tips) {
for (int i = 0; i < configs.size(); i++) {
String dataId = configs.get(i).getDataId();
// dataId不能为null或者是空串
if (dataId == null || dataId.trim().length() == 0) {
throw new IllegalStateException(String.format(
"the [ spring.cloud.nacos.config.%s[%s] ] must give a dataId",
tips, i));
}
}
}
接下来就是加载配置了。
private void loadNacosConfiguration(final CompositePropertySource composite,
List<NacosConfigProperties.Config> configs) {
// 遍历多个配置,依次进行加载
for (NacosConfigProperties.Config config : configs) {
// 获取文件扩展
String fileExtension = config.getFileExtension();
if (StringUtils.isEmpty(fileExtension)) {
fileExtension = NacosDataParserHandler.getInstance()
.getFileExtension(config.getDataId());
}
// 加载配置数据
loadNacosDataIfPresent(composite, config.getDataId(), config.getGroup(),
fileExtension, config.isRefresh());
}
}
主要是调用了loadNacosDataIfPresent方法来加载配置数据,在下面统一来分析该方法,其他几类配置也是通过该方法来加载的。注意这里的最后一个参数是配置中设置的是否可刷新(默认值是false),而下面应用配置加载默认就是true。
加载扩展配置
private void loadExtConfiguration(CompositePropertySource compositePropertySource) {
// 获取扩展配置元数据
List<NacosConfigProperties.Config> extConfigs = nacosConfigProperties
.getExtensionConfigs();
if (!CollectionUtils.isEmpty(extConfigs)) {
// 检查配置
checkConfiguration(extConfigs, "extension-configs");
// 加载配置数据
loadNacosConfiguration(compositePropertySource, extConfigs);
}
}
这里除了检查的配置元数据不同,其他操作和共享配置的加载基本一样。
加载应用配置
private void loadApplicationConfiguration(
CompositePropertySource compositePropertySource, String dataIdPrefix,
NacosConfigProperties properties, Environment environment) {
String fileExtension = properties.getFileExtension();
String nacosGroup = properties.getGroup();
// load directly once by default
// 直接以dataId加载
loadNacosDataIfPresent(compositePropertySource, dataIdPrefix, nacosGroup,
fileExtension, true);
// load with suffix, which have a higher priority than the default
// 以dataId和文件扩展名加载
loadNacosDataIfPresent(compositePropertySource,
dataIdPrefix + DOT + fileExtension, nacosGroup, fileExtension, true);
// Loaded with profile, which have a higher priority than the suffix
// 以dataId、profile和文件扩展名加载,因为profile可能配置过个,所以这里用循环
for (String profile : environment.getActiveProfiles()) {
String dataId = dataIdPrefix + SEP1 + profile + DOT + fileExtension;
loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup,
fileExtension, true);
}
// 同样的道理,这里后加载的配置会覆盖先加载的
}
这里又会有3种加载:
- 直接以dataId来加载;
- 以dataId和扩展名来加载;
- 以激活的profile、dataId和文件扩展名来加载。
不管是哪种加载,最终都会调用loadNacosDataIfPresent方法,和上面的共享配置和扩展配置一样。
loadNacosDataIfPresent方法
private void loadNacosDataIfPresent(final CompositePropertySource composite,
final String dataId, final String group, String fileExtension,
boolean isRefreshable) {
// 校验dataId字段
if (null == dataId || dataId.trim().length() < 1) {
return;
}
// 校验group字段
if (null == group || group.trim().length() < 1) {
return;
}
// 加载配置
NacosPropertySource propertySource = this.loadNacosPropertySource(dataId, group,
fileExtension, isRefreshable);
// 将加载到的配置添加到属性源中
this.addFirstPropertySource(composite, propertySource, false);
}
private NacosPropertySource loadNacosPropertySource(final String dataId,
final String group, String fileExtension, boolean isRefreshable) {
if (NacosContextRefresher.getRefreshCount() != 0) {
// 如果是不可刷新的
if (!isRefreshable) {
// 获取nacos的属性源
return NacosPropertySourceRepository.getNacosPropertySource(dataId,
group);
}
}
// 加载数据
return nacosPropertySourceBuilder.build(dataId, group, fileExtension,
isRefreshable);
}
private final static ConcurrentHashMap<String, NacosPropertySource> NACOS_PROPERTY_SOURCE_REPOSITORY = new ConcurrentHashMap<>();
public static NacosPropertySource getNacosPropertySource(String dataId,
String group) {
return NACOS_PROPERTY_SOURCE_REPOSITORY.get(getMapKey(dataId, group));
}
如果是不可刷新的,则会直接获取本地的缓存。 主要是通过NacosPropertySourceBuilder来加载并缓存数据。
NacosPropertySource build(String dataId, String group, String fileExtension,
boolean isRefreshable) {
// 加载数据
List<PropertySource<?>> propertySources = loadNacosData(dataId, group,
fileExtension);
// 封装为Nacos数据源
NacosPropertySource nacosPropertySource = new NacosPropertySource(propertySources,
group, dataId, new Date(), isRefreshable);
// 将加载到的数据缓存起来
NacosPropertySourceRepository.collectNacosPropertySource(nacosPropertySource);
return nacosPropertySource;
}
private List<PropertySource<?>> loadNacosData(String dataId, String group,
String fileExtension) {
String data = null;
try {
// 获取配置
data = configService.getConfig(dataId, group, timeout);
if (StringUtils.isEmpty(data)) {
log.warn(
"Ignore the empty nacos configuration and get it based on dataId[{}] & group[{}]",
dataId, group);
return Collections.emptyList();
}
if (log.isDebugEnabled()) {
log.debug(String.format(
"Loading nacos data, dataId: '%s', group: '%s', data: %s", dataId,
group, data));
}
return NacosDataParserHandler.getInstance().parseNacosData(dataId, data,
fileExtension);
}
catch (NacosException e) {
log.error("get data from Nacos error,dataId:{} ", dataId, e);
}
catch (Exception e) {
log.error("parse data from Nacos error,dataId:{},data:{}", dataId, data, e);
}
return Collections.emptyList();
}
private final static ConcurrentHashMap<String, NacosPropertySource> NACOS_PROPERTY_SOURCE_REPOSITORY = new ConcurrentHashMap<>();
public static void collectNacosPropertySource(
NacosPropertySource nacosPropertySource) {
NACOS_PROPERTY_SOURCE_REPOSITORY
// 以dataId作为键
.putIfAbsent(getMapKey(nacosPropertySource.getDataId(),
nacosPropertySource.getGroup()), nacosPropertySource);
}
通过配置服务来获取配置,然后将其封装为Nacos的数据源实现,最后将数据源缓存起来。真正的加载是通过ConfigService类来实现的,注意该类是Nacos项目中的,而不是Spring Cloud Alibaba项目中的。
NacosConfigManager
在分析ConfigService之前,先来看看它是怎么被创建的。 NacosPropertySourceBuilder是在NacosPropertySourceLocator的locate方法中创建的,上面已经给出过了代码,并且通过构造方法向其传递了ConfigService对象,而该对象是通过NacosConfigManager来获取的。
在NacosConfigBootstrapConfiguration和NacosConfigAutoConfiguration两个配置类中都会注册NacosConfigManager类型的bean。
@Bean
// bean工厂中没注册过才进行注册,在NacosConfigAutoConfiguration类中也注册过该类型的bean。
@ConditionalOnMissingBean
public NacosConfigManager nacosConfigManager(
NacosConfigProperties nacosConfigProperties) {
return new NacosConfigManager(nacosConfigProperties);
}
@Bean
public NacosConfigManager nacosConfigManager(
NacosConfigProperties nacosConfigProperties) {
return new NacosConfigManager(nacosConfigProperties);
}
而在创建NacosPropertySourceLocator时,会通过构造方法注入该bean。
Nacos为什么要在这两个配置都中都注册该类型的bean呢?注意这两个配置类被加载的时机也不同,NacosConfigBootstrapConfiguration是在bootstrap父级上下文中被加载到的,而NacosConfigAutoConfiguration是在子级上下文中被加载到的,虽然上面注册了两个,但是在不同的应用上下文中。所以上面NacosPropertySourceLocator中的配置管理器是父级应用上下文中注册的bean对象。
接下来看看是怎么获取配置服务对象的。
public ConfigService getConfigService() {
if (Objects.isNull(service)) {
createConfigService(this.nacosConfigProperties);
}
return service;
}
static ConfigService createConfigService(
NacosConfigProperties nacosConfigProperties) {
// 使用DCL单例模式创建ConfigService
if (Objects.isNull(service)) {
synchronized (NacosConfigManager.class) {
try {
if (Objects.isNull(service)) {
// 利用反射创建对象并设置属性
service = NacosFactory.createConfigService(
nacosConfigProperties.assembleConfigServiceProperties());
}
}
catch (NacosException e) {
log.error(e.getMessage());
throw new NacosConnectionFailureException(
nacosConfigProperties.getServerAddr(), e.getMessage(), e);
}
}
}
return service;
}
通过NacosFactory工厂类来创建配置服务对象,这个工厂类也是Nacos项目中的类。
public static ConfigService createConfigService(Properties properties) throws NacosException {
return ConfigFactory.createConfigService(properties);
}
public static ConfigService createConfigService(Properties properties) throws NacosException {
try {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
return vendorImpl;
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
最终,在ConfigFactory中通过反射创建了NacosConfigService类型的对象。
ConfigService
创建过程
public NacosConfigService(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
// 初始化命名空间
initNamespace(properties);
this.configFilterChainManager = new ConfigFilterChainManager(properties);
ServerListManager serverListManager = new ServerListManager(properties);
serverListManager.start();
// 创建worker
this.worker = new ClientWorker(this.configFilterChainManager, serverListManager, properties);
// will be deleted in 2.0 later versions
agent = new ServerHttpAgent(serverListManager);
}
private void initNamespace(Properties properties) {
// 设置命名空间
namespace = ParamUtil.parseNamespace(properties);
properties.put(PropertyKeyConst.NAMESPACE, namespace);
}
可以看到命名空间是在创建时就通过属性对象传递过来的,后续获取配置时只需要传递group和dataId即可。
属性获取
再次回到配置获取这条主线上。
@Override
public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {
// 获取配置
return getConfigInner(namespace, dataId, group, timeoutMs);
}
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
group = blank2defaultGroup(group);
// 检查dataId和group两个参数
ParamUtils.checkKeyParam(dataId, group);
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setTenant(tenant);
cr.setGroup(group);
// use local config first
// 先从本地文件缓存中获取配置
String content = LocalConfigInfoProcessor.getFailover(worker.getAgentName(), dataId, group, tenant);
if (content != null) { // 从本地缓存中获取到配置
LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}",
worker.getAgentName(), dataId, group, tenant, ContentUtils.truncateContent(content));
cr.setContent(content);
// 加密相关
String encryptedDataKey = LocalEncryptedDataKeyProcessor
.getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
cr.setEncryptedDataKey(encryptedDataKey);
configFilterChainManager.doFilter(null, cr);
// 获取实际的配置内容
content = cr.getContent();
return content;
}
try {
// 从服务端拉取配置
ConfigResponse response = worker.getServerConfig(dataId, group, tenant, timeoutMs, false);
cr.setContent(response.getContent());
cr.setEncryptedDataKey(response.getEncryptedDataKey());
configFilterChainManager.doFilter(null, cr);
// 获取实际的配置内容
content = cr.getContent();
return content;
} catch (NacosException ioe) {
if (NacosException.NO_RIGHT == ioe.getErrCode()) {
throw ioe;
}
LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",
worker.getAgentName(), dataId, group, tenant, ioe.toString());
}
LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}",
worker.getAgentName(), dataId, group, tenant, ContentUtils.truncateContent(content));
// 获取配置内容
content = LocalConfigInfoProcessor.getSnapshot(worker.getAgentName(), dataId, group, tenant);
cr.setContent(content);
// 获取数据加密的key,从本地文件中读取
String encryptedDataKey = LocalEncryptedDataKeyProcessor
.getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
cr.setEncryptedDataKey(encryptedDataKey);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
先来关注ClientWorker是怎么从配置中心获取数据的。
public ConfigResponse getServerConfig(String dataId, String group, String tenant, long readTimeout, boolean notify)
throws NacosException {
if (StringUtils.isBlank(group)) {
group = Constants.DEFAULT_GROUP;
}
return this.agent.queryConfig(dataId, group, tenant, readTimeout, notify);
}
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager,
final Properties properties) throws NacosException {
this.configFilterChainManager = configFilterChainManager;
// 初始化
init(properties);
// 创建rpc通信客户端
agent = new ConfigRpcTransportClient(properties, serverListManager);
// 获取合适的线程数量
int count = ThreadUtils.getSuitableThreadCount(THREAD_MULTIPLE);
// 创建线程池
ScheduledExecutorService executorService = Executors
.newScheduledThreadPool(Math.max(count, MIN_THREAD_NUM), r -> {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker");
t.setDaemon(true);
return t;
});
agent.setExecutor(executorService);
agent.start();
}
这里的agent是在构造方法中创建的,是ConfigRpcTransportClient类型的。
@Override
public ConfigResponse queryConfig(String dataId, String group, String tenant, long readTimeouts, boolean notify)
throws NacosException {
// 构建请求
ConfigQueryRequest request = ConfigQueryRequest.build(dataId, group, tenant);
// 设置请求头
request.putHeader(NOTIFY_HEADER, String.valueOf(notify));
// 获取rpc客户端
RpcClient rpcClient = getOneRunningClient();
if (notify) {
CacheData cacheData = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
if (cacheData != null) {
rpcClient = ensureRpcClient(String.valueOf(cacheData.getTaskId()));
}
}
// 发起请求
ConfigQueryResponse response = (ConfigQueryResponse) requestProxy(rpcClient, request, readTimeouts);
// 下面是处理响应结果
ConfigResponse configResponse = new ConfigResponse();
if (response.isSuccess()) { // 成功获取到配置
LocalConfigInfoProcessor.saveSnapshot(this.getName(), dataId, group, tenant, response.getContent());
configResponse.setContent(response.getContent());
String configType;
if (StringUtils.isNotBlank(response.getContentType())) {
configType = response.getContentType();
} else {
configType = ConfigType.TEXT.getType();
}
configResponse.setConfigType(configType);
String encryptedDataKey = response.getEncryptedDataKey();
LocalEncryptedDataKeyProcessor
.saveEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant, encryptedDataKey);
configResponse.setEncryptedDataKey(encryptedDataKey);
return configResponse;
} else if (response.getErrorCode() == ConfigQueryResponse.CONFIG_NOT_FOUND) { // 没有找到配置
LocalConfigInfoProcessor.saveSnapshot(this.getName(), dataId, group, tenant, null);
LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant, null);
return configResponse;
} else if (response.getErrorCode() == ConfigQueryResponse.CONFIG_QUERY_CONFLICT) { // 配置查询冲突
LOGGER.error(
"[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, "
+ "tenant={}", this.getName(), dataId, group, tenant);
throw new NacosException(NacosException.CONFLICT,
"data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
} else {
LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", this.getName(), dataId,
group, tenant, response);
throw new NacosException(response.getErrorCode(),
"http error, code=" + response.getErrorCode() + ",msg=" + response.getMessage() + ",dataId=" + dataId + ",group=" + group
+ ",tenant=" + tenant);
}
}
在上面方法的开头,创建了ConfigQueryRequest类型的请求,可以利用该请求类型在服务端找到相应的handler(ConfigQueryRequestHandler)来分析服务端配置获取的流程。 在处理响应时,会把请求到的配置数据缓存在本地的文件中。
private Response requestProxy(RpcClient rpcClientInner, Request request, long timeoutMills)
throws NacosException {
try {
request.putAllHeader(super.getSecurityHeaders(resourceBuild(request)));
request.putAllHeader(super.getCommonHeader());
} catch (Exception e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
JsonObject asJsonObjectTemp = new Gson().toJsonTree(request).getAsJsonObject();
asJsonObjectTemp.remove("headers");
asJsonObjectTemp.remove("requestId");
boolean limit = Limiter.isLimit(request.getClass() + asJsonObjectTemp.toString());
if (limit) {
throw new NacosException(NacosException.CLIENT_OVER_THRESHOLD,
"More than client-side current limit threshold");
}
// 发起请求
return rpcClientInner.request(request, timeoutMills);
}
这里的rpcClientInner是什么?是在上层queryConfig方法中调用getOneRunningClient方法来获取的。
RPC客户端的创建
RpcClient getOneRunningClient() throws NacosException {
return ensureRpcClient("0");
}
private RpcClient ensureRpcClient(String taskId) throws NacosException {
synchronized (ClientWorker.this) {
Map<String, String> labels = getLabels();
Map<String, String> newLabels = new HashMap<>(labels);
newLabels.put("taskId", taskId);
// 创建RPC客户端
RpcClient rpcClient = RpcClientFactory
.createClient(uuid + "_config-" + taskId, getConnectionType(), newLabels);
if (rpcClient.isWaitInitiated()) {
// 初始化客户端处理器
initRpcClientHandler(rpcClient);
rpcClient.setTenant(getTenant());
rpcClient.clientAbilities(initAbilities());
rpcClient.start();
}
return rpcClient;
}
}
public static RpcClient createClient(String clientName, ConnectionType connectionType, Map<String, String> labels) {
return createClient(clientName, connectionType, null, null, labels);
}
public static RpcClient createClient(String clientName, ConnectionType connectionType, Integer threadPoolCoreSize,
Integer threadPoolMaxSize, Map<String, String> labels) {
// 如果连接类型不是GRPC
if (!ConnectionType.GRPC.equals(connectionType)) {
// 则抛出异常
throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
}
// 如果客户端还不存在,则创建新对象并保存在map中
return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> {
LOGGER.info("[RpcClientFactory] create a new rpc client of " + clientName);
try {
// 创建GrpcClient对象
GrpcClient client = new GrpcSdkClient(clientNameInner);
// 设置线程核心数和最大数
client.setThreadPoolCoreSize(threadPoolCoreSize);
client.setThreadPoolMaxSize(threadPoolMaxSize);
client.labels(labels);
// 返回client对象,会被放入CLIENT_MAP中
return client;
} catch (Throwable throwable) {
LOGGER.error("Error to init GrpcSdkClient for client name :" + clientName, throwable);
throw throwable;
}
});
}
最终在RpcClientFactory的createClient方法中,创建了RPC客户端对象,是GrpcSdkClient类型的。