【前情提要】由于项目需要,需要一个定时任务集群,故此有了这个spring-boot-starter-quartz集群的实践。springboot的版本为:2.1.6.RELEASE;quartz的版本为:2.3.1.假如这里一共有两个定时任务的节点,它们的代码完全一样。
壹.jar包依赖
- <properties>
- <java.version>1.8</java.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-quartz</artifactId>
- </dependency>
-
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <scope>runtime</scope>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-jdbc</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
这里选择将定时任务的数据入库,避免数据直接存在内存中,因应用重启造成的数据丢失和做集群控制。
贰、项目配置
- spring:
- server:
- port: 8080
- servlet:
- context-path: /lovin
- datasource:
- url: jdbc:mysql://127.0.0.1:3306/training?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true
- username: root
- password: root
- driver-class-name: com.mysql.cj.jdbc.Driver
- quartz:
- job-store-type: jdbc #数据库方式
- jdbc:
- initialize-schema: never #不初始化表结构
- properties:
- org:
- quartz:
- scheduler:
- instanceId: AUTO #默认主机名和时间戳生成实例ID,可以是任何字符串,但对于所有调度程序来说,必须是唯一的 对应qrtz_scheduler_state INSTANCE_NAME字段
- #instanceName: clusteredScheduler #quartzScheduler
- jobStore:
- class: org.quartz.impl.jdbcjobstore.JobStoreTX #持久化配置
- driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate #我们仅为数据库制作了特定于数据库的代理
- useProperties: false #以指示JDBCJobStore将JobDataMaps中的所有值都作为字符串,因此可以作为名称 - 值对存储而不是在BLOB列中以其序列化形式存储更多复杂的对象。从长远来看,这是更安全的,因为您避免了将非String类序列化为BLOB的类版本问题。
- tablePrefix: qrtz_ #数据库表前缀
- misfireThreshold: 60000 #在被认为“失火”之前,调度程序将“容忍”一个Triggers将其下一个启动时间通过的毫秒数。默认值(如果您在配置中未输入此属性)为60000(60秒)。
- clusterCheckinInterval: 5000 #设置此实例“检入”*与群集的其他实例的频率(以毫秒为单位)。影响检测失败实例的速度。
- isClustered: true #打开群集功能
- threadPool: #连接池
- class: org.quartz.simpl.SimpleThreadPool
- threadCount: 10
- threadPriority: 5
- threadsInheritContextClassLoaderOfInitializingThread: true
这里需要注意的是两个节点的端口号应该不一致,避免冲突
叁、实现一个Job
- [@Slf4j](https://my.oschina.net/slf4j)
- public class Job extends QuartzJobBean {
- [@Override](https://my.oschina.net/u/1162528)
- protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
- // 获取参数
- JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
- // 业务逻辑 ...
- log.info("------springbootquartzonejob执行"+jobDataMap.get("name").toString()+"###############"+jobExecutionContext.getTrigger());
-
- }
其中的日志输出是为了便于观察任务执行情况
肆、封装定时任务操作
- [@Service](https://my.oschina.net/service)
- public class QuartzService {
- @Autowired
- private Scheduler scheduler;
-
- @PostConstruct
- public void startScheduler() {
- try {
- scheduler.start();
- } catch (SchedulerException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 增加一个job
- *
- * @param jobClass
- * 任务实现类
- * @param jobName
- * 任务名称
- * @param jobGroupName
- * 任务组名
- * @param jobTime
- * 时间表达式 (这是每隔多少秒为一次任务)
- * @param jobTimes
- * 运行的次数 (<0:表示不限次数)
- * @param jobData
- * 参数
- */
- public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobTime,
- int jobTimes, Map jobData) {
- try {
- // 任务名称和组构成任务key
- JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName)
- .build();
- // 设置job参数
- if(jobData!= null && jobData.size()>0){
- jobDetail.getJobDataMap().putAll(jobData);
- }
- // 使用simpleTrigger规则
- Trigger trigger = null;
- if (jobTimes < 0) {
- trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
- .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime))
- .startNow().build();
- } else {
- trigger = TriggerBuilder
- .newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder
- .repeatSecondlyForever(1).withIntervalInSeconds(jobTime).withRepeatCount(jobTimes))
- .startNow().build();
- }
- scheduler.scheduleJob(jobDetail, trigger);
- } catch (SchedulerException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 增加一个job
- *
- * @param jobClass
- * 任务实现类
- * @param jobName
- * 任务名称(建议唯一)
- * @param jobGroupName
- * 任务组名
- * @param jobTime
- * 时间表达式 (如:0/5 * * * * ? )
- * @param jobData
- * 参数
- */
- public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, String jobTime, Map jobData) {
- try {
- // 创建jobDetail实例,绑定Job实现类
- // 指明job的名称,所在组的名称,以及绑定job类
- // 任务名称和组构成任务key
- JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName)
- .build();
- // 设置job参数
- if(jobData!= null && jobData.size()>0){
- jobDetail.getJobDataMap().putAll(jobData);
- }
- // 定义调度触发规则
- // 使用cornTrigger规则
- // 触发器key
- Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
- .startAt(DateBuilder.futureDate(1, IntervalUnit.SECOND))
- .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).startNow().build();
- // 把作业和触发器注册到任务调度中
- scheduler.scheduleJob(jobDetail, trigger);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 修改 一个job的 时间表达式
- *
- * @param jobName
- * @param jobGroupName
- * @param jobTime
- */
- public void updateJob(String jobName, String jobGroupName, String jobTime) {
- try {
- TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
- CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
- trigger = trigger.getTriggerBuilder().withIdentity(triggerKey)
- .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).build();
- // 重启触发器
- scheduler.rescheduleJob(triggerKey, trigger);
- } catch (SchedulerException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 删除任务一个job
- *
- * @param jobName
- * 任务名称
- * @param jobGroupName
- * 任务组名
- */
- public void deleteJob(String jobName, String jobGroupName) {
- try {
- scheduler.deleteJob(new JobKey(jobName, jobGroupName));
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 暂停一个job
- *
- * @param jobName
- * @param jobGroupName
- */
- public void pauseJob(String jobName, String jobGroupName) {
- try {
- JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
- scheduler.pauseJob(jobKey);
- } catch (SchedulerException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 恢复一个job
- *
- * @param jobName
- * @param jobGroupName
- */
- public void resumeJob(String jobName, String jobGroupName) {
- try {
- JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
- scheduler.resumeJob(jobKey);
- } catch (SchedulerException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 立即执行一个job
- *
- * @param jobName
- * @param jobGroupName
- */
- public void runAJobNow(String jobName, String jobGroupName) {
- try {
- JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
- scheduler.triggerJob(jobKey);
- } catch (SchedulerException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 获取所有计划中的任务列表
- *
- * @return
- */
- public List<Map<String, Object>> queryAllJob() {
- List<Map<String, Object>> jobList = null;
- try {
- GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
- Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
- jobList = new ArrayList<Map<String, Object>>();
- for (JobKey jobKey : jobKeys) {
- List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
- for (Trigger trigger : triggers) {
- Map<String, Object> map = new HashMap<>();
- map.put("jobName", jobKey.getName());
- map.put("jobGroupName", jobKey.getGroup());
- map.put("description", "触发器:" + trigger.getKey());
- Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
- map.put("jobStatus", triggerState.name());
- if (trigger instanceof CronTrigger) {
- CronTrigger cronTrigger = (CronTrigger) trigger;
- String cronExpression = cronTrigger.getCronExpression();
- map.put("jobTime", cronExpression);
- }
- jobList.add(map);
- }
- }
- } catch (SchedulerException e) {
- e.printStackTrace();
- }
- return jobList;
- }
-
- /**
- * 获取所有正在运行的job
- *
- * @return
- */
- public List<Map<String, Object>> queryRunJob() {
- List<Map<String, Object>> jobList = null;
- try {
- List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
- jobList = new ArrayList<Map<String, Object>>(executingJobs.size());
- for (JobExecutionContext executingJob : executingJobs) {
- Map<String, Object> map = new HashMap<String, Object>();
- JobDetail jobDetail = executingJob.getJobDetail();
- JobKey jobKey = jobDetail.getKey();
- Trigger trigger = executingJob.getTrigger();
- map.put("jobName", jobKey.getName());
- map.put("jobGroupName", jobKey.getGroup());
- map.put("description", "触发器:" + trigger.getKey());
- Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
- map.put("jobStatus", triggerState.name());
- if (trigger instanceof CronTrigger) {
- CronTrigger cronTrigger = (CronTrigger) trigger;
- String cronExpression = cronTrigger.getCronExpression();
- map.put("jobTime", cronExpression);
- }
- jobList.add(map);
- }
- } catch (SchedulerException e) {
- e.printStackTrace();
- }
- return jobList;
- }
-
陆、初始化任务
这里不准备给用户用web界面来配置定时任务,故此采用CommandLineRunner来子啊应用初始化的时候来初始化任务。只需要实现CommandLineRunner的run()方法即可。
- @Override
- public void run(String... args) throws Exception {
- HashMap<String,Object> map = new HashMap<>();
- map.put("name",1);
- quartzService.deleteJob("job", "test");
- quartzService.addJob(Job.class, "job", "test", "0 * * * * ?", map);
-
- map.put("name",2);
- quartzService.deleteJob("job2", "test");
- quartzService.addJob(Job.class, "job2", "test", "10 * * * * ?", map);
-
- map.put("name",3);
- quartzService.deleteJob("job3", "test2");
- quartzService.addJob(Job.class, "job3", "test2", "15 * * * * ?", map);
-
- }
柒、测试验证
分别夏侯启动两个应用,然后观察任务执行,以及在运行过程中杀死某个服务,来观察定时任务的执行。