XxlJob中执行器任务调度执行过程

XxlJob中执行器的注册一文中,讲到在启动时,会创建一个Netty服务器,该服务器就是与调度中心来通信的。本文就来分析一下在收到调度中心请求时,是怎么执行到我们注册的执行器方法的。


EmbedHttpServerHandler

这是一个EmbedServer类中的内部类,是被作为最后一个通道处理器设置到Netty的通道流水线中。当收到请求时,它的channelRead0方法会被调用。

v2.4.0
java
xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
    // request parse
    //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content());    // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
    // 转换编码,拿到请求数据
    String requestData = msg.content().toString(CharsetUtil.UTF_8);
    String uri = msg.uri();
    HttpMethod httpMethod = msg.method();
    boolean keepAlive = HttpUtil.isKeepAlive(msg);
    // 从请求头中解析到token
    String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);

    // invoke
    // 使用之前的业务线程处理任务
    bizThreadPool.execute(new Runnable() {
        @Override
        public void run() {
            // do invoke
            // 处理请求
            Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);

            // to json
            // 序列化响应
            String responseJson = GsonTool.toJson(responseObj);

            // write response
            // 写回响应
            writeResponse(ctx, keepAlive, responseJson);
        }
    });
}

收到请求时,会往线程池中提交一个任务,该任务中会调用process方法来处理请求。

v2.4.0
java
xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
    // valid
    // 校验请求方法
    if (HttpMethod.POST != httpMethod) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
    }
    // 校验请求uri
    if (uri == null || uri.trim().length() == 0) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
    }
    // 加盐请求token
    if (accessToken != null
            && accessToken.trim().length() > 0
            && !accessToken.equals(accessTokenReq)) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
    }

    // services mapping
    try {
        // 根据请求的uri执行不同的方法
        switch (uri) {
            case "/beat":
                return executorBiz.beat();
            case "/idleBeat":
                IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                return executorBiz.idleBeat(idleBeatParam);
            case "/run": // 执行任务
                TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                return executorBiz.run(triggerParam);
            case "/kill":
                KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                return executorBiz.kill(killParam);
            case "/log":
                LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                return executorBiz.log(logParam);
            default:
                return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
        return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
    }
}

可以看到,这里根据不同的URI,调用了executorBiz的不同方法,本文重点关注”/run“。

ExecutorBizImpl

v2.4.0
java
xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
    // load old:jobHandler + jobThread
    // 获取任务所在任务线程对象
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;

    // valid:jobHandler + jobThread
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    // 处理Bean模式
    if (GlueTypeEnum.BEAN == glueTypeEnum) {

        // new jobhandler
        // 找到handler,也就是被@XxlJob注解修饰的方法
        IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

        // valid old jobThread
        // 线程执行过其他任务,
        if (jobThread!=null && jobHandler != newJobHandler) {
            // change handler, need kill old thread
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

            // 则清空线程和handler,下面会设置新的
            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        if (jobHandler == null) {
            // 设置当前任务的处理器
            jobHandler = newJobHandler;
            // 如果没有对应的handler,则返回错误
            if (jobHandler == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
            }
        }

    } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {

        // valid old jobThread
        if (jobThread != null &&
                !(jobThread.getHandler() instanceof GlueJobHandler
                    && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
            // change handler or gluesource updated, need kill old thread
            removeOldReason = "change job source or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        if (jobHandler == null) {
            try {
                IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
            }
        }
    } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {

        // valid old jobThread
        if (jobThread != null &&
                !(jobThread.getHandler() instanceof ScriptJobHandler
                        && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
            // change script or gluesource updated, need kill old thread
            removeOldReason = "change job source or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        if (jobHandler == null) {
            jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
        }
    } else { // 无效的glueType
        return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
    }

    // executor block strategy
    if (jobThread != null) {
        // 获取阻塞策略
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        // 如果阻塞策略是丢弃后续调度
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // discard when running
            if (jobThread.isRunningOrHasQueue()) { // 如果线程正在执行任务或者队列中存在还未被执行的任务,则放弃本次调度
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
            // 如果阻塞策略是覆盖之前调度
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
            // kill running jobThread
            if (jobThread.isRunningOrHasQueue()) { // 结束之前的线程并放弃之前已缓存的调度请求,使用新的线程处理本次调度
                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                // 为了使用新的线程,所以这里设置为null
                jobThread = null;
            }
        } else {
            // just queue trigger
        }
    }

    // replace thread (new or exists invalid)
    if (jobThread == null) {
        // 注册jobId和执行线程的映射关系,会创建和启动新的线程
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }

    // push data to queue
    // 将请求放入缓存中,线程会异步地从队列中取出数据并处理
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}

该方法是执行器方的核心实现,主要逻辑如下:

  • 根据任务ID查找所在的线程对象;
  • 对于BEAN模式,查找任务处理器方法,在启动的时候扫描了这些方法;其他模式本文不做分析;
  • 如果没有与任务绑定的线程或者线程后来执行过其他的任务,那么会新建一个。
  • 如果存在任务对应的线程,那么:
    • 如果阻塞策略是丢弃后续调度,则直接返回错误;
    • 如果阻塞策略是覆盖之前的调度,则也会创建一个新的线程来执行,并且会中断之前的线程;
  • 最后将请求参数放入队列中,线程会异步读取并处理;

查找任务线程

v2.4.0
java
xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java
// key是任务id
private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
public static JobThread loadJobThread(int jobId){
    return jobThreadRepository.get(jobId);
}

查找执行器方法

v2.4.0
java
xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler loadJobHandler(String name){
    return jobHandlerRepository.get(name);
}

这里就是根据@XxlJob注解中的value属性来做的匹配。

注册线程

上面已经说过,会有三种情况会创建新的线程:

  • 没有与任务关联的线程;
  • 任务管理的线程执行过其他的任务;
  • 阻塞策略是覆盖之前的调度;
v2.4.0
java
xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java
// key是任务id
private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
    public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
        // 创建任务线程对象
        JobThread newJobThread = new JobThread(jobId, handler);
        // 启动线程
        newJobThread.start();
        logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});

        // 放入缓存中
        JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);	// putIfAbsent | oh my god, map's put method return the old value!!!
        // 如果任务已经存在调度
        if (oldJobThread != null) {
            // 停止之前的调度
            oldJobThread.toStop(removeOldReason);
            // 中断线程
            oldJobThread.interrupt();
        }

        return newJobThread;
    }

可以看到,如果已经存在线程在执行任务,那么会将其中断,这是对覆盖之前调度的阻塞策略的支持。

JobThread

v2.4.0
java
xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java
@Override
public void run() {

	// init
	try {
        // 执行初始化方法,也就是@XxlJob中init属性指定的方法
        handler.init();
    } catch (Throwable e) {
		logger.error(e.getMessage(), e);
    }

    // execute
    // 循环执行,直到被终止
    while(!toStop){
        running = false;
        idleTimes++;

        TriggerParam triggerParam = null;
        try {
            // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
            triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
            if (triggerParam!=null) {
                running = true;
                idleTimes = 0;
                triggerLogIdSet.remove(triggerParam.getLogId());

                // log filename, like "logPath/yyyy-MM-dd/9999.log"
                String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
                XxlJobContext xxlJobContext = new XxlJobContext(
                        triggerParam.getJobId(),
                        triggerParam.getExecutorParams(),
                        logFileName,
                        triggerParam.getBroadcastIndex(),
                        triggerParam.getBroadcastTotal());

                // init job context
                XxlJobContext.setXxlJobContext(xxlJobContext);

                // execute
                XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());

                if (triggerParam.getExecutorTimeout() > 0) {
                    // limit timeout
                    Thread futureThread = null;
                    try {
                        FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
                            @Override
                            public Boolean call() throws Exception {

                                // init job context
                                XxlJobContext.setXxlJobContext(xxlJobContext);

                                // 执行任务
                                handler.execute();
                                return true;
                            }
                        });
                        futureThread = new Thread(futureTask);
                        futureThread.start();

                        Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
                    } catch (TimeoutException e) {

                        XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
                        XxlJobHelper.log(e);

                        // handle result
                        XxlJobHelper.handleTimeout("job execute timeout ");
                    } finally {
                        futureThread.interrupt();
                    }
                } else {
                    // just execute
                    // 执行任务
                    handler.execute();
                }

                // valid execute handle data
                if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
                    XxlJobHelper.handleFail("job handle result lost.");
                } else {
                    String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
                    tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
                            ?tempHandleMsg.substring(0, 50000).concat("...")
                            :tempHandleMsg;
                    XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
                }
                XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
                        + XxlJobContext.getXxlJobContext().getHandleCode()
                        + ", handleMsg = "
                        + XxlJobContext.getXxlJobContext().getHandleMsg()
                );

            } else {
                if (idleTimes > 30) {
                    if(triggerQueue.size() == 0) {	// avoid concurrent trigger causes jobId-lost
                        XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
                    }
                }
            }
        } catch (Throwable e) {
            if (toStop) {
                XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
            }

            // handle result
            StringWriter stringWriter = new StringWriter();
            e.printStackTrace(new PrintWriter(stringWriter));
            String errorMsg = stringWriter.toString();

            XxlJobHelper.handleFail(errorMsg);

            XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
        } finally {
            if(triggerParam != null) {
                // callback handler info
                if (!toStop) {
                    // commonm
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                    		triggerParam.getLogId(),
                            triggerParam.getLogDateTime(),
                            XxlJobContext.getXxlJobContext().getHandleCode(),
                            XxlJobContext.getXxlJobContext().getHandleMsg() )
                    );
                } else {
                    // is killed
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                    		triggerParam.getLogId(),
                            triggerParam.getLogDateTime(),
                            XxlJobContext.HANDLE_CODE_FAIL,
                            stopReason + " [job running, killed]" )
                    );
                }
            }
        }
    }

    // callback trigger request in queue
    while(triggerQueue !=null && triggerQueue.size()>0){
        TriggerParam triggerParam = triggerQueue.poll();
        if (triggerParam!=null) {
            // is killed
            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                    triggerParam.getLogId(),
                    triggerParam.getLogDateTime(),
                    XxlJobContext.HANDLE_CODE_FAIL,
                    stopReason + " [job not executed, in the job queue, killed.]")
            );
        }
    }

    // destroy
    try {
        // 执行销毁方法,也是@XxlJob中destroy属性指定的方法
        handler.destroy();
    } catch (Throwable e) {
        logger.error(e.getMessage(), e);
    }

    logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}

在上面的方法中,会通过反射调用到我们编写的执行器方法,在运行前后分别会执行我们通过@XxlJob注解指定的initdestroy方法。