XxlJob中执行器的注册

在客户端,我们只需要通过@XxlJob注解标记好执行器方法,那么运行中这些方法会自动被调用。实际上一个Spring Boot应用算是一个执行器,而应用中被@XxlJob修饰的方法被称为执行器方法。本文先来分析在启动过程中,是怎么注册执行器的,也就是怎么注册应用的。


注册XxlJobSpringExecutor

需要在执行器应用中往bean工厂中注册一个XxlJobSpringExecutor,看名称就知道这是XXL-JOB与Spring框架的整合。 下面是源码中给出来的样例配置。

v2.4.0
java
xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/core/config/XxlJobConfig.java
@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

    /**
     * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
     *
     *      1、引入依赖:
     *          <dependency>
     *             <groupId>org.springframework.cloud</groupId>
     *             <artifactId>spring-cloud-commons</artifactId>
     *             <version>${version}</version>
     *         </dependency>
     *
     *      2、配置文件,或者容器启动变量
     *          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
     *
     *      3、获取IP
     *          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
     */


}

下面是配置文件样例。

v2.4.0
properties
xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties
### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin

### xxl-job, access token
xxl.job.accessToken=default_token

### xxl-job executor appname
xxl.job.executor.appname=xxl-job-executor-sample
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
xxl.job.executor.address=
### xxl-job executor server-info
xxl.job.executor.ip=
xxl.job.executor.port=9999
### xxl-job executor log-path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### xxl-job executor log-retention-days
xxl.job.executor.logretentiondays=30

其中:

  • xxl.job.admin.addresses配置了调度中心的地址,下面注册执行器的时候会用到。
  • xxl.job.accessToken需要确保和调度中心中的配置一样,否则与其无法通信。
  • xxl.job.executor.xxx是执行器相关的属性,可以选择性配置,比如上面只配置了端口,地址会自动获取。

初始化

XxlJobSpringExecutor是一个SmartInitializingSingleton,所以在Spring的应用上下文启动的时候会被调用afterSingletonsInstantiated方法。

v2.4.0
java
xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java
// start
@Override
public void afterSingletonsInstantiated() {

    // init JobHandler Repository
    /*initJobHandlerRepository(applicationContext);*/

    // init JobHandler Repository (for method)
    // 加载JobHandler并做处理
    initJobHandlerMethodRepository(applicationContext);

    // 加载Glue工厂实例
    // refresh GlueFactory
    GlueFactory.refreshInstance(1);

    // super start
    try {
        // 进入XxlJobExecutor主启动流程
        super.start();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

先是加载任务处理器,然后再调用父类的start方法。

加载任务处理器

v2.4.0
java
xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
    if (applicationContext == null) {
        return;
    }
    // init job handler from method
    // 获取bean工厂中的所有bean
    String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
    // 遍历bean名称
    for (String beanDefinitionName : beanDefinitionNames) {

        // get bean
        Object bean = null;
        // 获取@Lazy注解
        Lazy onBean = applicationContext.findAnnotationOnBean(beanDefinitionName, Lazy.class);
        if (onBean!=null){ // 如果当前bean是懒加载的,则跳过
            logger.debug("xxl-job annotation scan, skip @Lazy Bean:{}", beanDefinitionName);
            continue;
        }else { // 获取当前bean实例
            bean = applicationContext.getBean(beanDefinitionName);
        }

        // filter method
        Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
        try {
            // 找bean类型中被@XxlJob注解修饰的方法
            annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                    new MethodIntrospector.MetadataLookup<XxlJob>() {
                        @Override
                        public XxlJob inspect(Method method) {
                            // 获取方法上的@XxlJob注解信息
                            return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                        }
                    });
        } catch (Throwable ex) {
            logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
        }

        // 不存在@XxlJob修饰的方法则跳过当前bean
        if (annotatedMethods==null || annotatedMethods.isEmpty()) {
            continue;
        }

        // generate and register method job handler
        // 遍历类中被@XxlJob修饰的方法
        for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
            Method executeMethod = methodXxlJobEntry.getKey(); // 方法对象
            XxlJob xxlJob = methodXxlJobEntry.getValue(); // @XxlJob注解对象
            // register
            // 注册job处理器
            registJobHandler(xxlJob, bean, executeMethod);
        }

    }
}

上面方法的主要逻辑就是从Spring应用上下文中获取所有bean,然后判断它里面是否存在被@XxlJob注解修饰的方法。最后会把这些方法注册到缓存中,后续收到调度中心的运行请求时会查找合适的任务处理器。

v2.4.0
java
xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java
public static IJobHandler loadJobHandler(String name){
    return jobHandlerRepository.get(name);
}


public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
    logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
    // 添加到缓存中
    return jobHandlerRepository.put(name, jobHandler);
}
// 注册任务处理器
protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod){
    if (xxlJob == null) {
        return;
    }

    String name = xxlJob.value();
    //make and simplify the variables since they'll be called several times later
    Class<?> clazz = bean.getClass();
    String methodName = executeMethod.getName();
    if (name.trim().length() == 0) {
        throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");
    }
    if (loadJobHandler(name) != null) {
        throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
    }

    // execute method
    /*if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
        throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                "The correct method format like \" public ReturnT<String> execute(String param) \" .");
    }
    if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
        throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                "The correct method format like \" public ReturnT<String> execute(String param) \" .");
    }*/

    executeMethod.setAccessible(true);

    // init and destroy
    Method initMethod = null;
    Method destroyMethod = null;

    if (xxlJob.init().trim().length() > 0) {
        try {
            // 获取初始化方法
            initMethod = clazz.getDeclaredMethod(xxlJob.init());
            initMethod.setAccessible(true);
        } catch (NoSuchMethodException e) {
            throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");
        }
    }
    if (xxlJob.destroy().trim().length() > 0) {
        try {
            // 获取销毁方法
            destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());
            destroyMethod.setAccessible(true);
        } catch (NoSuchMethodException e) {
            throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");
        }
    }

    // registry jobhandler
    // 调用本方法上面的重载方法,这里将处理器封装为MethodJobHandler对象
    registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));

}

在该方法中,除了执行器方法本身,还会解析@XxlJob注解配置的initdestroy方法,需要注意的是这两个方法都是和执行器方法在同一个类中。最后会把执行器方法封装为MethodJobHandler并注册到jobHandlerRepository集合中。

XxlJobExecutorstart方法

v2.4.0
java
xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java
public void start() throws Exception {

    // init logpath
    // 初始化日志路径
    XxlJobFileAppender.initLogPath(logPath);

    // init invoker, admin-client
    // 处理调度中心地址
    initAdminBizList(adminAddresses, accessToken);


    // init JobLogFileCleanThread
    // 启动日志文件清理线程
    JobLogFileCleanThread.getInstance().start(logRetentionDays);

    // init TriggerCallbackThread
    // 启动回调线程,用于将执行器的结果返回给调度中心
    TriggerCallbackThread.getInstance().start();

    // init executor-server
    // 启动内嵌服务器EmbedServer
    initEmbedServer(address, ip, port, appname, accessToken);
}

处理调度中心地址

是可以配置多个调度中心地址的,所以这里先对多个地址进行字符串分割处理。

v2.4.0
java
xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java
// ---------------------- admin-client (rpc invoker) ----------------------
private static List<AdminBiz> adminBizList;
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
    if (adminAddresses!=null && adminAddresses.trim().length()>0) {
        // 处理多个调度中心地址
        for (String address: adminAddresses.trim().split(",")) {
            if (address!=null && address.trim().length()>0) {

                AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);

                if (adminBizList == null) {
                    adminBizList = new ArrayList<AdminBiz>();
                }
                // 添加到调度中心列表中
                adminBizList.add(adminBiz);
            }
        }
    }
}

public static List<AdminBiz> getAdminBizList(){
    return adminBizList;
}

创建和启动Netty服务器

v2.4.0
java
xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

    // fill ip port
    // 如果没配置端口,则找一个可用的端口,从9999依次递减
    port = port>0?port: NetUtil.findAvailablePort(9999);
    ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();

    // generate address
    if (address==null || address.trim().length()==0) {
        // 拼接HTTP协议的请求地址
        String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null
        address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
    }

    // accessToken
    if (accessToken==null || accessToken.trim().length()==0) {
        logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
    }

    // start
    // 创建Netty服务器
    embedServer = new EmbedServer();
    // 启动netty服务器
    embedServer.start(address, port, appname, accessToken);
}

这里创建了EmbedServer类型的对象,并调用了start方法。

v2.4.0
java
xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java
public void start(final String address, final int port, final String appname, final String accessToken) {
    // 创建处理来自调度中心请求的业务对象
    executorBiz = new ExecutorBizImpl();
    thread = new Thread(new Runnable() {
        @Override
        public void run() {
            // param
            EventLoopGroup bossGroup = new NioEventLoopGroup(); // 负责监听连接
            EventLoopGroup workerGroup = new NioEventLoopGroup(); // 负责业务逻辑处理
            // 创建线程池,负责处理业务,和netty无关
            ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                    0,
                    200,
                    60L,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(2000),
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());
                        }
                    },
                    new RejectedExecutionHandler() {
                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                            throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
                        }
                    });
            try {
                // start server
                // 创建Netty的启动器
                ServerBootstrap bootstrap = new ServerBootstrap();
                // 设置相关的属性
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel channel) throws Exception {
                                channel.pipeline()
                                        // 检查调度中心是否存活
                                        .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                        // HttpServerCodec和HttpObjectAggregator用于处理请求数据
                                        .addLast(new HttpServerCodec())
                                        .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                        // 处理业务的handler
                                        .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                            }
                        })
                        .childOption(ChannelOption.SO_KEEPALIVE, true);

                // bind
                // 绑定端口,以同步方式启动
                ChannelFuture future = bootstrap.bind(port).sync();

                logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

                // start registry
                // 启动注册线程
                startRegistry(appname, address);

                // wait util stop
                // 让netty服务器线程不会关闭
                future.channel().closeFuture().sync();

            } catch (InterruptedException e) {
                logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
            } catch (Exception e) {
                logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
            } finally {
                // stop
                try {
                    workerGroup.shutdownGracefully();
                    bossGroup.shutdownGracefully();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
    thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
    thread.start();
}

这里是通过创建一个线程来异步启动Netty的服务器。注意设置的最后一个通道处理器是EmbedHttpServerHandler类型的,后续分析收到调度中心请求时会分析它。

在启动之后,会启动心跳线程,该线程除了首次注册执行器以外,还会与调度中心保持心跳。

启动心跳线程

v2.4.0
java
xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java
private static ExecutorRegistryThread instance = new ExecutorRegistryThread();
public static ExecutorRegistryThread getInstance(){
    return instance;
}
public void start(final String appname, final String address){

    // valid
    if (appname==null || appname.trim().length()==0) {
        logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");
        return;
    }
    if (XxlJobExecutor.getAdminBizList() == null) {
        logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
        return;
    }

    // 创建注册线程,兼顾发送心跳的任务
    registryThread = new Thread(new Runnable() {
        @Override
        public void run() {

            // registry
            // 不断循环向调度中心发起注册操作,避免被认为当前应用服务是下线了。
            while (!toStop) {
                try {
                    // 创建注册参数对象,第一个参数对应了xxl_job_registry表中的registry_group字段
                    RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                    // 遍历调度中心列表
                    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                        try {
                            // 向调度中心发起注册
                            ReturnT<String> registryResult = adminBiz.registry(registryParam);
                            if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                registryResult = ReturnT.SUCCESS;
                                logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                // 只要一个成功,就退出for循环了,因为多个调度中心共享同一个数据库
                                break;
                            } else {
                                logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            }
                        } catch (Exception e) {
                            logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                        }

                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }

                }

                try {
                    if (!toStop) {
                        // 睡眠一个心跳周期
                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                    }
                } catch (InterruptedException e) {
                    if (!toStop) {
                        logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
                    }
                }
            }

            // 执行到这里说明服务被停止

            // registry remove
            try {
                // 创建参数对象
                RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                // 遍历调度中心
                for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                    try {
                        // 服务下线
                        ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
                        if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                            registryResult = ReturnT.SUCCESS;
                            logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            break;
                        } else {
                            logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
                        }

                    }

                }
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor registry thread destroy.");

        }
    });
    registryThread.setDaemon(true);
    registryThread.setName("xxl-job, executor ExecutorRegistryThread");
    registryThread.start();
}

执行器每隔30秒会向调度中心发起一个注册请求,其实就是在保持心跳,调度中心是会定期清理长时间没有心跳活动的执行器的。