当前位置:   article > 正文

【手把手】分布式定时任务调度解析之xxl-job_定时任务xxljob

定时任务xxljob

1、xxl-job好像很火?

在之前我写的讲解Quartz中有介绍过,Quartz有差不多二十年的历史,调度模型已经非常成熟了,而且很容易集成到Spring中去,用来执行业务任务是一个很好的选择。但是越早的设计存在的问题也越明显,比如:
1、调度逻辑(Scheduler)和任务类耦合在同一个项目中,随着调度任务数量逐渐增多,同时调度任务逻辑逐渐加重,调度系统的整体性能会受到很大的影响;
2、Quartz集群的节点之间负载结果是随机的,谁抢到了数据库行锁就由谁去执行任务,这就有可能出现旱的旱死,涝的涝死的情况,发挥不了机器的性能;
3、Quartz本身没有提供动态调度和管理界面的功能,需要自己根据API进行开发;
4、Quartz的日志记录、数据统计、监控不是特别完善;
所以xxl-job和Elastic-Job都是对Quartz进行了封装,用起来更简单,功能更强大。

Quartz中最重要的三个对象:Job(作业)、Trigger(触发器)、Scheduler(调度器)。而xxl-job的调度原理就是,调度线程在一个while循环中不断地获取一定数量的即将触发的Trigger,拿到绑定的Job,包装成工作线程执行。当然,不管在任何调度系统中,底层都是线程模型。如果要自己写一个调度系统,一定要对多线程并发这一块有比较深入的学习,比如线程怎么启动怎么wait,怎么notify ,怎么加锁等等。

其实xxl-job一开始也只是一个大众点评的程序员的业余之作,于2015年开源:https://github.com/xuxueli/xxl-job。众所周知,大众点评因为被美团收购了,现在是美团点评。xxl是作者名字许雪里的首字母简写,除了xxl-job之外作者还开源了很多其他组件,现在一共有11个开源项目。到目前为止使用xxl-job的公司有几百家,算上那些没有登记的公司,实际上应该有几千家。在xxl-job早期的版本中,直接使用了Quartz的调度模型,直到2019年7月7日发布的7.27 版本才移除Quartz依赖。实际上即使重构代码移除了Quartz的依赖,xxl-job中也到处是Quartz的影子。比如任务、调度器、触发器的三个维度的经典设计,不过后面自从2.2.0版本开始,版本更新几乎没有什么变化。

跟老牌的Quartz相比,xxl-job拥有更加丰富的功能,总体上可以分成三类:
1、性能的提升:可以调度更多的任务;
2、可靠性的提升:任务超时、失败、故障转移的处理;
3、运维更加便捷:提供操作界面、有用户权限、详细的日志、提供通知配置、自动生成报表;

2、快速入门

认识xxl-job目录结构

以2.2.0版本为基础,进行xxl-job的介绍:https://github.com/xuxueli/xxl-job/releases/tag/v2.2.0,将源码包下载下来之后,用IDEA打开项目,xxl-job整体的目录结构如下:

doc:这里面全是一些相关的文档资料,包括依赖的数据库脚本语言,其实没啥好说的,该看看,该执行执行;

xxl-job-admin:相当于Quartz中的Scheduler调度器。在Quartz中将调度器和任务耦合在一起。而在xxl-job中将调度器单独剥离出来作为一个独立的工程,不再是任务调度项目中的一个线程;

xxl-job-core:在使用的时候其实也无需过多关注,这个是核心依赖jar包,只要通过坐标引入依赖直接使用即可;

xxl-job-executor-samples:任务的执行器,可以理解成是一个演示用的业务实例。也就是说,当在实际项目中需要使用xxl-job的时候,可以参考samples中的设计和写法,它引什么你引什么,它怎么写你怎么写,照葫芦画瓢就完事儿了。

xxl-job表结构解析

xxl-job对于任务数据的管理依赖于数据库,很多人以为xxl-job只支持MySQL数据库, 这个说法不够准确。因为xxl-job本身对于数据库的连接是基于JDBC去连接,换句话说,只要是JDBC多支持的数据库xxl-job其实都可以使用。但是如果想要使用MySQL以外其它的数据库的话,得去改动一下xxl-job的源码,原生的xxl-job其实不支持的东西还是挺多的,没有那么万能好用说实话。

在正式的使用xxl-job之前,需要先对xxl-job所依赖的表结构做个简单的认知,表结构脚本存放在:doc/db/tables_xxl_job.sql

xxl_job_group:执行器信息表。维护任务执行器信息;

xxl_job_info:调度扩展信息表。用于保存xxl-job调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等;

xxl_job_lock:任务调度锁表;

xxl_job_log:调度日志表。用于保存xxl-job任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;

xxl_job_log_report:调度日志报表。用户存储xxl-job任务调度日志的报表,调度中心报表功能页面会用到;

xxl_job_logglue:任务glue日志。用于保存glue更新历史,用于支持glue的版本回溯功能;glue就是说可以写一段代码,直接嵌入程序中执行,这是一种非常危险的方式。即是说可以通过外部的定时任务调度,嵌一段代码到业务中,这种操作方式非常的危险,如果有生产库的权限,排查其问题来就相当麻烦了,代码中有什么错都不知道;

xxl_job_registry:执行器注册表。维护在线的执行器和调度中心机器地址信息;

xxl_job_user:系统用户表。登录后台管理的账号和密码,这个可以在配置文件中配置;

xxl-job的调度器和业务执行是独立的,调度器决定任务的调度,并且通过HTTP的方式调用执行器接口执行任务。所以至少需要一个调度中心和一个执行器,当然也可以集群部署。

构建调度中心

如上所说,在xxl-job中将调度器单独剥离出来成为一个独立的工程——调度中心,这也就意味着调度中心完全可以单独打成一个jar包,然后丢到操作系统中直接启动成为一个独立的进程,真正做到和执行的任务互不侵犯。

那么要想打包一个独立的应用来说,最需要注意的是什么,那必须是它的配置文件:


    
    
  1. ### web
  2. server.port= 7391
  3. server.servlet.context-path=/xxl-job-admin
  4. ### actuator
  5. management.server.servlet.context-path=/actuator
  6. management.health.mail.enabled= false
  7. ### resources
  8. spring.mvc.servlet.load-on-startup= 0
  9. spring.mvc. static-path-pattern=/ static /**
  10. spring.resources.static-locations=classpath:/static/
  11. ### freemarker
  12. spring.freemarker.templateLoaderPath=classpath:/templates/
  13. spring.freemarker.suffix=.ftl
  14. spring.freemarker.charset=UTF-8
  15. spring.freemarker.request-context-attribute=request
  16. spring.freemarker.settings.number_format=0.##########
  17. ### mybatis
  18. mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml
  19. #mybatis.type-aliases-package=com.xxl.job.admin.core.model
  20. ### xxl-job, datasource
  21. spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
  22. spring.datasource.username=root
  23. spring.datasource.password=Lee@0629
  24. spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
  25. ### xxl-job-user
  26. xxl.job.login.username=admin
  27. xxl.job.login.password=123456
  28. ### datasource-pool
  29. spring.datasource.type=com.zaxxer.hikari.HikariDataSource
  30. spring.datasource.hikari.minimum-idle=10
  31. spring.datasource.hikari.maximum-pool-size=30
  32. spring.datasource.hikari.auto-commit=true
  33. spring.datasource.hikari.idle-timeout=30000
  34. spring.datasource.hikari.pool-name=HikariCP
  35. spring.datasource.hikari.max-lifetime=900000
  36. spring.datasource.hikari.connection-timeout=10000
  37. spring.datasource.hikari.connection-test-query=SELECT 1
  38. ### xxl-job, email
  39. spring.mail.host=smtp.qq.com
  40. spring.mail.port=25
  41. spring.mail.username=xxx@qq.com
  42. spring.mail.password=xxx
  43. spring.mail.properties.mail.smtp.auth=true
  44. spring.mail.properties.mail.smtp.starttls.enable=true
  45. spring.mail.properties.mail.smtp.starttls.required=true
  46. spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory
  47. ### xxl-job, access token
  48. xxl.job.accessToken=
  49. ### xxl-job, i18n (default is zh_CN, and you can choose "zh_CN", "zh_TC" and "en")
  50. xxl.job.i18n=zh_CN
  51. ## xxl-job, triggerpool max size
  52. xxl.job.triggerpool.fast.max=200
  53. xxl.job.triggerpool.slow.max=100
  54. ### xxl-job, log retention days
  55. xxl.job.logretentiondays=30
  • 1

没想到xxl-job用的还是.properties文件,我还是比较习惯使用.yml文件。言归正规,在这个配置文件中有几个点需要格外关注一下:

1、关于数据库的链接信息需要修改成自己的数据库服务链接信息;

2、显示配置管理后台的用户信息,这个用户的信息需要和xxl-job依赖的数据库中的xxl_job_user表中数据一致,没有显示配置的话,好像会概率出现登录不上的情况;

3、xxl.job.accessToken= 这个值暂时不用填什么,不过值得注意的是,这个token涉及到xxl-job的一个漏洞。在没有设置token的情况下,可以通过某些方式改掉程序;

将一些必要的信息修改完之后,启动xxl-job-admin服务,访问:http://localhost:7391/xxl-job-admin/,使用自带的账号/密码[admin/123456]登录

构建执行器

执行器负责任务的具体执行,分配线程。执行器需要注册到调度中心,这样调度器才知道怎么选择执行器,或者说做路由。执行器的执行结果,也需要通过回调的方式通知调度器。xxl-job提供了6个执行器的demo,这里选用xxl-job-executor-sample-springboot工程。

既然要使用到xxl-job,自然第一件事就是要引入相关的依赖,demo中已经引入好了

引入完相关依赖之后,开始折腾配置文件

这里就该详细讲解一下 xxl.job.accessToken 这个属性。xxl-job本身存在一个非常非常严重它自己又不承认叫漏洞的东西,相比当年的log4j还要严重,但多多少少也是沾了国产的光,毕竟国内自己的软件,容忍度也比较高。

如果有经常关注开源产品的话,应该会知道无论是阿里云还是腾讯云都曝出过xxl-job的一个远程执行漏洞。当没有填写 xxl.job.accessToken 的话,执行器和调度器的访问是不需要验证身份的,就可以手动的注入一段代码到业务逻辑中。设想一下,假设是在支付的时候,通过嵌入一段代码更改用户的账号密码,这是完全可以做到的,网上也有很多相关验证的文章感兴趣可以去翻翻看看。但是作者给出的答复是,因为没有配 xxl.job.accessToken 才可以,要是把 xxl.job.accessToken 配了不就不存在问题了。换句话说,就是自定义一段字符串,在调度器和执行器两边的配置问价都配置成相同的密文。但是如果别人要是知道了这个密文,还是可以注入进来,所以如果想要确保安全性,就得将这个密文设置成动态生成的,确保每次两边拿到的token不仅一致,而且每次都不一样才行。这又给开发带来了难度,我相信大部分使用xxl-job的配置文件里面是没有配置token的。

xxl.job.executor.appname=xxl-job-executor-feenix,这个属性也有个坑。当第一次启动执行器之后,这个执行器的名称会被写入到xxl_job_group表中。一旦被写入到表中之后,执行器的名称就不可以再更改了,即使在配置文件中更改了名称之后,数据库中的名称也不会同步更改。然后任务在被执行的时候,就会报一个找不到执行器的错误,所以名字一旦确定之后最好不要再改。

手动新增执行器

执行器启动之后会自动向admin端注册自己,这一步是没有问题的, 在xxl_job_registry表也看到了注册记录

因此,仍然需要在执行器管理界面去新增执行器,执行器的信息根据自己的项目信息填,选择【自动注册】即可

新增成功之后,就可以在xxl_job_group表里看到我们的执行器信息。有一点需要注意:xxl_job_group新增成功之后,address_list字段可能没有立即有值,这是因为心跳注册存在短暂延时,可以稍等下刷新即可看到

那为什么示例执行器不需要手动在界面新增就可以显示,而我自己的执行器需要手动新增?按照官网的操作说明,构建调度中心的时候需要执行里面的sql文件tables_xxl_job.sql。在sql里面,有下面一句标红的sql,正是这条sql手动把示例执行器插入到了xxl_job_group里面,这也就是示例执行器为什么不需要在界面手动新增的原因

添加任务

现在调度中心有了,执行器也有了,下面就可以派活给工人做了,也就是创建任务。登录调度中心,打开任务管理界面,新增任务

数据库中也可以看到相关的任务信息

启动任务 

任务启动后的执行结果,可以在xxl_job_log表中看到

3、xxl-job中的任务属性

刚刚在添加任务的时候,看到一堆乱七八糟的参数要去指定,第一眼头都大了。其实xxl-job中的参数还算是比价好理解的,下面对这些参数一个一个来详细讲解。

执行器

这没什么好说的,这个选项就是把已经注册到调度中心的执行器列出来,指定当前任务要由哪个执行器去执行。执行器列表有两个执行器,这里也是两个执行器可供选择。

任务描述

这个更没什么好说的,就是对当前任务的执行内容或目的进行简单的说明,好让别人知道这个任务是干什么的。 

这个属性主要是针对于一个执行器集群中选择具体的哪个执行器去执行任务。刚刚在上面【执行器】中选择了使用【自定义执行器】,但是这个【自定义执行器】实质上是由多台机器组成的一个集群,那么具体由哪台机器来执行任务呢,就是根据这个路由策略来规定具体的路由规则。

之前有说Quartz集群的节点之间负载结果是随机的,谁抢到了数据库行锁就由谁去执行任务,这就有可能出现旱的旱死,涝的涝死的情况,发挥不了机器的性能,xxl-job则提供了丰富的路由策略:

策略参数值详细含义
第一个FIRST固定选择第一个机器
最后一个LAST固定选择最后一个机器
轮询ROUND依次选择执行
随机RANDOM随机选择在线的机器
一致性HASHCONSISTENT_HASH每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上
最不经常使用LEAST_FREQUENTLY_USED使用频率最低的机器优先被选举
最近最久未使用LEAST_RECENTLY_USED最久未使用的机器优先被选举
故障转移FAILOVER按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度
忙碌转移BUSYOVER按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度
分片广播SHARDING_BROADCAST广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务

Cron

在前面介绍Quartz的文章里有说过,Quartz支持4种类型的触发器,虽然相比较于CronTrigger,其它3种触发器基本是万年不用。到了xxl-job这里,直接将其它3种全部删掉,只支持Cron这一种类型的触发规则。不过有意思的是,它并不只是单纯的把cron表达式填进去,作者也知道这年头没几个人天天去研究cron表达式怎么写的,直接集成了cron的图形生成工具:

运行模式

xxl-job支持的运行模式分为两种:Bean模式和Glue模式。

Bean模式就是预先在执行器中写好需要执行的任务代码,然后将这个任务作为Bean交给Spring去管理。在官方提供的实例中非常详细的说明了Bean的写法规则:

通过Bean模式可以让开发者快速上手,专注于业务逻辑本身,类似于Quartz中实现Job接口之后实实现的executor方法一样,而在实际开发中基本也都是使用这一模式。

Glue的意思是胶水,什么情况下需要用到胶水,将两个物体需要仅仅黏在一起的时候就需要用到胶水。也就是说,Glue模式其实就是在写的好好的一段代码中间插入一段代码,就好像是胶水把上下段代码黏在一起一样,在生产环境就是纯纯的搅屎棍。在生产环境是绝对严令禁止使用Glue模式,在网页上可以随意输入一段代码嵌入到生产环境中,是不是一件细思极恐的事情。虽然作者跑出来说只要配上token就没关系,但是如果使用这个功能的人本身就是开发人员呢,那天心情不好直接给你嵌段代码进行删库删表那不完犊子了。

JobHandler

这个属性是配合Bean模式进行使用,当选择了使用Bean模式之后,就需要在这里填入自定义的JobHandler的名称,也就是执行器中自定义的任务方法上的@XxlJob主中的值。

执行器通过这个JobHandler找到对应的任务进行执行。

阻塞处理策略

指的是任务的一次运行还没有结束的时候,下一次调度的时间又到了,这个时候怎么处理。xxl-job提供了三种不同的策略方式:

策略参数值详细含义
单机串行(默认)SERIAL_EXECUTION对当前线程不做任何处理,并在当前线程的队列里增加一个执行任务,一次只执行一个任务。调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行。
丢弃后续调度DISCARD_LATER如果当前线程阻塞,后续任务不再执行,直接返回失败,阻塞就不再执行了。调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败。
覆盖之前调度COVER_EARLY创建一个移除原因,新建一个线程去执行后续任务,杀掉当前线程。调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务。

子任务ID

如果需要在本任务执行结束并且执行成功的时候触发另外一个任务,那么就可以把另外的任务作为本任务的子任务运行。因为每个任务都拥有一个唯一的任务ID,任务ID可以从任务列表获取

只需要把JobId填上就可以,比如:下载对账文件的任务成功以后,开始解析文件入库。入库成功以后,开始对账,这样多个任务就实现了串行调度。

任务超时时间 & 失败重试次数

对于超时时间其实是可以控制的,虽然说这些任务是异步调度,但是这些线程一定是等它执行完了,再去调度器中汇报执行结果。如果在这里设置了超时时间为1s,那么1s之后不管线程执行完没有都不关心了,直接将这个线程给干掉,配合失败重试次数,重新尝试执行任务。

负责人

任务出问题了找个背锅的。

报警邮件

调度中心配置文件中配置好的邮件地址,没有预先配置好的话无法成功发送邮件

简单任务

首先需要明确一点,xxl-job中所有的任务,开发的流程都是一样的:

1、在Spring Bean实例中,开发Job方法,方式格式要求为 "public ReturnT<String> execute(String param) {.....具体执行的任务内容.....}";

返回类型不可修改,因为在返回类型中已经定义好了相应的返回码,通过这些返回码,页面才能知道执行的具体结果是如何

方法名随便,能给到开发者最大的自由了。方法的参数也不可以改,类型必须是String,其实这个参数就是在添加任务时的【任务参数】的值,页面上填了啥,这里就能取到啥。

2、为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值;

value这个值填在【JobHandler】中,调度中心根据这个名字来执行对应的任务。除此之外,还有另外两个参数 init 和 destroy,和Quartz中的监听器相似。定义好方法名之后,可以在方法执行前和方法结束后执行

3、执行日志:需要通过 "XxlJobLogger.log" 打印执行日志;只有通过此种方式打印的日志才会被记录到xxl-job的数据库中。

以上便是创建了一个任务,非常的简单。而xxl-job中所谓的5种不同类型的任务,都是基于简单任务为基础,进行不同的任务内容处理就是一种不容的任务。比如生命周期任务,就是简单任务减伤init 和 destroy 参数之后,就变成了所谓的生命周期任务。除此之外,其它的4中任务类型分别是:

分片任务

首先在创建任务的时候,【路由策略】选择【分片广播】

xxl-job提供了获取分片参数的工具,只需要根据获取到的不同的分片号执行相应的业务逻辑即可


    
    
  1. /**
  2. * 2、分片广播任务
  3. */
  4. @XxlJob("shardingJobHandler")
  5. public ReturnT<String> shardingJobHandler (String param) throws Exception {
  6. // 分片参数
  7. ShardingUtil. ShardingVO shardingVO = ShardingUtil.getShardingVo();
  8. XxlJobLogger.log( "分片参数:当前分片序号 = {}, 总分片数 = {}",
  9. shardingVO.getIndex(), shardingVO.getTotal());
  10. // 业务逻辑
  11. for ( int i = 0; i < shardingVO.getTotal(); i++) {
  12. // 获取当前的分片号
  13. if (i == shardingVO.getIndex()) {
  14. // 根据不同的分片号,处理不同的逻辑业务
  15. XxlJobLogger.log( "第 {} 片, 命中分片开始处理", i);
  16. } else {
  17. XxlJobLogger.log( "第 {} 片, 忽略", i);
  18. }
  19. }
  20. return ReturnT.SUCCESS;
  21. }
  • 1

命令行任务

其实我一直不是很明白,为什么会在Java的代码中去跑命令行。不过有时候想一想,这玩意儿拿来方便运维人员似乎很不错,比如每天某个时刻,需要将数据库的文件备份到指定目录下。这个备份的命令就可以使用这个命令行任务来每天定时执行,不需要运维人员每天手动去备份。但是类似于这种需求让运维写个定时脚本放在服务器上执行不就完了.....

这种类型的任务也是基于简单任务而来,将需要执行的命令在建立任务的时候,通过【任务参数】传递到方法中,拿到这个参数后调用Runtime.getRuntime().exec(command);执行命令即可。


    
    
  1. /**
  2. * 3、命令行任务
  3. */
  4. @XxlJob("commandJobHandler")
  5. public ReturnT<String> commandJobHandler (String param) throws Exception {
  6. String command = param;
  7. int exitValue = - 1;
  8. BufferedReader bufferedReader = null;
  9. try {
  10. // command process
  11. Process process = Runtime.getRuntime().exec(command);
  12. BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
  13. bufferedReader = new BufferedReader( new InputStreamReader(bufferedInputStream));
  14. // command log
  15. String line;
  16. while ((line = bufferedReader.readLine()) != null) {
  17. XxlJobLogger.log(line);
  18. }
  19. // command exit
  20. process.waitFor();
  21. exitValue = process.exitValue();
  22. } catch (Exception e) {
  23. XxlJobLogger.log(e);
  24. } finally {
  25. if (bufferedReader != null) {
  26. bufferedReader.close();
  27. }
  28. }
  29. if (exitValue == 0) {
  30. return IJobHandler.SUCCESS;
  31. } else {
  32. return new ReturnT<String>(IJobHandler.FAIL.getCode(), "command exit value(" + exitValue + ") is failed");
  33. }
  34. }
  • 1

HTTP任务

xxl-job定义了具体的http请求的规则,将根据规则写好的http请求参数在建立任务的时候,通过【任务参数】传递到方法中,方法中解析校验请求参数,然后对目标地址进行请求


    
    
  1. /**
  2. * 4、跨平台Http任务
  3. * 参数示例:
  4. * "url: http://www.baidu.com\n" +
  5. * "method: get\n" +
  6. * "data: content\n";
  7. */
  8. @XxlJob("httpJobHandler")
  9. public ReturnT<String> httpJobHandler (String param) throws Exception {
  10. // param parse
  11. if (param == null || param.trim().length() == 0) {
  12. XxlJobLogger.log( "param[" + param + "] invalid.");
  13. return ReturnT.FAIL;
  14. }
  15. String[] httpParams = param.split( "\n");
  16. String url = null;
  17. String method = null;
  18. String data = null;
  19. for (String httpParam : httpParams) {
  20. if (httpParam.startsWith( "url:")) {
  21. url = httpParam.substring(httpParam.indexOf( "url:") + 4).trim();
  22. }
  23. if (httpParam.startsWith( "method:")) {
  24. method = httpParam.substring(httpParam.indexOf( "method:") + 7).trim().toUpperCase();
  25. }
  26. if (httpParam.startsWith( "data:")) {
  27. data = httpParam.substring(httpParam.indexOf( "data:") + 5).trim();
  28. }
  29. }
  30. // param valid
  31. if (url == null || url.trim().length() == 0) {
  32. XxlJobLogger.log( "url[" + url + "] invalid.");
  33. return ReturnT.FAIL;
  34. }
  35. if (method == null || !Arrays.asList( "GET", "POST").contains(method)) {
  36. XxlJobLogger.log( "method[" + method + "] invalid.");
  37. return ReturnT.FAIL;
  38. }
  39. // request
  40. HttpURLConnection connection = null;
  41. BufferedReader bufferedReader = null;
  42. try {
  43. // connection
  44. URL realUrl = new URL(url);
  45. connection = (HttpURLConnection) realUrl.openConnection();
  46. // connection setting
  47. connection.setRequestMethod(method);
  48. connection.setDoOutput( true);
  49. connection.setDoInput( true);
  50. connection.setUseCaches( false);
  51. connection.setReadTimeout( 5 * 1000);
  52. connection.setConnectTimeout( 3 * 1000);
  53. connection.setRequestProperty( "connection", "Keep-Alive");
  54. connection.setRequestProperty( "Content-Type", "application/json;charset=UTF-8");
  55. connection.setRequestProperty( "Accept-Charset", "application/json;charset=UTF-8");
  56. // do connection
  57. connection.connect();
  58. // data
  59. if (data != null && data.trim().length() > 0) {
  60. DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
  61. dataOutputStream.write(data.getBytes( "UTF-8"));
  62. dataOutputStream.flush();
  63. dataOutputStream.close();
  64. }
  65. // valid StatusCode
  66. int statusCode = connection.getResponseCode();
  67. if (statusCode != 200) {
  68. throw new RuntimeException( "Http Request StatusCode(" + statusCode + ") Invalid.");
  69. }
  70. // result
  71. bufferedReader = new BufferedReader( new InputStreamReader(connection.getInputStream(), "UTF-8"));
  72. StringBuilder result = new StringBuilder();
  73. String line;
  74. while ((line = bufferedReader.readLine()) != null) {
  75. result.append(line);
  76. }
  77. String responseMsg = result.toString();
  78. XxlJobLogger.log(responseMsg);
  79. return ReturnT.SUCCESS;
  80. } catch (Exception e) {
  81. XxlJobLogger.log(e);
  82. return ReturnT.FAIL;
  83. } finally {
  84. try {
  85. if (bufferedReader != null) {
  86. bufferedReader.close();
  87. }
  88. if (connection != null) {
  89. connection.disconnect();
  90. }
  91. } catch (Exception e2) {
  92. XxlJobLogger.log(e2);
  93. }
  94. }
  95. }
  • 1

就是一个简单的http请求构成的链接,所以除了简单任务和分片任务之外,其它的这些花里胡哨的任务感觉意义不是很大,很多人都会觉得这些任务没多大的实用性。

5、xxl-job架构设计

调度与任务解耦

在Quartz中,调度逻辑和任务代码是耦合在一起的。而xxl-job把调度的动作抽象和独立出来,形成“调度中心”公共平台。调度中心只负责发起调度请求,平台自身并不承担业务逻辑。将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性。

这张架构图有点小问题,这个是2.1.0版本的架构,执行器通过xxl-rpc来告诉调度中心任务的执行情况,不过现在已经换成了http方式进行通知,但是官网并没有提供最新的架构图,凑合着看吧。

调度中心负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;

调度中心支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,Glue开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。

执行器负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;接收“调度中心”的执行请求、终止请求和日志请求等。从整体来看,xxl-job架构依赖较少,功能强大,简约而不简单,方便部署,易于使用。

全异步化设计

xxl-job系统中业务逻辑在远程执行器执行,触发流程全异步化设计。相比直接在调度中心内部执行业务逻辑,极大的降低了调度线程占用时间;

异步调度:调度中心每次任务触发时仅发送一次调度请求,该调度请求首先推送“异步调度队列”,然后异步推送给远程执行器。
异步执行:执行器会将请求存入“异步执行队列”并且立即响应调度中心,异步运行。
轻量级设计:xxl-job调度中心中每个Job逻辑非常 “轻”,在全异步化的基础上,单个Job一次运行平均耗时基本在 “10ms” 之内,基本为一次请求的网络开销。因此,可以保证使用有限的线程支撑大量的Job并发运行;

得益于上述3点优化,理论上默认配置下的调度中心,单机能够支撑5000任务并发运行稳定运行;实际场景中,由于调度中心与执行器网络ping延迟不同、DB读写耗时不同、任务调度密集程度不同,会导致任务量上限会上下波动。如若需要支撑更多的任务量,可以通过调大调度线程数、降低调度中心与执行器ping延迟和提升机器配置几种方式优化。

均衡调度

调度中心在集群部署时会自动进行任务平均分配,触发组件每次获取与线程池数量(调度中心支持自定义调度线程池大小)相关数量的任务,避免大量任务集中在单个调度中心集群节点。简单来讲,其实就是【路由策略】中的那几个选项:轮询、随机、一致性哈希、LSU、LRU、分片.....这也是为什么新版的xxl-job将原先的Quartz依赖移除,重新写了个调度模型,就是出于这种更多样性更加灵活的调度设计考虑。

6、xxl-job深入剖析

执行器启动与注册

当执行器集群部署的时候,调度器需要为任务执行选择执行器。所以,执行器在启动的时候,必须先注册到调度中心,再保存在数据库。执行器的注册与发现有两种方式:
1、一种是执行器启动的时候,主动到调度中心注册,并定时发送心跳,保持续约。执行器正常关闭时,也主动告知调度中心注销掉。这种方式叫做主动注册。如果执行器宕机或者网络出问题了,调度中心就不知道执行器的情况,如果把任务路由给一个不可用的执行器执行,就会导致任务执行失败;
2、调度中心本身也需要不断地对执行器进行探活。调度中心会启动一个专门的后台线程,定时调用执行器接口,如果发现异常就下线掉;

可以从执行器的源码去验证一下:首先,一个Spring Boot的项目启动从哪里入手?从配置类XxlJobConfig出发,这里用到了配置的参数。配置类定义了一个XxlJobSpringExecutor,会在启动扫描配置类的时候创建执行器。XxlJobSpringExecutor继承了XxlJobExecutor。父类实现了SmartInitializingSingleton接口,在对象初始化的时候会调用afterSingletonsInstantiated()方法,这里面父类的start()方法做了几件事:

// 初始化日志路径
XxlJobFileAppender.initLogPath(logPath);

// 创建调度器的客户端
initAdminBizList(adminAddresses, accessToken);

// 初始化日志清理线程
JobLogFileCleanThread.getInstance().start(logRetentionDays);

// 初始化Trigger回调线程
TriggerCallbackThread.getInstance().start();

// 初始化执行器服务器
initEmbedServer(address, ip, port, appname, accessToken);

initAdminBizList创建调度器客户端,是执行器用来连接调度器的。Trigger回调线程用来处理任务执行完毕后的回调,这个最后再说,为什么需要回调。从initEmbedServer方法进入执行器的创建,到embedServer.start。叫做embedServer是因为Spring Boot里面是用的内置的Tomcat启动的。

embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);

在这个start方法里面,最后有一个thread.start(),也就是调用了线程的run方法。线程是上面new出来的。在run方法里面,创建了一个名字叫bizThreadPool 的ThreadPoolExecutor,也就是业务线程的线程池。

ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor()

然后启动了一个Netty包的ServerBootstrap,然后启动服务器。

ServerBootstrap bootstrap = new ServerBootstrap();

在这里面要把执行器注册到调度中心:

startRegistry(appname, address);

到了ExecutorRegistryThread,在start方法里面最后启动了这个线程:

ExecutorRegistryThread.getInstance().start(appname, address);

registryThread.start();

也就是执行了这个创建的线程的run方法。首先拿到调度器的列表,它有可能是集群部署的。

for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {.....}

然后挨个注册上去,调用的是AdminBizClient的registry方法(这个类是core包里面的):

public ReturnT<String> registry(RegistryParam registryParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout,         registryParam, String.class);
}

调用了HTTP的接口,实际地址是: http://127.0.0.1:7391/xxl-job-admin/ api/registry

在旧的版本中用的是XXL-RPC,后来改成了Restful的API。 请求的是com.xxl.job.admin.controller.JobApiController的api方法,这里有一个分支:

if ("registry".equals(uri)) {
        RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
        return adminBiz.registry(registryParam);
}

这个时候会调用到AdminBizImpl的registryUpdate方法:

xxlJobRegistryDao.registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());

这个接口方法是没有实现类的——其实就是MyBatis的Mapper,把执行器保存到数据库。

XxlJobRegistryMapper.xml

<insert id="registrySave" >
        INSERT INTO xxl_job_registry( `registry_group` , `registry_key` , `registry_value`,         `update_time`) VALUES( #{registryGroup} , #{registryKey} , #{registryValue}, #{updateTime})
</insert>

后台线程探活这一块,在调度器的代码中,后面再分析:

JobRegistryMonitorHelper.getInstance().start();

调度器启动与任务执行

执行器启动好以后,工人就准备干活了,接下来就看一下指挥官上岗以后是怎么指挥工人的。实际上是先启动调度器再启动执行器,但是因为调度的流程涉及到执行器,所以才先分析了执行器。下面看看调度器是如何启动的,任务是如何得到执行的。

SpringBoot的工程,一样从配置类XxlJobAdminConfig入手。它实现了InitializingBean接口,会在初始化的时候调用afterPropertiesSet方法:

public void afterPropertiesSet() throws Exception {
        adminConfig = this;
        xxlJobScheduler = new XxlJobScheduler();
        xxlJobScheduler.init();
}

init方法里面做了几件事情:

// 任务注册监控器
JobRegistryMonitorHelper.getInstance().start();

// 任务调度失败的监控器,失败重试,失败邮件发送
JobFailMonitorHelper.getInstance().start();

// 任务结果丢失处理
JobLosedMonitorHelper.getInstance().start();

// trigger pool启动
JobTriggerPoolHelper.toStart();

// log report启动
JobLogReportHelper.getInstance().start();

// start-schedule
JobScheduleHelper.getInstance().start();

JobRegistryMonitorHelper做的事情是不停地更新注册表,把超时的执行器剔除。每隔30秒执行一次:TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);

JobTriggerPoolHelper创建了两个线程池,一个快的线程池,一个慢的线程池。这里主要关注的是调度器(指挥官)如何启动的,进入JobScheduleHelper的start方法,这段方法总体上看起来是这样的:

public void start() {
        // schedule thread
        scheduleThread = new Thread(…...);
        scheduleThread.setDaemon(true);
        scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
        scheduleThread.start();

        // ring thread
        ringThread = new Thread(…...);
        ringThread.setDaemon(true);
        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
        ringThread.start();
}

也就是创建并且启动了两个后台线程,一个是调度线程,一个是时间轮线程。先从第一个线程开始说起。

调度器线程

这里创建了一个scheduleThread线程,后面调用了start方法,也就是会进入run方法。 scheduleThread的run方法中,先随机睡眠4-5秒,为什么?为了防止执行器集中启动出现过多的资源竞争:TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );

然后计算预读取的任务数,这里默认是6000个。

int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

后面是一个while循环,也就是调度器重复不断地在做的事情。

获取任务锁

第一步是获取数据库的排他锁,因为所有的节点连接到的数据库是同一个实例,所以这里是一个分布式环境的锁。也就是后面的过程是互斥的,如果有多个调度器的服务,同一时间只能有一个调度器在获取任务信息:

preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );

获取的是job_lock表的lock_name=schedule_lock这一行数据的行锁。 如果加锁没有成功,说明其他调度中心在加载任务了,只能等其他节点提交事务或者回滚事务,释放锁以后才能获取锁。获取锁成功后查询任务:

<select id="scheduleJobQuery" parameterType="java.util.HashMap" resultMap="XxlJobInfo">
        SELECT <include refid="Base_Column_List" />
        FROM xxl_job_info AS t
        WHERE t.trigger_status = 1
        and t.trigger_next_time <![CDATA[ <= ]]> #{maxNextTime}
        ORDER BY id ASC
        LIMIT #{pagesize}
</select>

这个SQL是从任务表查询状态是1,并且下次触发时间小于{maxNextTime}的任务;{maxNextTime}=nowTime(当前时间) + PRE_READ_MS(5秒),也就是说查询5秒钟之内需要触发的任务。

调度任务

这里根据任务的触发时间分成了三种情况, 假设任务的下次触发时间(TriggerNextTime)是9点0分30秒,2秒钟触发一次。

第一种情况就是当前时间已经是9点0分35秒以后了,如果nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS,也就是触发时间已经过期5秒以上,那就不能调度了(misfire了),让它到下次触发的时间再跑,这里只需要更新下次触发时间。什么时候会超时?比如你的查询非常慢,或者你查询到等待触发的任务以后,debug停在上面很久才走到时间判断。  

第二种情况(正常情况):nowTime > jobInfo.getTriggerNextTime(),已经过了触发时间,但是没有超过5秒,时间是9点0分30秒到9点0分35秒之间。 这里要做的事情有四步:1、触发任务;2、更新下次触发时间;3、丢入时间轮;4、触发完了再把时间更新为下次更新时间;重点有两个,触发的时候做了什么,丢入时间轮做了什么。

第三种情况:还没到9点0分30秒。先丢入时间轮,再刷新一下下次触发时间,因为还没触发,实际上时间没变。所以,这里我们要重点关注一下,任务触发的时候,是怎么触发的。丢入时间轮,又是一个什么操作。

任务触发

从JobTriggerPoolHelper的trigger方法进入,又到了JobTriggerPoolHelper的addTrigger方法。 设计了两个线程池:fastTriggerPool和slowTriggerPool,如果1分钟内过期了10次,就使用慢的线程池:

ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
        triggerPool_ = slowTriggerPool;
}

相当于做了一个线程池隔离,即使有很多慢的任务,也只能把慢任务的线程池耗光。什么样的任务会使用慢的线程池来执行呢?JobTriggerPoolHelper中addTrigger的末尾:如果这次执行超过了500ms,就给它标记一下,超过10次,它就要被丢到下等舱了。

if (cost > 500) {       // ob-timeout threshold 500ms
        AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new         AtomicInteger(1));
        if (timeoutCount != null) {
                timeoutCount.incrementAndGet();
        }
}

线程池选择好以后,execute一下,也就是分配线程来执行触发任务。 进入XxlJobTrigger的trigger方法:

XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);

在XxlJobTrigger的trigger方法中,先拿到任务信息,如果方法参数failRetryCount>0,就用参数值,否则用Job定义的failRetryCount。这里传进来的是-1。

XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);

拿到失败重试次数和组别:

int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

先不考虑广播分片的情况,分片的原理后面再分析。直接走到末尾的else,processTrigger:

processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);

前面是获取一些参数,然后记录日志,初始化Trigger参数。然后获取路由,把结果放入routeAddressResult。如果是广播分片,所有的节点都要参与负载,否则要根据策略获取执行器地址。不同的路由策略,获取路由的方式也不一样,这里是典型的策略模式:

routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());

路由参数翻译详细含义
FIRST第一个固定选择第一个机器
LAST最后一个固定选择最后一个机器
ROUND轮询依次选择执行
RANDOM随机随机选择在线的机器
CONSISTENT_HASH一致性HASH每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上
LEAST_FREQUENTLY_USEDLRU最不经常使用使用频率最低的机器优先被选举
LEAST_RECENTLY_USEDLRU最近最久未使用最久未使用的机器优先被选举
FAILOVER故障转移按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度
BUSYOVER忙碌转移按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度
SHARDING_BROADCAST分片广播广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务

如果没有启动执行器,那就拿不到执行器地址。 拿到执行器地址以后,runExecutor触发远程的执行器: triggerResult = runExecutor(triggerParam, address);

这里调用的是ExecutorBizClient的run方法:

public ReturnT<String> run(TriggerParam triggerParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout,         triggerParam, String.class);
}

这里就调用了执行器的远程接口(http://192.168.44.1:9999/run),执行器接收到调用请求怎么处理后面再说。

回顾一下,上面说的是第二种情况:

一共四步,第一步结束了:
1、触发任务;
2、更新下次触发时间;
3、丢入时间轮;
4、触发完了再把时间更新为下次更新时间;
第2步和第4步非常简单,都是操作数据库。第3步,丢入时间轮,什么是时间轮?为什么要丢入时间轮?

时间轮 

要回答这个问题,我们先从Java中最原始的任务调度的方法说起。 给你一批任务(假设有1000个任务),都是不同的时间执行的,时间精确到秒,你怎么实现对所有的任务的调度?第一种思路是启动一个线程,每秒钟对所有的任务进行遍历,找出执行时间跟当前时间匹配的,执行它。如果任务数量太大,遍历和比较所有任务会比较浪费时间。

第二个思路,把这些任务进行排序,执行时间近(先触发)的放在前面。 JDK包里面自带了一个Timer工具类(java.util包下),可以实现延时任务(例如30分钟以后触发),也可以实现周期性任务(例如每1小时触发一次)。它的本质是一个优先队列(TaskQueue),和一个执行任务的线程(TimerThread)。

public class Timer {
    private final TaskQueue queue = new TaskQueue();
    private final TimerThread thread = new TimerThread(queue);
    public Timer(String name, boolean isDaemon) {
        thread.setName(name);
        thread.setDaemon(isDaemon);
        thread.start();
    }
}

在这个优先队列中,最先需要执行的任务排在优先队列的第一个。然后 TimerThread 不断地拿第一个任务的执行时间和当前时间做对比。如果时间到了先看看这个任务是不是周期性执行的任务,如果是则修改当前任务时间为下次执行的时间,如果不是周期性任务则将任务从优先队列中移除。最后执行任务。但是Timer是单线程的,在很多场景下不能满足业务需求。 在JDK1.5之后,引入了一个支持多线程的任务调度工具ScheduledThreadPoolExecutor用来替代Timer,它是几种常用的线程池之一。看看构造函数,里面是一个延迟队列DelayedWorkQueue,也是一个优先队列。

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}

优先队列的插入和删除的时间复杂度是O(logn),当数据量大的时候,频繁的入堆出堆性能不是很好。 这里考虑对所有的任务进行分组,把相同执行时刻的任务放在一起。比如这里,数组里面的一个下标就代表1秒钟。它就会变成一个数组加链表的数据结构。分组以后遍历和比较的时间会减少一些。但是还是有问题,如果任务数量非常大,而且时间都不一样,或者有执行时间非常遥远的任务,那这个数组长度是不是要非常地长?比如有个任务2个月之后执行,从现在开始计算,它的下标是5253120。所以长度肯定不能是无限的,只能是固定长度的。比如固定长度是60,一个格子代表1秒(现在叫做一个bucket槽),一圈可以表示60秒。遍历的线程只要一个格子一个格子的获取任务,并且执行就OK了。固定长度的数组怎么用来表示超出最大长度的时间呢?可以用循环数组。比如一个循环数组长度60,可以表示60秒。60秒以后执行的任务怎么放进去?只要除以60,用得到的余数,放到对应的格子就OK了。比如90%60=30,它放在第30个格子。这里就有了轮次的概念,第90秒的任务是第二轮的时候才执行。

这时候,时间轮的概念已经出来了。 如果任务数量太多,相同时刻执行的任务很多,会导致链表变得非常长。这里我们可以进一步对这个时间轮做一个改造,做一个多层的时间轮。比如:最内层60个格子,每个格子1秒;外层60个格子,每个格子1分;再外层24个格子,每个格子1小时。最内层走一圈,外层走一格。这时候时间轮就跟时钟更像了。随着时间流动,任务会降级,外层的任务会慢慢地向内层移动。时间轮任务插入和删除时间复杂度都为O(1),应用范围非常广泛,更适合任务数很大的延时场景,Dubbo、Netty、Kafka中都有实现。

xxl-job中的时间轮是怎么实现的?回到JobScheduleHelper的start方法:

int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
pushTimeRing(ringSecond, jobInfo.getId());

ringSecond是0-59的秒数值(millionSeconds是毫秒数)。把它想象成一个表盘的秒针指数。放入时间轮的这一段代码:

// push async ring
List<Integer> ringItemData = ringData.get(ringSecond);
if (ringItemData == null) {
ringItemData = new ArrayList<Integer>();
ringData.put(ringSecond, ringItemData);
}
ringItemData.add(jobId);

这个ringData是一个ConcurrentHashMap,key是Integer,放的是ringSecond(0-59)。Value是List<Integer>,里面放的是jobId。到这里为止,JobScheduleHelper的start方法的前一半就分析完了。接下来是ringThread线程,看看时间轮的任务是怎么拿出来执行的。

时间轮线程ringThread

在初始化的时候先对齐秒数:休眠当前秒数模以1000的余数,意思是下一个正秒运行。

TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );

然后进入一个while循环。获取当前秒数,避免处理耗时太长,跨过刻度,向前校验一个刻度。

// 根据当前秒数刻度和前一个刻度进行时间轮的任务获取
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);

for (int i = 0; i < 2; i++) {
    List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
    if (tmpData != null) {
        ringItemData.addAll(tmpData);
    }
}

(nowSecond+60-k)%60跟nowSecond-k的结果一模一样,也就是当前秒数,和前一秒。比如当前秒数是40,就获取40和39的任务。从ringData里面拿出来,放进ringItemData,这里面存的是这两秒需要触发的所有任务的jobId。

接下来就是触发任务了:
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);

又调用了JobTriggerPoolHelper的addTrigger。 在XxlJobTrigger的trigger方法中,调用了processTrigger,又调用了runExecutor:

runResult = executorBiz.run(triggerParam);

这里实现类是ExecutorBizClient,发起了一个HTTP的请求。

public ReturnT<String> run(TriggerParam triggerParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout,         triggerParam, String.class);
}

最终的URL地址是执行器的9999端口:http://192.168.44.1:9999/run,跟上面一样。也就是说,放进时间轮等待触发的任务,也会通过远程请求,让执行器执行任务。

执行器处理远程调用,回调

在业务实例这边,执行器启动9999端口监听的时候,在EmbedHttpServerHandler的channelRead0方法中,会创建线程池bizThreadPool,process方法处理URI的访问。

if ("/run".equals(uri)) {
        TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
        return executorBiz.run(triggerParam);
}

这个时候调用的是core包的ExecutorBizImpl的run方法。 第一步,先拿到任务的JobThread(表示有没有线程正在执行这个JobId的任务):

JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;

如果有线程,再拿到jobHandler。什么是jobHandler? 在SpringBoot的工程里面,jobHandler就是加了@XxlJob注解的任务方法(一个任务一个方法)。其他的四个框架中的使用,需要自己编写Handler(任务类)继承IJobHandler。这个IjobHandler接口意思跟Quartz里面的Job接口是一样的,这里面必须要覆盖父类的execute方法。

中间是对于jobThread和jobHandler的判断。对于bean、GROOVY、其他脚本类型的任务,处理不一样。基本原则就是必须要有一个Handler,而且跟之前的Handler必须相同。 如果当前任务正在运行(根据JobId能够找到JobThread),需要根据配置的策略采取不同的措施,比如:

1、DISCARD_LATER(丢弃后续调度):如果当前线程阻塞,后续任务不再执行,直接返回失败(阻塞就不再执行了)。

if (jobThread.isRunningOrHasQueue()) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy         effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}

2、COVER_EARLY(覆盖之前调度):创建一个移除原因,新建一个线程去执行后续任务(杀掉当前线程)。

if (jobThread.isRunningOrHasQueue()) {
    removeOldReason = "block strategy effect:" +         ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
    jobThread = null;
}

3、SERIAL_EXECUTION(单机串行,默认):对当前线程不做任何处理,并在当前线程的队列里增加一个执行任务(一次只执行一个任务)。最后,调用JobThread的pushTriggerQueue方法把Trigger放入队列。

ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);

这个队列是什么?TriggerQueue是什么?LinkedBlockingQueue。 执行器中单个任务处理线程一次只能执行一个任务。 JobThread在创建的时候就会启动,启动就会进入run方法的死循环,不断地从队列里面拿任务: triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);

最终调用到Handler(任务类)的execute方法: return handler.execute(triggerParamTmp.getExecutorParams());

在finally方法中调用了回调方法,告知调度器执行结果:

if(triggerParam != null) {
        // callback handler info
        if (!toStop) {
        // commonm
                TriggerCallbackThread.pushCallBack(new                 HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(),                 executeResult));
        } else {
                // is killed
                ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason                 + " [job running, killed]");
                TriggerCallbackThread.pushCallBack(new                 HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(),                 stopResult));
        }
}

放入一个队列。在TriggerCallbackThread的后台线程的run方法里面,调用doCallback方法,连接到调度器,写入调度结果。

总结一下整体调度流程:

1、调度中心获取任务锁,查询任务,根据情况触发或者放入时间轮;

2、触发任务需要先获取路由地址,然后调用执行器接口;

3、执行器接收到调用请求,通过JobThread执行任务,并且回调(callback)调度器的接口 ;

任务分片原理 

拿到分片参数:sharding param。这个sharding param还记得是什么么?最好是设计一个跟业务无关的分片字段,加上索引用它来获取数据的分片信息用来分割。如果只有两个,并且都是数字,把他们转换为整形;如果是广播任务,则在所有节点上processTrigger。

手动触发执行一次任务

@RequestMapping("/trigger")
@ResponseBody
//@PermissionLimit(limit = false)
public ReturnT<String> triggerJob(int id, String executorParam, String addressList) {
        // force cover job param
        if (executorParam == null) {
                executorParam = "";
        }

        JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam,         addressList);
        return ReturnT.SUCCESS;
}

之后就跟调度器自动触发任务的流程一样了。

文章知识点与官方知识档案匹配,可进一步学习相关知识
Java技能树首页概览 115367 人正在系统学习中
文章知识点与官方知识档案匹配,可进一步学习相关知识
Java技能树首页概览140293 人正在系统学习中
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/96039
推荐阅读
相关标签
  

闽ICP备14008679号