当前位置:   article > 正文

Spring Boot集成Spring Batch快速入门Demo_springboot + springbatch

springboot + springbatch

1.什么是Spring Batch?

Spring Batch 是一个轻量级的开源框架,它提供了一种简单的方式来处理大量的数据。它基于Spring框架,提供了一套批处理框架,可以处理各种类型的批处理任务,如ETL、数据导入/导出、报表生成等。Spring Batch提供了一些重要的概念,如Job、Step、ItemReader、ItemProcessor、ItemWriter等,这些概念可以帮助我们构建可重用的批处理应用程序。通过Spring Batch,我们可以轻松地实现批处理的并发、容错、重试等功能,同时也可以方便地与其他Spring组件集成,如Spring Boot、Spring Data等。总之,Spring Batch是一个非常强大、灵活、易于使用的批处理框架,可以帮助我们快速构建高效、可靠的批处理应用程序。

分层架构

spring-batch-layers

可以看到它分为三层,分别是:

  • Application应用层:包含了所有任务batch jobs和开发人员自定义的代码,主要是根据项目需要开发的业务流程等。
  • Batch Core核心层:包含启动和管理任务的运行环境类,如JobLauncher等。
  • Batch Infrastructure基础层:上面两层是建立在基础层之上的,包含基础的读入reader写出writer、重试框架等。

主要概念

spring-batch-reference-model

2.2.1 JobRepository

专门负责与数据库打交道,对整个批处理的新增、更新、执行进行记录。所以Spring Batch是需要依赖数据库来管理的。

2.2.2 任务启动器JobLauncher

负责启动任务Job

2.2.3 任务Job

Job是封装整个批处理过程的单位,跑一个批处理任务,就是跑一个Job所定义的内容。

111

  上图介绍了Job的一些相关概念:

  • Job:封装处理实体,定义过程逻辑。
  • JobInstanceJob的运行实例,不同的实例,参数不同,所以定义好一个Job后可以通过不同参数运行多次。
  • JobParameters:与JobInstance相关联的参数。
  • JobExecution:代表Job的一次实际执行,可能成功、可能失败。

所以,开发人员要做的事情,就是定义Job

2.2.4 步骤Step

Step是对Job某个过程的封装,一个Job可以包含一个或多个Step,一步步的Step按特定逻辑执行,才代表Job执行完成。

222

  通过定义Step来组装Job可以更灵活地实现复杂的业务逻辑。

2.2.5 输入——处理——输出

所以,定义一个Job关键是定义好一个或多个Step,然后把它们组装好即可。而定义Step有多种方法,但有一种常用的模型就是输入——处理——输出,即Item ReaderItem ProcessorItem Writer。比如通过Item Reader从文件输入数据,然后通过Item Processor进行业务处理和数据转换,最后通过Item Writer写到数据库中去。 Spring Batch为我们提供了许多开箱即用的ReaderWriter,非常方便。

 

2.环境搭建

参照代码仓库mysql模块里面docker目录搭建

3.代码工程

实验目标

如何使用 Spring Boot 创建各种不同类型 Spring Batch Job

pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>springboot-demo</artifactId>
  7. <groupId>com.et</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>SpringBatch</artifactId>
  12. <properties>
  13. <maven.compiler.source>8</maven.compiler.source>
  14. <maven.compiler.target>8</maven.compiler.target>
  15. </properties>
  16. <dependencies>
  17. <dependency>
  18. <groupId>org.springframework.boot</groupId>
  19. <artifactId>spring-boot-starter-web</artifactId>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.springframework.boot</groupId>
  23. <artifactId>spring-boot-autoconfigure</artifactId>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.springframework.boot</groupId>
  27. <artifactId>spring-boot-starter-test</artifactId>
  28. <scope>test</scope>
  29. </dependency>
  30. <dependency>
  31. <groupId>org.springframework.boot</groupId>
  32. <artifactId>spring-boot-starter-batch</artifactId>
  33. </dependency>
  34. <dependency>
  35. <groupId>mysql</groupId>
  36. <artifactId>mysql-connector-java</artifactId>
  37. </dependency>
  38. <dependency>
  39. <groupId>org.springframework.boot</groupId>
  40. <artifactId>spring-boot-starter-jdbc</artifactId>
  41. </dependency>
  42. </dependencies>
  43. <build>
  44. <plugins>
  45. <plugin>
  46. <groupId>org.springframework.boot</groupId>
  47. <artifactId>spring-boot-maven-plugin</artifactId>
  48. </plugin>
  49. </plugins>
  50. </build>
  51. </project>

job

第一个简单的任务

  1. package com.et.batch.job;
  2. import org.springframework.batch.core.Job;
  3. import org.springframework.batch.core.Step;
  4. import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
  5. import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
  6. import org.springframework.batch.repeat.RepeatStatus;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.stereotype.Component;
  10. @Component
  11. public class FirstJobDemo {
  12. @Autowired
  13. private JobBuilderFactory jobBuilderFactory;
  14. @Autowired
  15. private StepBuilderFactory stepBuilderFactory;
  16. @Bean
  17. public Job firstJob() {
  18. return jobBuilderFactory.get("firstJob")
  19. .start(step())
  20. .build();
  21. }
  22. private Step step() {
  23. return stepBuilderFactory.get("step")
  24. .tasklet((contribution, chunkContext) -> {
  25. System.out.println("execute step....");
  26. return RepeatStatus.FINISHED;
  27. }).build();
  28. }
  29. }

多步骤的job

  1. package com.et.batch.job;
  2. import org.springframework.batch.core.ExitStatus;
  3. import org.springframework.batch.core.Job;
  4. import org.springframework.batch.core.Step;
  5. import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
  6. import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
  7. import org.springframework.batch.repeat.RepeatStatus;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.stereotype.Component;
  11. @Component
  12. public class MultiStepJobDemo {
  13. @Autowired
  14. private JobBuilderFactory jobBuilderFactory;
  15. @Autowired
  16. private StepBuilderFactory stepBuilderFactory;
  17. @Bean
  18. public Job multiStepJob() {
  19. /*return jobBuilderFactory.get("multiStepJob")
  20. .start(step1())
  21. .next(step2())
  22. .next(step3())
  23. .build();*/
  24. // control the next step by last Status
  25. return jobBuilderFactory.get("multiStepJob2")
  26. .start(step1())
  27. .on(ExitStatus.COMPLETED.getExitCode()).to(step2())
  28. .from(step2())
  29. .on(ExitStatus.COMPLETED.getExitCode()).to(step3())
  30. .from(step3()).end()
  31. .build();
  32. }
  33. private Step step1() {
  34. return stepBuilderFactory.get("step1")
  35. .tasklet((stepContribution, chunkContext) -> {
  36. System.out.println("execute step1。。。");
  37. return RepeatStatus.FINISHED;
  38. }).build();
  39. }
  40. private Step step2() {
  41. return stepBuilderFactory.get("step2")
  42. .tasklet((stepContribution, chunkContext) -> {
  43. System.out.println("execute step2。。。");
  44. return RepeatStatus.FINISHED;
  45. }).build();
  46. }
  47. private Step step3() {
  48. return stepBuilderFactory.get("step3")
  49. .tasklet((stepContribution, chunkContext) -> {
  50. System.out.println("execute step3。。。");
  51. return RepeatStatus.FINISHED;
  52. }).build();
  53. }
  54. }

多flow控制的job, 创建一个flow对象,包含若干个step

  1. package com.et.batch.job;
  2. import org.springframework.batch.core.Job;
  3. import org.springframework.batch.core.Step;
  4. import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
  5. import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
  6. import org.springframework.batch.core.job.builder.FlowBuilder;
  7. import org.springframework.batch.core.job.flow.Flow;
  8. import org.springframework.batch.repeat.RepeatStatus;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.context.annotation.Bean;
  11. import org.springframework.stereotype.Component;
  12. @Component
  13. public class FlowJobDemo {
  14. @Autowired
  15. private JobBuilderFactory jobBuilderFactory;
  16. @Autowired
  17. private StepBuilderFactory stepBuilderFactory;
  18. @Bean
  19. public Job flowJob() {
  20. return jobBuilderFactory.get("flowJob")
  21. .start(flow())
  22. .next(step3())
  23. .end()
  24. .build();
  25. }
  26. private Step step1() {
  27. return stepBuilderFactory.get("step1")
  28. .tasklet((stepContribution, chunkContext) -> {
  29. System.out.println("execute step1。。。");
  30. return RepeatStatus.FINISHED;
  31. }).build();
  32. }
  33. private Step step2() {
  34. return stepBuilderFactory.get("step2")
  35. .tasklet((stepContribution, chunkContext) -> {
  36. System.out.println("execute step2。。。");
  37. return RepeatStatus.FINISHED;
  38. }).build();
  39. }
  40. private Step step3() {
  41. return stepBuilderFactory.get("step3")
  42. .tasklet((stepContribution, chunkContext) -> {
  43. System.out.println("execute step3。。。");
  44. return RepeatStatus.FINISHED;
  45. }).build();
  46. }
  47. private Flow flow() {
  48. return new FlowBuilder<Flow>("flow")
  49. .start(step1())
  50. .next(step2())
  51. .build();
  52. }
  53. }

并发执行的jobs

  1. package com.et.batch.job;
  2. import org.springframework.batch.core.Job;
  3. import org.springframework.batch.core.Step;
  4. import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
  5. import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
  6. import org.springframework.batch.core.job.builder.FlowBuilder;
  7. import org.springframework.batch.core.job.flow.Flow;
  8. import org.springframework.batch.repeat.RepeatStatus;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.context.annotation.Bean;
  11. import org.springframework.core.task.SimpleAsyncTaskExecutor;
  12. import org.springframework.stereotype.Component;
  13. @Component
  14. public class SplitJobDemo {
  15. @Autowired
  16. private JobBuilderFactory jobBuilderFactory;
  17. @Autowired
  18. private StepBuilderFactory stepBuilderFactory;
  19. @Bean
  20. public Job splitJob() {
  21. return jobBuilderFactory.get("splitJob")
  22. .start(flow1())
  23. .split(new SimpleAsyncTaskExecutor()).add(flow2())
  24. .end()
  25. .build();
  26. }
  27. private Step step1() {
  28. return stepBuilderFactory.get("step1")
  29. .tasklet((stepContribution, chunkContext) -> {
  30. System.out.println("execute step1。。。");
  31. return RepeatStatus.FINISHED;
  32. }).build();
  33. }
  34. private Step step2() {
  35. return stepBuilderFactory.get("step2")
  36. .tasklet((stepContribution, chunkContext) -> {
  37. System.out.println("execute step2。。。");
  38. return RepeatStatus.FINISHED;
  39. }).build();
  40. }
  41. private Step step3() {
  42. return stepBuilderFactory.get("step3")
  43. .tasklet((stepContribution, chunkContext) -> {
  44. System.out.println("execute step3。。。");
  45. return RepeatStatus.FINISHED;
  46. }).build();
  47. }
  48. private Flow flow1() {
  49. return new FlowBuilder<Flow>("flow1")
  50. .start(step1())
  51. .next(step2())
  52. .build();
  53. }
  54. private Flow flow2() {
  55. return new FlowBuilder<Flow>("flow2")
  56. .start(step3())
  57. .build();
  58. }
  59. }

根据上次运行结果判断是否执行下一步

  1. package com.et.batch.job;
  2. import org.springframework.batch.core.Job;
  3. import org.springframework.batch.core.Step;
  4. import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
  5. import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
  6. import org.springframework.batch.repeat.RepeatStatus;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.stereotype.Component;
  10. @Component
  11. public class DeciderJobDemo {
  12. @Autowired
  13. private JobBuilderFactory jobBuilderFactory;
  14. @Autowired
  15. private StepBuilderFactory stepBuilderFactory;
  16. @Autowired
  17. private MyDecider myDecider;
  18. @Bean
  19. public Job deciderJob() {
  20. return jobBuilderFactory.get("deciderJob")
  21. .start(step1())
  22. .next(myDecider)
  23. .from(myDecider).on("weekend").to(step2())
  24. .from(myDecider).on("workingDay").to(step3())
  25. .from(step3()).on("*").to(step4())
  26. .end()
  27. .build();
  28. }
  29. private Step step1() {
  30. return stepBuilderFactory.get("step1")
  31. .tasklet((stepContribution, chunkContext) -> {
  32. System.out.println("execute step1。。。");
  33. return RepeatStatus.FINISHED;
  34. }).build();
  35. }
  36. private Step step2() {
  37. return stepBuilderFactory.get("step2")
  38. .tasklet((stepContribution, chunkContext) -> {
  39. System.out.println("execute step2。。。");
  40. return RepeatStatus.FINISHED;
  41. }).build();
  42. }
  43. private Step step3() {
  44. return stepBuilderFactory.get("step3")
  45. .tasklet((stepContribution, chunkContext) -> {
  46. System.out.println("execute step3。。。");
  47. return RepeatStatus.FINISHED;
  48. }).build();
  49. }
  50. private Step step4() {
  51. return stepBuilderFactory.get("step4")
  52. .tasklet((stepContribution, chunkContext) -> {
  53. System.out.println("execute step4。。。");
  54. return RepeatStatus.FINISHED;
  55. }).build();
  56. }
  57. }

父子嵌套job

  1. package com.et.batch.job;
  2. import org.springframework.batch.core.Job;
  3. import org.springframework.batch.core.Step;
  4. import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
  5. import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
  6. import org.springframework.batch.core.launch.JobLauncher;
  7. import org.springframework.batch.core.repository.JobRepository;
  8. import org.springframework.batch.core.step.builder.JobStepBuilder;
  9. import org.springframework.batch.core.step.builder.StepBuilder;
  10. import org.springframework.batch.repeat.RepeatStatus;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.context.annotation.Bean;
  13. import org.springframework.stereotype.Component;
  14. import org.springframework.transaction.PlatformTransactionManager;
  15. @Component
  16. public class NestedJobDemo {
  17. @Autowired
  18. private JobBuilderFactory jobBuilderFactory;
  19. @Autowired
  20. private StepBuilderFactory stepBuilderFactory;
  21. @Autowired
  22. private JobLauncher jobLauncher;
  23. @Autowired
  24. private JobRepository jobRepository;
  25. @Autowired
  26. private PlatformTransactionManager platformTransactionManager;
  27. @Bean
  28. public Job parentJob() {
  29. return jobBuilderFactory.get("parentJob")
  30. .start(childJobOneStep())
  31. .next(childJobTwoStep())
  32. .build();
  33. }
  34. private Step childJobOneStep() {
  35. return new JobStepBuilder(new StepBuilder("childJobOneStep"))
  36. .job(childJobOne())
  37. .launcher(jobLauncher)
  38. .repository(jobRepository)
  39. .transactionManager(platformTransactionManager)
  40. .build();
  41. }
  42. private Step childJobTwoStep() {
  43. return new JobStepBuilder(new StepBuilder("childJobTwoStep"))
  44. .job(childJobTwo())
  45. .launcher(jobLauncher)
  46. .repository(jobRepository)
  47. .transactionManager(platformTransactionManager)
  48. .build();
  49. }
  50. private Job childJobOne() {
  51. return jobBuilderFactory.get("childJobOne")
  52. .start(
  53. stepBuilderFactory.get("childJobOneStep")
  54. .tasklet((stepContribution, chunkContext) -> {
  55. System.out.println("subtask1。。。");
  56. return RepeatStatus.FINISHED;
  57. }).build()
  58. ).build();
  59. }
  60. private Job childJobTwo() {
  61. return jobBuilderFactory.get("childJobTwo")
  62. .start(
  63. stepBuilderFactory.get("childJobTwoStep")
  64. .tasklet((stepContribution, chunkContext) -> {
  65. System.out.println("subtask2。。。");
  66. return RepeatStatus.FINISHED;
  67. }).build()
  68. ).build();
  69. }
  70. }

application.yaml

自动会初始化脚本,只需要建立以恶搞空库就行

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/springbatch
    username: root
    password: 123456

  batch:
    jdbc:
      schema: classpath:org/springframework/batch/core/schema-mysql.sql
    initialize-schema: always #Since Spring Boot 2.5.0 use spring.batch.jdbc.initialize-schema=never

    job:
      enabled: true

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

4.测试

  • 启动Spring Boot应用程序,系统会自动运行job,跑过一次,下次启动不会继续执行
  • 如果要执行定时任务,可以利用spring提供的scheduledTaskRegistrar注册一个定时任务,扫描最新的定时任务,将这些定时任务注册到scheduleFuture中从而实现动态定时任务。

5.引用

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/911351
推荐阅读
相关标签
  

闽ICP备14008679号