赞
踩
本文主要介绍了SpringBoot架构下动态定时任务的使用,定时任务表达式配置在数据库中,通过反射执行到目标方法。
Quartz 是一个开源的作业调度框架,支持分布式定时任务,Quartz定时任务据我了解可分为Trigger(触发器)、Job(任务)和Scheduler(调度器),定时任务的逻辑大体为:创建触发器和任务,并将其加入到调度器中。
Quartz 的核心类有以下三部分:
任务 Job : 需要实现的任务类,实现 execute() 方法,执行后完成任务;
触发器 Trigger : 包括 SimpleTrigger 和 CronTrigger;
调度器 Scheduler : 任务调度器,负责基于 Trigger触发器,来执行 Job任务.
Trigger 有五种触发器:
SimpleTrigger 触发器:需要在特定的日期/时间启动,且以指定的间隔时间(单位毫秒)重复执行 n 次任务,如 :在 9:00 开始,每隔1小时,每隔几分钟,每隔几秒钟执行一次 。没办法指定每隔一个月执行一次(每月的时间间隔不是固定值)。
CalendarIntervalTrigger 触发器:指定从某一个时间开始,以一定的时间间隔(单位有秒,分钟,小时,天,月,年,星期)执行的任务。
DailyTimeIntervalTrigger 触发器:指定每天的某个时间段内,以一定的时间间隔执行任务。并且支持指定星期。如:指定每天 9:00 至 18:00 ,每隔 70 秒执行一次,并且只要周一至周五执行。
CronTrigger 触发器:基于日历的任务调度器,即指定星期、日期的某时间执行任务。
NthIncludedDayTrigger 触发器:不同时间间隔的第 n 天执行任务。比如,在每个月的第 15 日处理财务发票记帐,同样设定双休日或者假期。
create table sys_job ( job_id bigint(20) not null auto_increment comment '任务ID', job_name varchar(64) default '' comment '任务名称', job_group varchar(64) default 'DEFAULT' comment '任务组名', invoke_target varchar(500) not null comment '调用目标方法', cron_expression varchar(255) default '' comment 'cron执行表达式', misfire_policy varchar(20) default '3' comment '计划执行错误策略(1立即执行 2执行一次 3放弃执行)', concurrent char(1) default '1' comment '是否并发执行(0允许 1禁止)', status char(1) default '0' comment '状态(0正常 1暂停)', create_by varchar(64) default '' comment '创建者', create_time datetime comment '创建时间', update_by varchar(64) default '' comment '更新者', update_time datetime comment '更新时间', remark varchar(500) default '' comment '备注信息', primary key (job_id, job_name, job_group) ) engine=innodb auto_increment=100 comment = '定时任务调度表'; INSERT INTO `sys_job`(`job_id`, `job_name`, `job_group`, `invoke_target`, `cron_expression`, `misfire_policy`, `concurrent`, `status`, `create_by`, `create_time`, `update_by`, `update_time`, `remark`) VALUES (2, '系统默认(有参)', 'DEFAULT', 'com.demo.task.Task.testParams(\'hello\')', '0/15 * * * * ?', '3', '1', '0', 'admin', '2024-01-16 19:07:33', '', NULL, ''); INSERT INTO `sys_job`(`job_id`, `job_name`, `job_group`, `invoke_target`, `cron_expression`, `misfire_policy`, `concurrent`, `status`, `create_by`, `create_time`, `update_by`, `update_time`, `remark`) VALUES (3, '系统默认(无参)', 'DEFAULT', 'task.testNoParams()', '0/20 * * * * ?', '3', '1', '0', 'admin', '2024-01-16 19:07:33', '', NULL, ''); create table sys_job_log ( job_log_id bigint(20) not null auto_increment comment '任务日志ID', job_name varchar(64) not null comment '任务名称', job_group varchar(64) not null comment '任务组名', invoke_target varchar(500) not null comment '调用目标字符串', job_message varchar(500) comment '日志信息', status char(1) default '0' comment '执行状态(0正常 1失败)', exception_info varchar(2000) default '' comment '异常信息', create_time datetime comment '创建时间', primary key (job_log_id) ) engine=innodb comment = '定时任务调度日志表';
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>4.1.14</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency>
Quartz定时任务默认都是并发执行的,不会等待上一次任务执行完毕,只要间隔时间到就会执行, 如果定时任执行太长,会长时间占用资源,导致其它任务堵塞。
一般设置都是禁止并发执行
//禁止并发执行 @DisallowConcurrentExecution public class QuartzDisallowConcurrentExecution extends AbstractQuartzJob { @Override protected void doExecute(JobExecutionContext context, SysJob sysJob) throws Exception { JobInvokeUtil.invokeMethod(sysJob); } } public abstract class AbstractQuartzJob implements Job { private static final Logger log = LoggerFactory.getLogger(AbstractQuartzJob.class); /** * 线程本地变量 */ private static ThreadLocal<Date> threadLocal = new ThreadLocal<>(); @Override public void execute(JobExecutionContext context) throws JobExecutionException { SysJob sysJob = new SysJob(); BeanUtils.copyProperties(context.getMergedJobDataMap().get(ScheduleConstants.TASK_PROPERTIES),sysJob); try { before(context, sysJob); if (sysJob != null) { doExecute(context, sysJob); } after(context, sysJob, null); } catch (Exception e) { log.error("任务执行异常 - :", e); after(context, sysJob, e); } } /** * 执行前 * * @param context 工作执行上下文对象 * @param sysJob 系统计划任务 */ protected void before(JobExecutionContext context, SysJob sysJob) { threadLocal.set(new Date()); } /** * 执行后 * * @param context 工作执行上下文对象 * @param sysJob 系统计划任务 */ protected void after(JobExecutionContext context, SysJob sysJob, Exception e){ Date startTime = threadLocal.get(); threadLocal.remove(); // todo 写入数据库当中 } /** * 执行方法,由子类重载 * * @param context 工作执行上下文对象 * @param sysJob 系统计划任务 * @throws Exception 执行过程中的异常 */ protected abstract void doExecute(JobExecutionContext context, SysJob sysJob) throws Exception; }
@Data public class SysJob implements Serializable { private static final long serialVersionUID = 1L; /** 任务ID */ private Long jobId; /** 任务名称 */ private String jobName; /** 任务组名 */ private String jobGroup; /** 调用目标字符串 */ private String invokeTarget; /** cron执行表达式 */ private String cronExpression; /** cron计划策略 */ // 0=默认,1=立即触发执行,2=触发一次执行,3=不触发立即执行 private String misfirePolicy = ScheduleConstants.MISFIRE_DEFAULT; /** 是否并发执行(0允许 1禁止) */ private String concurrent; /** 任务状态(0正常 1暂停) */ private String status; }
public static void createScheduleJob(Scheduler scheduler, SysJob job) throws SchedulerException, TaskException { Class<? extends Job> jobClass = getQuartzJobClass(job); // 构建job信息 Long jobId = job.getJobId(); String jobGroup = job.getJobGroup(); JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(getJobKey(jobId, jobGroup)).build(); // 表达式调度构建器 CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression()); cronScheduleBuilder = handleCronScheduleMisfirePolicy(job, cronScheduleBuilder); // 按新的cronExpression表达式构建一个新的trigger CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(getTriggerKey(jobId, jobGroup)) .withSchedule(cronScheduleBuilder).build(); // 放入参数,运行时的方法可以获取 jobDetail.getJobDataMap().put(ScheduleConstants.TASK_PROPERTIES, job); // 判断是否存在 if (scheduler.checkExists(getJobKey(jobId, jobGroup))){ // 防止创建时存在数据问题 先移除,然后在执行创建操作 scheduler.deleteJob(getJobKey(jobId, jobGroup)); } // 判断任务是否过期 if (StringUtils.isNotNull(CronUtils.getNextExecution(job.getCronExpression()))){ // 执行调度任务 核心代码 scheduler.scheduleJob(jobDetail, trigger); } // 暂停任务 if (job.getStatus().equals(ScheduleConstants.Status.PAUSE.getValue())){ scheduler.pauseJob(ScheduleUtils.getJobKey(jobId, jobGroup)); } } /** * 获取quartz任务类 * * @param sysJob 执行计划 * @return 具体执行任务类 */ private static Class<? extends Job> getQuartzJobClass(SysJob sysJob){ boolean isConcurrent = "0".equals(sysJob.getConcurrent()); return isConcurrent ? QuartzJobExecution.class : QuartzDisallowConcurrentExecution.class; }
public class JobInvokeUtil { /** * 执行方法 * * @param sysJob 系统任务 */ public static void invokeMethod(SysJob sysJob) throws Exception { String invokeTarget = sysJob.getInvokeTarget(); String beanName = getBeanName(invokeTarget); String methodName = getMethodName(invokeTarget); List<Object[]> methodParams = getMethodParams(invokeTarget); if (!isValidClassName(beanName)) { Object bean = SpringUtils.getBean(beanName); invokeMethod(bean, methodName, methodParams); } else{ Object bean = Class.forName(beanName).getDeclaredConstructor().newInstance(); invokeMethod(bean, methodName, methodParams); } } /** * 调用任务方法 * * @param bean 目标对象 * @param methodName 方法名称 * @param methodParams 方法参数 */ private static void invokeMethod(Object bean, String methodName, List<Object[]> methodParams) throws NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { if (StringUtils.isNotNull(methodParams) && methodParams.size() > 0) { Method method = bean.getClass().getMethod(methodName, getMethodParamsType(methodParams)); method.invoke(bean, getMethodParamsValue(methodParams)); } else{ Method method = bean.getClass().getMethod(methodName); method.invoke(bean); } } }
@Component("task")
@Slf4j
public class Task {
public void testParams(String params) {
log.info("执行有参方法:" + params);
System.out.println();
}
public void testNoParams() {
log.info("执行无参方法");
}
}
@PostConstruct
public void init() throws SchedulerException, TaskException {
scheduler.clear();
List<SysJob> jobList = jobMapper.selectList(null);
for (SysJob job : jobList) {
ScheduleUtils.createScheduleJob(scheduler, job);
}
}
运行效果:
2024-03-25 14:05:30.020 INFO 11296 — [eduler_Worker-1] com.demo.task.Task : 执行有参方法:hello
2024-03-25 14:05:40.005 INFO 11296 — [eduler_Worker-2] com.demo.task.Task : 执行无参方法
2024-03-25 14:05:45.008 INFO 11296 — [eduler_Worker-3] com.demo.task.Task : 执行有参方法:hello2024-03-25 14:06:00.012 INFO 11296 — [eduler_Worker-4] com.demo.task.Task : 执行有参方法:hello
2024-03-25 14:06:00.014 INFO 11296 — [eduler_Worker-5] com.demo.task.Task : 执行无参方法
public int insertJob(SysJob job) throws SchedulerException, TaskException {
job.setStatus(ScheduleConstants.Status.PAUSE.getValue());
int rows = jobMapper.insert(job);
if (rows > 0) {
ScheduleUtils.createScheduleJob(scheduler, job);
}
return rows;
}
首先自定义一个 JobFactory,通过 AutowireCapableBeanFactory
将创建好的 Job 对象交给 Spring 管理
@Configuration public class CustomJobFactory extends AdaptableJobFactory { @Autowired private AutowireCapableBeanFactory autowireCapableBeanFactory; /** * Create the job instance, populating it with property values taken * from the scheduler context, job data map and trigger data map. * * @param bundle */ @Override protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception { Object jobInstance = super.createJobInstance(bundle); autowireCapableBeanFactory.autowireBean(jobInstance); return jobInstance; } }
再创建一个配置类,将自定义的 JobFactory 设置到 Schedule
中
@Configuration public class QuartzConfig { @Autowired private CustomJobFactory customJobFactory; @SneakyThrows @Bean public Scheduler scheduler(){ SchedulerFactory schedulerFactory = new StdSchedulerFactory(); Scheduler scheduler = schedulerFactory.getScheduler(); // 自定义 JobFactory 使得在 Quartz Job 中可以使用 @Autowired scheduler.setJobFactory(customJobFactory); scheduler.start(); return scheduler; } }
本文是基于 Quartz 实现的动态定时任务,有些场景比如任务暂停、任务删除、任务立即执行,参考下面的源码,这里不再赘述了。
项目代码
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。