赞
踩
// 创建一个定时任务
TimerTask task = new TimerTask() {
@Override
public void run() {
System.out.println("TimerTask执行时间: " + new Date() + "n" +
"线程名称: " + Thread.currentThread().getName());
}
};
System.out.println("当前时间: " + new Date() + "n" +
"线程名称: " + Thread.currentThread().getName());
// 初始化任务队列
Timer timer = new Timer("Timer");
long delay = 1000L;
// 提交任务
timer.schedule(task, delay);
CalcServiceTimerThread taskThread = new CalcServiceTimerThread("定时执行任务"); //设置执行时间 Calendar calendar = Calendar.getInstance(); int year = calendar.get(Calendar.YEAR); int month = calendar.get(Calendar.MONTH); int day = calendar.get(Calendar.DAY_OF_MONTH); //定制每天的23:00:00执行, calendar.set(year, month, day, 23, 00, 00); Date date = calendar.getTime(); Timer timer = new Timer(); System.out.println("nowData:" + LocalDateTime.now()); System.out.println("taskExecuteDateTime:" + date); //每天的date时刻执行task, 仅执行一次 timer.schedule(taskThread, date); //每天的date时刻执行task,每隔2秒重复执行 //long period = 2 * 1000; //timer.schedule(taskThread, date, period);
在Java中,Timer类用于调度任务的执行。以下是Timer类的基本组成:
**Timer对象:**创建Timer对象是使用Timer类的主要入口点。Timer对象允许你调度和管理任务的执行。
**TimerTask对象:**TimerTask是一个抽象类,用于定义要执行的任务。你需要创建一个继承自TimerTask的子类,并实现其中的run()方法,该方法包含了具体的任务逻辑。
**schedule()方法:**Timer类中的schedule()方法用于安排任务的执行。它有多个重载形式,允许你指定任务、延迟时间、重复间隔等。
**TimerTask队列:**Timer类内部维护一个TimerTask队列,用于存储要执行的任务。当调用schedule()方法安排任务时,任务会被添加到队列中。
**Timer线程:**Timer类内部有一个单独的线程,负责按照设定的时间调度任务的执行。该线程会从TimerTask队列中取出任务,并在指定的时间点执行任务的run()方法。
**取消任务:**Timer类提供了cancel()方法,用于取消尚未执行的任务。当调用cancel()方法时,所有未执行的任务将被移除。
使用Timer类时,你可以创建一个Timer对象,然后通过调用schedule()方法来安排任务的执行。任务可以是TimerTask的子类,通过重写run()方法来实现具体的逻辑。Timer类会在后台运行一个线程,按照设定的时间执行任务。
请注意,Java中的Timer类在较新的版本中已被推荐使用ScheduledExecutorService类来替代,因为ScheduledExecutorService提供了更灵活和可靠的任务调度机制。
schedule()方法的解释说明:
此方法用于安排在指定的时间执行指定的任务。任务将在参数time
所表示的时间点执行,即在指定的日期和时间触发任务。
此方法用于安排在指定延迟之后执行指定的任务。任务将在当前时间的基础上延迟指定的毫秒数delay
后执行。
此方法用于安排在指定延迟之后开始重复执行指定的任务。任务将在当前时间的基础上延迟指定的毫秒数delay
后执行,然后每隔指定的毫秒数period
重复执行一次。
此方法用于安排在指定时间开始重复执行指定的任务。任务将在参数firstTime
所表示的时间点执行,然后每隔指定的毫秒数period
重复执行一次。
scheduleAtFixedRate 构造方法 --> 安排任务以固定的速率重复执行
此方法用于安排任务以固定的速率重复执行。任务将在当前时间的基础上延迟指定的毫秒数delay
后执行,然后每隔指定的毫秒数period
重复执行一次。
此方法用于安排任务以固定的速率重复执行。任务将在参数firstTime
所表示的时间点执行,然后每隔指定的毫秒数period
重复执行一次。
scheduleAtFixedRate 和 schedule 区别
schedule()
方法:按照固定的延迟时间间隔来执行任务。任务的下一次执行时间是相对于上一次任务的实际完成时间来计算的,即无论任务执行的时间是否超过了延迟时间间隔,下一次任务都会按照设定的延迟时间间隔来执行。scheduleAtFixedRate()
方法:按照固定的时间间隔来执行任务。任务的下一次执行时间是相对于上一次任务的开始时间来计算的,即任务的执行时间不会影响下一次任务的执行时间。schedule()
方法:在任务执行时间超过延迟时间间隔时,下一次任务的执行会立即开始,即任务的执行时间可能会影响后续任务的执行时间间隔。scheduleAtFixedRate()
方法:在任务执行时间超过时间间隔时,下一次任务的执行不会立即开始,而是等待固定的时间间隔后再执行。无论任务的执行时间如何,都会按照设定的时间间隔执行任务。schedule()
方法以指定的时间点触发任务执行,并在每次任务执行完毕后重新计算下一次执行时间,而scheduleAtFixedRate()
方法以固定的速率重复执行任务,不会重新计算下一次执行时间
Timer
内部使用一个叫做 TaskQueue
的类存放定时任务,它是一个基于最小堆 实现的优先级队列。
TaskQueue
会按照任务距离下一次执行时间的大小将任务排序,保证在堆顶的任务最先执行
TimerTask task; // 标记任务是否应该执行 boolean taskFired; synchronized(queue) { // Wait for queue to become non-empty // 如果队列为空,且newTasksMayBeScheduled为true,此时等待任务加入 while (queue.isEmpty() && newTasksMayBeScheduled) queue.wait(); // 如果队列为空,且newTasksMayBeScheduled为false,说明此时线程应该退出 if (queue.isEmpty()) break; // Queue is empty and will forever remain; die // Queue nonempty; look at first evt and do the right thing long currentTime, executionTime; task = queue.getMin(); synchronized(task.lock) { if (task.state == TimerTask.CANCELLED) { queue.removeMin(); continue; // No action required, poll queue again } currentTime = System.currentTimeMillis(); executionTime = task.nextExecutionTime; // 当前时间 >= 目标执行时间,说明任务可执行,设置taskFired = true if (taskFired = (executionTime<=currentTime)) { // period == 0 说明是非周期任务,先从队列移除 if (task.period == 0) { // Non-repeating, remove queue.removeMin(); task.state = TimerTask.EXECUTED; } // 周期任务,会根据period重设执行时间,再加入到队列中 else { // Repeating task, reschedule queue.rescheduleMin( task.period<0 ? currentTime - task.period : executionTime + task.period); } } } // 任务为不需执行状态,则等待 if (!taskFired) // Task hasn't yet fired; wait queue.wait(executionTime - currentTime); } // 任务需要执行,则调用task的run方法执行,这里执行的其实就是调用方创建task时候run方法的逻辑 if (taskFired) // Task fired; run it, holding no locks task.run();
优点 | 缺点 |
---|---|
简单易用,API 直观 | 单线程执行任务,一个任务执行时间过长会影响其他任务的执行 |
支持延迟执行和周期性执行任务 | 不适合大量并发任务的场景,性能较差 |
内部使用单个线程调度任务 | 任务执行期间,不能并发执行其他任务 |
提供了方便的任务取消功能 | 不适合需要精确时间调度的场景 |
在单个线程中顺序执行任务 | 不提供任务执行时间的统计和监控功能 |
可以指定任务的优先级 | 不支持任务依赖关系的管理 |
使用场景:
TimerTask repeatedTask = new TimerTask() {
@Override
public void run() {
System.out.println("当前时间: " + new Date() + "n" +
"线程名称: " + Thread.currentThread().getName());
}
};
System.out.println("当前时间: " + new Date() + "n" +
"线程名称: " + Thread.currentThread().getName());
ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
long delay = 1000L;
long period = 1000L;
executor.scheduleAtFixedRate(repeatedTask, delay, period, TimeUnit.MILLISECONDS);
Thread.sleep(delay + period * 5);
executor.shutdown();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); // 定时任务1:每隔1秒打印一次当前时间 Runnable task1 = () -> { System.out.println("Task 1: Current time - " + LocalDateTime.now()); }; // 定时任务2:延迟3秒后执行,然后每隔2秒打印一次当前时间 Runnable task2 = () -> { System.out.println("Task 2: Current time - " + LocalDateTime.now()); }; // 定时任务3:每隔5秒打印一次 "Hello, World!" Runnable task3 = () -> { System.out.println("Task 3: Hello, World!"); }; // 执行定时任务1,每隔1秒执行一次 executor.scheduleAtFixedRate(task1, 0, 1, TimeUnit.SECONDS); // 执行定时任务2,延迟3秒后执行,然后每隔2秒执行一次 executor.scheduleWithFixedDelay(task2, 3, 2, TimeUnit.SECONDS); // 执行定时任务3,每隔5秒执行一次 executor.scheduleAtFixedRate(task3, 0, 5, TimeUnit.SECONDS); // 主线程休眠10秒 try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } // 关闭定时任务执行器 executor.shutdown();
scheduleWithFixedDelay
方法:
scheduleAtFixedRate
方法:
如果你希望任务之间有固定的时间间隔,并且希望任务的执行时间不会对间隔造成影响,可以使用 scheduleAtFixedRate
方法。如果你希望每次任务执行完成后都等待一定的延迟时间,再执行下一次任务,可以使用 scheduleWithFixedDelay
方法。
ScheduledThreadPoolExecutor
是 Java 中的一个线程池实现,它可以用于执行定时任务。它的原理是基于线程池和定时调度器的结合,在 ScheduledThreadPoolExecutor
中,定时调度器的实现主要依赖于 DelayedWorkQueue
这个内部任务队列和一个专门的调度线程。
详细:
ScheduledThreadPoolExecutor
内部维护了一个 DelayedWorkQueue
对象作为任务队列。该队列是一个按照任务的执行时间点进行排序的优先队列,确保按照执行时间点的顺序来执行任务。ScheduledThreadPoolExecutor
时,它会被包装成一个 ScheduledFutureTask
对象并插入到任务队列中。ScheduledFutureTask
继承自 FutureTask
,它除了包含任务的执行逻辑,还包含任务的执行时间点。ScheduledThreadPoolExecutor
内部有一个专门的调度线程,负责从任务队列中获取并执行任务。该线程会不断循环执行以下步骤:
run()
方法,完成任务的具体操作。public void run() { boolean periodic = isPeriodic(); // 如果不能在当前状态下运行,那么就要取消任务 if (!canRunInCurrentRunState(periodic)) cancel(false); // 如果只是延时任务,那么就调用run方法,运行任务。 else if (!periodic) ScheduledFutureTask.super.run(); // 如果是周期定时任务,调用runAndReset方法,运行任务。 // 这个方法不会改变任务的状态,所以可以反复执行。 else if (ScheduledFutureTask.super.runAndReset()) { // 设置周期任务下一次执行的开始时间time setNextRunTime(); // 重新执行任务outerTask reExecutePeriodic(outerTask); } }
优点 | 缺点 |
---|---|
支持高并发 | 需要手动管理线程池大小 |
灵活的任务管理 | 对任务的执行时间无法做出实时调整 |
可以处理任务的异常 | 需要自行处理任务的优先级和依赖关系 |
提供延迟执行和周期性执行 | 需要额外的线程池资源 |
可以控制线程池的大小 |
常见使用场景:
ScheduledThreadPoolExecutor
提供了强大的定时任务调度功能,可以用于执行定时触发的任务或周期性执行的任务。ScheduledThreadPoolExecutor
是基于线程池的实现,可以同时处理多个任务,并发执行任务。需要注意的是,ScheduledThreadPoolExecutor
在使用时需要根据具体场景进行合理的线程池大小配置,避免线程过多或过少的情况发生。另外,当任务的执行时间发生变化时,ScheduledThreadPoolExecutor
无法实时调整任务的执行时间,因此在对任务执行时间要求较高的场景下,需谨慎使用。
attention: Timer 和 ScheduledExecutorService 都是不支持CRON 表达式
Cron表达式是一种时间表达式,用于指定在何时执行特定任务。它由6个字段组成,表示任务的执行时间规则。Cron表达式广泛应用于任务调度系统、定时任务管理以及日程提醒等领域。本文将介绍Cron表达式的基本语法和用法,并探讨其发展历程。
Cron表达式最初起源于Unix系统,最早用于调度系统任务。随着时间的推移,Cron表达式在各种
应用领域得到广泛应用,并在不同的编程语言和框架中实现了相应的解析和调度器。
随着需求的增加,Cron表达式的功能不断扩展,例如增加了更多的特殊符号、支持年份字段等。同时,一些成熟的任务调度框架如Quartz、Spring Task等也提供了更强大和灵活的Cron表达式的支持。
在现代的开发中,Cron表达式已成为定时任务调度的标准,被广泛使用于各种应用和系统中,例如定时数据备份、定时报表生成、定时任务执行等。
Cron表达式由6个字段组成,分别表示秒、分、小时、日期、月份和星期几。每个字段都可以使用特定的符号和通配符来定义时间范围。
字段 | 允许值 | 允许的特殊符号 |
---|---|---|
秒 | 0-59 | , - * / |
分 | 0-59 | , - * / |
小时 | 0-23 | , - * / |
日期 | 1-31 | , - * ? / L W |
月份 | 1-12 或 JAN-DEC | , - * / |
星期几 | 0-6 或 SUN-SAT | , - * ? / L # |
特殊符号的含义如下:
*
:匹配任意值-
:定义范围,如 3-5
表示 3、4、5,
:列举多个值,如 MON,WED,FRI
表示星期一、星期三和星期五/
:指定增量,如 0/15
表示从0开始,每隔15秒触发一次?
:只能用于日期和星期几,表示不指定具体值L
:用于日期和星期几,表示最后一个W
:用于日期,表示最近的工作日#
:用于星期几,表示第几个,如 2#3
表示第三个星期二以下是一些常见的Cron表达式使用示例:
0 0 12 * * ?
:每天中午12点触发0 15 10 ? * *
:每天上午10:15触发0 0/5 14,18 * * ?
:每天下午2点到6点,每隔5分钟触发一次0 0 6,19 * * MON-FRI
:周一至周五的早上6点和晚上7点触发0 0/2 * * * ?
:每隔2分钟触发一次0 30 23 L * ?
:每月最后一天的23:30触发q2: https://cron.qqe2.com/
CronTabGuru: https://crontab.guru/
年份 | 里程碑事件 |
---|---|
2003 | Spring Framework 1.0发布,首次引入了基本的任务调度功能。 |
2006 | Spring Framework 2.0发布,引入了TaskExecutor和TaskScheduler接口,提供了更强大的任务执行和调度功能。 |
2009 | Spring Framework 3.0发布,通过注解支持定时任务的定义和配置,大大简化了任务调度的开发和管理。 |
2014 | Spring Framework 4.0发布,进一步改进了定时任务的支持,包括更灵活的任务调度和异步执行的能力。 |
2018 | Spring Framework 5.0发布,引入了新的Reactive模块,提供了响应式编程的支持,并扩展了任务调度的能力。 |
2021 | Spring Framework 5.3发布,进一步改进了任务调度的性能和稳定性,增加了对长期运行任务的支持。 |
使用@EnableScheduling
注解开启定时任务,该注解放在启动类上。
@SpringBootApplication
// 开始定时任务
@EnableScheduling
public class ScheduleWorkApplication {
public static void main(String[] args) {
SpringApplication.run(ScheduleWorkApplication.class, args);
}
}
使用@Scheduled
注解定义定时任务
/** * fixedRate:固定速率执行。每5秒执行一次。 */ @Scheduled(fixedRate = 5000) public void reportCurrentTimeWithFixedRate() { log.info("SpringTask--Current Thread : {}", Thread.currentThread().getName()); log.info("SpringTask--Fixed Rate Task : The time is now {}", dateFormat.format(new Date())); } /** * fixedDelay:固定延迟执行。距离上一次调用成功后2秒才执。 */ @Scheduled(fixedDelay = 2000) public void reportCurrentTimeWithFixedDelay() { try { TimeUnit.SECONDS.sleep(3); log.info("SpringTask--Fixed Delay Task : The time is now {}", dateFormat.format(new Date())); } catch (InterruptedException e) { e.printStackTrace(); } } /** * initialDelay:初始延迟。任务的第一次执行将延迟5秒,然后将以5秒的固定间隔执行。 */ @Scheduled(initialDelay = 5000, fixedRate = 5000) public void reportCurrentTimeWithInitialDelay() { log.info("SpringTask--Fixed Rate Task with Initial Delay : The time is now {}", dateFormat.format(new Date())); } /** * cron:使用Cron表达式。 每分钟的1,2秒运行 */ @Scheduled(cron = "1-2 * * * * ? ") public void reportCurrentTimeWithCronExpression() { log.info("SpringTask--Cron Expression: The time is now {}", dateFormat.format(new Date())); }
@Scheduled
注解是Spring Framework中用于定义定时任务的注解之一,它提供了四种任务调度策略,对应四个参数:
fixedRate:固定频率调度
@Scheduled(fixedRate = 5000)
fixedDelay:固定延迟调度
@Scheduled(fixedDelay = 5000)
initialDelay:初始延迟调度
@Scheduled(initialDelay = 5000)
cron:Cron表达式调度
@Scheduled(cron = "0 0 8 * * ?")
SchedulingConfigurer
接口,并重写configureTasks
方法。示例代码:import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
@Configuration
public class MyTaskConfig implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.addFixedRateTask(() -> {
// 执行任务逻辑
}, 5000); // 每隔5秒执行一次
}
}
@EnableScheduling
注解,启用Spring的任务调度功能。application.properties
或application.yml
文件中配置定时任务的调度策略。示例代码(使用application.properties
配置文件):# 定时任务配置
spring.task.scheduling.pool.size=5
spring.task.scheduling.thread-name-prefix=my-task-
@Scheduled
注解标记方法。示例代码:import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class MyTask {
@Scheduled(fixedRate = 5000) // 每隔5秒执行一次
public void doTask() {
// 执行任务逻辑
}
}
@EnableScheduling
注解,启用Spring的任务调度功能。以上是Spring Boot中Spring Task的几种常见使用方式和相应的示例。根据实际需求,可以选择合适的方式来编写和管理定时任务。
@EnableScheduling注解
该注解的代码源码很简单,其核心在于引入了一个SchedulingConfiguration.class配置类
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({SchedulingConfiguration.class}) //导入配置类
@Documented
public @interface EnableScheduling {
}
SchedulingConfiguration.class配置类的源码也很简单,只是创建了一个ScheduledAnnotationBeanPostProcessor实例。从名字上看,该实例一种BeanPostProcessor(后置处理器)。
后置处理器作用是在Bean对象在实例化和依赖注入完毕后,在显示调用初始化方法的前后添加我们自己的逻辑。注意是Bean实例化完毕后及依赖注入完成后触发的。
@Configuration @Role(2) public class SchedulingConfiguration { public SchedulingConfiguration() { } @Bean( name = {"org.springframework.context.annotation.internalScheduledAnnotationProcessor"} ) //创建一个ScheduledAnnotationBeanPostProcessor实例 @Role(2) public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() { return new ScheduledAnnotationBeanPostProcessor(); } }
综上,@EnableScheduling注解是通过创建一个ScheduledAnnotationBeanPostProcessor实例实现开启定时任务的。
ScheduledAnnotationBeanPostProcessor类
查看源码可知,该类实现一堆接口,此处不再逐个介绍这些接口,主要分析该类实现开启定时任务的实现逻辑。
public class ScheduledAnnotationBeanPostProcessor implements ScheduledTaskHolder, MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor, Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware, SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {
....
}
postProcessAfterInitialization方法
该方法位于ScheduledAnnotationBeanPostProcessor类中,主要作用是找出全部被@Scheduled注解标记的方法,并调用processScheduled方法进行下一步处理。
public Object postProcessAfterInitialization(Object bean, String beanName) { Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); // 1 找出全部被`@Scheduled`注解标记的方法 if (!this.nonAnnotatedClasses.contains(targetClass)) { Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (method) -> { Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class); return !scheduledMethods.isEmpty() ? scheduledMethods : null; }); // 2 如果类中没有使用 @Scheduled注解,则将其加入不再访问名单 if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (this.logger.isTraceEnabled()) { this.logger.trace("No @Scheduled annotations found on bean class: " + bean.getClass()); } } else { //3 该类中有方法被@Scheduled注解标注,则使用processScheduled处理该方法 annotatedMethods.forEach((method, scheduledMethods) -> { scheduledMethods.forEach((scheduled) -> { this.processScheduled(scheduled, method, bean); }); }); if (this.logger.isDebugEnabled()) { this.logger.debug(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods); } } } return bean; } // ..
processScheduled方法
该方法位于ScheduledAnnotationBeanPostProcessor类中,该类主要作用是
1)检测被@Scheduled注解是否有参数
2)被@Scheduled注解标注的方法是否是无参且无返回值
3)使用ScheduledTaskRegistrar注册定时任务,后加入任务列表
public class ScheduledAnnotationBeanPostProcessor implements xxx{ //ScheduledTaskRegistrar这个类为Spring容器定时任务注册中心,并使用taskScheduler执行定时任务 //默认情况下,使用单线程执行,可配置为多线程。 private final ScheduledTaskRegistrar registrar = new ScheduledTaskRegistrar(); protected void processScheduled(Scheduled scheduled, Method method, Object bean) { try { //1 被@Scheduled注解标记的方法必须无参 Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled"); Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass()); Runnable runnable = new ScheduledMethodRunnable(bean, invocableMethod); boolean processedSchedule = false; //2 被@Scheduled注解标记的方法使用 cron、fixedDelay、fixedRate三者之一 String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required"; // 3 定义一个set 存储待调度的任务 Set<ScheduledTask> tasks = new LinkedHashSet(4); long initialDelay = scheduled.initialDelay(); String initialDelayString = scheduled.initialDelayString(); //4.1 将cron类型的任务 放入set String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); } //4.2 将fixedDelay类型的任务 放入set long fixedDelay = scheduled.fixedDelay(); if (fixedDelay >= 0L) { tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } //4.3 将fixedDelayString类型的任务 放入set String fixedDelayString = scheduled.fixedDelayString(); if (StringUtils.hasText(fixedDelayString)) { tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } //4.4 将fixedRate类型的任务 放入set long fixedRate = scheduled.fixedRate(); if (fixedRate >= 0L) { tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } //4.5 将fixedRateString类型的任务 放入set String fixedRateString = scheduled.fixedRateString(); if (StringUtils.hasText(fixedRateString)) { tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } //5 最后将 task 注册到 scheduledTasks 中 ((Set)registeredTasks).addAll(tasks); } } // ...
ScheduledTaskRegistrar类
该类实现了三个接口,其中
InitializingBean接口为bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候会执行该方法。
DisposableBean接口和InitializingBean接口一样,为bean提供了释放资源方法的方式,它只包括destroy方法,凡是继承该接口的类,在bean被销毁之前都会执行该方法。
ScheduledTaskHolder接口定义返回当前实例的任务列表。
public class ScheduledTaskRegistrar implements ScheduledTaskHolder, InitializingBean, DisposableBean { /** *InitializingBean接口定义的方法 * Calls {@link #scheduleTasks()} at bean construction time. */ @Override public void afterPropertiesSet() { scheduleTasks(); } /** * 执行定时任务 * Schedule all registered tasks against the underlying * {@linkplain #setTaskScheduler(TaskScheduler) task scheduler}. */ @SuppressWarnings("deprecation") protected void scheduleTasks() { if (this.taskScheduler == null) { //默认使用单线程调度器 this.localExecutor = Executors.newSingleThreadScheduledExecutor(); //真正调度的为 ConcurrentTaskScheduler实例 this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor); } //添加四种执行调度 此处只留一个 其他省略 if (this.triggerTasks != null) { for (TriggerTask task : this.triggerTasks) { addScheduledTask(scheduleTriggerTask(task)); } } }
优点 | 缺点 |
---|---|
简单易用:Spring Task提供了简单易用的注解方式来定义定时任务,无需引入额外的依赖和配置。 | 功能相对有限:相较于其他定时任务框架,Spring Task的功能相对较为简单,对于复杂的任务调度需求可能不够灵活。 |
集成方便:Spring Task是基于Spring Framework的一部分,无需额外的集成步骤,可直接在Spring Boot项目中使用。 | 单机环境:Spring Task适用于单机环境下的任务调度,对于分布式环境的任务调度需求,需要额外的处理。 |
线程池支持:Spring Task支持使用线程池来执行定时任务,可以控制任务的并发性和线程池参数。 | 缺少监控和管理:相较于专门的任务调度框架,Spring Task缺少一些监控和管理功能,如任务执行状态的监控和任务的动态管理。 |
集成Spring生态系统:Spring Task与其他Spring组件无缝集成,可以方便地使用依赖注入、AOP等特性。 | 高精度调度:Spring Task的调度精度可能受到底层操作系统的影响,无法做到非常精确的调度。 |
总的来说,Spring Task是一种简单、方便的定时任务实现方式,适用于简单的任务调度需求,并且与Spring生态系统集成良好。但对于复杂的任务调度需求、分布式环境下的任务调度或需要更高精度的调度等场景,可能需要考虑其他更专业的任务调度框架。
时间轮简单来说就是一个环形的队列(底层一般基于数组实现),队列中的每一个元素(时间格)都可以存放一个定时任务列表。
时间轮中的每个时间格代表了时间轮的基本时间跨度或者说时间精度,假如时间一秒走一个时间格的话,那么这个时间轮的最高精度就是 1 秒(也就是说 3 s 和 3.9s 会在同一个时间格中)。
下图是一个有 12 个时间格的时间轮,转完一圈需要 12 s。当我们需要新建一个 3s 后执行的定时任务,只需要将定时任务放在下标为 3 的时间格中即可。当我们需要新建一个 9s 后执行的定时任务,只需要将定时任务放在下标为 9 的时间格中即可。
private static final int TIME_SLOT_COUNT = 12; // 时间轮的时间格数量 private static final long TIME_SLOT_DURATION = 1000; // 每个时间格的时间跨度,单位为毫秒 private static final long MAX_DELAY = TIME_SLOT_COUNT * TIME_SLOT_DURATION; // 最大延迟时间 private ScheduledExecutorService executorService; private List<List<Runnable>> timeSlots; // 时间格中的任务列表 private int currentIndex; // 当前时间格的索引 public TimeWheelDemo() { executorService = Executors.newSingleThreadScheduledExecutor(); timeSlots = new ArrayList<>(TIME_SLOT_COUNT); for (int i = 0; i < TIME_SLOT_COUNT; i++) { timeSlots.add(new ArrayList<>()); } currentIndex = 0; } public void start() { executorService.scheduleAtFixedRate(this::tick, TIME_SLOT_DURATION, TIME_SLOT_DURATION, TimeUnit.MILLISECONDS); } public void schedule(Runnable task, long delay) { if (delay >= MAX_DELAY) { // 超过最大延迟时间,直接执行任务 executorService.execute(task); } else { int slotIndex = (currentIndex + (int) (delay / TIME_SLOT_DURATION)) % TIME_SLOT_COUNT; timeSlots.get(slotIndex).add(task); } } private void tick() { List<Runnable> currentSlot = timeSlots.get(currentIndex); for (Runnable task : currentSlot) { executorService.execute(task); } currentSlot.clear(); currentIndex = (currentIndex + 1) % TIME_SLOT_COUNT; }
优点 | 缺点 |
---|---|
高效性:时间轮算法利用环形队列和索引计算,执行定时任务的效率较高。 | 单一节点:时间轮算法适用于单体应用,无法直接扩展到分布式环境中。 |
简单实现:相对于其他复杂的调度算法,时间轮算法的实现相对简单,易于理解和维护。 | 精度受限:时间轮算法的调度精度受到时间格的大小限制,无法做到非常精确的调度。 |
高并发支持:时间轮算法利用多线程执行定时任务,可以支持高并发的任务调度需求。 | 时间格数量限制:时间轮算法的性能和精度与时间格的数量相关,需要根据实际情况合理调整。 |
可靠性:时间轮算法基于环形队列,循环执行任务,不会因为系统时间的变化或任务耗时导致任务调度出错。 | 对任务依赖:时间轮算法适用于独立的定时任务,对于有任务间依赖关系或复杂调度逻辑的场景较为有限。 |
总的来说,时间轮算法在单体应用中实现定时任务具有高效性、简单实现和高并发支持的优点。然而,它也有一些局限性,如精度受限、单一节点的限制以及对任务依赖较为有限。在选择使用时间轮算法之前,需要根据具体需求和场景综合考虑其优缺点。
下表对比了Java中Timer、ScheduledExecutorService、Spring Task和时间轮算法四种实现定时任务的特点、优缺点以及适用场景的技术选型考虑:
特点 | Timer | ScheduledExecutorService | Spring Task | 时间轮算法 |
---|---|---|---|---|
实现方式 | 单线程实现 | 线程池实现 | 基于线程池的任务调度 | 环形队列存储任务 |
并发支持 | 低 | 高 | 高 | 高 |
功能丰富度 | 有限 | 丰富 | 适中 | 适中 |
可靠性 | 低(单线程) | 高 | 高 | 高 |
调度精度 | 低(受系统时间影响) | 高 | 高 | 中 |
分布式支持 | 无 | 无 | 无 | 无 |
复杂性 | 简单 | 中 | 简单 | 中 |
适用场景 | 单线程、简单任务 | 并发任务、复杂调度需求 | 单体应用、简单调度需求 | 单体应用、高并发调度需求 |
根据不同的需求和场景,可以根据以下考虑进行技术选型:
需要根据具体的业务需求、并发性要求、调度复杂性和精度要求等因素进行综合考虑,选择最合适的技术来实现定时任务。
Quartz是一个功能强大的开源任务调度框架,用于在Java应用程序中实现定时任务的调度和执行。它提供了灵活的调度配置和管理机制,可以满足各种定时任务的需求。Quartz支持集群部署,可以在分布式环境下实现高可用性和负载均衡。
Quartz最早由Terracotta团队开发,后来成为开源项目,现在由Quartz项目组维护。Quartz的第一个版本发布于2001年,经过多年的发展和迭代,已经成为业界广泛使用的定时任务调度框架之一。Quartz在实践中得到了广泛应用,被众多企业和开发者所采用。
Quartz包含以下核心模块:
Scheduler:调度器是Quartz的核心组件,负责加载和执行任务。它可以配置调度规则、触发器和任务,并提供了丰富的调度操作和管理方法。
Job和JobDetail:Job是需要定时执行的任务的抽象,通过实现Job接口定义具体的任务逻辑。JobDetail则包含了Job的相关信息,如任务名称、分组等。
Trigger:触发器定义了任务的调度规则,包括触发时间、重复次数、间隔时间等。Quartz提供了多种类型的触发器,如SimpleTrigger、CronTrigger等,以满足不同的调度需求。
JobStore:JobStore负责任务的持久化和存储,可以将任务信息存储在内存、数据库或其他存储介质中。Quartz提供了多种实现,如RAMJobStore、JDBCJobStore等。
Listener:Quartz提供了监听器机制,可以通过监听器在任务调度的不同阶段进行自定义操作。例如,可以监听任务的执行前后、调度器的启动和关闭等事件。
以下是一个简单的Quartz示例,演示了如何配置和使用Quartz进行定时任务调度:
import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; public class QuartzDemo { public static void main(String[] args) throws SchedulerException { SchedulerFactory schedulerFactory = new StdSchedulerFactory(); Scheduler scheduler = schedulerFactory.getScheduler(); // 创建JobDetail JobDetail jobDetail = JobBuilder.newJob(MyJob.class) .withIdentity("myJob", "group1") .build(); // 创建Trigger Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("myTrigger", "group1") .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(5)) .build(); // 将JobDetail和Trigger注册到调度器 scheduler.scheduleJob(jobDetail , trigger); // 启动调度器 scheduler.start(); } public static class MyJob implements Job { @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { System.out.println("Executing my job..."); } } }
在上面的示例中,我们创建了一个简单的Job,并配置了一个每5秒执行一次的Trigger。然后将JobDetail和Trigger注册到调度器,最后启动调度器,即可实现定时任务的调度和执行。
Quartz的集群模式指的是一个集群下多个节点管理同一批任务的调度**,通过共享数据库的方式实现,保证同一个任务到达触发时间的时候,只有一台机器去执行该任务。**每个节点部署一个单独的quartz实例,相互之间没有直接数据通信。
Table Name | Description |
---|---|
QRTZ_CALENDARS | 存储Quartz的Calendar信息 |
QRTZ_CRON_TRIGGERS | 存储CronTrigger,包括Cron表达式和时区信息 |
QRTZ_FIRED_TRIGGERS | 存储与已触发的Trigger相关的状态信息,以及相联Job的执行信息 |
QRTZ_PAUSED_TRIGGER_GRPS | 存储已暂停的Trigger组的信息 |
QRTZ_SCHEDULER_STATE | 存储少量的有关Scheduler的状态信息,和别的Scheduler实例 |
QRTZ_LOCKS | 存储程序的悲观锁的信息 |
QRTZ_JOB_DETAILS | 存储每一个已配置的Job的详细信息 |
QRTZ_JOB_LISTENERS | 存储有关已配置的JobListener的信息 |
QRTZ_SIMPLE_TRIGGERS | 存储简单的Trigger,包括重复次数、间隔、以及已触的次数 |
QRTZ_BLOG_TRIGGERS | Trigger作为Blob类型存储 |
QRTZ_TRIGGER_LISTENERS | 存储已配置的TriggerListener的信息 |
QRTZ_TRIGGERS | 存储已配置的Trigger的信息 |
org.quartz.scheduler.instanceName=SsmScheduler org.quartz.scheduler.instanceId=AUTO org.quartz.scheduler.wrapJobExecutionInUserTransaction=false org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount=10 org.quartz.threadPool.threadPriority=5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true org.quartz.jobStore.useProperties=true org.quartz.jobStore.tablePrefix=QRTZ_ org.quartz.jobStore.misfireThreshold=60000 org.quartz.jobStore.isClustered=true org.quartz.jobStore.clusterCheckinInterval=2000 org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate org.quartz.jobStore.dataSource=qzDS org.quartz.dataSource.qzDS.connectionProvider.class=cn.wdy.schedulequartz.config.DruidConnectionProvider org.quartz.dataSource.qzDS.driver=com.mysql.cj.jdbc.Driver org.quartz.dataSource.qzDS.URL=jdbc:mysql://127.0.0.1:3306/quartz-database?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true org.quartz.dataSource.qzDS.user=quartz-database org.quartz.dataSource.qzDS.password=quartz-database org.quartz.dataSource.qzDS.maxConnection=5 org.quartz.dataSource.qzDS.validationQuery=select 0 from dual
同时运行两个项目,有着相同的定时任务,如果一个项目中断,则另外一个项目中定时任务会继续执行
Quartz实现分布式的原理简述:
分布式任务调度是在分布式系统中对任务进行调度和执行的一种技术。它能够解决任务调度的并发性、高可用性和扩展性等问题。ElasticJob是一个开源的分布式任务调度框架,为分布式任务调度提供了便捷的解决方案。本文将介绍ElasticJob的技术发展历史、核心模块、简单的示例和分布式实现的原理。
ElasticJob最初由当当网开发并开源,目前已成为Apache软件基金会的顶级项目。自推出以来,ElasticJob得到了广泛的应用和发展,并且不断增加新功能和改进。它在分布式任务调度领域取得了良好的声誉,并成为了许多企业的首选框架之一。
ElasticJob由以下几个核心模块组成:
Job:任务模块,定义具体的任务逻辑。开发者需要实现Job接口并编写任务执行逻辑。
LiteJobConfiguration:任务配置模块,用于配置任务的相关参数,如任务名称、调度时间表达式、任务分片等。
JobScheduler:任务调度模块,负责将任务分发给执行器,并进行任务调度和管理。
JobExecutor:任务执行器模块,负责执行具体的任务逻辑。
JobRegistry:任务注册中心模块,用于注册和发现可用的任务执行器。
以下是一个简单的ElasticJob示例,用于演示一个基于Java的分布式定时任务的实现:
public class MyJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { // 执行具体的任务逻辑 System.out.println("执行任务:" + shardingContext.getJobName()); } } public class ElasticJobDemo { public static void main(String[] args) { // 创建任务配置 LiteJobConfiguration jobConfiguration = LiteJobConfiguration.newBuilder(new SimpleJobConfiguration( JobCoreConfiguration.newBuilder("myJob", "0/5 * * * * ?", 2).build(), MyJob.class.getCanonicalName() )).build(); // 创建任务调度器 JobScheduler jobScheduler = new JobScheduler( new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "elastic-job-demo")), jobConfiguration ); // 启动任务调度器 jobScheduler.init(); } }
以上示例定义了一个名为MyJob
的任务,它实现了SimpleJob
接口,并在execute
方法中定义了具体的任务逻辑。在ElasticJobDemo
类中,我们创建了任务配置和任务调度器,并启动了任务调度器。该示
例将每5秒执行一次任务。
现有一个批量处理的业务,数据量较大,现将计算任务分发给节点进行执行
String taskId = context.getTaskId(); String shardingParameter = context.getShardingParameter(); log.info("端口:{},定时任务开始:taskId:{},shardingParameter:{}", port, taskId, shardingParameter); log.info("~~~~Thread ID: {}, " + "作业分片总数: {}, " + "当前分片项:{}." + "当前参数: {}," + "作业名称:{}" + "作业自定义参数:{}", Thread.currentThread().getId(), context.getShardingTotalCount(), context.getShardingItem(), context.getShardingParameter(), context.getJobName(), context.getJobParameter() ); // 根据传参的不同,来进行分批处理相应业务 List<Person> people = personService.queryPersonBySex(Integer.valueOf(shardingParameter)); if (!CollectionUtils.isEmpty(people)) { for (Person person : people) { personService.update(person); } }
ElasticJob的分布式实现基于分片(Sharding)的概念。任务被分成多个片段,并由不同的任务执行器执行。具体实现的原理如下:
任务分片:根据任务配置中的分片参数,将任务划分成多个片段。每个片段独立执行,避免单一节点负载过重。
任务分发:任务调度器通过任务注册中心获取可用的任务执行器,并将任务片段分发给它们。任务调度器负责任务分发和调度策略的管理。
任务执行:每个任务执行器独立执行分配给它的任务片段。它们通过心跳机制向任务调度器汇报任务执行情况,并接收新的任务分配。
分布式协调:任务调度器和任务执行器之间通过分布式协调机制实现任务的分发和同步。常见的分布式协调工具有Zookeeper和Redis等。
通过以上机制,ElasticJob实现了分布式任务的调度和执行。它能够根据任务配置灵活地进行任务分片和分发,并提供高可用性和负载均衡的分布式任务调度方案。
XXL-Job:是大众点评的分布式任务调度平台,是一个轻量级分布式任务调度平台, 其核心设计目标是开发迅速、学习简单、轻量级、易扩展
大众点评目前已接入XXL-JOB,该系统在内部已调度约100万次,表现优异。
目前已有多家公司接入xxl-job,包括比较知名的大众点评,京东,优信二手车,360金融 (360),联想集团 (联想),易信 (网易)等等
XXL-Job最初由小马哥(马士兵)开发并开源,目前由XXL-Media维护。自推出以来,XXL-Job经过多个版本的迭代和改进,成为了国内较为知名的分布式任务调度平台。它被广泛应用于各行各业的分布式系统中,具有良好的稳定性和性能。
XXL-Job包含以下核心模块:
调度中心:负责任务调度的管理和分发。它提供了Web界面供用户进行任务配置、管理和监控。
执行器:负责接收调度中心分发的任务,并执行任务的具体逻辑。执行器可以独立部署在多个节点上,实现任务的分布式执行。
日志存储:用于存储任务执行的日志信息,方便用户查看任务执行情况和排查问题。
调度器:负责按照任务配置的调度策略,将任务分发给可用的执行器节点。
分布式任务锁:用于解决分布式环境下的任务并发问题,保证同一时间只有一个执行器节点执行任务。
源码下载地址:
https://github.com/xuxueli/xxl-job
https://gitee.com/xuxueli0323/xxl-job
请下载项目源码并解压,获取 “调度数据库初始化SQL脚本” 并执行即可。
“调度数据库初始化SQL脚本” 位置为:
/xxl-job/doc/db/tables_xxl_job.sql
解压源码,按照maven格式将源码导入IDE, 使用maven进行编译即可,源码结构如下:
修改xxl-job-admin
项目的配置文件application.properties
,把数据库账号密码配置上
### web server.port=8080 server.servlet.context-path=/xxl-job-admin ### actuator management.server.servlet.context-path=/actuator management.health.mail.enabled=false ### resources spring.mvc.servlet.load-on-startup=0 spring.mvc.static-path-pattern=/static/** spring.resources.static-locations=classpath:/static/ ### freemarker spring.freemarker.templateLoaderPath=classpath:/templates/ spring.freemarker.suffix=.ftl spring.freemarker.charset=UTF-8 spring.freemarker.request-context-attribute=request spring.freemarker.settings.number_format=0.########## ### mybatis mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml #mybatis.type-aliases-package=com.xxl.job.admin.core.model ### xxl-job, datasource spring.datasource.url=jdbc:mysql://192.168.202.200:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai spring.datasource.username=root spring.datasource.password=WolfCode_2017 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver ### datasource-pool spring.datasource.type=com.zaxxer.hikari.HikariDataSource spring.datasource.hikari.minimum-idle=10 spring.datasource.hikari.maximum-pool-size=30 spring.datasource.hikari.auto-commit=true spring.datasource.hikari.idle-timeout=30000 spring.datasource.hikari.pool-name=HikariCP spring.datasource.hikari.max-lifetime=900000 spring.datasource.hikari.connection-timeout=10000 spring.datasource.hikari.connection-test-query=SELECT 1 spring.datasource.hikari.validation-timeout=1000 ### xxl-job, email spring.mail.host=smtp.qq.com spring.mail.port=25 spring.mail.username=xxx@qq.com spring.mail.from=xxx@qq.com spring.mail.password=xxx spring.mail.properties.mail.smtp.auth=true spring.mail.properties.mail.smtp.starttls.enable=true spring.mail.properties.mail.smtp.starttls.required=true spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory ### xxl-job, access token xxl.job.accessToken=default_token ### xxl-job, i18n (default is zh_CN, and you can choose "zh_CN", "zh_TC" and "en") xxl.job.i18n=zh_CN ## xxl-job, triggerpool max size xxl.job.triggerpool.fast.max=200 xxl.job.triggerpool.slow.max=100 ### xxl-job, log retention days xxl.job.logretentiondays=30
运行XxlJobAdminApplication
程序即可.
调度中心访问地址: http://localhost:8080/xxl-job-admin
默认登录账号 “admin/123456”, 登录后运行界面如下图所示。
至此“调度中心”项目已经部署成功。
创建SpringBoot项目并且添加如下依赖:
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.1</version>
</dependency>
在配置文件中添加如下配置:
### 调度中心部署根地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册; xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin ### 执行器通讯TOKEN [选填]:非空时启用; xxl.job.accessToken=default_token ### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册 xxl.job.executor.appname=xxl-job-executor-sample ### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。 xxl.job.executor.address= ### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务"; xxl.job.executor.ip=127.0.0.1 ### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口; xxl.job.executor.port=9999 ### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径; xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler ### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能; xxl.job.executor.logretentiondays=30
创建XxlJobConfig
配置对象:
@Configuration public class XxlJobConfig { @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.accessToken}") private String accessToken; @Value("${xxl.job.executor.appname}") private String appname; @Value("${xxl.job.executor.address}") private String address; @Value("${xxl.job.executor.ip}") private String ip; @Value("${xxl.job.executor.port}") private int port; @Value("${xxl.job.executor.logpath}") private String logPath; @Value("${xxl.job.executor.logretentiondays}") private int logRetentionDays; @Bean public XxlJobSpringExecutor xxlJobExecutor() { XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; } }
添加任务处理类,交给Spring容器管理,在处理方法上贴上@XxlJob
注解
@Component
public class SimpleXxlJob {
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
System.out.println("执行定时任务,执行时间:"+new Date());
}
}
登录调度中心,在任务管理中新增任务,配置内容如下:
新增后界面如下:
接着启动定时调度任务
在调度中心的调度日志中就可以看到,任务的执行结果.
管控台也可以看到任务的执行信息.
任务以源码方式维护在调度中心,支持通过Web IDE在线更新,实时编译和生效,因此不需要指定JobHandler。
( “GLUE模式(Java)” 运行模式的任务实际上是一段继承自IJobHandler的Java类代码,它在执行器项目中运行,可使用@Resource/@Autowire注入执行器里中的其他服务.
添加Service
@Service
public class HelloService {
public void methodA(){
System.out.println("执行MethodA的方法");
}
public void methodB(){
System.out.println("执行MethodB的方法");
}
}
添加任务配置
通过GLUE IDE在线编辑代码
编写内容如下:
package com.xxl.job.service.handler;
import cn.wolfcode.xxljobdemo.service.HelloService;
import com.xxl.job.core.handler.IJobHandler;
import org.springframework.beans.factory.annotation.Autowired;
public class DemoGlueJobHandler extends IJobHandler {
@Autowired
private HelloService helloService;
@Override
public void execute() throws Exception {
helloService.methodA();
}
}
启动并执行程序
在IDEA中设置SpringBoot项目运行开启多个集群
启动两个SpringBoot程序,需要修改Tomcat端口和执行器端口
Tomcat端口8081程序的命令行参数如下:
-Dserver.port=8081 -Dxxl.job.executor.port=8991
Tomcat端口8082程序的命令行参数如下:
-Dserver.port=8082 -Dxxl.job.executor.port=8992
在任务管理中,修改路由策略,修改成轮询
重新启动,我们可以看到效果是,定时任务会在这两台机器中进行轮询的执行
当执行器集群部署时,提供丰富的路由策略,包括:
FIRST(第一个):固定选择第一个机器
LAST(最后一个):固定选择最后一个机器;
ROUND(轮询):依次的选择在线的机器发起调度
RANDOM(随机):随机选择在线的机器;
CONSISTENT_HASH(一致性HASH):
每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
SHARDING_BROADCAST(分片广播):
广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
需求:我们现在实现这样的需求,在指定节假日,需要给平台的所有用户去发送祝福的短信.
在数据库中导入xxl_job_demo.sql
数据
添加依赖
<!--MyBatis驱动--> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.2.0</version> </dependency> <!--mysql驱动--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!--lombok依赖--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.10</version> </dependency>
添加配置
spring.datasource.url=jdbc:mysql://localhost:3306/xxl_job_demo?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=UTF-8
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.username=root
spring.datasource.password=WolfCode_2017
添加实体类
@Setter@Getter
public class UserMobilePlan {
private Long id;//主键
private String username;//用户名
private String nickname;//昵称
private String phone;//手机号码
private String info;//备注
}
添加Mapper处理类
@Mapper
public interface UserMobilePlanMapper {
@Select("select * from t_user_mobile_plan")
List<UserMobilePlan> selectAll();
}
任务处理方法实现
@XxlJob("sendMsgHandler") public void sendMsgHandler() throws Exception{ List<UserMobilePlan> userMobilePlans = userMobilePlanMapper.selectAll(); System.out.println("任务开始时间:"+new Date()+",处理任务数量:"+userMobilePlans.size()); Long startTime = System.currentTimeMillis(); userMobilePlans.forEach(item->{ try { //模拟发送短信动作 TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println("任务结束时间:"+new Date()); System.out.println("任务耗时:"+(System.currentTimeMillis()-startTime)+"毫秒"); }
任务配置信息
比如我们的案例中有2000+条数据,如果不采取分片形式的话,任务只会在一台机器上执行,这样的话需要20+秒才能执行完任务.
如果采取分片广播的形式的话,一次任务调度将会广播触发对应集群中所有执行器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
获取分片参数方式:
// 可参考Sample示例执行器中的示例任务"ShardingJobHandler"了解试用
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
通过这两个参数,我们可以通过求模取余的方式,分别查询,分别执行,这样的话就可以提高处理的速度.
之前2000+条数据只在一台机器上执行需要20+秒才能完成任务,分片后,有两台机器可以共同完成2000+条数据,每台机器处理1000+条数据,这样的话只需要10+秒就能完成任务
Mapper增加查询方法
@Mapper
public interface UserMobilePlanMapper {
@Select("select * from t_user_mobile_plan where mod(id,#{shardingTotal})=#{shardingIndex}")
List<UserMobilePlan> selectByMod(@Param("shardingIndex") Integer shardingIndex,@Param("shardingTotal")Integer shardingTotal);
@Select("select * from t_user_mobile_plan")
List<UserMobilePlan> selectAll();
}
任务类方法
@XxlJob("sendMsgShardingHandler") public void sendMsgShardingHandler() throws Exception{ System.out.println("任务开始时间:"+new Date()); int shardTotal = XxlJobHelper.getShardTotal(); int shardIndex = XxlJobHelper.getShardIndex(); List<UserMobilePlan> userMobilePlans = null; if(shardTotal==1){ //如果没有分片就直接查询所有数据 userMobilePlans = userMobilePlanMapper.selectAll(); }else{ userMobilePlans = userMobilePlanMapper.selectByMod(shardIndex,shardTotal); } System.out.println("处理任务数量:"+userMobilePlans.size()); Long startTime = System.currentTimeMillis(); userMobilePlans.forEach(item->{ try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println("任务结束时间:"+new Date()); System.out.println("任务耗时:"+(System.currentTimeMillis()-startTime)+"毫秒"); }
任务设置
Power-Job是一款开源的分布式任务调度平台,旨在解决分布式系统中任务调度的需求。它提供了可靠、高效的任务调度和执行功能,支持分布式部署和扩展。Power-Job具有简单易用、稳定可靠等特点,广泛应用于各种分布式系统中。
├── LICENSE
├── powerjob-client // powerjob-client,普通Jar包,提供 OpenAPI
├── powerjob-common // 各组件的公共依赖,开发者无需感知
├── powerjob-remote // 内部通讯层框架,开发者无需感知
├── powerjob-server // powerjob-server,基于SpringBoot实现的调度服务器
├── powerjob-worker // powerjob-worker, 普通Jar包,接入powerjob-server的应用需要依赖该Jar包
├── powerjob-worker-agent // powerjob-agent,可执行Jar文件,可直接接入powerjob-server的代理应用
├── powerjob-worker-samples // 教程项目,包含了各种Java处理器的编写样例
├── powerjob-worker-spring-boot-starter // powerjob-worker 的 spring-boot-starter ,spring boot 应用可以通用引入该依赖一键接入 powerjob-server
├── powerjob-official-processors // 官方处理器,包含一系列常用的 Processor,依赖该 jar 包即可使用
├── others
└── pom.xml
任务(Job):描述了需要被 PowerJob 调度的任务信息,包括任务名称、调度时间、处理器信息等。
任务实例( JobInstance,简称 Instance):任务(Job)被调度执行后会生成任务实例(Instance),任务实例记录了任务的运行时信息(任务与任务实例的关系类似于类与对象的关系)。
作业(Task):任务实例的执行单元,一个 JobInstance 存在至少一个 Task,具体规则如下:
工作流(Workflow):由 DAG(有向无环图)描述的一组任务(Job),用于任务编排。
工作流实例(WorkflowInstance):工作流被调度执行后会生成工作流实例,记录了工作流的运行时信息。
备注:固定延迟和固定频率任务统称秒级任务,这两种任务无法被停止,只有任务被关闭或删除时才能真正停止任务
git clone https://github.com/PowerJob/PowerJob.git
创建数据库
CREATE DATABASE IF NOT EXISTS powerjob-daily DEFAULT CHARSET utf8mb4
[外链图片转存失败,源站可
启动此类:tech.powerjob.server.PowerJobServerApplication
注册应用。应用名:icreate-demo;密码:123456
应用登录
进入示例工程(powerjob-worker-samples),修改配置文件连接powerjob-server并编写自己的处理器代码。
修改 powerjob-worker-samples 的 application.properties,将 powerjob.worker.app-name 改为刚刚在控制台注册的名称。
随便找个地方新建类,继承你想要使用的处理器(各个处理器的介绍可见官方文档,文档非常详细)
5 启动任务
项目 | Quartz | Elastic-Job | XXL-JOB | SchedulerX |
---|---|---|---|---|
定时调度 | Cron | Cron | Cron | CronFixed_DelayFixed_RateOne_TimeOpenAPI |
任务编排 | 不支持 | 不支持 | 不支持 | 支持,可以通过图形化配置,并且任务间可传递数据。 |
分布式跑批 | 不支持 | 静态分片 | 广播 | 广播静态分片动态分片(MapReduce) |
多语言 | Java | Java脚本任务 | Java脚本任务 | Java脚本任务HTTP任务K8s Job |
可观测 | 无 | 弱 | 历史记录运行日志(不支持搜索)监控大盘 | 历史记录运行日志(支持搜索)监控大盘操作记录查看堆栈链路追踪 |
可运维 | 无 | 启用、禁用任务 | 启用、禁用任务手动运行任务停止任务 | 启用、禁用任务手动运行任务停止任务标记成功重刷历史数据 |
报警监控 | 无 | 邮件 | 邮件 | 邮件webhook短信电话 |
性能 | 每次调度通过DB抢锁,对DB压力大。 | ZooKeeper是性能瓶颈。 | 由Master节点调度,Master节点压力大。 | 可水平扩展,支持海量任务调度。 |
QuartZ | xxl-job | SchedulerX 2.0 | PowerJob | |
---|---|---|---|---|
定时类型 | CRON | CRON | CRON、固定频率、固定延迟、OpenAPI | CRON、固定频率、固定延迟、OpenAPI |
任务类型 | 内置Java | 内置Java、GLUE Java、Shell、Python等脚本 | 内置Java、外置Java(FatJar)、Shell、Python等脚本 | 内置Java、外置Java(容器)、Shell、Python等脚本 |
分布式任务 | 无 | 静态分片 | MapReduce 动态分片 | MapReduce 动态分片 |
在线任务治理 | 不支持 | 支持 | 支持 | 支持 |
日志白屏化 | 不支持 | 支持 | 不支持 | 支持 |
调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | 无锁化设计,性能强劲无上限 |
报警监控 | 无 | 邮件 | 短信 | 邮件,提供接口允许开发者扩展 |
系统依赖 | MySQL、Oracle…) | MySQL | 人民币 | 任意 Spring Data Jpa支持的关系型数据库(MySQL、Oracle…) |
DAG 工作流 | 持 | 不支持 | 支持 | 支持 |
------------------- | ------------------------------------------------- | ------------------------------------------------------------ |
| 定时类型 | CRON | CRON | CRON、固定频率、固定延迟、OpenAPI | CRON、固定频率、固定延迟、OpenAPI |
| 任务类型 | 内置Java | 内置Java、GLUE Java、Shell、Python等脚本 | 内置Java、外置Java(FatJar)、Shell、Python等脚本 | 内置Java、外置Java(容器)、Shell、Python等脚本 |
| 分布式任务 | 无 | 静态分片 | MapReduce 动态分片 | MapReduce 动态分片 |
| 在线任务治理 | 不支持 | 支持 | 支持 | 支持 |
| 日志白屏化 | 不支持 | 支持 | 不支持 | 支持 |
| 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | 无锁化设计,性能强劲无上限 |
| 报警监控 | 无 | 邮件 | 短信 | 邮件,提供接口允许开发者扩展 |
| 系统依赖 | MySQL、Oracle…) | MySQL | 人民币 | 任意 Spring Data Jpa支持的关系型数据库(MySQL、Oracle…) |
| DAG 工作流 | 持 | 不支持 | 支持 | 支持 |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。