当前位置:   article > 正文

java定时任务使用_java 定时任务

java 定时任务

单体定时任务

1 java.util.Timer

1.1 基本使用

1.1.2 1s 之后执行的定时任务
        // 创建一个定时任务
        TimerTask task = new TimerTask() {
            @Override
            public void run() {
                System.out.println("TimerTask执行时间: " + new Date() + "n" +
                        "线程名称: " + Thread.currentThread().getName());
            }
        };
        System.out.println("当前时间: " + new Date() + "n" +
                "线程名称: " + Thread.currentThread().getName());
        // 初始化任务队列
        Timer timer = new Timer("Timer");
        long delay = 1000L;
        // 提交任务
        timer.schedule(task, delay);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
1.1.3 指定时间执行
		CalcServiceTimerThread taskThread = new CalcServiceTimerThread("定时执行任务");
        //设置执行时间
        Calendar calendar = Calendar.getInstance();
        int year = calendar.get(Calendar.YEAR);
        int month = calendar.get(Calendar.MONTH);
        int day = calendar.get(Calendar.DAY_OF_MONTH);
        //定制每天的23:00:00执行,
        calendar.set(year, month, day, 23, 00, 00);
        Date date = calendar.getTime();
        Timer timer = new Timer();
        System.out.println("nowData:" + LocalDateTime.now());
        System.out.println("taskExecuteDateTime:" + date);
        //每天的date时刻执行task, 仅执行一次
        timer.schedule(taskThread, date);
        //每天的date时刻执行task,每隔2秒重复执行
        //long period = 2 * 1000;
        //timer.schedule(taskThread, date, period);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

1.2 基本组成

在Java中,Timer类用于调度任务的执行。以下是Timer类的基本组成:

  1. **Timer对象:**创建Timer对象是使用Timer类的主要入口点。Timer对象允许你调度和管理任务的执行。
    在这里插入图片描述

  2. **TimerTask对象:**TimerTask是一个抽象类,用于定义要执行的任务。你需要创建一个继承自TimerTask的子类,并实现其中的run()方法,该方法包含了具体的任务逻辑。
    在这里插入图片描述

  3. **schedule()方法:**Timer类中的schedule()方法用于安排任务的执行。它有多个重载形式,允许你指定任务、延迟时间、重复间隔等。

  4. **TimerTask队列:**Timer类内部维护一个TimerTask队列,用于存储要执行的任务。当调用schedule()方法安排任务时,任务会被添加到队列中。

  5. **Timer线程:**Timer类内部有一个单独的线程,负责按照设定的时间调度任务的执行。该线程会从TimerTask队列中取出任务,并在指定的时间点执行任务的run()方法。

  6. **取消任务:**Timer类提供了cancel()方法,用于取消尚未执行的任务。当调用cancel()方法时,所有未执行的任务将被移除。

使用Timer类时,你可以创建一个Timer对象,然后通过调用schedule()方法来安排任务的执行。任务可以是TimerTask的子类,通过重写run()方法来实现具体的逻辑。Timer类会在后台运行一个线程,按照设定的时间执行任务。

请注意,Java中的Timer类在较新的版本中已被推荐使用ScheduledExecutorService类来替代,因为ScheduledExecutorService提供了更灵活和可靠的任务调度机制。

1.3 Timer 中schedule相关构造方法

schedule()方法的解释说明

  • schedule(TimerTask task, Date time):

此方法用于安排在指定的时间执行指定的任务。任务将在参数time所表示的时间点执行,即在指定的日期和时间触发任务。

  • schedule(TimerTask task, long delay):

此方法用于安排在指定延迟之后执行指定的任务。任务将在当前时间的基础上延迟指定的毫秒数delay后执行。

  • schedule(TimerTask task, long delay, long period):

此方法用于安排在指定延迟之后开始重复执行指定的任务。任务将在当前时间的基础上延迟指定的毫秒数delay后执行,然后每隔指定的毫秒数period重复执行一次。

  • schedule(TimerTask task, Date firstTime, long period):

此方法用于安排在指定时间开始重复执行指定的任务。任务将在参数firstTime所表示的时间点执行,然后每隔指定的毫秒数period重复执行一次。

scheduleAtFixedRate 构造方法 --> 安排任务以固定的速率重复执行

  • scheduleAtFixedRate(TimerTask task, long delay, long period):

此方法用于安排任务以固定的速率重复执行。任务将在当前时间的基础上延迟指定的毫秒数delay后执行,然后每隔指定的毫秒数period重复执行一次。

  • scheduleAtFixedRate(TimerTask task, Date firstTime, long period):

此方法用于安排任务以固定的速率重复执行。任务将在参数firstTime所表示的时间点执行,然后每隔指定的毫秒数period重复执行一次。

scheduleAtFixedRate 和 schedule 区别

  1. 调度策略:
    • schedule()方法:按照固定的延迟时间间隔来执行任务。任务的下一次执行时间是相对于上一次任务的实际完成时间来计算的,即无论任务执行的时间是否超过了延迟时间间隔,下一次任务都会按照设定的延迟时间间隔来执行。
    • scheduleAtFixedRate()方法:按照固定的时间间隔来执行任务。任务的下一次执行时间是相对于上一次任务的开始时间来计算的,即任务的执行时间不会影响下一次任务的执行时间。
  2. 可能的间隔调整:
    • schedule()方法:在任务执行时间超过延迟时间间隔时,下一次任务的执行会立即开始,即任务的执行时间可能会影响后续任务的执行时间间隔。
    • scheduleAtFixedRate()方法:在任务执行时间超过时间间隔时,下一次任务的执行不会立即开始,而是等待固定的时间间隔后再执行。无论任务的执行时间如何,都会按照设定的时间间隔执行任务。

schedule()方法以指定的时间点触发任务执行,并在每次任务执行完毕后重新计算下一次执行时间,而scheduleAtFixedRate()方法以固定的速率重复执行任务,不会重新计算下一次执行时间
在这里插入图片描述

1.4 Timer的底层实现

Timer 内部使用一个叫做 TaskQueue 的类存放定时任务,它是一个基于最小堆 实现的优先级队列。

TaskQueue 会按照任务距离下一次执行时间的大小将任务排序,保证在堆顶的任务最先执行

				TimerTask task;
                // 标记任务是否应该执行
                boolean taskFired;
                synchronized(queue) {
                    // Wait for queue to become non-empty
                    // 如果队列为空,且newTasksMayBeScheduled为true,此时等待任务加入
                    while (queue.isEmpty() && newTasksMayBeScheduled)
                        queue.wait();
                    // 如果队列为空,且newTasksMayBeScheduled为false,说明此时线程应该退出
                    if (queue.isEmpty())
                        break; // Queue is empty and will forever remain; die

                    // Queue nonempty; look at first evt and do the right thing
                    long currentTime, executionTime;
                    task = queue.getMin();
                    synchronized(task.lock) {
                        if (task.state == TimerTask.CANCELLED) {
                            queue.removeMin();
                            continue;  // No action required, poll queue again
                        }
                        currentTime = System.currentTimeMillis();
                        executionTime = task.nextExecutionTime;
                        // 当前时间 >= 目标执行时间,说明任务可执行,设置taskFired = true
                        if (taskFired = (executionTime<=currentTime)) {
                            // period == 0 说明是非周期任务,先从队列移除
                            if (task.period == 0) { // Non-repeating, remove
                                queue.removeMin();
                                task.state = TimerTask.EXECUTED;
                            }
                            // 周期任务,会根据period重设执行时间,再加入到队列中
                            else { // Repeating task, reschedule
                                queue.rescheduleMin(
                                  task.period<0 ? currentTime   - task.period
                                                : executionTime + task.period);
                            }
                        }
                    }
                    // 任务为不需执行状态,则等待
                    if (!taskFired) // Task hasn't yet fired; wait
                        queue.wait(executionTime - currentTime);
                }
                // 任务需要执行,则调用task的run方法执行,这里执行的其实就是调用方创建task时候run方法的逻辑
                if (taskFired)  // Task fired; run it, holding no locks
                    task.run();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

1.5 优缺点比较以及使用场景

优点缺点
简单易用,API 直观单线程执行任务,一个任务执行时间过长会影响其他任务的执行
支持延迟执行和周期性执行任务不适合大量并发任务的场景,性能较差
内部使用单个线程调度任务任务执行期间,不能并发执行其他任务
提供了方便的任务取消功能不适合需要精确时间调度的场景
在单个线程中顺序执行任务不提供任务执行时间的统计和监控功能
可以指定任务的优先级不支持任务依赖关系的管理

使用场景:

  • 简单的定时任务调度:当需要执行简单的定时任务,例如定时触发某个事件、周期性地执行某个操作等,而不需要复杂的任务管理和并发控制时,可以使用 Timer。
  • 单线程环境:由于 Timer 是单线程执行任务的,适用于单线程环境下的定时任务调度。
  • 较少的任务并发量:Timer 不适合高并发场景,因为它使用单个线程执行所有的定时任务,当任务量较大或任务执行时间较长时,可能导致任务堆积和性能问题。
  • 不需要灵活的任务管理:Timer 提供了基本的延迟执行和周期性执行功能,但缺少任务优先级、依赖关系等高级任务管理功能。

2 ScheduledExecutorService

2.1 基本使用

2.1.1 延迟一秒周期执行五次
		TimerTask repeatedTask = new TimerTask() {
            @Override
            public void run() {
                System.out.println("当前时间: " + new Date() + "n" +
                        "线程名称: " + Thread.currentThread().getName());
            }
        };
        System.out.println("当前时间: " + new Date() + "n" +
                "线程名称: " + Thread.currentThread().getName());
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
        long delay = 1000L;
        long period = 1000L;
        executor.scheduleAtFixedRate(repeatedTask, delay, period, TimeUnit.MILLISECONDS);
        Thread.sleep(delay + period * 5);
        executor.shutdown();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
2.1.2 三种常见用法
       ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        // 定时任务1:每隔1秒打印一次当前时间
        Runnable task1 = () -> {
            System.out.println("Task 1: Current time - " + LocalDateTime.now());
        };
        // 定时任务2:延迟3秒后执行,然后每隔2秒打印一次当前时间
        Runnable task2 = () -> {
            System.out.println("Task 2: Current time - " + LocalDateTime.now());
        };
        // 定时任务3:每隔5秒打印一次 "Hello, World!"
        Runnable task3 = () -> {
            System.out.println("Task 3: Hello, World!");
        };
        // 执行定时任务1,每隔1秒执行一次
        executor.scheduleAtFixedRate(task1, 0, 1, TimeUnit.SECONDS);
        // 执行定时任务2,延迟3秒后执行,然后每隔2秒执行一次
        executor.scheduleWithFixedDelay(task2, 3, 2, TimeUnit.SECONDS);
        // 执行定时任务3,每隔5秒执行一次
        executor.scheduleAtFixedRate(task3, 0, 5, TimeUnit.SECONDS);
        // 主线程休眠10秒
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 关闭定时任务执行器
        executor.shutdown();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

2.2 两种构造方法的比较

  1. scheduleWithFixedDelay 方法:
    • 这个方法会在每次任务执行完成后,等待指定的延迟时间,然后再执行下一次任务。
    • 如果任务的执行时间超过了延迟时间,下一次任务的执行会等待前一个任务执行完成,并加上延迟时间。
    • 这种方式可以保证每次任务之间的间隔是固定的,不受任务执行时间的影响。
  2. scheduleAtFixedRate 方法:
    • 这个方法会按照指定的固定周期执行任务,不考虑任务的执行时间。
    • 无论任务的执行时间是多少,下一次任务都会在固定周期之后立即执行。
    • 如果任务的执行时间超过了固定周期,下一次任务会立即执行,不会等待上一个任务执行完成。

如果你希望任务之间有固定的时间间隔,并且希望任务的执行时间不会对间隔造成影响,可以使用 scheduleAtFixedRate 方法。如果你希望每次任务执行完成后都等待一定的延迟时间,再执行下一次任务,可以使用 scheduleWithFixedDelay 方法。
在这里插入图片描述

2.3 运作原理

ScheduledThreadPoolExecutor 是 Java 中的一个线程池实现,它可以用于执行定时任务。它的原理是基于线程池和定时调度器的结合,在 ScheduledThreadPoolExecutor 中,定时调度器的实现主要依赖于 DelayedWorkQueue 这个内部任务队列和一个专门的调度线程。

详细:

  1. 任务队列:ScheduledThreadPoolExecutor 内部维护了一个 DelayedWorkQueue 对象作为任务队列。该队列是一个按照任务的执行时间点进行排序的优先队列,确保按照执行时间点的顺序来执行任务。
  2. 提交任务:当一个定时任务被提交到 ScheduledThreadPoolExecutor 时,它会被包装成一个 ScheduledFutureTask 对象并插入到任务队列中。ScheduledFutureTask 继承自 FutureTask,它除了包含任务的执行逻辑,还包含任务的执行时间点。
  3. 调度线程:ScheduledThreadPoolExecutor 内部有一个专门的调度线程,负责从任务队列中获取并执行任务。该线程会不断循环执行以下步骤:
    • 获取任务:调度线程从任务队列中获取最近的任务,该任务的执行时间点已到达或超过。
    • 执行任务:调度线程执行任务的 run() 方法,完成任务的具体操作。
    • 重新调度周期性任务:对于周期性的定时任务,调度线程会根据任务的执行时间和周期重新计算下一次执行的时间,并将任务重新插入到任务队列中
public void run() {
    boolean periodic = isPeriodic();
    // 如果不能在当前状态下运行,那么就要取消任务
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 如果只是延时任务,那么就调用run方法,运行任务。
    else if (!periodic)
        ScheduledFutureTask.super.run();
    // 如果是周期定时任务,调用runAndReset方法,运行任务。
    // 这个方法不会改变任务的状态,所以可以反复执行。
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 设置周期任务下一次执行的开始时间time
        setNextRunTime();
        // 重新执行任务outerTask
        reExecutePeriodic(outerTask);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

2.4 优缺点以及使用场景

优点缺点
支持高并发需要手动管理线程池大小
灵活的任务管理对任务的执行时间无法做出实时调整
可以处理任务的异常需要自行处理任务的优先级和依赖关系
提供延迟执行和周期性执行需要额外的线程池资源
可以控制线程池的大小

常见使用场景:

  • 定时任务调度:ScheduledThreadPoolExecutor 提供了强大的定时任务调度功能,可以用于执行定时触发的任务或周期性执行的任务。
  • 并发任务处理:由于 ScheduledThreadPoolExecutor 是基于线程池的实现,可以同时处理多个任务,并发执行任务。
  • 动态调度和取消任务:可以根据需要动态地添加、取消或重新调度任务,灵活控制任务的执行。
  • 异步任务执行:可以提交异步任务到线程池执行,通过定时任务调度功能控制任务的执行时间。

需要注意的是,ScheduledThreadPoolExecutor 在使用时需要根据具体场景进行合理的线程池大小配置,避免线程过多或过少的情况发生。另外,当任务的执行时间发生变化时,ScheduledThreadPoolExecutor 无法实时调整任务的执行时间,因此在对任务执行时间要求较高的场景下,需谨慎使用。

attention: Timer 和 ScheduledExecutorService 都是不支持CRON 表达式

3 cron 表达式的简单介绍

Cron表达式是一种时间表达式,用于指定在何时执行特定任务。它由6个字段组成,表示任务的执行时间规则。Cron表达式广泛应用于任务调度系统、定时任务管理以及日程提醒等领域。本文将介绍Cron表达式的基本语法和用法,并探讨其发展历程。

Cron表达式发展历程

Cron表达式最初起源于Unix系统,最早用于调度系统任务。随着时间的推移,Cron表达式在各种

应用领域得到广泛应用,并在不同的编程语言和框架中实现了相应的解析和调度器。

随着需求的增加,Cron表达式的功能不断扩展,例如增加了更多的特殊符号、支持年份字段等。同时,一些成熟的任务调度框架如Quartz、Spring Task等也提供了更强大和灵活的Cron表达式的支持。

在现代的开发中,Cron表达式已成为定时任务调度的标准,被广泛使用于各种应用和系统中,例如定时数据备份、定时报表生成、定时任务执行等。

Cron表达式语法

Cron表达式由6个字段组成,分别表示秒、分、小时、日期、月份和星期几。每个字段都可以使用特定的符号和通配符来定义时间范围。

字段允许值允许的特殊符号
0-59, - * /
0-59, - * /
小时0-23, - * /
日期1-31, - * ? / L W
月份1-12 或 JAN-DEC, - * /
星期几0-6 或 SUN-SAT, - * ? / L #

特殊符号的含义如下:

  • *:匹配任意值
  • -:定义范围,如 3-5 表示 3、4、5
  • ,:列举多个值,如 MON,WED,FRI 表示星期一、星期三和星期五
  • /:指定增量,如 0/15 表示从0开始,每隔15秒触发一次
  • ?:只能用于日期和星期几,表示不指定具体值
  • L:用于日期和星期几,表示最后一个
  • W:用于日期,表示最近的工作日
  • #:用于星期几,表示第几个,如 2#3 表示第三个星期二

Cron表达式使用示例

以下是一些常见的Cron表达式使用示例:

  • 0 0 12 * * ?:每天中午12点触发
  • 0 15 10 ? * *:每天上午10:15触发
  • 0 0/5 14,18 * * ?:每天下午2点到6点,每隔5分钟触发一次
  • 0 0 6,19 * * MON-FRI:周一至周五的早上6点和晚上7点触发
  • 0 0/2 * * * ?:每隔2分钟触发一次
  • 0 30 23 L * ?:每月最后一天的23:30触发

常用工具

q2: https://cron.qqe2.com/

CronTabGuru: https://crontab.guru/

4 Spring Task

4.1 发展历程

年份里程碑事件
2003Spring Framework 1.0发布,首次引入了基本的任务调度功能。
2006Spring Framework 2.0发布,引入了TaskExecutor和TaskScheduler接口,提供了更强大的任务执行和调度功能。
2009Spring Framework 3.0发布,通过注解支持定时任务的定义和配置,大大简化了任务调度的开发和管理。
2014Spring Framework 4.0发布,进一步改进了定时任务的支持,包括更灵活的任务调度和异步执行的能力。
2018Spring Framework 5.0发布,引入了新的Reactive模块,提供了响应式编程的支持,并扩展了任务调度的能力。
2021Spring Framework 5.3发布,进一步改进了任务调度的性能和稳定性,增加了对长期运行任务的支持。

4.2 基本使用

4.2.1 基于@Scheduled注解
  • 使用@EnableScheduling注解开启定时任务,该注解放在启动类上。

    @SpringBootApplication
    // 开始定时任务
    @EnableScheduling
    public class ScheduleWorkApplication {
        public static void main(String[] args) {
            SpringApplication.run(ScheduleWorkApplication.class, args);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
  • 使用@Scheduled注解定义定时任务

    /**
     * fixedRate:固定速率执行。每5秒执行一次。
     */ 
    @Scheduled(fixedRate = 5000)
    public void reportCurrentTimeWithFixedRate() {
        log.info("SpringTask--Current Thread : {}", Thread.currentThread().getName());
        log.info("SpringTask--Fixed Rate Task : The time is now {}", dateFormat.format(new Date()));
    }
    
    /**
     * fixedDelay:固定延迟执行。距离上一次调用成功后2秒才执。
     */
    @Scheduled(fixedDelay = 2000)
    public void reportCurrentTimeWithFixedDelay() {
        try {
            TimeUnit.SECONDS.sleep(3);
            log.info("SpringTask--Fixed Delay Task : The time is now {}", dateFormat.format(new Date()));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    /**
     * initialDelay:初始延迟。任务的第一次执行将延迟5秒,然后将以5秒的固定间隔执行。
     */
    @Scheduled(initialDelay = 5000, fixedRate = 5000)
    public void reportCurrentTimeWithInitialDelay() {
        log.info("SpringTask--Fixed Rate Task with Initial Delay : The time is now {}", dateFormat.format(new Date()));
    }
    
    /**
     * cron:使用Cron表达式。 每分钟的1,2秒运行
     */
    @Scheduled(cron = "1-2 * * * * ? ")
    public void reportCurrentTimeWithCronExpression() {
        log.info("SpringTask--Cron Expression: The time is now {}", dateFormat.format(new Date()));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

@Scheduled注解是Spring Framework中用于定义定时任务的注解之一,它提供了四种任务调度策略,对应四个参数:

  1. fixedRate:固定频率调度

    • 参数:@Scheduled(fixedRate = 5000)
    • 表示任务开始后,每隔指定的时间(单位:毫秒)执行一次,无论上一次任务是否执行完成。
  2. fixedDelay:固定延迟调度

    • 参数:@Scheduled(fixedDelay = 5000)
    • 表示任务执行完成后,延迟指定的时间(单位:毫秒)后再次执行。
  3. initialDelay:初始延迟调度

    • 参数:@Scheduled(initialDelay = 5000)
    • 表示首次任务执行前的延迟时间(单位:毫秒)。
  4. cron:Cron表达式调度

    • 参数:@Scheduled(cron = "0 0 8 * * ?")
    • 使用Cron表达式来定义任务的执行时间规则,可以精确到秒级别,提供更灵活的任务调度方式。
4.2.2实现接口方式:
  • 实现SchedulingConfigurer接口,并重写configureTasks方法。示例代码:
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.SchedulingConfigurer;
    import org.springframework.scheduling.config.ScheduledTaskRegistrar;
    
    @Configuration
    public class MyTaskConfig implements SchedulingConfigurer {
    
        @Override
        public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
            taskRegistrar.addFixedRateTask(() -> {
                // 执行任务逻辑
            }, 5000); // 每隔5秒执行一次
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
  • 在Spring Boot的主类上添加@EnableScheduling注解,启用Spring的任务调度功能。
4.2.3 XML配置方式:
  • application.propertiesapplication.yml文件中配置定时任务的调度策略。示例代码(使用application.properties配置文件):
    # 定时任务配置
    spring.task.scheduling.pool.size=5
    spring.task.scheduling.thread-name-prefix=my-task-
    
    • 1
    • 2
    • 3
  • 创建定时任务的Bean,并使用@Scheduled注解标记方法。示例代码:
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MyTask {
    
        @Scheduled(fixedRate = 5000) // 每隔5秒执行一次
        public void doTask() {
            // 执行任务逻辑
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
  • 在Spring Boot的主类上添加@EnableScheduling注解,启用Spring的任务调度功能。

以上是Spring Boot中Spring Task的几种常见使用方式和相应的示例。根据实际需求,可以选择合适的方式来编写和管理定时任务。

4.3 实现原理

@EnableScheduling注解
该注解的代码源码很简单,其核心在于引入了一个SchedulingConfiguration.class配置类

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({SchedulingConfiguration.class}) //导入配置类
@Documented
public @interface EnableScheduling {
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

SchedulingConfiguration.class配置类的源码也很简单,只是创建了一个ScheduledAnnotationBeanPostProcessor实例。从名字上看,该实例一种BeanPostProcessor(后置处理器)。

后置处理器作用是在Bean对象在实例化和依赖注入完毕后,在显示调用初始化方法的前后添加我们自己的逻辑。注意是Bean实例化完毕后及依赖注入完成后触发的。

@Configuration
@Role(2)
public class SchedulingConfiguration {
    public SchedulingConfiguration() {
    }

    @Bean(
        name = {"org.springframework.context.annotation.internalScheduledAnnotationProcessor"}
    )
    
     //创建一个ScheduledAnnotationBeanPostProcessor实例
    @Role(2)
    public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
        return new ScheduledAnnotationBeanPostProcessor();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

综上,@EnableScheduling注解是通过创建一个ScheduledAnnotationBeanPostProcessor实例实现开启定时任务的。

ScheduledAnnotationBeanPostProcessor类
查看源码可知,该类实现一堆接口,此处不再逐个介绍这些接口,主要分析该类实现开启定时任务的实现逻辑。

public class ScheduledAnnotationBeanPostProcessor implements ScheduledTaskHolder, MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor, Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware, SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {
    ....
}
  • 1
  • 2
  • 3

postProcessAfterInitialization方法
该方法位于ScheduledAnnotationBeanPostProcessor类中,主要作用是找出全部被@Scheduled注解标记的方法,并调用processScheduled方法进行下一步处理。

public Object postProcessAfterInitialization(Object bean, String beanName) {
        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
    	// 1 找出全部被`@Scheduled`注解标记的方法
        if (!this.nonAnnotatedClasses.contains(targetClass)) {
            Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (method) -> {
                Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class);
                return !scheduledMethods.isEmpty() ? scheduledMethods : null;
            });
            // 2 如果类中没有使用 @Scheduled注解,则将其加入不再访问名单
            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(targetClass);
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("No @Scheduled annotations found on bean class: " + bean.getClass());
                }
            } else {
                //3 该类中有方法被@Scheduled注解标注,则使用processScheduled处理该方法
                annotatedMethods.forEach((method, scheduledMethods) -> {
                    scheduledMethods.forEach((scheduled) -> {
                        this.processScheduled(scheduled, method, bean);
                    });
                });
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods);
                }
            }
        }
    
        return bean;
    }

// ..
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

processScheduled方法
该方法位于ScheduledAnnotationBeanPostProcessor类中,该类主要作用是

1)检测被@Scheduled注解是否有参数
2)被@Scheduled注解标注的方法是否是无参且无返回值
3)使用ScheduledTaskRegistrar注册定时任务,后加入任务列表

public class ScheduledAnnotationBeanPostProcessor implements xxx{
    
 //ScheduledTaskRegistrar这个类为Spring容器定时任务注册中心,并使用taskScheduler执行定时任务
 //默认情况下,使用单线程执行,可配置为多线程。
 private final ScheduledTaskRegistrar registrar = new ScheduledTaskRegistrar();
    
 protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
        try {
            //1 被@Scheduled注解标记的方法必须无参
            Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
            Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
            Runnable runnable = new ScheduledMethodRunnable(bean, invocableMethod);
            boolean processedSchedule = false;
            //2 被@Scheduled注解标记的方法使用 cron、fixedDelay、fixedRate三者之一
            String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
            

            // 3 定义一个set  存储待调度的任务
            Set<ScheduledTask> tasks = new LinkedHashSet(4);
            long initialDelay = scheduled.initialDelay();
            String initialDelayString = scheduled.initialDelayString();
           
    		//4.1  将cron类型的任务 放入set
            String cron = scheduled.cron();
            if (StringUtils.hasText(cron)) {
                    tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
            }
    		
    		//4.2 将fixedDelay类型的任务 放入set
            long fixedDelay = scheduled.fixedDelay();
            if (fixedDelay >= 0L) {
                tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
            }
    		
            //4.3 将fixedDelayString类型的任务 放入set
            String fixedDelayString = scheduled.fixedDelayString();
            if (StringUtils.hasText(fixedDelayString)) {
                    tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
            }
    
            //4.4 将fixedRate类型的任务 放入set
            long fixedRate = scheduled.fixedRate();
            if (fixedRate >= 0L) {
                tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
            }
    		
            //4.5 将fixedRateString类型的任务 放入set
            String fixedRateString = scheduled.fixedRateString();
            if (StringUtils.hasText(fixedRateString)) { 
                    tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); 
            }
    		
            //5 最后将 task 注册到 scheduledTasks 中
            ((Set)registeredTasks).addAll(tasks);
            }
    }

// ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

ScheduledTaskRegistrar类
该类实现了三个接口,其中

InitializingBean接口为bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候会执行该方法。
DisposableBean接口和InitializingBean接口一样,为bean提供了释放资源方法的方式,它只包括destroy方法,凡是继承该接口的类,在bean被销毁之前都会执行该方法。
ScheduledTaskHolder接口定义返回当前实例的任务列表。

public class ScheduledTaskRegistrar implements ScheduledTaskHolder, InitializingBean, DisposableBean {

	/**
	 *InitializingBean接口定义的方法
	 * Calls {@link #scheduleTasks()} at bean construction time.
	 */
	@Override
	public void afterPropertiesSet() {
		scheduleTasks();
	}
	
	/**
	 * 执行定时任务
	 * Schedule all registered tasks against the underlying
	 * {@linkplain #setTaskScheduler(TaskScheduler) task scheduler}.
	 */
	@SuppressWarnings("deprecation")
	protected void scheduleTasks() {
		if (this.taskScheduler == null) {
	        //默认使用单线程调度器
			this.localExecutor = Executors.newSingleThreadScheduledExecutor();
	        //真正调度的为 ConcurrentTaskScheduler实例
			this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
		}
	    
	    //添加四种执行调度 此处只留一个 其他省略
		if (this.triggerTasks != null) {
			for (TriggerTask task : this.triggerTasks) {
				addScheduledTask(scheduleTriggerTask(task));
			}
		}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

4.4 Spring Task优缺点

优点缺点
简单易用:Spring Task提供了简单易用的注解方式来定义定时任务,无需引入额外的依赖和配置。功能相对有限:相较于其他定时任务框架,Spring Task的功能相对较为简单,对于复杂的任务调度需求可能不够灵活。
集成方便:Spring Task是基于Spring Framework的一部分,无需额外的集成步骤,可直接在Spring Boot项目中使用。单机环境:Spring Task适用于单机环境下的任务调度,对于分布式环境的任务调度需求,需要额外的处理。
线程池支持:Spring Task支持使用线程池来执行定时任务,可以控制任务的并发性和线程池参数。缺少监控和管理:相较于专门的任务调度框架,Spring Task缺少一些监控和管理功能,如任务执行状态的监控和任务的动态管理。
集成Spring生态系统:Spring Task与其他Spring组件无缝集成,可以方便地使用依赖注入、AOP等特性。高精度调度:Spring Task的调度精度可能受到底层操作系统的影响,无法做到非常精确的调度。

总的来说,Spring Task是一种简单、方便的定时任务实现方式,适用于简单的任务调度需求,并且与Spring生态系统集成良好。但对于复杂的任务调度需求、分布式环境下的任务调度或需要更高精度的调度等场景,可能需要考虑其他更专业的任务调度框架。

5 时间轮

简介

时间轮简单来说就是一个环形的队列(底层一般基于数组实现),队列中的每一个元素(时间格)都可以存放一个定时任务列表。

时间轮中的每个时间格代表了时间轮的基本时间跨度或者说时间精度,假如时间一秒走一个时间格的话,那么这个时间轮的最高精度就是 1 秒(也就是说 3 s 和 3.9s 会在同一个时间格中)。

下图是一个有 12 个时间格的时间轮,转完一圈需要 12 s。当我们需要新建一个 3s 后执行的定时任务,只需要将定时任务放在下标为 3 的时间格中即可。当我们需要新建一个 9s 后执行的定时任务,只需要将定时任务放在下标为 9 的时间格中即可。
在这里插入图片描述

简单实现

 private static final int TIME_SLOT_COUNT = 12;  // 时间轮的时间格数量
    private static final long TIME_SLOT_DURATION = 1000;  // 每个时间格的时间跨度,单位为毫秒
    private static final long MAX_DELAY = TIME_SLOT_COUNT * TIME_SLOT_DURATION;  // 最大延迟时间

    private ScheduledExecutorService executorService;
    private List<List<Runnable>> timeSlots;  // 时间格中的任务列表
    private int currentIndex;  // 当前时间格的索引

    public TimeWheelDemo() {
        executorService = Executors.newSingleThreadScheduledExecutor();
        timeSlots = new ArrayList<>(TIME_SLOT_COUNT);
        for (int i = 0; i < TIME_SLOT_COUNT; i++) {
            timeSlots.add(new ArrayList<>());
        }
        currentIndex = 0;
    }

    public void start() {
        executorService.scheduleAtFixedRate(this::tick, TIME_SLOT_DURATION, TIME_SLOT_DURATION, TimeUnit.MILLISECONDS);
    }

    public void schedule(Runnable task, long delay) {
        if (delay >= MAX_DELAY) {
            // 超过最大延迟时间,直接执行任务
            executorService.execute(task);
        } else {
            int slotIndex = (currentIndex + (int) (delay / TIME_SLOT_DURATION)) % TIME_SLOT_COUNT;
            timeSlots.get(slotIndex).add(task);
        }
    }

    private void tick() {
        List<Runnable> currentSlot = timeSlots.get(currentIndex);
        for (Runnable task : currentSlot) {
            executorService.execute(task);
        }
        currentSlot.clear();
        currentIndex = (currentIndex + 1) % TIME_SLOT_COUNT;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

时间轮算法优缺点

优点缺点
高效性:时间轮算法利用环形队列和索引计算,执行定时任务的效率较高。单一节点:时间轮算法适用于单体应用,无法直接扩展到分布式环境中。
简单实现:相对于其他复杂的调度算法,时间轮算法的实现相对简单,易于理解和维护。精度受限:时间轮算法的调度精度受到时间格的大小限制,无法做到非常精确的调度。
高并发支持:时间轮算法利用多线程执行定时任务,可以支持高并发的任务调度需求。时间格数量限制:时间轮算法的性能和精度与时间格的数量相关,需要根据实际情况合理调整。
可靠性:时间轮算法基于环形队列,循环执行任务,不会因为系统时间的变化或任务耗时导致任务调度出错。对任务依赖:时间轮算法适用于独立的定时任务,对于有任务间依赖关系或复杂调度逻辑的场景较为有限。

总的来说,时间轮算法在单体应用中实现定时任务具有高效性、简单实现和高并发支持的优点。然而,它也有一些局限性,如精度受限、单一节点的限制以及对任务依赖较为有限。在选择使用时间轮算法之前,需要根据具体需求和场景综合考虑其优缺点。

6 单体中定时任务对比

下表对比了Java中Timer、ScheduledExecutorService、Spring Task和时间轮算法四种实现定时任务的特点、优缺点以及适用场景的技术选型考虑:

特点TimerScheduledExecutorServiceSpring Task时间轮算法
实现方式单线程实现线程池实现基于线程池的任务调度环形队列存储任务
并发支持
功能丰富度有限丰富适中适中
可靠性低(单线程)
调度精度低(受系统时间影响)
分布式支持
复杂性简单简单
适用场景单线程、简单任务并发任务、复杂调度需求单体应用、简单调度需求单体应用、高并发调度需求

根据不同的需求和场景,可以根据以下考虑进行技术选型:

  • Timer:适用于简单的单线程任务调度,无并发要求,且任务调度需求相对简单的场景。
  • ScheduledExecutorService:适用于并发任务调度和复杂的调度需求,具有较高的可靠性和调度精度。
  • Spring Task:适用于单体应用中的简单调度需求,与Spring生态系统集成良好,易于使用和管理。
  • 时间轮算法:适用于单体应用中的高并发调度需求,具有高并发支持和较高的可靠性,适用于时间格精度要求不高的场景。

需要根据具体的业务需求、并发性要求、调度复杂性和精度要求等因素进行综合考虑,选择最合适的技术来实现定时任务。

分布式定时任务

Quartz

简介

Quartz是一个功能强大的开源任务调度框架,用于在Java应用程序中实现定时任务的调度和执行。它提供了灵活的调度配置和管理机制,可以满足各种定时任务的需求。Quartz支持集群部署,可以在分布式环境下实现高可用性和负载均衡。

技术发展历史

Quartz最早由Terracotta团队开发,后来成为开源项目,现在由Quartz项目组维护。Quartz的第一个版本发布于2001年,经过多年的发展和迭代,已经成为业界广泛使用的定时任务调度框架之一。Quartz在实践中得到了广泛应用,被众多企业和开发者所采用。

核心模块介绍

Quartz包含以下核心模块:

  1. Scheduler:调度器是Quartz的核心组件,负责加载和执行任务。它可以配置调度规则、触发器和任务,并提供了丰富的调度操作和管理方法。

  2. Job和JobDetail:Job是需要定时执行的任务的抽象,通过实现Job接口定义具体的任务逻辑。JobDetail则包含了Job的相关信息,如任务名称、分组等。

  3. Trigger:触发器定义了任务的调度规则,包括触发时间、重复次数、间隔时间等。Quartz提供了多种类型的触发器,如SimpleTrigger、CronTrigger等,以满足不同的调度需求。

  4. JobStore:JobStore负责任务的持久化和存储,可以将任务信息存储在内存、数据库或其他存储介质中。Quartz提供了多种实现,如RAMJobStore、JDBCJobStore等。

  5. Listener:Quartz提供了监听器机制,可以通过监听器在任务调度的不同阶段进行自定义操作。例如,可以监听任务的执行前后、调度器的启动和关闭等事件。

简单实例

以下是一个简单的Quartz示例,演示了如何配置和使用Quartz进行定时任务调度:

import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;

public class QuartzDemo {

    public static void main(String[] args) throws SchedulerException {
        SchedulerFactory schedulerFactory = new StdSchedulerFactory();
        Scheduler scheduler = schedulerFactory.getScheduler();

        // 创建JobDetail
        JobDetail jobDetail = JobBuilder.newJob(MyJob.class)
                .withIdentity("myJob", "group1")
                .build();

        // 创建Trigger
        Trigger trigger = TriggerBuilder.newTrigger()
                .withIdentity("myTrigger", "group1")
                .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(5))
                .build();

        // 将JobDetail和Trigger注册到调度器
        scheduler.scheduleJob(jobDetail

, trigger);

        // 启动调度器
        scheduler.start();
    }

    public static class MyJob implements Job {
        @Override
        public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
            System.out.println("Executing my job...");
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

在上面的示例中,我们创建了一个简单的Job,并配置了一个每5秒执行一次的Trigger。然后将JobDetail和Trigger注册到调度器,最后启动调度器,即可实现定时任务的调度和执行。

持久化实现

Quartz的集群模式指的是一个集群下多个节点管理同一批任务的调度**,通过共享数据库的方式实现,保证同一个任务到达触发时间的时候,只有一台机器去执行该任务。**每个节点部署一个单独的quartz实例,相互之间没有直接数据通信。

创建相应的表
Table NameDescription
QRTZ_CALENDARS存储Quartz的Calendar信息
QRTZ_CRON_TRIGGERS存储CronTrigger,包括Cron表达式和时区信息
QRTZ_FIRED_TRIGGERS存储与已触发的Trigger相关的状态信息,以及相联Job的执行信息
QRTZ_PAUSED_TRIGGER_GRPS存储已暂停的Trigger组的信息
QRTZ_SCHEDULER_STATE存储少量的有关Scheduler的状态信息,和别的Scheduler实例
QRTZ_LOCKS存储程序的悲观锁的信息
QRTZ_JOB_DETAILS存储每一个已配置的Job的详细信息
QRTZ_JOB_LISTENERS存储有关已配置的JobListener的信息
QRTZ_SIMPLE_TRIGGERS存储简单的Trigger,包括重复次数、间隔、以及已触的次数
QRTZ_BLOG_TRIGGERSTrigger作为Blob类型存储
QRTZ_TRIGGER_LISTENERS存储已配置的TriggerListener的信息
QRTZ_TRIGGERS存储已配置的Trigger的信息
配置相关的quartz
org.quartz.scheduler.instanceName=SsmScheduler
org.quartz.scheduler.instanceId=AUTO
org.quartz.scheduler.wrapJobExecutionInUserTransaction=false
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount=10
org.quartz.threadPool.threadPriority=5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true

org.quartz.jobStore.useProperties=true
org.quartz.jobStore.tablePrefix=QRTZ_
org.quartz.jobStore.misfireThreshold=60000
org.quartz.jobStore.isClustered=true
org.quartz.jobStore.clusterCheckinInterval=2000
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate

org.quartz.jobStore.dataSource=qzDS
org.quartz.dataSource.qzDS.connectionProvider.class=cn.wdy.schedulequartz.config.DruidConnectionProvider
org.quartz.dataSource.qzDS.driver=com.mysql.cj.jdbc.Driver
org.quartz.dataSource.qzDS.URL=jdbc:mysql://127.0.0.1:3306/quartz-database?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true
org.quartz.dataSource.qzDS.user=quartz-database
org.quartz.dataSource.qzDS.password=quartz-database
org.quartz.dataSource.qzDS.maxConnection=5
org.quartz.dataSource.qzDS.validationQuery=select 0 from dual
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

实例现象

同时运行两个项目,有着相同的定时任务,如果一个项目中断,则另外一个项目中定时任务会继续执行
在这里插入图片描述
在这里插入图片描述

Quartz实现分布式的原理简述:

  1. JobStore持久化机制:Quartz通过JobStore来持久化任务和触发器的信息。在分布式环境中,可以选择使用数据库作为JobStore,所有节点共享同一个数据库实例。这样,不同节点之间可以通过数据库实现任务信息的同步和共享。
  2. 集群模式:Quartz提供了集群模式来支持分布式环境。在集群模式下,多个节点可以共同组成一个Quartz集群,每个节点都能独立地触发和执行任务。节点之间通过共享数据库中的任务信息来实现数据的同步和共享。集群模式下的节点可以动态加入或离开,实现弹性伸缩。
  3. 锁机制:在分布式环境下,需要使用锁机制来保证任务和触发器的操作的原子性和一致性。Quartz使用锁来控制对任务和触发器的访问,确保同一时间只有一个节点能够操作某个任务或触发器。常用的锁机制包括数据库行级锁或分布式锁。
  4. 心跳机制:为了检测节点的存活状态和负载情况,Quartz提供了心跳机制。每个节点会周期性地发送心跳信号,告知其他节点它的存活状态和负载情况。其他节点可以根据心跳信息来调整任务的分配策略,实现负载均衡和高可用性。

Elastic-job

简介

分布式任务调度是在分布式系统中对任务进行调度和执行的一种技术。它能够解决任务调度的并发性、高可用性和扩展性等问题。ElasticJob是一个开源的分布式任务调度框架,为分布式任务调度提供了便捷的解决方案。本文将介绍ElasticJob的技术发展历史、核心模块、简单的示例和分布式实现的原理。

技术发展历史

ElasticJob最初由当当网开发并开源,目前已成为Apache软件基金会的顶级项目。自推出以来,ElasticJob得到了广泛的应用和发展,并且不断增加新功能和改进。它在分布式任务调度领域取得了良好的声誉,并成为了许多企业的首选框架之一。

核心模块介绍

ElasticJob由以下几个核心模块组成:

  1. Job:任务模块,定义具体的任务逻辑。开发者需要实现Job接口并编写任务执行逻辑。

  2. LiteJobConfiguration:任务配置模块,用于配置任务的相关参数,如任务名称、调度时间表达式、任务分片等。

  3. JobScheduler:任务调度模块,负责将任务分发给执行器,并进行任务调度和管理。

  4. JobExecutor:任务执行器模块,负责执行具体的任务逻辑。

  5. JobRegistry:任务注册中心模块,用于注册和发现可用的任务执行器。

简单示例

以下是一个简单的ElasticJob示例,用于演示一个基于Java的分布式定时任务的实现:

public class MyJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        // 执行具体的任务逻辑
        System.out.println("执行任务:" + shardingContext.getJobName());
    }
}

public class ElasticJobDemo {
    public static void main(String[] args) {
        // 创建任务配置
        LiteJobConfiguration jobConfiguration = LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
                JobCoreConfiguration.newBuilder("myJob", "0/5 * * * * ?", 2).build(),
                MyJob.class.getCanonicalName()
        )).build();

        // 创建任务调度器
        JobScheduler jobScheduler = new JobScheduler(
                new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "elastic-job-demo")),
                jobConfiguration
        );

        // 启动任务调度器
        jobScheduler.init();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

以上示例定义了一个名为MyJob的任务,它实现了SimpleJob接口,并在execute方法中定义了具体的任务逻辑。在ElasticJobDemo类中,我们创建了任务配置和任务调度器,并启动了任务调度器。该示

例将每5秒执行一次任务。

实例现象

现有一个批量处理的业务,数据量较大,现将计算任务分发给节点进行执行

    String taskId = context.getTaskId();
        String shardingParameter = context.getShardingParameter();
        log.info("端口:{},定时任务开始:taskId:{},shardingParameter:{}", port, taskId, shardingParameter);
        log.info("~~~~Thread ID: {}, " +
                        "作业分片总数: {}, " +
                        "当前分片项:{}." +
                        "当前参数: {}," +
                        "作业名称:{}" +
                        "作业自定义参数:{}",
                Thread.currentThread().getId(),
                context.getShardingTotalCount(),
                context.getShardingItem(),
                context.getShardingParameter(),
                context.getJobName(),
                context.getJobParameter()
        );
        // 根据传参的不同,来进行分批处理相应业务
        List<Person> people = personService.queryPersonBySex(Integer.valueOf(shardingParameter));
        if (!CollectionUtils.isEmpty(people)) {
            for (Person person : people) {
                personService.update(person);
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

在这里插入图片描述

在这里插入图片描述

分布式实现原理

ElasticJob的分布式实现基于分片(Sharding)的概念。任务被分成多个片段,并由不同的任务执行器执行。具体实现的原理如下:

  1. 任务分片:根据任务配置中的分片参数,将任务划分成多个片段。每个片段独立执行,避免单一节点负载过重。

  2. 任务分发:任务调度器通过任务注册中心获取可用的任务执行器,并将任务片段分发给它们。任务调度器负责任务分发和调度策略的管理。

  3. 任务执行:每个任务执行器独立执行分配给它的任务片段。它们通过心跳机制向任务调度器汇报任务执行情况,并接收新的任务分配。

  4. 分布式协调:任务调度器和任务执行器之间通过分布式协调机制实现任务的分发和同步。常见的分布式协调工具有Zookeeper和Redis等。

通过以上机制,ElasticJob实现了分布式任务的调度和执行。它能够根据任务配置灵活地进行任务分片和分发,并提供高可用性和负载均衡的分布式任务调度方案。

xxl-job

简介

XXL-Job:是大众点评的分布式任务调度平台,是一个轻量级分布式任务调度平台, 其核心设计目标是开发迅速、学习简单、轻量级、易扩展

大众点评目前已接入XXL-JOB,该系统在内部已调度约100万次,表现优异。

目前已有多家公司接入xxl-job,包括比较知名的大众点评,京东,优信二手车,360金融 (360),联想集团 (联想),易信 (网易)等等

技术发展历史

XXL-Job最初由小马哥(马士兵)开发并开源,目前由XXL-Media维护。自推出以来,XXL-Job经过多个版本的迭代和改进,成为了国内较为知名的分布式任务调度平台。它被广泛应用于各行各业的分布式系统中,具有良好的稳定性和性能。

核心模块介绍

XXL-Job包含以下核心模块:

  1. 调度中心:负责任务调度的管理和分发。它提供了Web界面供用户进行任务配置、管理和监控。

  2. 执行器:负责接收调度中心分发的任务,并执行任务的具体逻辑。执行器可以独立部署在多个节点上,实现任务的分布式执行。

  3. 日志存储:用于存储任务执行的日志信息,方便用户查看任务执行情况和排查问题。

  4. 调度器:负责按照任务配置的调度策略,将任务分发给可用的执行器节点。

  5. 分布式任务锁:用于解决分布式环境下的任务并发问题,保证同一时间只有一个执行器节点执行任务。

简单入门介绍

1.1 下载源码

源码下载地址:

https://github.com/xuxueli/xxl-job

https://gitee.com/xuxueli0323/xxl-job

2.1 初始化调度数据库

请下载项目源码并解压,获取 “调度数据库初始化SQL脚本” 并执行即可。

“调度数据库初始化SQL脚本” 位置为:

/xxl-job/doc/db/tables_xxl_job.sql
  • 1
2.2 编译源码

解压源码,按照maven格式将源码导入IDE, 使用maven进行编译即可,源码结构如下:
在这里插入图片描述

2.3 配置部署调度中心
2.3.1 调度中心配置

修改xxl-job-admin项目的配置文件application.properties,把数据库账号密码配置上

### web
server.port=8080
server.servlet.context-path=/xxl-job-admin

### actuator
management.server.servlet.context-path=/actuator
management.health.mail.enabled=false

### resources
spring.mvc.servlet.load-on-startup=0
spring.mvc.static-path-pattern=/static/**
spring.resources.static-locations=classpath:/static/

### freemarker
spring.freemarker.templateLoaderPath=classpath:/templates/
spring.freemarker.suffix=.ftl
spring.freemarker.charset=UTF-8
spring.freemarker.request-context-attribute=request
spring.freemarker.settings.number_format=0.##########

### mybatis
mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml
#mybatis.type-aliases-package=com.xxl.job.admin.core.model

### xxl-job, datasource
spring.datasource.url=jdbc:mysql://192.168.202.200:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=WolfCode_2017
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

### datasource-pool
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.minimum-idle=10
spring.datasource.hikari.maximum-pool-size=30
spring.datasource.hikari.auto-commit=true
spring.datasource.hikari.idle-timeout=30000
spring.datasource.hikari.pool-name=HikariCP
spring.datasource.hikari.max-lifetime=900000
spring.datasource.hikari.connection-timeout=10000
spring.datasource.hikari.connection-test-query=SELECT 1
spring.datasource.hikari.validation-timeout=1000

### xxl-job, email
spring.mail.host=smtp.qq.com
spring.mail.port=25
spring.mail.username=xxx@qq.com
spring.mail.from=xxx@qq.com
spring.mail.password=xxx
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory

### xxl-job, access token
xxl.job.accessToken=default_token

### xxl-job, i18n (default is zh_CN, and you can choose "zh_CN", "zh_TC" and "en")
xxl.job.i18n=zh_CN

## xxl-job, triggerpool max size
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100

### xxl-job, log retention days
xxl.job.logretentiondays=30
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
2.3.2 部署项目

运行XxlJobAdminApplication程序即可.

调度中心访问地址: http://localhost:8080/xxl-job-admin

默认登录账号 “admin/123456”, 登录后运行界面如下图所示。
在这里插入图片描述

至此“调度中心”项目已经部署成功。

2.4 配置部署执行器项目
2.4.1 添加Maven依赖

创建SpringBoot项目并且添加如下依赖:

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.3.1</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
2.4.2 执行器配置

在配置文件中添加如下配置:

### 调度中心部署根地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### 执行器通讯TOKEN [选填]:非空时启用;
xxl.job.accessToken=default_token
### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
xxl.job.executor.appname=xxl-job-executor-sample
### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
xxl.job.executor.address=
### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
xxl.job.executor.ip=127.0.0.1
### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
xxl.job.executor.port=9999
### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
xxl.job.executor.logretentiondays=30
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
2.4.3 添加执行器配置

创建XxlJobConfig配置对象:

@Configuration
public class XxlJobConfig {
    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;
    @Value("${xxl.job.accessToken}")
    private String accessToken;
    @Value("${xxl.job.executor.appname}")
    private String appname;
    @Value("${xxl.job.executor.address}")
    private String address;
    @Value("${xxl.job.executor.ip}")
    private String ip;
    @Value("${xxl.job.executor.port}")
    private int port;
    @Value("${xxl.job.executor.logpath}")
    private String logPath;
    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        return xxlJobSpringExecutor;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
2.4.4 添加任务处理类

添加任务处理类,交给Spring容器管理,在处理方法上贴上@XxlJob注解

@Component
public class SimpleXxlJob {
    @XxlJob("demoJobHandler")
    public void demoJobHandler() throws Exception {
        System.out.println("执行定时任务,执行时间:"+new Date());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
2.5 运行HelloWorld程序
2.5.1 任务配置&触发执行

登录调度中心,在任务管理中新增任务,配置内容如下:
在这里插入图片描述

新增后界面如下:
在这里插入图片描述

接着启动定时调度任务
在这里插入图片描述

2.5.2 查看日志

在调度中心的调度日志中就可以看到,任务的执行结果.
在这里插入图片描述

管控台也可以看到任务的执行信息.
在这里插入图片描述

2.5.3 GLUE模式(Java)

任务以源码方式维护在调度中心,支持通过Web IDE在线更新,实时编译和生效,因此不需要指定JobHandler。

( “GLUE模式(Java)” 运行模式的任务实际上是一段继承自IJobHandler的Java类代码,它在执行器项目中运行,可使用@Resource/@Autowire注入执行器里中的其他服务.

添加Service

@Service
public class HelloService {
    public void methodA(){
        System.out.println("执行MethodA的方法");
    }
    public void methodB(){
        System.out.println("执行MethodB的方法");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

添加任务配置
在这里插入图片描述

通过GLUE IDE在线编辑代码
在这里插入图片描述


编写内容如下:

package com.xxl.job.service.handler;

import cn.wolfcode.xxljobdemo.service.HelloService;
import com.xxl.job.core.handler.IJobHandler;
import org.springframework.beans.factory.annotation.Autowired;

public class DemoGlueJobHandler extends IJobHandler {
    @Autowired
    private HelloService helloService;
    @Override
    public void execute() throws Exception {
        helloService.methodA();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

启动并执行程序

2.6 执行器集群
2.6.1 集群环境搭建

在IDEA中设置SpringBoot项目运行开启多个集群
在这里插入图片描述

启动两个SpringBoot程序,需要修改Tomcat端口和执行器端口

  • Tomcat端口8081程序的命令行参数如下:

    -Dserver.port=8081 -Dxxl.job.executor.port=8991
    
    • 1
  • Tomcat端口8082程序的命令行参数如下:

    -Dserver.port=8082 -Dxxl.job.executor.port=8992
    
    • 1

在任务管理中,修改路由策略,修改成轮询
在这里插入图片描述

重新启动,我们可以看到效果是,定时任务会在这两台机器中进行轮询的执行

  • 8081端口的控制台日志如下:

在这里插入图片描述

  • 8082端口的控制台日志如下:
    在这里插入图片描述
2.6.2 调度路由算法讲解

当执行器集群部署时,提供丰富的路由策略,包括:

  1. FIRST(第一个):固定选择第一个机器

  2. LAST(最后一个):固定选择最后一个机器;

  3. ROUND(轮询):依次的选择在线的机器发起调度

  4. RANDOM(随机):随机选择在线的机器;

  5. CONSISTENT_HASH(一致性HASH):

    每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。

  6. LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;

  7. LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;

  8. FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;

  9. BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;

  10. SHARDING_BROADCAST(分片广播):

    广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;

3. 分片功能讲解
3.1 案例需求讲解

需求:我们现在实现这样的需求,在指定节假日,需要给平台的所有用户去发送祝福的短信.

3.1.1 初始化数据

在数据库中导入xxl_job_demo.sql数据

3.1.2 集成Druid&MyBatis

添加依赖

<!--MyBatis驱动-->
<dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>1.2.0</version>
</dependency>
<!--mysql驱动-->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
</dependency>
<!--lombok依赖-->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <version>1.1.10</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

添加配置

spring.datasource.url=jdbc:mysql://localhost:3306/xxl_job_demo?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=UTF-8
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.username=root
spring.datasource.password=WolfCode_2017
  • 1
  • 2
  • 3
  • 4
  • 5

添加实体类

@Setter@Getter
public class UserMobilePlan {
    private Long id;//主键
    private String username;//用户名
    private String nickname;//昵称
    private String phone;//手机号码
    private String info;//备注
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

添加Mapper处理类

@Mapper
public interface UserMobilePlanMapper {
    @Select("select * from t_user_mobile_plan")
    List<UserMobilePlan> selectAll();
}
  • 1
  • 2
  • 3
  • 4
  • 5
3.1.3 业务功能实现

任务处理方法实现

@XxlJob("sendMsgHandler")
public void sendMsgHandler() throws Exception{
    List<UserMobilePlan> userMobilePlans = userMobilePlanMapper.selectAll();
    System.out.println("任务开始时间:"+new Date()+",处理任务数量:"+userMobilePlans.size());
    Long startTime = System.currentTimeMillis();
    userMobilePlans.forEach(item->{
        try {
            //模拟发送短信动作
            TimeUnit.MILLISECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    System.out.println("任务结束时间:"+new Date());
    System.out.println("任务耗时:"+(System.currentTimeMillis()-startTime)+"毫秒");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

任务配置信息
在这里插入图片描述

3.2 分片概念讲解

比如我们的案例中有2000+条数据,如果不采取分片形式的话,任务只会在一台机器上执行,这样的话需要20+秒才能执行完任务.

如果采取分片广播的形式的话,一次任务调度将会广播触发对应集群中所有执行器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;

获取分片参数方式:

// 可参考Sample示例执行器中的示例任务"ShardingJobHandler"了解试用 
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
  • 1
  • 2
  • 3

通过这两个参数,我们可以通过求模取余的方式,分别查询,分别执行,这样的话就可以提高处理的速度.

之前2000+条数据只在一台机器上执行需要20+秒才能完成任务,分片后,有两台机器可以共同完成2000+条数据,每台机器处理1000+条数据,这样的话只需要10+秒就能完成任务

3.3 案例改造成任务分片

Mapper增加查询方法

@Mapper
public interface UserMobilePlanMapper {
    @Select("select * from t_user_mobile_plan where mod(id,#{shardingTotal})=#{shardingIndex}")
    List<UserMobilePlan> selectByMod(@Param("shardingIndex") Integer shardingIndex,@Param("shardingTotal")Integer shardingTotal);
    @Select("select * from t_user_mobile_plan")
    List<UserMobilePlan> selectAll();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

任务类方法

@XxlJob("sendMsgShardingHandler")
public void sendMsgShardingHandler() throws Exception{
    System.out.println("任务开始时间:"+new Date());
    int shardTotal = XxlJobHelper.getShardTotal();
    int shardIndex = XxlJobHelper.getShardIndex();
    List<UserMobilePlan> userMobilePlans = null;
    if(shardTotal==1){
        //如果没有分片就直接查询所有数据
        userMobilePlans = userMobilePlanMapper.selectAll();
    }else{
        userMobilePlans = userMobilePlanMapper.selectByMod(shardIndex,shardTotal);
    }
    System.out.println("处理任务数量:"+userMobilePlans.size());
    Long startTime = System.currentTimeMillis();
    userMobilePlans.forEach(item->{
        try {
            TimeUnit.MILLISECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    System.out.println("任务结束时间:"+new Date());
    System.out.println("任务耗时:"+(System.currentTimeMillis()-startTime)+"毫秒");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

任务设置
在这里插入图片描述

power-job

简介

Power-Job是一款开源的分布式任务调度平台,旨在解决分布式系统中任务调度的需求。它提供了可靠、高效的任务调度和执行功能,支持分布式部署和扩展。Power-Job具有简单易用、稳定可靠等特点,广泛应用于各种分布式系统中。

项目结构说明

├── LICENSE
├── powerjob-client // powerjob-client,普通Jar包,提供 OpenAPI
├── powerjob-common // 各组件的公共依赖,开发者无需感知
├── powerjob-remote // 内部通讯层框架,开发者无需感知
├── powerjob-server // powerjob-server,基于SpringBoot实现的调度服务器
├── powerjob-worker // powerjob-worker, 普通Jar包,接入powerjob-server的应用需要依赖该Jar包
├── powerjob-worker-agent // powerjob-agent,可执行Jar文件,可直接接入powerjob-server的代理应用
├── powerjob-worker-samples // 教程项目,包含了各种Java处理器的编写样例
├── powerjob-worker-spring-boot-starter // powerjob-worker 的 spring-boot-starter ,spring boot 应用可以通用引入该依赖一键接入 powerjob-server
├── powerjob-official-processors // 官方处理器,包含一系列常用的 Processor,依赖该 jar 包即可使用
├── others
└── pom.xml

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

基本概念

分组概念:
  • appName:应用名称,建议与用户实际接入 PowerJob 的应用名称保持一致,用于业务分组与隔离一个 appName 等于一个业务集群,也就是实际的一个 Java 项目
核心概念:
  • 任务(Job):描述了需要被 PowerJob 调度的任务信息,包括任务名称、调度时间、处理器信息等。

  • 任务实例( JobInstance,简称 Instance):任务(Job)被调度执行后会生成任务实例(Instance),任务实例记录了任务的运行时信息(任务与任务实例的关系类似于类与对象的关系)。

  • 作业(Task):任务实例的执行单元,一个 JobInstance 存在至少一个 Task,具体规则如下:

    • 单机任务(STANDALONE):一个 JobInstance 对应一个 Task
    • 广播任务(BROADCAST):一个 JobInstance 对应 N 个 Task,N为集群机器数量,即每一台机器都会生成一个 Task
    • Map/MapReduce任务:一个 JobInstance 对应若干个 Task,由开发者手动 map 产生
  • 工作流(Workflow):由 DAG(有向无环图)描述的一组任务(Job),用于任务编排。

  • 工作流实例(WorkflowInstance):工作流被调度执行后会生成工作流实例,记录了工作流的运行时信息。

扩展概念
  • JVM 容器:以 Maven 工程项目的维度组织一堆 Java 文件(开发者开发的众多 Java 处理器),可以通过前端网页动态发布并被执行器加载,具有极强的扩展能力和灵活性。
  • OpenAPI:允许开发者通过接口来完成手工的操作,让系统整体变得更加灵活。开发者可以基于 API 便捷地扩展 PowerJob 原有的功能。
  • 轻量级任务:单机执行且不需要以固定频率或者固定延迟执行的任务 (>= v4.2.1)
  • 重量级任务:非单机执行或者以固定频率/延迟执行的任务 (>= v4.2.1)
定时任务类型
  • API:该任务只会由 powerjob-client 中提供的 OpenAPI 接口触发,server 不会主动调度。
  • CRON:该任务的调度时间由 CRON 表达式指定。
  • 固定频率:秒级任务,每隔多少毫秒运行一次,功能与 java.util.concurrent.ScheduledExecutorService#scheduleAtFixedRate 相同。
  • 固定延迟:秒级任务,延迟多少毫秒运行一次,功能与 java.util.concurrent.ScheduledExecutorService#scheduleWithFixedDelay 相同。
  • 工作流:该任务只会由其所属的工作流调度执行,server 不会主动调度该任务。如果该任务不属于任何一个工作流,该任务就不会被调度。

备注:固定延迟和固定频率任务统称秒级任务,这两种任务无法被停止,只有任务被关闭或删除时才能真正停止任务

简单使用教程

下载项目

git clone https://github.com/PowerJob/PowerJob.git

在这里插入图片描述

数据库配置

创建数据库

CREATE DATABASE IF NOT EXISTS powerjob-daily DEFAULT CHARSET utf8mb4[外链图片转存失败,源站可
在这里插入图片描述

启动服务

启动此类:tech.powerjob.server.PowerJobServerApplication

在这里插入图片描述

注册应用

注册应用。应用名:icreate-demo;密码:123456
在这里插入图片描述

应用登录

在这里插入图片描述

编写示例代码

进入示例工程(powerjob-worker-samples),修改配置文件连接powerjob-server并编写自己的处理器代码。

1.修改配置文件

修改 powerjob-worker-samples 的 application.properties,将 powerjob.worker.app-name 改为刚刚在控制台注册的名称。
在这里插入图片描述

2. 编写自己的处理器

​ 随便找个地方新建类,继承你想要使用的处理器(各个处理器的介绍可见官方文档,文档非常详细)

在这里插入图片描述

3 启动程序

在这里插入图片描述

4 编写对应任务

在这里插入图片描述

5 启动任务
在这里插入图片描述

定时任务对比,与选型场景

项目QuartzElastic-JobXXL-JOBSchedulerX
定时调度CronCronCronCronFixed_DelayFixed_RateOne_TimeOpenAPI
任务编排不支持不支持不支持支持,可以通过图形化配置,并且任务间可传递数据。
分布式跑批不支持静态分片广播广播静态分片动态分片(MapReduce)
多语言JavaJava脚本任务Java脚本任务Java脚本任务HTTP任务K8s Job
可观测历史记录运行日志(不支持搜索)监控大盘历史记录运行日志(支持搜索)监控大盘操作记录查看堆栈链路追踪
可运维启用、禁用任务启用、禁用任务手动运行任务停止任务启用、禁用任务手动运行任务停止任务标记成功重刷历史数据
报警监控邮件邮件邮件webhook短信电话
性能每次调度通过DB抢锁,对DB压力大。ZooKeeper是性能瓶颈。由Master节点调度,Master节点压力大。可水平扩展,支持海量任务调度。
QuartZxxl-jobSchedulerX 2.0PowerJob
定时类型CRONCRONCRON、固定频率、固定延迟、OpenAPICRON、固定频率、固定延迟、OpenAPI
任务类型内置Java内置Java、GLUE Java、Shell、Python等脚本内置Java、外置Java(FatJar)、Shell、Python等脚本内置Java、外置Java(容器)、Shell、Python等脚本
分布式任务静态分片MapReduce 动态分片MapReduce 动态分片
在线任务治理不支持支持支持支持
日志白屏化不支持支持不支持支持
调度方式及性能基于数据库锁,有性能瓶颈基于数据库锁,有性能瓶颈不详无锁化设计,性能强劲无上限
报警监控邮件短信邮件,提供接口允许开发者扩展
系统依赖MySQL、Oracle…)MySQL人民币任意 Spring Data Jpa支持的关系型数据库(MySQL、Oracle…)
DAG 工作流不支持支持支持

------------------- | ------------------------------------------------- | ------------------------------------------------------------ |
| 定时类型 | CRON | CRON | CRON、固定频率、固定延迟、OpenAPI | CRON、固定频率、固定延迟、OpenAPI |
| 任务类型 | 内置Java | 内置Java、GLUE Java、Shell、Python等脚本 | 内置Java、外置Java(FatJar)、Shell、Python等脚本 | 内置Java、外置Java(容器)、Shell、Python等脚本 |
| 分布式任务 | 无 | 静态分片 | MapReduce 动态分片 | MapReduce 动态分片 |
| 在线任务治理 | 不支持 | 支持 | 支持 | 支持 |
| 日志白屏化 | 不支持 | 支持 | 不支持 | 支持 |
| 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | 无锁化设计,性能强劲无上限 |
| 报警监控 | 无 | 邮件 | 短信 | 邮件,提供接口允许开发者扩展 |
| 系统依赖 | MySQL、Oracle…) | MySQL | 人民币 | 任意 Spring Data Jpa支持的关系型数据库(MySQL、Oracle…) |
| DAG 工作流 | 持 | 不支持 | 支持 | 支持 |

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/95984
推荐阅读
相关标签
  

闽ICP备14008679号