当前位置:   article > 正文

spring-boot-starter-quartz集群实践

spring-boot-starter-quartz

前情提要】由于项目需要,需要一个定时任务集群,故此有了这个spring-boot-starter-quartz集群的实践。springboot的版本为:2.1.6.RELEASE;quartz的版本为:2.3.1.假如这里一共有两个定时任务的节点,它们的代码完全一样。


壹.jar包依赖

  1. <properties>
  2. <java.version>1.8</java.version>
  3. </properties>
  4. <dependencies>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter</artifactId>
  8. </dependency>
  9. <dependency>
  10. <groupId>org.springframework.boot</groupId>
  11. <artifactId>spring-boot-starter-quartz</artifactId>
  12. </dependency>
  13. <dependency>
  14. <groupId>mysql</groupId>
  15. <artifactId>mysql-connector-java</artifactId>
  16. <scope>runtime</scope>
  17. </dependency>
  18. <dependency>
  19. <groupId>org.springframework.boot</groupId>
  20. <artifactId>spring-boot-starter-jdbc</artifactId>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.projectlombok</groupId>
  24. <artifactId>lombok</artifactId>
  25. <optional>true</optional>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.springframework.boot</groupId>
  29. <artifactId>spring-boot-starter-test</artifactId>
  30. <scope>test</scope>
  31. </dependency>
  32. </dependencies>

这里选择将定时任务的数据入库,避免数据直接存在内存中,因应用重启造成的数据丢失和做集群控制。

贰、项目配置

  1. spring:
  2. server:
  3. port: 8080
  4. servlet:
  5. context-path: /lovin
  6. datasource:
  7. url: jdbc:mysql://127.0.0.1:3306/training?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true
  8. username: root
  9. password: root
  10. driver-class-name: com.mysql.cj.jdbc.Driver
  11. quartz:
  12. job-store-type: jdbc #数据库方式
  13. jdbc:
  14. initialize-schema: never #不初始化表结构
  15. properties:
  16. org:
  17. quartz:
  18. scheduler:
  19. instanceId: AUTO #默认主机名和时间戳生成实例ID,可以是任何字符串,但对于所有调度程序来说,必须是唯一的 对应qrtz_scheduler_state INSTANCE_NAME字段
  20. #instanceName: clusteredScheduler #quartzScheduler
  21. jobStore:
  22. class: org.quartz.impl.jdbcjobstore.JobStoreTX #持久化配置
  23. driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate #我们仅为数据库制作了特定于数据库的代理
  24. useProperties: false #以指示JDBCJobStore将JobDataMaps中的所有值都作为字符串,因此可以作为名称 - 值对存储而不是在BLOB列中以其序列化形式存储更多复杂的对象。从长远来看,这是更安全的,因为您避免了将非String类序列化为BLOB的类版本问题。
  25. tablePrefix: qrtz_ #数据库表前缀
  26. misfireThreshold: 60000 #在被认为“失火”之前,调度程序将“容忍”一个Triggers将其下一个启动时间通过的毫秒数。默认值(如果您在配置中未输入此属性)为60000(60秒)。
  27. clusterCheckinInterval: 5000 #设置此实例“检入”*与群集的其他实例的频率(以毫秒为单位)。影响检测失败实例的速度。
  28. isClustered: true #打开群集功能
  29. threadPool: #连接池
  30. class: org.quartz.simpl.SimpleThreadPool
  31. threadCount: 10
  32. threadPriority: 5
  33. threadsInheritContextClassLoaderOfInitializingThread: true

这里需要注意的是两个节点的端口号应该不一致,避免冲突

叁、实现一个Job

  1. [@Slf4j](https://my.oschina.net/slf4j)
  2. public class Job extends QuartzJobBean {
  3. [@Override](https://my.oschina.net/u/1162528)
  4. protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
  5. // 获取参数
  6. JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
  7. // 业务逻辑 ...
  8. log.info("------springbootquartzonejob执行"+jobDataMap.get("name").toString()+"###############"+jobExecutionContext.getTrigger());
  9. }

其中的日志输出是为了便于观察任务执行情况

肆、封装定时任务操作

  1. [@Service](https://my.oschina.net/service)
  2. public class QuartzService {
  3. @Autowired
  4. private Scheduler scheduler;
  5. @PostConstruct
  6. public void startScheduler() {
  7. try {
  8. scheduler.start();
  9. } catch (SchedulerException e) {
  10. e.printStackTrace();
  11. }
  12. }
  13. /**
  14. * 增加一个job
  15. *
  16. * @param jobClass
  17. * 任务实现类
  18. * @param jobName
  19. * 任务名称
  20. * @param jobGroupName
  21. * 任务组名
  22. * @param jobTime
  23. * 时间表达式 (这是每隔多少秒为一次任务)
  24. * @param jobTimes
  25. * 运行的次数 (<0:表示不限次数)
  26. * @param jobData
  27. * 参数
  28. */
  29. public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobTime,
  30. int jobTimes, Map jobData) {
  31. try {
  32. // 任务名称和组构成任务key
  33. JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName)
  34. .build();
  35. // 设置job参数
  36. if(jobData!= null && jobData.size()>0){
  37. jobDetail.getJobDataMap().putAll(jobData);
  38. }
  39. // 使用simpleTrigger规则
  40. Trigger trigger = null;
  41. if (jobTimes < 0) {
  42. trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
  43. .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime))
  44. .startNow().build();
  45. } else {
  46. trigger = TriggerBuilder
  47. .newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder
  48. .repeatSecondlyForever(1).withIntervalInSeconds(jobTime).withRepeatCount(jobTimes))
  49. .startNow().build();
  50. }
  51. scheduler.scheduleJob(jobDetail, trigger);
  52. } catch (SchedulerException e) {
  53. e.printStackTrace();
  54. }
  55. }
  56. /**
  57. * 增加一个job
  58. *
  59. * @param jobClass
  60. * 任务实现类
  61. * @param jobName
  62. * 任务名称(建议唯一)
  63. * @param jobGroupName
  64. * 任务组名
  65. * @param jobTime
  66. * 时间表达式 (如:0/5 * * * * ? )
  67. * @param jobData
  68. * 参数
  69. */
  70. public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, String jobTime, Map jobData) {
  71. try {
  72. // 创建jobDetail实例,绑定Job实现类
  73. // 指明job的名称,所在组的名称,以及绑定job类
  74. // 任务名称和组构成任务key
  75. JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName)
  76. .build();
  77. // 设置job参数
  78. if(jobData!= null && jobData.size()>0){
  79. jobDetail.getJobDataMap().putAll(jobData);
  80. }
  81. // 定义调度触发规则
  82. // 使用cornTrigger规则
  83. // 触发器key
  84. Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
  85. .startAt(DateBuilder.futureDate(1, IntervalUnit.SECOND))
  86. .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).startNow().build();
  87. // 把作业和触发器注册到任务调度中
  88. scheduler.scheduleJob(jobDetail, trigger);
  89. } catch (Exception e) {
  90. e.printStackTrace();
  91. }
  92. }
  93. /**
  94. * 修改 一个job的 时间表达式
  95. *
  96. * @param jobName
  97. * @param jobGroupName
  98. * @param jobTime
  99. */
  100. public void updateJob(String jobName, String jobGroupName, String jobTime) {
  101. try {
  102. TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
  103. CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
  104. trigger = trigger.getTriggerBuilder().withIdentity(triggerKey)
  105. .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).build();
  106. // 重启触发器
  107. scheduler.rescheduleJob(triggerKey, trigger);
  108. } catch (SchedulerException e) {
  109. e.printStackTrace();
  110. }
  111. }
  112. /**
  113. * 删除任务一个job
  114. *
  115. * @param jobName
  116. * 任务名称
  117. * @param jobGroupName
  118. * 任务组名
  119. */
  120. public void deleteJob(String jobName, String jobGroupName) {
  121. try {
  122. scheduler.deleteJob(new JobKey(jobName, jobGroupName));
  123. } catch (Exception e) {
  124. e.printStackTrace();
  125. }
  126. }
  127. /**
  128. * 暂停一个job
  129. *
  130. * @param jobName
  131. * @param jobGroupName
  132. */
  133. public void pauseJob(String jobName, String jobGroupName) {
  134. try {
  135. JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
  136. scheduler.pauseJob(jobKey);
  137. } catch (SchedulerException e) {
  138. e.printStackTrace();
  139. }
  140. }
  141. /**
  142. * 恢复一个job
  143. *
  144. * @param jobName
  145. * @param jobGroupName
  146. */
  147. public void resumeJob(String jobName, String jobGroupName) {
  148. try {
  149. JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
  150. scheduler.resumeJob(jobKey);
  151. } catch (SchedulerException e) {
  152. e.printStackTrace();
  153. }
  154. }
  155. /**
  156. * 立即执行一个job
  157. *
  158. * @param jobName
  159. * @param jobGroupName
  160. */
  161. public void runAJobNow(String jobName, String jobGroupName) {
  162. try {
  163. JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
  164. scheduler.triggerJob(jobKey);
  165. } catch (SchedulerException e) {
  166. e.printStackTrace();
  167. }
  168. }
  169. /**
  170. * 获取所有计划中的任务列表
  171. *
  172. * @return
  173. */
  174. public List<Map<String, Object>> queryAllJob() {
  175. List<Map<String, Object>> jobList = null;
  176. try {
  177. GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
  178. Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
  179. jobList = new ArrayList<Map<String, Object>>();
  180. for (JobKey jobKey : jobKeys) {
  181. List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
  182. for (Trigger trigger : triggers) {
  183. Map<String, Object> map = new HashMap<>();
  184. map.put("jobName", jobKey.getName());
  185. map.put("jobGroupName", jobKey.getGroup());
  186. map.put("description", "触发器:" + trigger.getKey());
  187. Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
  188. map.put("jobStatus", triggerState.name());
  189. if (trigger instanceof CronTrigger) {
  190. CronTrigger cronTrigger = (CronTrigger) trigger;
  191. String cronExpression = cronTrigger.getCronExpression();
  192. map.put("jobTime", cronExpression);
  193. }
  194. jobList.add(map);
  195. }
  196. }
  197. } catch (SchedulerException e) {
  198. e.printStackTrace();
  199. }
  200. return jobList;
  201. }
  202. /**
  203. * 获取所有正在运行的job
  204. *
  205. * @return
  206. */
  207. public List<Map<String, Object>> queryRunJob() {
  208. List<Map<String, Object>> jobList = null;
  209. try {
  210. List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
  211. jobList = new ArrayList<Map<String, Object>>(executingJobs.size());
  212. for (JobExecutionContext executingJob : executingJobs) {
  213. Map<String, Object> map = new HashMap<String, Object>();
  214. JobDetail jobDetail = executingJob.getJobDetail();
  215. JobKey jobKey = jobDetail.getKey();
  216. Trigger trigger = executingJob.getTrigger();
  217. map.put("jobName", jobKey.getName());
  218. map.put("jobGroupName", jobKey.getGroup());
  219. map.put("description", "触发器:" + trigger.getKey());
  220. Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
  221. map.put("jobStatus", triggerState.name());
  222. if (trigger instanceof CronTrigger) {
  223. CronTrigger cronTrigger = (CronTrigger) trigger;
  224. String cronExpression = cronTrigger.getCronExpression();
  225. map.put("jobTime", cronExpression);
  226. }
  227. jobList.add(map);
  228. }
  229. } catch (SchedulerException e) {
  230. e.printStackTrace();
  231. }
  232. return jobList;
  233. }

陆、初始化任务

这里不准备给用户用web界面来配置定时任务,故此采用CommandLineRunner来子啊应用初始化的时候来初始化任务。只需要实现CommandLineRunner的run()方法即可。

  1. @Override
  2. public void run(String... args) throws Exception {
  3. HashMap<String,Object> map = new HashMap<>();
  4. map.put("name",1);
  5. quartzService.deleteJob("job", "test");
  6. quartzService.addJob(Job.class, "job", "test", "0 * * * * ?", map);
  7. map.put("name",2);
  8. quartzService.deleteJob("job2", "test");
  9. quartzService.addJob(Job.class, "job2", "test", "10 * * * * ?", map);
  10. map.put("name",3);
  11. quartzService.deleteJob("job3", "test2");
  12. quartzService.addJob(Job.class, "job3", "test2", "15 * * * * ?", map);
  13. }

柒、测试验证

分别夏侯启动两个应用,然后观察任务执行,以及在运行过程中杀死某个服务,来观察定时任务的执行。 SpringbootquartzoneApplicationSpringbootquartztwoApplication

写在后面的话】下面给出的是所需要脚本的连接地址:脚本下载地址,另外这边又一个自己实现的demo

转载于:https://my.oschina.net/u/3082380/blog/3086240

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/981758
推荐阅读
相关标签
  

闽ICP备14008679号