XxlJob中执行器任务调度执行过程
在XxlJob中执行器的注册一文中,讲到在启动时,会创建一个Netty服务器,该服务器就是与调度中心来通信的。本文就来分析一下在收到调度中心请求时,是怎么执行到我们注册的执行器方法的。
EmbedHttpServerHandler
这是一个EmbedServer类中的内部类,是被作为最后一个通道处理器设置到Netty的通道流水线中。当收到请求时,它的channelRead0方法会被调用。
java
xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
// request parse
//final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
// 转换编码,拿到请求数据
String requestData = msg.content().toString(CharsetUtil.UTF_8);
String uri = msg.uri();
HttpMethod httpMethod = msg.method();
boolean keepAlive = HttpUtil.isKeepAlive(msg);
// 从请求头中解析到token
String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
// invoke
// 使用之前的业务线程处理任务
bizThreadPool.execute(new Runnable() {
@Override
public void run() {
// do invoke
// 处理请求
Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
// to json
// 序列化响应
String responseJson = GsonTool.toJson(responseObj);
// write response
// 写回响应
writeResponse(ctx, keepAlive, responseJson);
}
});
}
收到请求时,会往线程池中提交一个任务,该任务中会调用process方法来处理请求。
java
xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
// valid
// 校验请求方法
if (HttpMethod.POST != httpMethod) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
// 校验请求uri
if (uri == null || uri.trim().length() == 0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
// 加盐请求token
if (accessToken != null
&& accessToken.trim().length() > 0
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
// services mapping
try {
// 根据请求的uri执行不同的方法
switch (uri) {
case "/beat":
return executorBiz.beat();
case "/idleBeat":
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
case "/run": // 执行任务
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
case "/kill":
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
case "/log":
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
default:
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
}
}
可以看到,这里根据不同的URI,调用了executorBiz的不同方法,本文重点关注”/run“。
ExecutorBizImpl
java
xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
// load old:jobHandler + jobThread
// 获取任务所在任务线程对象
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
// valid:jobHandler + jobThread
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
// 处理Bean模式
if (GlueTypeEnum.BEAN == glueTypeEnum) {
// new jobhandler
// 找到handler,也就是被@XxlJob注解修饰的方法
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// valid old jobThread
// 线程执行过其他任务,
if (jobThread!=null && jobHandler != newJobHandler) {
// change handler, need kill old thread
removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
// 则清空线程和handler,下面会设置新的
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
// 设置当前任务的处理器
jobHandler = newJobHandler;
// 如果没有对应的handler,则返回错误
if (jobHandler == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
}
}
} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof GlueJobHandler
&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change handler or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
try {
IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
}
} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof ScriptJobHandler
&& ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change script or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
}
} else { // 无效的glueType
return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
}
// executor block strategy
if (jobThread != null) {
// 获取阻塞策略
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
// 如果阻塞策略是丢弃后续调度
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// discard when running
if (jobThread.isRunningOrHasQueue()) { // 如果线程正在执行任务或者队列中存在还未被执行的任务,则放弃本次调度
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
// 如果阻塞策略是覆盖之前调度
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
// kill running jobThread
if (jobThread.isRunningOrHasQueue()) { // 结束之前的线程并放弃之前已缓存的调度请求,使用新的线程处理本次调度
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
// 为了使用新的线程,所以这里设置为null
jobThread = null;
}
} else {
// just queue trigger
}
}
// replace thread (new or exists invalid)
if (jobThread == null) {
// 注册jobId和执行线程的映射关系,会创建和启动新的线程
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
// push data to queue
// 将请求放入缓存中,线程会异步地从队列中取出数据并处理
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
该方法是执行器方的核心实现,主要逻辑如下:
- 根据任务ID查找所在的线程对象;
- 对于BEAN模式,查找任务处理器方法,在启动的时候扫描了这些方法;其他模式本文不做分析;
- 如果没有与任务绑定的线程或者线程后来执行过其他的任务,那么会新建一个。
- 如果存在任务对应的线程,那么:
- 如果阻塞策略是丢弃后续调度,则直接返回错误;
- 如果阻塞策略是覆盖之前的调度,则也会创建一个新的线程来执行,并且会中断之前的线程;
- 最后将请求参数放入队列中,线程会异步读取并处理;
查找任务线程
java
xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java
// key是任务id
private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
public static JobThread loadJobThread(int jobId){
return jobThreadRepository.get(jobId);
}
查找执行器方法
java
xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler loadJobHandler(String name){
return jobHandlerRepository.get(name);
}
这里就是根据@XxlJob注解中的value属性来做的匹配。
注册线程
上面已经说过,会有三种情况会创建新的线程:
- 没有与任务关联的线程;
- 任务管理的线程执行过其他的任务;
- 阻塞策略是覆盖之前的调度;
java
xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java
// key是任务id
private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
// 创建任务线程对象
JobThread newJobThread = new JobThread(jobId, handler);
// 启动线程
newJobThread.start();
logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
// 放入缓存中
JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!!
// 如果任务已经存在调度
if (oldJobThread != null) {
// 停止之前的调度
oldJobThread.toStop(removeOldReason);
// 中断线程
oldJobThread.interrupt();
}
return newJobThread;
}
可以看到,如果已经存在线程在执行任务,那么会将其中断,这是对覆盖之前调度的阻塞策略的支持。
JobThread
java
xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java
@Override
public void run() {
// init
try {
// 执行初始化方法,也就是@XxlJob中init属性指定的方法
handler.init();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
// execute
// 循环执行,直到被终止
while(!toStop){
running = false;
idleTimes++;
TriggerParam triggerParam = null;
try {
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerParam!=null) {
running = true;
idleTimes = 0;
triggerLogIdSet.remove(triggerParam.getLogId());
// log filename, like "logPath/yyyy-MM-dd/9999.log"
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
XxlJobContext xxlJobContext = new XxlJobContext(
triggerParam.getJobId(),
triggerParam.getExecutorParams(),
logFileName,
triggerParam.getBroadcastIndex(),
triggerParam.getBroadcastTotal());
// init job context
XxlJobContext.setXxlJobContext(xxlJobContext);
// execute
XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
if (triggerParam.getExecutorTimeout() > 0) {
// limit timeout
Thread futureThread = null;
try {
FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
// init job context
XxlJobContext.setXxlJobContext(xxlJobContext);
// 执行任务
handler.execute();
return true;
}
});
futureThread = new Thread(futureTask);
futureThread.start();
Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
} catch (TimeoutException e) {
XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
XxlJobHelper.log(e);
// handle result
XxlJobHelper.handleTimeout("job execute timeout ");
} finally {
futureThread.interrupt();
}
} else {
// just execute
// 执行任务
handler.execute();
}
// valid execute handle data
if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
XxlJobHelper.handleFail("job handle result lost.");
} else {
String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
?tempHandleMsg.substring(0, 50000).concat("...")
:tempHandleMsg;
XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
}
XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
+ XxlJobContext.getXxlJobContext().getHandleCode()
+ ", handleMsg = "
+ XxlJobContext.getXxlJobContext().getHandleMsg()
);
} else {
if (idleTimes > 30) {
if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
}
}
}
} catch (Throwable e) {
if (toStop) {
XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
}
// handle result
StringWriter stringWriter = new StringWriter();
e.printStackTrace(new PrintWriter(stringWriter));
String errorMsg = stringWriter.toString();
XxlJobHelper.handleFail(errorMsg);
XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
} finally {
if(triggerParam != null) {
// callback handler info
if (!toStop) {
// commonm
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.getXxlJobContext().getHandleCode(),
XxlJobContext.getXxlJobContext().getHandleMsg() )
);
} else {
// is killed
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.HANDLE_CODE_FAIL,
stopReason + " [job running, killed]" )
);
}
}
}
}
// callback trigger request in queue
while(triggerQueue !=null && triggerQueue.size()>0){
TriggerParam triggerParam = triggerQueue.poll();
if (triggerParam!=null) {
// is killed
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.HANDLE_CODE_FAIL,
stopReason + " [job not executed, in the job queue, killed.]")
);
}
}
// destroy
try {
// 执行销毁方法,也是@XxlJob中destroy属性指定的方法
handler.destroy();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}
在上面的方法中,会通过反射调用到我们编写的执行器方法,在运行前后分别会执行我们通过@XxlJob注解指定的init和destroy方法。