在客户端,我们只需要通过@XxlJob注解标记好执行器方法,那么运行中这些方法会自动被调用。实际上一个Spring Boot应用算是一个执行器,而应用中被@XxlJob修饰的方法被称为执行器方法。本文先来分析在启动过程中,是怎么注册执行器的,也就是怎么注册应用的。
注册XxlJobSpringExecutor
需要在执行器应用中往bean工厂中注册一个XxlJobSpringExecutor,看名称就知道这是XXL-JOB与Spring框架的整合。 下面是源码中给出来的样例配置。
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
/**
* 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
*
* 1、引入依赖:
* <dependency>
* <groupId>org.springframework.cloud</groupId>
* <artifactId>spring-cloud-commons</artifactId>
* <version>${version}</version>
* </dependency>
*
* 2、配置文件,或者容器启动变量
* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
*
* 3、获取IP
* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
*/
}
下面是配置文件样例。
### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### xxl-job, access token
xxl.job.accessToken=default_token
### xxl-job executor appname
xxl.job.executor.appname=xxl-job-executor-sample
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
xxl.job.executor.address=
### xxl-job executor server-info
xxl.job.executor.ip=
xxl.job.executor.port=9999
### xxl-job executor log-path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### xxl-job executor log-retention-days
xxl.job.executor.logretentiondays=30
其中:
- xxl.job.admin.addresses配置了调度中心的地址,下面注册执行器的时候会用到。
- xxl.job.accessToken需要确保和调度中心中的配置一样,否则与其无法通信。
- xxl.job.executor.xxx是执行器相关的属性,可以选择性配置,比如上面只配置了端口,地址会自动获取。
初始化
XxlJobSpringExecutor是一个SmartInitializingSingleton,所以在Spring的应用上下文启动的时候会被调用afterSingletonsInstantiated方法。
// start
@Override
public void afterSingletonsInstantiated() {
// init JobHandler Repository
/*initJobHandlerRepository(applicationContext);*/
// init JobHandler Repository (for method)
// 加载JobHandler并做处理
initJobHandlerMethodRepository(applicationContext);
// 加载Glue工厂实例
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start
try {
// 进入XxlJobExecutor主启动流程
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
先是加载任务处理器,然后再调用父类的start方法。
加载任务处理器
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
// init job handler from method
// 获取bean工厂中的所有bean
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
// 遍历bean名称
for (String beanDefinitionName : beanDefinitionNames) {
// get bean
Object bean = null;
// 获取@Lazy注解
Lazy onBean = applicationContext.findAnnotationOnBean(beanDefinitionName, Lazy.class);
if (onBean!=null){ // 如果当前bean是懒加载的,则跳过
logger.debug("xxl-job annotation scan, skip @Lazy Bean:{}", beanDefinitionName);
continue;
}else { // 获取当前bean实例
bean = applicationContext.getBean(beanDefinitionName);
}
// filter method
Map<Method, XxlJob> annotatedMethods = null; // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
try {
// 找bean类型中被@XxlJob注解修饰的方法
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
new MethodIntrospector.MetadataLookup<XxlJob>() {
@Override
public XxlJob inspect(Method method) {
// 获取方法上的@XxlJob注解信息
return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
}
});
} catch (Throwable ex) {
logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
}
// 不存在@XxlJob修饰的方法则跳过当前bean
if (annotatedMethods==null || annotatedMethods.isEmpty()) {
continue;
}
// generate and register method job handler
// 遍历类中被@XxlJob修饰的方法
for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
Method executeMethod = methodXxlJobEntry.getKey(); // 方法对象
XxlJob xxlJob = methodXxlJobEntry.getValue(); // @XxlJob注解对象
// register
// 注册job处理器
registJobHandler(xxlJob, bean, executeMethod);
}
}
}
上面方法的主要逻辑就是从Spring应用上下文中获取所有bean,然后判断它里面是否存在被@XxlJob注解修饰的方法。最后会把这些方法注册到缓存中,后续收到调度中心的运行请求时会查找合适的任务处理器。
public static IJobHandler loadJobHandler(String name){
return jobHandlerRepository.get(name);
}
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
// 添加到缓存中
return jobHandlerRepository.put(name, jobHandler);
}
// 注册任务处理器
protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod){
if (xxlJob == null) {
return;
}
String name = xxlJob.value();
//make and simplify the variables since they'll be called several times later
Class<?> clazz = bean.getClass();
String methodName = executeMethod.getName();
if (name.trim().length() == 0) {
throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");
}
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}
// execute method
/*if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
"The correct method format like \" public ReturnT<String> execute(String param) \" .");
}
if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
"The correct method format like \" public ReturnT<String> execute(String param) \" .");
}*/
executeMethod.setAccessible(true);
// init and destroy
Method initMethod = null;
Method destroyMethod = null;
if (xxlJob.init().trim().length() > 0) {
try {
// 获取初始化方法
initMethod = clazz.getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");
}
}
if (xxlJob.destroy().trim().length() > 0) {
try {
// 获取销毁方法
destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");
}
}
// registry jobhandler
// 调用本方法上面的重载方法,这里将处理器封装为MethodJobHandler对象
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
}
在该方法中,除了执行器方法本身,还会解析@XxlJob注解配置的init和destroy方法,需要注意的是这两个方法都是和执行器方法在同一个类中。最后会把执行器方法封装为MethodJobHandler并注册到jobHandlerRepository集合中。
XxlJobExecutor的start方法
public void start() throws Exception {
// init logpath
// 初始化日志路径
XxlJobFileAppender.initLogPath(logPath);
// init invoker, admin-client
// 处理调度中心地址
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread
// 启动日志文件清理线程
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread
// 启动回调线程,用于将执行器的结果返回给调度中心
TriggerCallbackThread.getInstance().start();
// init executor-server
// 启动内嵌服务器EmbedServer
initEmbedServer(address, ip, port, appname, accessToken);
}
处理调度中心地址
是可以配置多个调度中心地址的,所以这里先对多个地址进行字符串分割处理。
// ---------------------- admin-client (rpc invoker) ----------------------
private static List<AdminBiz> adminBizList;
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
if (adminAddresses!=null && adminAddresses.trim().length()>0) {
// 处理多个调度中心地址
for (String address: adminAddresses.trim().split(",")) {
if (address!=null && address.trim().length()>0) {
AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
if (adminBizList == null) {
adminBizList = new ArrayList<AdminBiz>();
}
// 添加到调度中心列表中
adminBizList.add(adminBiz);
}
}
}
}
public static List<AdminBiz> getAdminBizList(){
return adminBizList;
}
创建和启动Netty服务器
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
// fill ip port
// 如果没配置端口,则找一个可用的端口,从9999依次递减
port = port>0?port: NetUtil.findAvailablePort(9999);
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
// generate address
if (address==null || address.trim().length()==0) {
// 拼接HTTP协议的请求地址
String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null
address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
}
// accessToken
if (accessToken==null || accessToken.trim().length()==0) {
logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
}
// start
// 创建Netty服务器
embedServer = new EmbedServer();
// 启动netty服务器
embedServer.start(address, port, appname, accessToken);
}
这里创建了EmbedServer类型的对象,并调用了start方法。
public void start(final String address, final int port, final String appname, final String accessToken) {
// 创建处理来自调度中心请求的业务对象
executorBiz = new ExecutorBizImpl();
thread = new Thread(new Runnable() {
@Override
public void run() {
// param
EventLoopGroup bossGroup = new NioEventLoopGroup(); // 负责监听连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 负责业务逻辑处理
// 创建线程池,负责处理业务,和netty无关
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
}
});
try {
// start server
// 创建Netty的启动器
ServerBootstrap bootstrap = new ServerBootstrap();
// 设置相关的属性
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
// 检查调度中心是否存活
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
// HttpServerCodec和HttpObjectAggregator用于处理请求数据
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
// 处理业务的handler
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
// 绑定端口,以同步方式启动
ChannelFuture future = bootstrap.bind(port).sync();
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
// start registry
// 启动注册线程
startRegistry(appname, address);
// wait util stop
// 让netty服务器线程不会关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
} finally {
// stop
try {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
}
这里是通过创建一个线程来异步启动Netty的服务器。注意设置的最后一个通道处理器是EmbedHttpServerHandler类型的,后续分析收到调度中心请求时会分析它。
在启动之后,会启动心跳线程,该线程除了首次注册执行器以外,还会与调度中心保持心跳。
启动心跳线程
private static ExecutorRegistryThread instance = new ExecutorRegistryThread();
public static ExecutorRegistryThread getInstance(){
return instance;
}
public void start(final String appname, final String address){
// valid
if (appname==null || appname.trim().length()==0) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");
return;
}
if (XxlJobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
return;
}
// 创建注册线程,兼顾发送心跳的任务
registryThread = new Thread(new Runnable() {
@Override
public void run() {
// registry
// 不断循环向调度中心发起注册操作,避免被认为当前应用服务是下线了。
while (!toStop) {
try {
// 创建注册参数对象,第一个参数对应了xxl_job_registry表中的registry_group字段
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
// 遍历调度中心列表
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
// 向调度中心发起注册
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
// 只要一个成功,就退出for循环了,因为多个调度中心共享同一个数据库
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
if (!toStop) {
// 睡眠一个心跳周期
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
}
} catch (InterruptedException e) {
if (!toStop) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
}
}
}
// 执行到这里说明服务被停止
// registry remove
try {
// 创建参数对象
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
// 遍历调度中心
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
// 服务下线
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
if (!toStop) {
logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
}
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>>>> xxl-job, executor registry thread destroy.");
}
});
registryThread.setDaemon(true);
registryThread.setName("xxl-job, executor ExecutorRegistryThread");
registryThread.start();
}
执行器每隔30秒会向调度中心发起一个注册请求,其实就是在保持心跳,调度中心是会定期清理长时间没有心跳活动的执行器的。