XxlJob中调度中心如何管理执行器
在执行器应用启动的时候会创建一个周期性向执行器发起注册的线程,调度中心也会周期性扫描长时间没有收到心跳请求的执行器并删除它。本文就来分析一下调度中心作为服务端是怎么管理执行器的。
在XxlJobScheduler的init方法中,会启动服务注册线程。
JobRegistryHelper
java
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java
public void start(){
// for registry or remove
// 创建用于处理执行器注册或下线的线程池
registryOrRemoveThreadPool = new ThreadPoolExecutor(
2,
10,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
r.run();
logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
}
});
// for monitor
// 创建执行器注册监视线程
registryMonitorThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
// auto registry group
// 从xxl_job_group表中获取自动注册(addressType = 0)的所有执行器组数据
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {
// remove dead address (admin/executor)
/*
* 清理xxl_job_registry表中距离上次心跳时间超过最大时间3倍的执行器
* RegistryConfig.DEAD_TIMEOUT的值为90,也就是说明超过90秒没收到执行器的心跳,则认为该执行器是掉线了,就移除它。
*/
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
// 从注册表中移除无效的执行器
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
// fresh online address (admin/executor)
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
// 从xxl_job_registry表中取出活跃执行器(更新时间位于当前时间减去90秒范围内的执行器)
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
// 遍历执行器
for (XxlJobRegistry item: list) {
// 客户端注册的时候,默认就是EXECUTOR
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
// 获取registry_key字段,就是客户端配置的appname
String appname = item.getRegistryKey();
List<String> registryList = appAddressMap.get(appname);
if (registryList == null) {
registryList = new ArrayList<String>();
}
// 将registry_value字段(执行器的地址)放入列表中
if (!registryList.contains(item.getRegistryValue())) {
registryList.add(item.getRegistryValue());
}
// 将地址列表放入map中
appAddressMap.put(appname, registryList);
}
}
}
// fresh group address
// 遍历执行器组列表
for (XxlJobGroup group: groupList) {
// 获取对应应用的执行器列表,用group的appname去匹配xxl_job_registry中的registry_key字段(也就是客户端配置的appname)
List<String> registryList = appAddressMap.get(group.getAppname());
String addressListStr = null;
if (registryList!=null && !registryList.isEmpty()) {
Collections.sort(registryList);
// 对多个执行器地址进行字符串拼接
StringBuilder addressListSB = new StringBuilder();
// 将多个地址用逗号分割
for (String item:registryList) {
addressListSB.append(item).append(",");
}
addressListStr = addressListSB.toString();
addressListStr = addressListStr.substring(0, addressListStr.length()-1);
}
// 设置最新的活跃执行器地址
group.setAddressList(addressListStr);
group.setUpdateTime(new Date());
// 更新xxl_job_group表
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
try {
// 睡眠一个心跳周期时间
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
}
});
// 设置为守护线程
registryMonitorThread.setDaemon(true);
// 设置线程名称
registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
// 启动线程
registryMonitorThread.start();
}
重点关注这个monitor线程,会从xxl_job_registry表中查询出超过3倍心跳周期的没有收到心跳请求的执行器,并从表中删除它。对于正常的执行器,会把它的地址拼接在一起,然后设置到xxl_job_group表中的相应记录上。
执行器注册
java
xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java
@Controller
@RequestMapping("/api")
public class JobApiController {
@Resource
private AdminBiz adminBiz;
/**
* api
*
* @param uri
* @param data
* @return
*/
@RequestMapping("/{uri}")
@ResponseBody
@PermissionLimit(limit=false)
public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {
// valid
// 如果请求方法不是POST,直接返回错误
if (!"POST".equalsIgnoreCase(request.getMethod())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
// 如果请求的URL为空,也返回错误
if (uri==null || uri.trim().length()==0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
// 如果没有设置accessToken,也返回错误
if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null
&& XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0
&& !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
// services mapping
if ("callback".equals(uri)) {
List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
return adminBiz.callback(callbackParamList);
} else if ("registry".equals(uri)) { // 服务注册
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registry(registryParam);
} else if ("registryRemove".equals(uri)) { // 服务下线
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registryRemove(registryParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}
}
}
这是一个复合的控制器,对于执行器注册来说URI是“registry”,所以会调用registry方法。
java
xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
return JobRegistryHelper.getInstance().registry(registryParam);
}
实际的注册操作还是通过JobRegistryHelper来实现的。
java
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java
public ReturnT<String> registry(RegistryParam registryParam) {
// valid
if (!StringUtils.hasText(registryParam.getRegistryGroup())
|| !StringUtils.hasText(registryParam.getRegistryKey())
|| !StringUtils.hasText(registryParam.getRegistryValue())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
}
// async execute
// 注册注册任务
registryOrRemoveThreadPool.execute(new Runnable() {
@Override
public void run() {
// 更新xxl_job_registry表
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
if (ret < 1) {
// 如果是第一次注册则插入
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
// fresh
freshGroupRegistryInfo(registryParam);
}
}
});
return ReturnT.SUCCESS;
}
在这里会往线程池中提交任务来异步地处理注册请求,这个线程池就是在上面介绍的init方法中创建的。如果数据库中的xxl_job_registry表中没有记录,说明是首次注册,就新增记录;如果已经存在记录,则更新记录的update_time字段。
执行器应用的xxl.job.executor.appname配置对应了xxl_job_registry表中的registry_key和xxl_job_group表中的appname字段。