XxlJob中调度中心的任务调度原理
在调度中心启动过程中,会启动调度相关的线程,本文就来分析一下这些线程是怎么工作的,以及任务是怎么被调度到的。
JobScheduleHelper
下面的方法比较长,主要是创建了两个任务调度相关的线程,在下面会详细介绍。
start
pushTimeRing
java
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java
public void start(){
// schedule thread
// 基础调度线程,会把任务从数据库中查出来然后交给时间轮线程来调度
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
try {
/*
* System.currentTimeMillis()%1000表示当前秒内还没过去的微秒,5000减去它表示对齐时间(整秒)
* 至于为什么是5秒,是为了和PRE_READ_MS静态变量一致。
*/
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
// 计算预读的任务数量,是快慢线程池最大处理量的20倍
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while (!scheduleThreadToStop) {
// Scan Job
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
// 获取数据库连接
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
// 获取是否是自动提交事务,备份连接中的设置,下面会将其修改为false
connAutoCommit = conn.getAutoCommit();
// 关闭事务自动提交
conn.setAutoCommit(false);
// 悲观锁,因为集群中的多个调度中心会同时访问该表
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
// 锁住schedule_lock行(默认就这一行),相当于集群中多个调度中心进行同步
preparedStatement.execute();
// tx start
// 1、pre read
long nowTime = System.currentTimeMillis();
/*
* 查询任务,获取下次执行截止时间为5秒后之前的这些任务(所以也叫预读)
* 上面循环开头会睡眠5秒,也就是说这个线程每5秒钟查询一次任务
*/
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring
// 遍历待调度任务
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
// 如果当前时间超过了任务原定触发时间加上预读时间之和(说明已经过了本轮执行),则跳过
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// 1、misfire match
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// FIRE_ONCE_NOW 》 trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
}
// 2、fresh next
refreshNextValidTime(jobInfo, new Date());
}
// 如果当前时间超过了任务原定触发时间,但还没超过5秒
else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
// 1、trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
// 将任务交给时间轮调度线程
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
} else {
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
// 将任务交给时间轮调度线程,时间轮线程很快就会取得任务数据,不用担心需要等到下一轮(周期为1分钟)才会被处理
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 3、update trigger info
// 再次遍历任务
for (XxlJobInfo jobInfo: scheduleList) {
// 更新任务的本次和下次调度时间
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false;
}
// tx stop
} catch (Exception e) {
if (!scheduleThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
}
} finally {
// commit
if (conn != null) {
try {
// 提交事务
conn.commit();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
// 设置回之前的值
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
long cost = System.currentTimeMillis()-start;
// Wait seconds, align second
// 如果花费时间小于1秒
if (cost < 1000) { // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;
// 如果预读成功,则等到下一个整秒再读数据,否则等到下一个5秒
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
// ring thread
// 时间轮调度线程
ringThread = new Thread(new Runnable() {
@Override
public void run() {
while (!ringThreadToStop) {
// align second
try {
// 对齐时间到整秒,睡眠1秒
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
// second data
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
// 这里也处理了前一个刻度的,避免处理耗时太长,导致跨过刻度从而丢失任务,所以这里向前校验一个刻度;
for (int i = 0; i < 2; i++) {
// 从时间轮中获取任务列表,并移除该列表
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
// 添加到待处理列表中
ringItemData.addAll(tmpData);
}
}
// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
if (ringItemData.size() > 0) {
// do trigger
// 处理对应时刻任务列表中的任务
for (int jobId: ringItemData) {
// do trigger
// 触发任务,将任务加入快慢线程池中进行执行
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
// clear
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
});
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
}
java
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java
private void pushTimeRing(int ringSecond, int jobId){
// push async ring
// 取出对应时间轮项,ringSecond的范围是0~59
List<Integer> ringItemData = ringData.get(ringSecond);
// 将任务添加到该时间刻度上
if (ringItemData == null) {
ringItemData = new ArrayList<Integer>();
ringData.put(ringSecond, ringItemData);
}
ringItemData.add(jobId);
logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
}
上面方法中创建了两个线程:基础调度线程和时间轮调度线程。
- 基础调度线程:该线程每隔5秒会从数据库中查未来5秒需要被调度的任务,然后提交给事件轮调度线程来执行,最后任务的本次和下次调度时间。
- 时间轮调度线程:每整秒会从时间轮上取出任务列表并交给JobTriggerPoolHelper中的线程池来调度,另外该线程中除了处理当前时刻的任务列表以外,还会处理前一个时刻的任务列表,防止漏掉。
JobTriggerPoolHelper
java
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java
private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
}
public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList) {
// choose thread pool
// 默认选择快线程池
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
// 该任务是否在1分钟内超过10次被判定为慢任务?是的话则被定义为慢任务,使用慢线程池
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
// trigger
triggerPool_.execute(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
try {
// do trigger
// 调度任务
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// check timeout-count-map
long minTim_now = System.currentTimeMillis()/60000;
// 判断和上一次执行是否在同一个分钟内,如果不是则清空慢任务计数
if (minTim != minTim_now) {
minTim = minTim_now;
// 清空慢任务统计
jobTimeoutCountMap.clear();
}
// incr timeout-count-map
// 计算调度任务花费的时间
long cost = System.currentTimeMillis()-start;
// 如果任务执行时间超过500ms,则该任务被定义为慢任务
if (cost > 500) { // ob-timeout threshold 500ms
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if (timeoutCount != null) {
// 自增计数
timeoutCount.incrementAndGet();
}
}
}
}
});
}
在addTrigger方法中,会根据任务的调度耗时长短,来选择使用快还是慢线程池。这里的慢是指在1分钟内,超过10次调度耗时都超过500毫秒。真正的调度是由XxlJobTrigger来执行的。
XxlJobTrigger
java
xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java
public static void trigger(int jobId,
TriggerTypeEnum triggerType,
int failRetryCount,
// JobScheduleHelper中时间轮线程传过来的下面三个参数都是null
String executorShardingParam,
String executorParam,
String addressList) {
// load data
// 获取任务详情
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
if (jobInfo == null) {
logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
return;
}
// 如果存在外部参数则使用,外部参数是指在控制台手动执行的
if (executorParam != null) {
jobInfo.setExecutorParam(executorParam);
}
// 获取重试次数
int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
// 获取执行器组
XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
// cover addressList
// 如果存在外部地址则使用,和外部参数类似
if (addressList!=null && addressList.trim().length()>0) {
group.setAddressType(1);
group.setAddressList(addressList.trim());
}
// sharding param
int[] shardingParam = null;
// 处理分片参数,举例:分片参数格式为1/3,1代表index,3代表total
if (executorShardingParam!=null){
String[] shardingArr = executorShardingParam.split("/");
if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
shardingParam = new int[2];
shardingParam[0] = Integer.valueOf(shardingArr[0]);
shardingParam[1] = Integer.valueOf(shardingArr[1]);
}
}
// 如果任务的负载算法为分片广播并且分片参数无效等四个条件都满足
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
// 遍历执行器地址列表
for (int i = 0; i < group.getRegistryList().size(); i++) {
// 进行任务调度
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
}
} else {
// 分片参数赋值为默认值
if (shardingParam == null) {
shardingParam = new int[]{0, 1};
}
// 进行任务调度
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
}
}
上面是先是获取了执行器组,并根据调度策略来选择组内的执行器,然后调用processTrigger方法来执行调度。如果调度策略是分片广播,则会向组内的所有的执行器发送请求。
同一个执行器应用可能会部署多个实例,这些实例会被XxlJob归纳为一个组。
java
xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
// param
// 获取任务阻塞策略枚举
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
// 获取任务路由(负载均衡)策略枚举
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
// 将被拆分的分片参数再拼接起来(index/total),后面会用到
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
// 1、save log-id
// 创建并保存任务日志
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId()); // 设置任务id
jobLog.setTriggerTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 2、init trigger-param
// 封装任务触发参数
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setBroadcastIndex(index);
triggerParam.setBroadcastTotal(total);
// 3、init address
// 这里用了一个策略模式,通过负载策略获取到实际的负载策略处理类
String address = null;
ReturnT<String> routeAddressResult = null;
// 如果存在执行器地址
if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
// 如果路由策略是分片广播
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
// 如果索引合理,则选取下标为index的执行器地址
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
} else { // 否则选取第一个执行器的地址
address = group.getRegistryList().get(0);
}
} else {
// 根据路由策略选择执行器
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
}
}
} else {
// 如果没有地址,则返回错误
routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
}
// 4、trigger remote executor
// 请求目标执行器并拿到结果
ReturnT<String> triggerResult = null;
if (address != null) {
// 进行任务调度
triggerResult = runExecutor(triggerParam, address);
} else {
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
}
// 5、collection trigger info
// 对任务执行结果进行处理
StringBuffer triggerMsgSb = new StringBuffer();
triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
.append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
if (shardingParam != null) {
triggerMsgSb.append("("+shardingParam+")");
}
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);
triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
.append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");
// 6、save log trigger-info
// 更新任务日志
jobLog.setExecutorAddress(address);
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
jobLog.setExecutorShardingParam(shardingParam);
jobLog.setExecutorFailRetryCount(finalFailRetryCount);
//jobLog.setTriggerTime();
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
上面方法的步骤如下:
- 创建并设置任务调度参数;
- 执行任务调度;
- 对任务调度结果进行处理;
这里看看是怎么调度任务的。
java
xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
// 获取业务对象
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
// 进行任务调度
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}
先是获取了ExecutorBiz,再调用其run方法。
getExecutorBiz
run
java
xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java
public static ExecutorBiz getExecutorBiz(String address) throws Exception {
// valid
if (address==null || address.trim().length()==0) {
return null;
}
// load-cache
address = address.trim();
// 从缓存中获取
ExecutorBiz executorBiz = executorBizRepository.get(address);
if (executorBiz != null) {
return executorBiz;
}
// set-cache
// 创建新的业务对象
executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());
// 放入缓存中
executorBizRepository.put(address, executorBiz);
return executorBiz;
}
java
xxl-job-core/src/main/java/com/xxl/job/core/biz/client/ExecutorBizClient.java
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
// 发送请求到执行器
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
getExecutorBiz方法中创建的是ExecutorBizClient,因为调度器要主动请求执行器,所以调度器是客户端。 下面的方法是底层的通信实现,主要就是设置请求头,请求参数,发送请求和获取并解析响应结果。
java
xxl-job-core/src/main/java/com/xxl/job/core/util/XxlJobRemotingUtil.java
public static ReturnT postBody(String url, String accessToken, int timeout, Object requestObj, Class returnTargClassOfT) {
HttpURLConnection connection = null;
BufferedReader bufferedReader = null;
try {
// connection
URL realUrl = new URL(url);
connection = (HttpURLConnection) realUrl.openConnection();
// trust-https
boolean useHttps = url.startsWith("https");
if (useHttps) {
HttpsURLConnection https = (HttpsURLConnection) connection;
trustAllHosts(https);
}
// connection setting
connection.setRequestMethod("POST");
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setUseCaches(false);
connection.setReadTimeout(timeout * 1000);
connection.setConnectTimeout(3 * 1000);
connection.setRequestProperty("connection", "Keep-Alive");
connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
if(accessToken!=null && accessToken.trim().length()>0){
connection.setRequestProperty(XXL_JOB_ACCESS_TOKEN, accessToken);
}
// do connection
// 连接服务器
connection.connect();
// write requestBody
if (requestObj != null) {
String requestBody = GsonTool.toJson(requestObj);
DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
// 写入数据
dataOutputStream.write(requestBody.getBytes("UTF-8"));
// 发送数据
dataOutputStream.flush();
dataOutputStream.close();
}
/*byte[] requestBodyBytes = requestBody.getBytes("UTF-8");
connection.setRequestProperty("Content-Length", String.valueOf(requestBodyBytes.length));
OutputStream outwritestream = connection.getOutputStream();
outwritestream.write(requestBodyBytes);
outwritestream.flush();
outwritestream.close();*/
// valid StatusCode
// 获取响应码
int statusCode = connection.getResponseCode();
if (statusCode != 200) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-job remoting fail, StatusCode("+ statusCode +") invalid. for url : " + url);
}
// result
bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
StringBuilder result = new StringBuilder();
String line;
// 读取响应
while ((line = bufferedReader.readLine()) != null) {
result.append(line);
}
String resultJson = result.toString();
// parse returnT
try {
// 解析响应结果
ReturnT returnT = GsonTool.fromJson(resultJson, ReturnT.class, returnTargClassOfT);
return returnT;
} catch (Exception e) {
logger.error("xxl-job remoting (url="+url+") response content invalid("+ resultJson +").", e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-job remoting (url="+url+") response content invalid("+ resultJson +").");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-job remoting error("+ e.getMessage() +"), for url : " + url);
} finally {
try {
if (bufferedReader != null) {
bufferedReader.close();
}
if (connection != null) {
connection.disconnect();
}
} catch (Exception e2) {
logger.error(e2.getMessage(), e2);
}
}
}