当前位置:   article > 正文

SpringBoot-集成Quartz实现持久化定时接口调用任务

SpringBoot-集成Quartz实现持久化定时接口调用任务

一、基本概念

Quartz 是功能强大的开源作业调度库,几乎可以集成到任何 Java 应用程序中,从最小的独立应用程序到最大的电子商务系统。Quartz 可用于创建简单或复杂的计划,以执行数以万计的工作;可以执行您编写的所有内容。

Spring Boot 官方也对 Quartz 调度器进行了集成,Spring boot 官网文档:Quartz Scheduler,Java JDK 也带有 计时器 Timer 以及 定时执行服务 ScheduledExecutorService ,Spring 也提供了 @Scheduled 执行定时任务

如果实际环境中定时任务过多,处理频繁,建议适应第三方封装的调度框架,因为定时器操作底层都是多线程的操作,任务的启动、暂停、恢复、删除、实质是线程的启动、暂停、中断、唤醒等操作。

二、Quartz-scheduler 的核心流程

Scheduler - 调度器

1、Scheduler 用来对 Trigger 和 Job 进行管理,Trigger 和 JobDetail 可以注册到 Scheduler 中,两者在 Scheduler 中都拥有自己的唯一的组(group)和名称(name)用来进行彼此的区分,Scheduler 可以通过任务组和名称来对 Trigger 和 JobDetail 进行管理。

2、每个 Scheduler 都有一个 SchedulerContext,用来保存 Scheduler 的上下文数据,Job 和 Trigger 都可以获取其中的信息。

3、Scheduler 是由 SchedulerFactory 创建,它有两个实现:DirectSchedulerFactory 、StdSchdulerFactory ,前者可以用来在代码里定制 Schduler 参数,后者直接读取 classpath 下的 quartz.properties(不存在就都使用默认值)配置来实例化 Scheduler。

Job - 任务

1、Job 是一个任务接口,开发者可以实现该接口定义自己的任务,JobExecutionContext 中提供了调度上下文的各种信息。

2、Job 中的任务有可能并发执行,例如任务的执行时间过长,而每次触发的时间间隔太短,则会导致任务会被并发执行。如果是并发执行,就需要一个数据库锁去避免一个数据被多次处理。可以在 execute()方法上添加 @DisallowConcurrentExecution 注解解决这个问题。

JobDetail - 任务详情

1、JobDetail 对象是在将 job 注册到 scheduler 时,由客户端程序创建的,它包含 job 的各种属性设置,以及用于存储 job 实例状态信息的 JobDataMap。

2、JobDetail 由 JobBuilder 创建/定义,Quartz 不存储 Job 的实际实例,但是允许通过使用 JobDetail 定义一个实例。

3、Job 有一个与其关联的名称和组,应该在单个 Scheduler 中唯一标识它们。

4、一个 Trigger(触发器) 只能对应一个 Job(任务),但是一个 Job 可以对应多个 Trigger。JobDetal 与 Trigger 一对多

Trigger - 触发器

1、Trigger 用于触发 Job 的执行。TriggerBuilder 用于定义/构建触发器实例。

2、Trigger也有一个相关联的 JobDataMap,用于给Job传递一些触发相关的参数。

3、Quartz自带了各种不同类型的 Trigger,最常用的主要是SimpleTrigger和CronTrigger。

JobDataMap

1、JobDataMap 实现了 JDK 的 Map 接口,可以以 Key-Value 的形式存储数据。

2、JobDetail、Trigger 实现类中都定义 JobDataMap 成员变量及其 getter、setter 方法,可以用来设置参数信息,Job 执行 execute() 方法的时候,JobExecutionContext 可以获取到 JobDataMap 中的信息。

三、实践

1、新建一个quartz-service服务

添加依赖:

  1. <modelVersion>4.0.0</modelVersion>
  2. <groupId>org.example</groupId>
  3. <artifactId>quartz-servicer</artifactId>
  4. <version>1.0.0</version>
  5. <properties>
  6. <maven.compiler.source>11</maven.compiler.source>
  7. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  8. <maven.compiler.target>11</maven.compiler.target>
  9. </properties>
  10. <dependencies>
  11. <dependency>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter-web</artifactId>
  14. <version>2.7.5</version>
  15. </dependency>
  16. <!-- 公共依赖 -->
  17. <dependency>
  18. <groupId>com.alibaba</groupId>
  19. <artifactId>fastjson</artifactId>
  20. <version>2.0.19</version>
  21. </dependency>
  22. <!-- quartz依赖 -->
  23. <dependency>
  24. <groupId>org.springframework.boot</groupId>
  25. <artifactId>spring-boot-starter-quartz</artifactId>
  26. <version>2.5.4</version>
  27. </dependency>
  28. <dependency>
  29. <groupId>mysql</groupId>
  30. <artifactId>mysql-connector-java</artifactId>
  31. <version>8.0.31</version>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.springframework.boot</groupId>
  35. <artifactId>spring-boot-starter-jdbc</artifactId>
  36. <version>2.7.3</version>
  37. </dependency>
  38. <dependency>
  39. <groupId>com.alibaba</groupId>
  40. <artifactId>druid-spring-boot-starter</artifactId>
  41. <version>1.2.14</version>
  42. <exclusions>
  43. <exclusion>
  44. <artifactId>spring-boot-autoconfigure</artifactId>
  45. <groupId>org.springframework.boot</groupId>
  46. </exclusion>
  47. </exclusions>
  48. </dependency>
  49. </dependencies>
  50. <build>
  51. <plugins>
  52. <plugin>
  53. <groupId>org.springframework.boot</groupId>
  54. <artifactId>spring-boot-maven-plugin</artifactId>
  55. </plugin>
  56. </plugins>
  57. </build>
  58. 复制代码

2、配置数据源相关链接

quartz需要单据的数据库,所以需要单据创建一个库来给quartz使用,我新建了一个scheduler的库,详细信息官方参考quartz-scheduler/quartz

  1. #1 保存已经触发的触发器状态信息
  2. DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
  3. #2 存放暂停掉的触发器表表
  4. DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
  5. #3 调度器状态表
  6. DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
  7. #4 存储程序的悲观锁的信息(假如使用了悲观锁)
  8. DROP TABLE IF EXISTS QRTZ_LOCKS;
  9. #5 简单的触发器表
  10. DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
  11. #6 存储两种类型的触发器表
  12. DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
  13. #7 定时触发器表
  14. DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
  15. #8 以blob 类型存储的触发器
  16. DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
  17. #9 触发器表
  18. DROP TABLE IF EXISTS QRTZ_TRIGGERS;
  19. #10 job 详细信息表
  20. DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
  21. #11 日历信息表
  22. DROP TABLE IF EXISTS QRTZ_CALENDARS;
  23. #job 详细信息表
  24. CREATE TABLE QRTZ_JOB_DETAILS
  25. (
  26. SCHED_NAME VARCHAR(120) NOT NULL,
  27. JOB_NAME VARCHAR(200) NOT NULL,
  28. JOB_GROUP VARCHAR(200) NOT NULL,
  29. DESCRIPTION VARCHAR(250) NULL,
  30. JOB_CLASS_NAME VARCHAR(250) NOT NULL,
  31. IS_DURABLE VARCHAR(1) NOT NULL,
  32. IS_NONCONCURRENT VARCHAR(1) NOT NULL,
  33. IS_UPDATE_DATA VARCHAR(1) NOT NULL,
  34. REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
  35. JOB_DATA BLOB NULL,
  36. PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
  37. );
  38. #触发器表
  39. CREATE TABLE QRTZ_TRIGGERS
  40. (
  41. SCHED_NAME VARCHAR(120) NOT NULL,
  42. TRIGGER_NAME VARCHAR(200) NOT NULL,
  43. TRIGGER_GROUP VARCHAR(200) NOT NULL,
  44. JOB_NAME VARCHAR(200) NOT NULL,
  45. JOB_GROUP VARCHAR(200) NOT NULL,
  46. DESCRIPTION VARCHAR(250) NULL,
  47. NEXT_FIRE_TIME BIGINT(13) NULL,
  48. PREV_FIRE_TIME BIGINT(13) NULL,
  49. PRIORITY INTEGER NULL,
  50. TRIGGER_STATE VARCHAR(16) NOT NULL,
  51. TRIGGER_TYPE VARCHAR(8) NOT NULL,
  52. START_TIME BIGINT(13) NOT NULL,
  53. END_TIME BIGINT(13) NULL,
  54. CALENDAR_NAME VARCHAR(200) NULL,
  55. MISFIRE_INSTR SMALLINT(2) NULL,
  56. JOB_DATA BLOB NULL,
  57. PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
  58. FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
  59. REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)
  60. );
  61. #简单的触发器表,包括重复次数,间隔,以及已触发的次数
  62. CREATE TABLE QRTZ_SIMPLE_TRIGGERS
  63. (
  64. SCHED_NAME VARCHAR(120) NOT NULL,
  65. TRIGGER_NAME VARCHAR(200) NOT NULL,
  66. TRIGGER_GROUP VARCHAR(200) NOT NULL,
  67. REPEAT_COUNT BIGINT(7) NOT NULL,
  68. REPEAT_INTERVAL BIGINT(12) NOT NULL,
  69. TIMES_TRIGGERED BIGINT(10) NOT NULL,
  70. PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
  71. FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
  72. REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
  73. );
  74. #定时触发器表,存储 cron trigger,包括 cron 表达式和时区信息
  75. CREATE TABLE QRTZ_CRON_TRIGGERS
  76. (
  77. SCHED_NAME VARCHAR(120) NOT NULL,
  78. TRIGGER_NAME VARCHAR(200) NOT NULL,
  79. TRIGGER_GROUP VARCHAR(200) NOT NULL,
  80. CRON_EXPRESSION VARCHAR(200) NOT NULL,
  81. TIME_ZONE_ID VARCHAR(80),
  82. PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
  83. FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
  84. REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
  85. );
  86. #存储calendarintervaltrigger和dailytimeintervaltrigger两种类型的触发器
  87. CREATE TABLE QRTZ_SIMPROP_TRIGGERS
  88. (
  89. SCHED_NAME VARCHAR(120) NOT NULL,
  90. TRIGGER_NAME VARCHAR(200) NOT NULL,
  91. TRIGGER_GROUP VARCHAR(200) NOT NULL,
  92. STR_PROP_1 VARCHAR(512) NULL,
  93. STR_PROP_2 VARCHAR(512) NULL,
  94. STR_PROP_3 VARCHAR(512) NULL,
  95. INT_PROP_1 INT NULL,
  96. INT_PROP_2 INT NULL,
  97. LONG_PROP_1 BIGINT NULL,
  98. LONG_PROP_2 BIGINT NULL,
  99. DEC_PROP_1 NUMERIC(13,4) NULL,
  100. DEC_PROP_2 NUMERIC(13,4) NULL,
  101. BOOL_PROP_1 VARCHAR(1) NULL,
  102. BOOL_PROP_2 VARCHAR(1) NULL,
  103. PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
  104. FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
  105. REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
  106. );
  107. #以blob 类型存储的触发器
  108. CREATE TABLE QRTZ_BLOB_TRIGGERS
  109. (
  110. SCHED_NAME VARCHAR(120) NOT NULL,
  111. TRIGGER_NAME VARCHAR(200) NOT NULL,
  112. TRIGGER_GROUP VARCHAR(200) NOT NULL,
  113. BLOB_DATA BLOB NULL,
  114. PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
  115. FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
  116. REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
  117. );
  118. #日历信息表, quartz可配置一个日历来指定一个时间范围
  119. CREATE TABLE QRTZ_CALENDARS
  120. (
  121. SCHED_NAME VARCHAR(120) NOT NULL,
  122. CALENDAR_NAME VARCHAR(200) NOT NULL,
  123. CALENDAR BLOB NOT NULL,
  124. PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)
  125. );
  126. #存放暂停掉的触发器表表
  127. CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS
  128. (
  129. SCHED_NAME VARCHAR(120) NOT NULL,
  130. TRIGGER_GROUP VARCHAR(200) NOT NULL,
  131. PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)
  132. );
  133. # 存储与已触发的 trigger 相关的状态信息,以及相联 job 的执行信息
  134. CREATE TABLE QRTZ_FIRED_TRIGGERS
  135. (
  136. SCHED_NAME VARCHAR(120) NOT NULL,
  137. ENTRY_ID VARCHAR(95) NOT NULL,
  138. TRIGGER_NAME VARCHAR(200) NOT NULL,
  139. TRIGGER_GROUP VARCHAR(200) NOT NULL,
  140. INSTANCE_NAME VARCHAR(200) NOT NULL,
  141. FIRED_TIME BIGINT(13) NOT NULL,
  142. SCHED_TIME BIGINT(13) NOT NULL,
  143. PRIORITY INTEGER NOT NULL,
  144. STATE VARCHAR(16) NOT NULL,
  145. JOB_NAME VARCHAR(200) NULL,
  146. JOB_GROUP VARCHAR(200) NULL,
  147. IS_NONCONCURRENT VARCHAR(1) NULL,
  148. REQUESTS_RECOVERY VARCHAR(1) NULL,
  149. PRIMARY KEY (SCHED_NAME,ENTRY_ID)
  150. );
  151. 3、 调度器状态表
  152. CREATE TABLE QRTZ_SCHEDULER_STATE
  153. (
  154. SCHED_NAME VARCHAR(120) NOT NULL,
  155. INSTANCE_NAME VARCHAR(200) NOT NULL,
  156. LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
  157. CHECKIN_INTERVAL BIGINT(13) NOT NULL,
  158. PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)
  159. );
  160. 4、 存储程序的悲观锁的信息(假如使用了悲观锁)
  161. CREATE TABLE QRTZ_LOCKS
  162. (
  163. SCHED_NAME VARCHAR(120) NOT NULL,
  164. LOCK_NAME VARCHAR(40) NOT NULL,
  165. PRIMARY KEY (SCHED_NAME,LOCK_NAME)
  166. );
  167. 复制代码

3、创建配置文件

Quartz 使用一个名为 quartz.properties 的属性文件进行信息配置,必须位于 classpath 下,是 StdSchedulerFactory 用于创建 Scheduler 的默认属性文件。默认情况下 StdSchedulerFactory 从类路径下加载名为 “quartz.properties” 的属性文件,如果失败,则加载 org/quartz 包中的“quartz.properties”文件,因为我需要的是新建一个Scheduler服务,所以直接使用application.yml,配置如下:

  1. datasource:
  2. url: jdbc:mysql://127.0.0.1:3306/scheduler
  3. username: ***
  4. password: ****
  5. driver-class-name: com.mysql.cj.jdbc.Driver
  6. type: com.alibaba.druid.pool.DruidDataSource
  7. #定时配置
  8. quartz:
  9. #相关属性配置
  10. properties:
  11. org:
  12. quartz:
  13. scheduler:
  14. instanceName: local-scheduler-svc
  15. instanceId: AUTO
  16. jobStore:
  17. #表示 quartz 中的所有数据,比如作业和触发器的信息都保存在内存中(而不是数据库中)
  18. class: org.springframework.scheduling.quartz.LocalDataSourceJobStore
  19. # 驱动配置
  20. driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
  21. # 表前缀
  22. tablePrefix: QRTZ_
  23. #是否为集群
  24. isClustered: false
  25. clusterCheckinInterval: 10000
  26. useProperties: false
  27. dataSource: quartzDs
  28. #线程池配置
  29. threadPool:
  30. class: org.quartz.simpl.SimpleThreadPool
  31. #线程数
  32. threadCount: 10
  33. #优先级
  34. threadPriority: 5
  35. #线程继承上下文类加载器的初始化线程
  36. threadsInheritContextClassLoaderOfInitializingThread: true
  37. #数据库方式
  38. job-store-type: JDBC
  39. #初始化表结构
  40. jdbc:
  41. initialize-schema: NEVER
  42. 复制代码

4、新建一个任务实体类JobInfo,用户新建,传递任务信息

jobName:任务名称

jobGroup:任务组

jsonParams:任务执行信息(在用户定时远程调用接口的时候,我们可以接口信息封装到这个Map中)

cron:定时任务的cron表达式

timeZoneId:定制执行任务的时区

triggerTime:定时器时间(目前没用上)

  1. @Data
  2. public class JobInfo {
  3. private String jobName;
  4. private String jobGroup;
  5. private Map<String, Object> jsonParams;
  6. private String cron;
  7. private String timeZoneId;
  8. private Date triggerTime;
  9. }
  10. 复制代码

5、新建一个任务执行类HttpRemoteJob 实现 Job接口,重写execute()方法

execute()里面就是任务的逻辑:

① 使用HttpURLConnection发送网络请求,利用BufferedReader接收请求返回的结果

② 在任务的Description中取出定时请求的接口信息,解析Description,获取请求url

③ 编辑请求信息,通过URL类编辑请求信息,使用HttpURLConnection发送请求,并接收请求返回的状态码,根据状态码,判断是否请求成功,请求成功便通过BufferedReader读取响应信息,返回请求结果

  1. import com.alibaba.fastjson.JSONObject;
  2. import org.quartz.DisallowConcurrentExecution;
  3. import org.quartz.Job;
  4. import org.quartz.JobExecutionContext;
  5. import org.quartz.JobExecutionException;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.util.StringUtils;
  9. import java.io.BufferedReader;
  10. import java.io.InputStreamReader;
  11. import java.net.HttpURLConnection;
  12. import java.net.URL;
  13. import java.util.Objects;
  14. @DisallowConcurrentExecution
  15. public class HttpRemoteJob implements Job {
  16. //日志
  17. private static final Logger log = LoggerFactory.getLogger(HttpRemoteJob.class);
  18. @Override
  19. public void execute(JobExecutionContext context)throws JobExecutionException {
  20. //用于发送网络请求
  21. HttpURLConnection connection = null;
  22. //用于接收请求返回的结果
  23. BufferedReader bufferedReader = null;
  24. //获取任务Description述,之前我们把接口请求的信息放在Description里面了
  25. String jsonParams = context.getJobDetail().getDescription();
  26. if (StringUtils.isEmpty(jsonParams)){
  27. return;
  28. }
  29. //解析Description,获取请求url
  30. JSONObject jsonObj= (JSONObject) JSONObject.parse(jsonParams);
  31. String callUrl = jsonObj.getString("callUrl");
  32. if(StringUtils.isEmpty(callUrl)) {
  33. return;
  34. }
  35. try {
  36. //编辑请求信息
  37. URL realUrl = new URL(callUrl);
  38. connection = (HttpURLConnection) realUrl.openConnection();
  39. connection.setRequestMethod("GET");
  40. connection.setDoOutput(true);
  41. connection.setDoInput(true);
  42. connection.setUseCaches(false);
  43. connection.setReadTimeout(5 * 1000);
  44. connection.setConnectTimeout(3 * 1000);
  45. connection.setRequestProperty("connection", "Keep-Alive");
  46. connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
  47. connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
  48. //发送请求
  49. connection.connect();
  50. //获取请求返回的状态吗
  51. int statusCode = connection.getResponseCode();
  52. if (statusCode != 200){
  53. //请求失败抛出异常
  54. throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
  55. }
  56. //如果返回值正常,数据在网络中是以流的形式得到服务端返回的数据
  57. bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
  58. StringBuilder stringBuilder = new StringBuilder();
  59. String line;
  60. // 从流中读取响应信息
  61. while ((line = bufferedReader.readLine()) != null) {
  62. stringBuilder.append(line);
  63. }
  64. String responseMsg = stringBuilder.toString();
  65. log.info(responseMsg);
  66. } catch (Exception e) {
  67. log.error(e.getMessage());
  68. } finally {
  69. //关闭流与请求连接
  70. try {
  71. if (Objects.nonNull(bufferedReader)){
  72. bufferedReader.close();
  73. }
  74. if (Objects.nonNull(connection)){
  75. connection.disconnect();
  76. }
  77. } catch (Exception e) {
  78. log.error(e.getMessage());
  79. }
  80. }
  81. }
  82. }
  83. 复制代码

@DisallowConcurrentExecution 的作用:

Quartz定时任务默认是并发执行的,不会等待上一次任务执行完毕,只要有间隔时间到就会执行, 如果定时任执行太长,会长时间占用资源,导致其它任务堵塞。@DisallowConcurrentExecution 这个注解是加在Job类上的,是禁止并发执行多个相同定义的JobDetail, , 但并不是不能同时执行多个Job, 而是不能并发执行同一个Job Definition(由JobDetail定义), 但是可以同时执行多个不同的JobDetail。

JobExecutionContext 类可以获取很多任务的信息:

  1. @Override
  2. public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
  3. JobKey jobKey = jobExecutionContext.getJobDetail().getKey();
  4. //工作任务名称
  5. String jobName = jobKey.getName();
  6. //工作任务组名称
  7. String groupName = jobKey.getGroup();
  8. //任务类名称(带路径)
  9. String classPathName = jobExecutionContext.getJobDetail().getJobClass().getName();
  10. //任务类名称
  11. String className = jobExecutionContext.getJobDetail().getJobClass().getSimpleName();
  12. //获取Trigger内容
  13. TriggerKey triggerKey = jobExecutionContext.getTrigger().getKey();
  14. //触发器名称
  15. String triggerName = triggerKey.getName();
  16. //出发组名称(带路径)
  17. String triggerPathName = jobExecutionContext.getTrigger().getClass().getName();
  18. //触发器类名称
  19. String triggerClassName = jobExecutionContext.getTrigger().getClass().getSimpleName();
  20. }
  21. 复制代码

注意:

  1. //解析Description,获取请求url
  2. JSONObject jsonObj= (JSONObject) JSONObject.parse(jsonParams);
  3. String callUrl = jsonObj.getString("callUrl");
  4. 复制代码

之前我们封装JobInfo信息是将Map<String, Object> jsonParams保存接口请求信息,们去解析接口请求的时候也要用callUrl去取,那么在封装JobInfo类时,Map中就需要指定key是callUrl,不然取不到就会报错了。

6、创建定时任务业务层,创建 JobService接口和 JobServiceImpl实现类

JobServiceImpl业务逻辑:

JobInfo携带这我们需要新建任务的信息

① 通过jobName和jobGroup可以查询到任务唯一的jobKey,通过getJobDetail(jobKey),判断是否已有这个任务,有的话先删除在新增这个任务

② 新建一个任务JobDetail,withDescription属性中是指任务描述,JobInfo类中JsonParams属性是一个Map,这里需要将Map格式化一下,不然无法赋给withDescription,这个JsonUtils在下面:

  1. //任务详情
  2. JobDetail jobDetail = JobBuilder.newJob(HttpRemoteJob.class)
  3. .withDescription(JsonUtils.object2Json(jobInfo.getJsonParams())) //任务描述
  4. .withIdentity(jobKey) //指定任务
  5. .build();
  6. 复制代码

③ 创建触发器,触发器有多种类型,需要定时执行就使用cron表达式,创建cron调度器建造器CronScheduleBuilder,再创建Trigger触发器,调度器建造器有很多种,除了我下面用到的简单调度器构造器SimpleTrigger,还有:

CalendarIntervalScheduleBuilder : 每隔一段时间执行一次(年月日)

DailyTimeIntervalScheduleBuilder : 设置年月日中的某些固定日期,可以设置执行总次数

以后我们再单独写一片介绍;

CronScheduleBuilder和SimpleTrigger的区别在于:CronScheduleBuilder是通过cron表达式定时某个时间点或多个时间点定时直接,而SimpleTrigger是周期性执行,着重与时间间隔,着重与周期执行;

④ 将任务添加到Scheduler中

此外还有一些Scheduler的其他方法:

获取任务触发器 :TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);

停止触发器 :scheduler.pauseTrigger(triggerKey);

移除触发器:scheduler.unscheduleJob(triggerKey);

删除任务:scheduler.deleteJob(JobKey.jobKey(jobName,jobGroup));

根据jobName,jobGroup获取jobKey 恢复任务:scheduler.resumeJob(JobKey.jobKey(jobName,jobGroup));

根据jobName,jobGroup获取jobKey 暂停任务: scheduler.pauseJob(JobKey.jobKey(jobName,jobGroup));

根据jobName,jobGroup获取jobKey 立即执行任务: scheduler.triggerJob(JobKey.jobKey(jobName,jobGroup));

在下面的实现类代码中有很好的用例;

JsonUtils工具类

  1. public class JsonUtils {
  2. public static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
  3. private static final ObjectMapper IGNORE_OBJECT_MAPPER = createIgnoreObjectMapper();
  4. private static ObjectMapper createIgnoreObjectMapper() {
  5. ObjectMapper objectMapper = createObjectMapper();
  6. objectMapper.addMixIn(Object.class, DynamicMixIn.class);
  7. return objectMapper;
  8. }
  9. public static ObjectMapper createObjectMapper() {
  10. ObjectMapper objectMapper = new ObjectMapper();
  11. objectMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
  12. objectMapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true);
  13. objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
  14. objectMapper.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true);
  15. objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
  16. objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
  17. objectMapper.registerModule(new JavaTimeModule());
  18. return objectMapper;
  19. }
  20. public static String object2Json(Object o) {
  21. StringWriter sw = new StringWriter();
  22. JsonGenerator gen = null;
  23. try {
  24. gen = new JsonFactory().createGenerator(sw);
  25. OBJECT_MAPPER.writeValue(gen, o);
  26. } catch (IOException e) {
  27. throw new RuntimeException("Cannot serialize object as JSON", e);
  28. } finally {
  29. if (null != gen) {
  30. try {
  31. gen.close();
  32. } catch (IOException e) {
  33. throw new RuntimeException("Cannot serialize object as JSON", e);
  34. }
  35. }
  36. }
  37. return sw.toString();
  38. }
  39. }
  40. 复制代码

JobService接口

  1. import liu.qingxu.domain.JobInfo;
  2. import org.springframework.web.bind.annotation.RequestBody;
  3. /**
  4. * @module
  5. * @author: qingxu.liu
  6. * @date: 2022-11-15 14:37
  7. * @copyright
  8. **/
  9. public interface JobService {
  10. /**
  11. * 新建一个定时任务
  12. * @param jobInfo 任务信息
  13. * @return 任务信息
  14. */
  15. JobInfo save(@RequestBody JobInfo jobInfo);
  16. /**
  17. * 新建一个简单定时任务
  18. * @param jobInfo 任务信息
  19. * @return 任务信息
  20. */
  21. JobInfo simpleSave(@RequestBody JobInfo jobInfo);
  22. /**
  23. * 删除任务
  24. * @param jobName 任务名称
  25. * @param jobGroup 任务组
  26. */
  27. void remove( String jobName,String jobGroup);
  28. /**
  29. * 恢复任务
  30. * @param jobName 任务名称
  31. * @param jobGroup 任务组
  32. */
  33. void resume(String jobName, String jobGroup);
  34. /**
  35. * 暂停任务
  36. * @param jobName 任务名称
  37. * @param jobGroup 任务组
  38. */
  39. void pause(String jobName, String jobGroup);
  40. /**
  41. * 立即执行任务一主要是用于执行一次任务的场景
  42. * @param jobName 任务名称
  43. * @param jobGroup 任务组
  44. */
  45. void trigger(String jobName, String jobGroup);
  46. }
  47. 复制代码

JobServiceImpl实现类

  1. import liu.qingxu.domain.JobInfo;
  2. import liu.qingxu.executors.HttpRemoteJob;
  3. import liu.qingxu.service.JobService;
  4. import liu.qingxu.utils.JsonUtils;
  5. import org.quartz.CronScheduleBuilder;
  6. import org.quartz.JobBuilder;
  7. import org.quartz.JobDetail;
  8. import org.quartz.JobKey;
  9. import org.quartz.Scheduler;
  10. import org.quartz.SchedulerException;
  11. import org.quartz.SimpleTrigger;
  12. import org.quartz.Trigger;
  13. import org.quartz.TriggerBuilder;
  14. import org.quartz.TriggerKey;
  15. import org.springframework.beans.factory.annotation.Autowired;
  16. import org.springframework.stereotype.Service;
  17. import java.util.Objects;
  18. import java.util.TimeZone;
  19. /**
  20. * @module
  21. * @author: qingxu.liu
  22. * @date: 2022-11-15 14:48
  23. * @copyright
  24. **/
  25. @Service
  26. public class JobServiceImpl implements JobService {
  27. @Autowired
  28. private Scheduler scheduler;
  29. @Override
  30. public JobInfo save(JobInfo jobInfo) {
  31. //查询是否已有相同任务 jobKey可以唯一确定一个任务
  32. JobKey jobKey = JobKey.jobKey(jobInfo.getJobName(), jobInfo.getJobGroup());
  33. try {
  34. JobDetail jobDetail = scheduler.getJobDetail(jobKey);
  35. if (Objects.nonNull(jobDetail)){
  36. scheduler.deleteJob(jobKey);
  37. }
  38. } catch (SchedulerException e) {
  39. e.printStackTrace();
  40. }
  41. //任务详情
  42. JobDetail jobDetail = JobBuilder.newJob(HttpRemoteJob.class)
  43. .withDescription(JsonUtils.object2Json(jobInfo.getJsonParams())) //任务描述
  44. .withIdentity(jobKey) //指定任务
  45. .build();
  46. //根据cron,TimeZone时区,指定执行计划
  47. CronScheduleBuilder builder = CronScheduleBuilder.cronSchedule(jobInfo.getCron())
  48. .inTimeZone(TimeZone.getTimeZone(jobInfo.getTimeZoneId()));
  49. //触发器
  50. Trigger trigger = TriggerBuilder.newTrigger()
  51. .withIdentity(jobInfo.getJobName(), jobInfo.getJobGroup()).startNow()
  52. .withSchedule(builder)
  53. .build();
  54. //添加任务
  55. try {
  56. scheduler.scheduleJob(jobDetail, trigger);
  57. } catch (SchedulerException e) {
  58. System.out.println(e.getMessage());
  59. }
  60. return jobInfo;
  61. }
  62. @Override
  63. public JobInfo simpleSave(JobInfo jobInfo) {
  64. JobKey jobKey = JobKey.jobKey(jobInfo.getJobName(), jobInfo.getJobGroup()); //作业名称及其组名
  65. //判断是否有相同的作业
  66. try {
  67. JobDetail jobDetail = scheduler.getJobDetail(jobKey);
  68. if(jobDetail != null){
  69. scheduler.deleteJob(jobKey);
  70. }
  71. } catch (SchedulerException e) {
  72. e.printStackTrace();
  73. }
  74. //定义作业的详细信息,并设置要执行的作业类名,设置作业名称及其组名
  75. JobDetail jobDetail = JobBuilder.newJob(HttpRemoteJob.class)
  76. .withDescription(JsonUtils.object2Json(jobInfo.getJsonParams()))
  77. .withIdentity(jobKey)
  78. .build()
  79. ;
  80. //简单触发器,着重与时间间隔
  81. SimpleTrigger trigger = (SimpleTrigger) TriggerBuilder.newTrigger()
  82. .withIdentity(jobInfo.getJobName(), jobInfo.getJobGroup())
  83. .startAt(jobInfo.getTriggerTime())
  84. .build();
  85. try {
  86. scheduler.scheduleJob(jobDetail, trigger);
  87. } catch (SchedulerException e) {
  88. System.out.println(e.getMessage());
  89. }
  90. return jobInfo;
  91. }
  92. @Override
  93. public void remove(String jobName, String jobGroup) {
  94. //获取任务触发器
  95. TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
  96. try {
  97. //停止触发器
  98. scheduler.pauseTrigger(triggerKey);
  99. //移除触发器
  100. scheduler.unscheduleJob(triggerKey);
  101. //删除任务
  102. scheduler.deleteJob(JobKey.jobKey(jobName,jobGroup));
  103. } catch (SchedulerException e) {
  104. System.out.println(e.getMessage());
  105. }
  106. }
  107. @Override
  108. public void resume(String jobName, String jobGroup) {
  109. try {
  110. //根据jobName,jobGroup获取jobKey 恢复任务
  111. scheduler.resumeJob(JobKey.jobKey(jobName,jobGroup));
  112. } catch (SchedulerException e) {
  113. System.out.println(e.getMessage());
  114. }
  115. }
  116. @Override
  117. public void pause(String jobName, String jobGroup) {
  118. try {
  119. //根据jobName,jobGroup获取jobKey 暂停任务
  120. scheduler.pauseJob(JobKey.jobKey(jobName,jobGroup));
  121. } catch (SchedulerException e) {
  122. System.out.println(e.getMessage());
  123. }
  124. }
  125. @Override
  126. public void trigger(String jobName, String jobGroup) {
  127. try {
  128. //根据jobName,jobGroup获取jobKey 立即执行任务
  129. scheduler.triggerJob(JobKey.jobKey(jobName,jobGroup));
  130. } catch (SchedulerException e) {
  131. System.out.println(e.getMessage());
  132. }
  133. }
  134. }
  135. 复制代码

到此为止,我的定时调用接口任务已经完成了,现在我们写个Controller来试着调用一下:

我们调用test接口,新建一个任务,让任务每隔5秒调用runTest接口,然后在调用deleteTest接口删除任务;

  1. @RestController
  2. @RequestMapping("/quartz/job")
  3. public class QuartzController {
  4. private final JobService jobService;
  5. public QuartzController(JobService jobService) {
  6. this.jobService = jobService;
  7. }
  8. @GetMapping("/test")
  9. public void test(){
  10. JobInfo jobInfo = new JobInfo();
  11. jobInfo.setJobName("test-job");
  12. jobInfo.setJobGroup("test");
  13. jobInfo.setTimeZoneId("Asia/Shanghai"); //时区指定上海
  14. jobInfo.setCron("0/5 * * * * ? "); //每5秒执行一次
  15. Map<String, Object> params = new HashMap<>();
  16. //添加需要调用的接口信息
  17. String callUrl = "http://127.0.0.1:8080/quartz/job/test/run";
  18. params.put("callUrl", callUrl);
  19. jobInfo.setJsonParams(params);
  20. jobService.save(jobInfo);
  21. }
  22. @GetMapping("/test/run")
  23. public void runTest(){
  24. System.out.println(new Date());
  25. }
  26. @GetMapping("/test/delete")
  27. public void deleteTest(){
  28. jobService.remove("test-job","test");
  29. System.out.println("任务已删除");
  30. }
  31. }
  32. 复制代码

四、验证结果

可以看到runTest接口每隔5秒就被任务调用了一个,这时候我们去看数据库中的scheduler库的QRTZ_JOB_DETAILS表和QRTZ_TRIGGERS表,可以看到我们我们定时任务的信息,表示我们的定时任务持久化成功,这个时候你关掉服务器,再重启任务也会定时去执行runTest接口;

然后我们再调用deleteTest接口删除任务

此时再看去看数据库中的scheduler库的QRTZ_JOB_DETAILS表和QRTZ_TRIGGERS表中就没有之前的任务数据了

end~~~

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/981799
推荐阅读
相关标签
  

闽ICP备14008679号