当前位置:   article > 正文

SpringBoot结合Quartz实现定时任务_springboot定时任务quartz

springboot定时任务quartz

《从零打造项目》系列文章

工具

ORM框架选型

数据库变更管理

定时任务框架

缓存

  • 待更新

安全框架

  • 待更新

开发规范

  • 待更新

前言

需求

假设我们有这样两个需求:

1、用户注册1分钟后给用户发送欢迎通知。

2、每天8点钟给用户发送当天温度通知。

接下来我们就准备实现上述两个需求,关于通知发送就只是简单地控制台输出,没有真正实现该功能。

关于定时任务框架的选择,本文将选用 Quartz 来实现上述需求,下面简单介绍一下 Quartz。

Quartz介绍

Quartz 作为一个优秀的开源调度框架,Quartz 具有以下特点:

  1. 强大的调度功能,例如支持丰富多样的调度方法,可以满足各种常规及特殊需求;
  2. 灵活的应用方式,例如支持任务和调度的多种组合方式,支持调度数据的多种存储方式;
  3. 分布式和集群能力,Terracotta 收购后在原来功能基础上作了进一步提升。

另外,作为 Spring 默认的调度框架,Quartz 很容易与 Spring 集成实现灵活可配置的调度功能。

在 Quartz 体系结构中,有三个组件非常重要:

  • Scheduler :调度器。Scheduler启动Trigger去执行Job。
  • Trigger :触发器。用来定义 Job(任务)触发条件、触发时间,触发间隔,终止时间等。四大类型:SimpleTrigger(简单的触发器)、CornTrigger(Cron表达式触发器)、DateIntervalTrigger(日期触发器)、CalendarIntervalTrigger(日历触发器)。
  • Job :任务。具体要执行的业务逻辑,比如:发送短信、发送邮件、访问数据库、同步数据等。

Quartz集群

Quartz 的存储方式有两种:RAMJobStoreJDBCJobStore。从名字就能看出,存在内存中和存在数据库当中。在默认情况下Quartz将任务调度的运行信息保存在内存中,这种方法提供了最佳的性能,因为内存中数据访问最快。不足之处是缺乏数据的持久性,当程序路途停止或系统崩溃时,所有运行的信息都会丢失。

两者之间的区别如下图所示:

Quartz的两种存储方式区别

JDBCJobStore 存储可以实现 Quartz 集群模式,实际场景下,我们必然需要考虑定时任务的高可用,即选用集群模式。

Quartz 集群架构如下,集群中的每个节点是一个独立的 Quartz 应用,且独立的 Quartz 节点并不与另一节点通信,而是通过相同的数据库表来感知另一 Quartz 应用。简而言之,Quartz 应用、数据库支撑、多节点部署即可搭建起Quartz的应用集群。

Quartz集群架构

**Quartz 集群共用同一个数据库,由数据库中的数据来确定任务是否正在执行,如果该任务正在执行,则其他服务器就不能去执行该调度任务。**Quartz集群的特点如下:

1、持久化

Quartz 可以将调度器 scheduler、触发器 trigger 以及任务 Job 的运行时信息存储至数据库中,采用 JDBCJobStore,如果服务器异常时,可以基于数据库中的存储信息进行任务恢复。

2、高可用性

如果相关服务器节点挂掉的话,集群的其他节点则会继续执行相关任务。

3、伸缩性

如果集群中的节点数过少,导致相关任务无法及时执行,可以增加额外的服务器节点,只需将其他节点上的脚本及配置信息拷贝至新部署的节点上运行即可。

4、负载均衡

Quartz 使用随机的负载均衡算法,任务 job 是以随机的方式由不同的节点上 Scheduler 实例来执行。但当前不存在一个方法指派一个Job到集群中的特定节点。

下面我们就使用 Quartz 来实现定时任务推送。

项目实践

创建一个 Maven 项目,名为 quartz-task。

环境配置

1、在 pom.xml 文件中,引入相关依赖。

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.6.3</version>
  <relativePath/>
</parent>

<properties>
  <java.version>1.8</java.version>
  <fastjson.version>1.2.73</fastjson.version>
  <hutool.version>5.5.1</hutool.version>
  <mysql.version>8.0.19</mysql.version>
  <org.mapstruct.version>1.4.2.Final</org.mapstruct.version>
  <org.projectlombok.version>1.18.20</org.projectlombok.version>
  <druid.version>1.1.18</druid.version>
  <springdoc.version>1.6.9</springdoc.version>
  <liquibase.version>4.16.1</liquibase.version>
</properties>

<dependencies>
  <!-- 实现对 Spring MVC 的自动化配置 -->
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>

  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
  </dependency>
  <!-- 实现对 Quartz 的自动化配置 -->
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
  </dependency>
  <dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.5.1</version>
  </dependency>
  <dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus</artifactId>
    <version>3.5.1</version>
  </dependency>
  <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>${mysql.version}</version>
    <scope>runtime</scope>
  </dependency>
  <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid-spring-boot-starter</artifactId>
    <version>${druid.version}</version>
  </dependency>
  <dependency>
    <groupId>org.liquibase</groupId>
    <artifactId>liquibase-core</artifactId>
    <version>4.16.1</version>
  </dependency>

  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.20</version>
  </dependency>
  <dependency>
    <groupId>com.alibaba.fastjson2</groupId>
    <artifactId>fastjson2</artifactId>
    <version>2.0.12</version>
  </dependency>
  <dependency>
    <groupId>org.mapstruct</groupId>
    <artifactId>mapstruct</artifactId>
    <version>${org.mapstruct.version}</version>
  </dependency>
  <dependency>
    <groupId>org.mapstruct</groupId>
    <artifactId>mapstruct-processor</artifactId>
    <version>${org.mapstruct.version}</version>
  </dependency>
  <dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>${hutool.version}</version>
  </dependency>
  <dependency>
    <groupId>org.springdoc</groupId>
    <artifactId>springdoc-openapi-ui</artifactId>
    <version>${springdoc.version}</version>
  </dependency>

</dependencies>

<build>
  <plugins>
    <plugin>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-maven-plugin</artifactId>
    </plugin>
    <plugin>
      <groupId>org.liquibase</groupId>
      <artifactId>liquibase-maven-plugin</artifactId>
      <version>4.16.1</version>
      <configuration>
        <!--properties文件路径,该文件记录了数据库连接信息等-->
        <propertyFile>src/main/resources/application.yml</propertyFile>
        <propertyFileWillOverride>true</propertyFileWillOverride>
      </configuration>
    </plugin>
    <plugin>
      <groupId>com.msdn.hresh</groupId>
      <artifactId>liquibase-changelog-generate</artifactId>
      <version>1.0-SNAPSHOT</version>
      <configuration>
        <sourceFolderPath>src/main/resources/liquibase/changelogs/
        </sourceFolderPath><!-- 当前应用根目录 -->
      </configuration>
    </plugin>
  </plugins>
</build>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127

2、添加 application.yml

server:
  port: 8080

spring:
  application:
    name: quartz-task
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/quartz_test_db?serverTimezone=Hongkong&characterEncoding=utf-8&useSSL=false
    username: root
    password: root
  liquibase:
    enabled: true
    change-log: classpath:liquibase/master.xml
    # 记录版本日志表
    database-change-log-table: databasechangelog
    # 记录版本改变lock表
    database-change-log-lock-table: databasechangeloglock
  quartz:
    # 程序结束时会等待quartz相关的内容结束
    wait-for-jobs-to-complete-on-shutdown: true
    # 将任务等保存化到数据库
    job-store-type: jdbc
    # QuartzScheduler启动时覆盖己存在的Job
    overwrite-existing-jobs: false

mybatis:
  mapper-locations: classpath:mapper/*Mapper.xml
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    lazy-loading-enabled: true

changeLogFile: src/main/resources/liquibase/master.xml
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

3、关于 Quartz 的配置,可以一并写在 application.yml 中,类似于这样:

spring:
  datasource:
    user:
      url: jdbc:mysql://127.0.0.1:3306/quartz_test_db?useSSL=false&useUnicode=true&characterEncoding=UTF-8
      driver-class-name: com.mysql.cj.jdbc.Driver
      username: root
      password:
    quartz:
      url: jdbc:mysql://127.0.0.1:3306/quartz_test_db?useSSL=false&useUnicode=true&characterEncoding=UTF-8
      driver-class-name: com.mysql.cj.jdbc.Driver
      username: root
      password:

  # Quartz 的配置,对应 QuartzProperties 配置类
  quartz:
    scheduler-name: clusteredScheduler # Scheduler 名字。默认为 schedulerName
    job-store-type: jdbc # Job 存储器类型。默认为 memory 表示内存,可选 jdbc 使用数据库。
    wait-for-jobs-to-complete-on-shutdown: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
    overwrite-existing-jobs: false # 是否覆盖已有 Job 的配置
    properties: # 添加 Quartz Scheduler 附加属性,更多可以看 http://www.quartz-scheduler.org/documentation/2.4.0-SNAPSHOT/configuration.html 文档
      org:
        quartz:
          # JobStore 相关配置
          jobStore:
            # 数据源名称
            dataSource: quartzDataSource # 使用的数据源
            class: org.quartz.impl.jdbcjobstore.JobStoreTX # JobStore 实现类
            driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
            tablePrefix: QRTZ_ # Quartz 表前缀
            isClustered: true # 是集群模式
            clusterCheckinInterval: 1000
            useProperties: false
          # 线程池相关配置
          threadPool:
            threadCount: 25 # 线程池大小。默认为 10 。
            threadPriority: 5 # 线程优先级
            class: org.quartz.simpl.SimpleThreadPool # 线程池类型
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

不过因为 Quartz 配置内容过多,所以单独新建了 quartz.properties。

org.quartz.jobStore.useProperties=true

#在集群中每个实例都必须有一个唯一的instanceId,但是应该有一个相同的instanceName【默认“QuartzScheduler”】【非必须】
org.quartz.scheduler.instanceName=quartzScheduler
# Scheduler实例ID,全局唯一
org.quartz.scheduler.instanceId=AUTO
# 指定scheduler的主线程是否为后台线程,【默认false】【非必须】
org.quartz.scheduler.makeSchedulerThreadDaemon=true
# 触发job时是否需要拥有锁
org.quartz.jobStore.acquireTriggersWithinLock = true

#线程池配置
#线程池类型
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
#线程池大小
org.quartz.threadPool.threadCount=10
#线程优先级
org.quartz.threadPool.threadPriority=5

#============================================================================
# Configure JobStore
#============================================================================
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.tablePrefix=qrtz_
# 最大能忍受的触发超时时间(触发器被认定为“misfired”之前),如果超过则认为“失误”【默认60秒】
org.quartz.jobStore.misfireThreshold = 60000
# 配置数据源的名称,在后面配置数据源的时候要用到,
# 例如org.quartz.dataSource.myDS.driver=com.mysql.cj.jdbc.Driver
org.quartz.jobStore.dataSource = myDS

# 集群配置
org.quartz.jobStore.isClustered = true
# 检入到数据库中的频率(毫秒)。检查是否其他的实例到了应当检入的时候未检入这能指出一个失败的实例,
# 且当前Scheduler会以此来接管执行失败并可恢复的Job通过检入操作,Scheduler也会更新自身的状态记录
org.quartz.jobStore.clusterCheckinInterval=5000
# jobStore处理未按时触发的Job的数量
org.quartz.jobStore.maxMisfiresToHandleAtATime=20

# datasource
org.quartz.dataSource.myDS.provider = hikaricp
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.dataSource.myDS.driver=com.mysql.cj.jdbc.Driver
org.quartz.dataSource.myDS.URL=jdbc:mysql://localhost:3306/quartz_test_db?characterEncoding=utf8
org.quartz.dataSource.myDS.user=root
org.quartz.dataSource.myDS.password=root
# 最大连接数
org.quartz.dataSource.myDS.maxConnections = 10
# dataSource用于检测connection是否failed/corrupt的SQL语句
org.quartz.dataSource.myDS.validationQuery = select 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

关于 properties 文件中每个属性的含义,推荐阅读《Quartz配置文件详解&生产配置》。

4、手动在数据库中创建 Quartz 相关表,可以从 Quartz 发行版下载中找到tables_mysql.sql ,或直接从其源代码中找到 。因为我们使用 MySQL ,所以使用 tables_mysql_innodb.sql 脚本。

核心类

1、Quartz 配置类

@Configuration
public class SchedulerConfig {

  @Bean
  public SchedulerFactoryBean scheduler(DataSource dataSource) {
    SchedulerFactoryBean schedulerFactory = new SchedulerFactoryBean();
    schedulerFactory.setConfigLocation(new ClassPathResource("quartz.properties"));
    schedulerFactory.setDataSource(dataSource);
    schedulerFactory.setJobFactory(new SpringBeanJobFactory());
    schedulerFactory.setApplicationContextSchedulerContextKey("applicationContext");
    return schedulerFactory;
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2、Quartz 相关实体类

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class ScheduleTask {

  // 任务名
  private String jobName;

  // 任务组
  private String groupName;

  // 任务数据
  private String jobData;

  // 任务执行处理类,小写字母开头
  private String jobHandlerClass;

  // 任务执行时间
  private Long jobTime;

  // 任务执行时间,cron时间表达式 (如:0/5 * * * * ? )
  private String jobCronTime;

  // 任务执行次数,(<0:表示不限次数)
  private int jobTimes;

}

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class JobResponse {

  // 任务名
  private String jobName;

  // 任务组
  private String groupName;

  // 任务数据
  private String jobData;

  private String triggerKey;

  private String jobStatus;

  // 任务执行时间,cron时间表达式 (如:0/5 * * * * ? )
  private String jobCronTime;

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

3、QuartzService定时器操作

@Service
@RequiredArgsConstructor
public class QuartzTaskService {

  public static final String JOB_DATA_KEY = "jobData";
  public static final String JOB_HANDLER_CLASS_KEY = "jobHandlerClass";
  private final Scheduler scheduler;

  public void createJob(ScheduleTask task) throws SchedulerException {
    JobDetail jobDetail = JobBuilder.newJob().ofType(MessageJob.class)
        .withIdentity(task.getJobName(), task.getGroupName())
        .usingJobData(JOB_DATA_KEY, task.getJobData())
        .usingJobData(JOB_HANDLER_CLASS_KEY, task.getJobHandlerClass())
        .build();
    Trigger trigger;
    if (StrUtil.isNotBlank(task.getJobCronTime())) {
      trigger = TriggerBuilder.newTrigger().forJob(jobDetail)
          .withIdentity(task.getJobName() + "_trigger", task.getGroupName())
          .withSchedule(CronScheduleBuilder.cronSchedule(task.getJobCronTime())).build();
    } else {
      trigger = TriggerBuilder.newTrigger().forJob(jobDetail)
          .withIdentity(task.getJobName() + "_trigger", task.getGroupName())
          .startAt(new Date(task.getJobTime()))
          .build();
    }

    scheduler.scheduleJob(jobDetail, trigger);
  }


  // 修改 一个job的 时间表达式
  @SneakyThrows
  public void updateJob(String jobName, String jobGroupName, String jobTime) {
    TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
    CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
    trigger = trigger.getTriggerBuilder().withIdentity(triggerKey)
        .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).build();
    // 重启触发器
    scheduler.rescheduleJob(triggerKey, trigger);
  }

  @SneakyThrows
  public void removeTask(JobKey jobKey) {
    scheduler.deleteJob(jobKey);
  }

  // 暂停一个job
  @SneakyThrows
  public void pauseJob(JobKey jobKey) {
    scheduler.pauseJob(jobKey);
  }

  // 恢复一个job
  @SneakyThrows
  public void resumeJob(JobKey jobKey) {
    scheduler.resumeJob(jobKey);
  }

  // 立即执行一个job
  @SneakyThrows
  public void runJobNow(JobKey jobKey) {
    scheduler.triggerJob(jobKey);
  }

  // 获取所有计划中的任务列表
  public List<JobResponse> queryAllJob() throws SchedulerException {
    List<JobResponse> jobResponses = new ArrayList<>();
    GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
    Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
    for (JobKey jobKey : jobKeys) {
      List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
      for (Trigger trigger : triggers) {
        Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
        JobResponse jobResponse = getJobResponse(jobKey, trigger, triggerState);
        jobResponses.add(jobResponse);
      }
    }
    return jobResponses;
  }

  private JobResponse getJobResponse(JobKey jobKey, Trigger trigger, TriggerState triggerState) {
    JobResponse jobResponse = JobResponse.builder().jobName(jobKey.getName())
        .groupName(jobKey.getGroup()).triggerKey(trigger.getKey().toString()).build();
    jobResponse.setJobStatus(triggerState.name());
    if (trigger instanceof CronTrigger) {
      CronTrigger cronTrigger = (CronTrigger) trigger;
      String cronExpression = cronTrigger.getCronExpression();
      jobResponse.setJobCronTime(cronExpression);
    }
    return jobResponse;
  }

  // 获取所有正在运行的job
  public List<JobResponse> queryRunJob() throws SchedulerException {
    List<JobResponse> jobResponses = new ArrayList<>();
    List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
    for (JobExecutionContext executingJob : executingJobs) {
      JobDetail jobDetail = executingJob.getJobDetail();
      JobKey jobKey = jobDetail.getKey();
      Trigger trigger = executingJob.getTrigger();
      Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
      JobResponse jobResponse = getJobResponse(jobKey, trigger, triggerState);
      jobResponses.add(jobResponse);
    }
    return jobResponses;
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107

4、自定义 Job 类

@Setter
@Slf4j
public class MessageJob implements Job {

  private ApplicationContext applicationContext;

  private String jobData;

  private String jobHandlerClass;

  @SneakyThrows
  @Override
  public void execute(JobExecutionContext context) {
    log.info("quartz job data: " + jobData + ", jobHandlerClass: " + jobHandlerClass);
    MessageHandler messageHandler = (MessageHandler) applicationContext.getBean(jobHandlerClass);
    messageHandler.handlerMessage(jobData);
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

5、创建定时任务处理接口

public interface MessageHandler {

  void handlerMessage(String jobData) throws JsonProcessingException;

}
  • 1
  • 2
  • 3
  • 4
  • 5

不同的定时任务对应不同的任务处理类,即实现 MessageHandler 接口。

业务实现

1、UserService,包括用户注册,给用户发送欢迎消息,以及发送天气温度通知。

@Service
@RequiredArgsConstructor
@Slf4j
public class UserService {

  private final ScheduleTaskService scheduleTaskService;
  private final UserMapper userMapper;
  private final UserStruct userStruct;
  private final WeatherService weatherService;

  /**
   * 假设有这样一个业务需求,每当有新用户注册,则1分钟后会给用户发送欢迎通知.
   *
   * @param userRequest 用户请求体
   */
  public void register(UserRequest userRequest) {
    if (Objects.isNull(userRequest) || isBlank(userRequest.getUsername()) ||
        isBlank(userRequest.getPassword())) {
      BusinessException.fail("账号或密码为空!");
    }

    User user = userStruct.toUser(userRequest);
    userMapper.insert(user);

    scheduleTaskService.createTask(user.getUsername());
  }


  public void sayHelloToUser(String username) {
    User user = userMapper.selectByUserName(username);
    String message = "Welcome to Java,I am hresh.";
    log.info(user.getUsername() + " , hello, " + message);
  }


  public void pushWeatherNotification(List<User> users) {
    log.info("执行发送天气通知给用户的任务。。。");
    WeatherInfo weatherInfo = weatherService.getWeather(WeatherConstant.WU_HAN);
    for (User user : users) {
      log.info(user.getUsername() + "----" + weatherInfo.toString());
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

2、ScheduleTaskService 创建定时任务

@Service
@RequiredArgsConstructor
public class ScheduleTaskService {

  private final QuartzTaskService quartzTaskService;
  private final UserMapper userMapper;

  @SneakyThrows
  public void createTask(String username) {
    LocalDateTime scheduleTime = LocalDateTime.now().plusMinutes(1L);
    String jobName = "sayHello";
    String jogGroupName = "group1";
    String jobHandlerClass = "sayHelloHandler";

    ScheduleTask scheduleTask = ScheduleTask.builder()
        .jobName(jobName)
        .groupName(jogGroupName)
        .jobData(new ObjectMapper().writeValueAsString(username))
        .jobHandlerClass(jobHandlerClass)
        .jobTime(DateUtil.toEpochMilli(scheduleTime))
        .build();

    quartzTaskService.createJob(scheduleTask);
  }

  @SneakyThrows
  public void createWeatherNotificationTask(String jobTime) {
    String jobName = "weatherNotification";
    String jogGroupName = "group2";
    String jobHandlerClass = "weatherNotificationHandler";

    List<User> users = userMapper.queryAll();

    ScheduleTask scheduleTask = ScheduleTask.builder()
        .jobName(jobName)
        .groupName(jogGroupName)
        .jobData(JSON.toJSONString(users))
        .jobHandlerClass(jobHandlerClass)
        .jobCronTime(jobTime)
        .build();

    quartzTaskService.createJob(scheduleTask);
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

3、WeatherService,获取天气温度等信息

@Service
@RequiredArgsConstructor
public class WeatherService {

  private final RestTemplate restTemplate;

  public WeatherInfo getWeather(WeatherConstant weatherConstant) {
    String json = restTemplate
        .getForObject("http://t.weather.sojson.com/api/weather/city/" + weatherConstant.getCode()
            , String.class);
    JSONObject jsonObject = JSONObject.parseObject(json);
    Integer status = jsonObject.getInteger("status");

    String currentDay = DateUtil.getDay(LocalDateTime.now());

    if (status == 200) {
      JSONObject data = jsonObject.getJSONObject("data");
      String quality = data.getString("quality");
      String notice = data.getString("ganmao");
      String currentTemperature = data.getString("wendu");

      JSONArray forecast = data.getJSONArray("forecast");
      JSONObject dayInfo = forecast.getJSONObject(0);
      String high = dayInfo.getString("high");
      String low = dayInfo.getString("low");
      String weather = dayInfo.getString("type");
      String windDirection = dayInfo.getString("fx");

      return WeatherInfo.builder().airQuality(quality + "污染").date(currentDay)
          .cityName(weatherConstant.getCityName()).temperature(low + "-" + high).weather(weather)
          .windDirection(windDirection).notice(notice).currentTemperature(currentTemperature)
          .build();
    }
    return null;
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

4、UserController,对外暴露接口

@RestController
@RequiredArgsConstructor
public class UserController {

  private final UserService userService;
  private final ScheduleTaskService scheduleTaskService;

  @PostMapping("/register")
  public Result<Object> register(@RequestBody UserRequest userRequest) {
    userService.register(userRequest);
    return Result.ok();
  }


  @PostMapping("/weather-notification")
  public Result<Object> scheduledSayHello(@RequestParam("jobTime") String jobTime) {
    scheduleTaskService.createWeatherNotificationTask(jobTime);
    return Result.ok();
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

还有一些代码没有展示出来,感兴趣的朋友到时候可以去我的 Github 上看一下项目源码。

测试

为了演示效果,发送天气温度通知,我们暂时设为每2分钟一次。

首先通过 postman 来注册用户

用户注册

可以到数据库中看一下 qrtz_job_details 表中的数据,如下所示:

qrtz_job_details表新增数据

等待一分钟后,控制台会输出如下内容:

Quartz执行定时任务

执行完定时任务后,qrtz_job_details 表中相关数据也会被删除掉。

接着来测试发送天气通知

定时给用户发送天气通知

因为咱们测试的是每两分钟跑一次定时任务,所以 qrtz_job_details 表中会一直存在这么一条数据:

qrtz_job_details表新增数据

问题记录

1、初次启动定时任务报错

Couldn't acquire next trigger: Unknown column 'SCHED_TIME' in 'field list'
  • 1

原因:我们下载的 SQL 文件有问题,在 qrtz_fired_triggers 表的构建语句中缺少 sched_time 字段,完整的 SQL 语句如下:

create table qrtz_fired_triggers
  (
    sched_name varchar(120) not null,
    entry_id varchar(95) not null,
    trigger_name varchar(200) not null,
    trigger_group varchar(200) not null,
    instance_name varchar(200) not null,
    fired_time bigint(13) not null,
    sched_time bigint(13) not null,
    priority integer not null,
    state varchar(16) not null,
    job_name varchar(200) null,
    job_group varchar(200) null,
    is_nonconcurrent varchar(1) null,
    requests_recovery varchar(1) null,
    primary key (sched_name,entry_id)
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

2、增加数据源配置过程中遇到的坑:

在 quartz.properties 文件中没有增加 org.quartz.dataSource.myDS.provider = hikaricp 配置时,启动一直报错:

Caused by: org.quartz.SchedulerException: Could not initialize DataSource: qzDS

Caused by: org.quartz.SchedulerException: ConnectionProvider class ‘org.quartz.utils.C3p0PoolingConnectionProvider’ could not be instantiated.

后来增加了 provider: hikaricp 这个配置,启动不报错。

总结

Quartz 框架出现的比较早,后续不少定时框架,或多或少都基于 Quartz 研发的,比如当当网的elastic-job就是基于quartz二次开发之后的分布式调度解决方案。

并且,Quartz 并没有内置 UI 管理控制台,不过你可以使用 quartzui 这个开源项目来解决这个问题。

虽然 Quartz 可以实现我们的需求,但代码入侵比较严重,使用起来比较麻烦,后续我们再研究一下其他的定时任务框架。

感兴趣的朋友可以去我的 Github 下载相关代码,如果对你有所帮助,不妨 Star 一下,谢谢大家支持!

参考文献

Quartz配置文件详解&生产配置

Quartz.NET Configuration Reference

springBoot整合Quartz定时任务(持久化到数据库,开箱即食)

Quartz学习总结之Job存储模式和集群

Quartz应用与集群原理分析

quartz (从原理到应用)详解篇(转)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/981717
推荐阅读
相关标签
  

闽ICP备14008679号