赞
踩
在企业开发中可能会面临到一些需要处理较大数据量的场景,例如将一个表的全部数据导入到另一张表结构类似的表中、批量读取一个或多个文件内容并写入到数据库中,又或者将一张表的数据批量更新到另一张表中。Spring Batch可以快速的开发这种场景下的批处理应用程序。
Spring Batch提供了在处理大量数据时必不可少的可重用功能,包括 日志记录/跟踪、事务管理、作业处理统计信息、作业重新启动、跳过和资源管理。对于 大数据量和高性能的批处理任务,Spring Batch 同样提供了高级功能和特性来支持,例如 分区功能、远程功能等,大大简化了批处理应用的开发,将开发人员从复杂的任务配置管理过程中解放出来,让我们可以更多地去关注核心的业务的处理过程。总之,通过 Spring Batch 我们就能够实现简单的或者复杂的和大数据量的批处理作业。
JobRepository:用来注册job的容器
JobLauncher:用来启动Job的接口
Job:实际执行的任务,包含一个或多个Step
Step:包含ItemReader、ItemProcessor和ItemWriter
ItemReader:用来读取数据的接口
ItemProcessor:用来处理数据的接口
ItemWriter: 用来输出数据的接口
官网地址如下:
任务的处理是在 Step 这个阶段定义的。在 Step 中,需要定义数据的读取、数据的处理、数据的写出操作,在这三个阶段中,数据的处理是真正进行数据处理的地方。具体 Step 的流程如下图所示:
Reader(架构图中的 Item Reader):主要的任务是定义数据的读取操作,包括读取文件的位置、对读取首先要进行的划分(如以 ',' 作为分隔符)、将读取到的文件映射到相关对象的属性字段等
Process(架构图中的 Item Processor):这里是真正对数据进行处理的地方,数据的处理逻辑都在这里定义
Writer(架构图中的 Item Writer):这个阶段的主要任务是定义数据的输出操作,包括将数据写入到数据库等
当批处理作业需要操作数据库时,Spring Batch要求在数据库中创建好批处理作业的元数据的存储表格。如下,其中以batch开头的表,是Spring Batch用来存储每次执行作业所产生的元数据。而student表则是作为我们这个Demo中数据的来源:
下图显示了所有6张表的ERD模型及其相互关系(摘自官网):
在数据库中执行官方元数据模式SQL脚本:
- -- do not edit this file
- -- BATCH JOB 实例表 包含与aJobInstance相关的所有信息
- -- JOB ID由batch_job_seq分配
- -- JOB 名称,与spring配置一致
- -- JOB KEY 对job参数的MD5编码,正因为有这个字段的存在,同一个job如果第一次运行成功,第二次再运行会抛出JobInstanceAlreadyCompleteException异常。
- CREATE TABLE BATCH_JOB_INSTANCE (
- JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,
- VERSION BIGINT ,
- JOB_NAME VARCHAR(100) NOT NULL,
- JOB_KEY VARCHAR(32) NOT NULL,
- constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
- ) ENGINE=InnoDB;
-
- -- 该BATCH_JOB_EXECUTION表包含与该JobExecution对象相关的所有信息
- CREATE TABLE BATCH_JOB_EXECUTION (
- JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
- VERSION BIGINT ,
- JOB_INSTANCE_ID BIGINT NOT NULL,
- CREATE_TIME DATETIME NOT NULL,
- START_TIME DATETIME DEFAULT NULL ,
- END_TIME DATETIME DEFAULT NULL ,
- STATUS VARCHAR(10) ,
- EXIT_CODE VARCHAR(2500) ,
- EXIT_MESSAGE VARCHAR(2500) ,
- LAST_UPDATED DATETIME,
- JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
- constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
- references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
- ) ENGINE=InnoDB;
-
- -- 该表包含与该JobParameters对象相关的所有信息
- CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
- JOB_EXECUTION_ID BIGINT NOT NULL ,
- TYPE_CD VARCHAR(6) NOT NULL ,
- KEY_NAME VARCHAR(100) NOT NULL ,
- STRING_VAL VARCHAR(250) ,
- DATE_VAL DATETIME DEFAULT NULL ,
- LONG_VAL BIGINT ,
- DOUBLE_VAL DOUBLE PRECISION ,
- IDENTIFYING CHAR(1) NOT NULL ,
- constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
- references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
- ) ENGINE=InnoDB;
-
- -- 该表包含与该StepExecution 对象相关的所有信息
- CREATE TABLE BATCH_STEP_EXECUTION (
- STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
- VERSION BIGINT NOT NULL,
- STEP_NAME VARCHAR(100) NOT NULL,
- JOB_EXECUTION_ID BIGINT NOT NULL,
- START_TIME DATETIME NOT NULL ,
- END_TIME DATETIME DEFAULT NULL ,
- STATUS VARCHAR(10) ,
- COMMIT_COUNT BIGINT ,
- READ_COUNT BIGINT ,
- FILTER_COUNT BIGINT ,
- WRITE_COUNT BIGINT ,
- READ_SKIP_COUNT BIGINT ,
- WRITE_SKIP_COUNT BIGINT ,
- PROCESS_SKIP_COUNT BIGINT ,
- ROLLBACK_COUNT BIGINT ,
- EXIT_CODE VARCHAR(2500) ,
- EXIT_MESSAGE VARCHAR(2500) ,
- LAST_UPDATED DATETIME,
- constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
- references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
- ) ENGINE=InnoDB;
-
- -- 该BATCH_STEP_EXECUTION_CONTEXT表包含ExecutionContext与Step相关的所有信息
- CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
- STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
- SHORT_CONTEXT VARCHAR(2500) NOT NULL,
- SERIALIZED_CONTEXT TEXT ,
- constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
- references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
- ) ENGINE=InnoDB;
-
- -- 该表包含ExecutionContext与Job相关的所有信息
- CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
- JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
- SHORT_CONTEXT VARCHAR(2500) NOT NULL,
- SERIALIZED_CONTEXT TEXT ,
- constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
- references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
- ) ENGINE=InnoDB;
-
- CREATE TABLE BATCH_STEP_EXECUTION_SEQ (
- ID BIGINT NOT NULL,
- UNIQUE_KEY CHAR(1) NOT NULL,
- constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
- ) ENGINE=InnoDB;
- INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);
-
- CREATE TABLE BATCH_JOB_EXECUTION_SEQ (
- ID BIGINT NOT NULL,
- UNIQUE_KEY CHAR(1) NOT NULL,
- constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
- ) ENGINE=InnoDB;
- INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);
-
- CREATE TABLE BATCH_JOB_SEQ (
- ID BIGINT NOT NULL,
- UNIQUE_KEY CHAR(1) NOT NULL,
- constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
- ) ENGINE=InnoDB;
- INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);

- CREATE TABLE `student` (
- `id` int(11) NOT NULL AUTO_INCREMENT,
- `name` varchar(20) NOT NULL,
- `age` int(11) NOT NULL,
- `sex` varchar(20) NOT NULL,
- `address` varchar(100) NOT NULL,
- `cid` int(11) NOT NULL,
- PRIMARY KEY (`id`) USING BTREE
- ) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8;
Student表中数据:
勾选如下红框标注的依赖项:
点击Finish完成项目的创建:
项目最终依赖:
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-batch</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-jpa</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.batch</groupId>
- <artifactId>spring-batch-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>

spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/springbatch?serverTimezone=Asia/Shanghai&characterEncoding=UTF-8&autoReconnect=true hikari: password: password username: root jpa: open-in-view: true show-sql: true hibernate: ddl-auto: update database: mysql # 禁止项目启动时运行job batch: job: enabled: false
- package org.zero.example.springbatchdemo.model;
-
- import lombok.AllArgsConstructor;
- import lombok.Data;
- import lombok.NoArgsConstructor;
-
- import javax.persistence.*;
-
- /**
- * student 表格的实体类
- *
- * @author 01
- * @date 2019-02-24
- **/
- @Data
- @Entity
- @Table(name = "student")
- @NoArgsConstructor
- @AllArgsConstructor
- public class Student {
- @Id
- @GeneratedValue(strategy = GenerationType.IDENTITY)
- private Integer id;
-
- private String name;
-
- private Integer age;
-
- private String sex;
-
- private String address;
-
- private Integer cid;
- }

批处理作业和定时任务需要多线程,因此配置Spring的线程池,代码如下:
- package org.zero.example.springbatchdemo.config;
-
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-
- /**
- * 配置任务线程池执行器
- *
- * @author 01
- * @date 2019-02-24
- **/
- @Configuration
- public class ExecutorConfiguration {
-
- @Bean
- public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
- ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
- threadPoolTaskExecutor.setCorePoolSize(50);
- threadPoolTaskExecutor.setMaxPoolSize(200);
- threadPoolTaskExecutor.setQueueCapacity(1000);
- threadPoolTaskExecutor.setThreadNamePrefix("Data-Job");
-
- return threadPoolTaskExecutor;
- }
- }

实现一个作业监听器,批处理作业在执行前后会调用监听器的方法:
- package org.zero.example.springbatchdemo.task.listener;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.batch.core.BatchStatus;
- import org.springframework.batch.core.JobExecution;
- import org.springframework.batch.core.JobExecutionListener;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
- import org.springframework.stereotype.Component;
-
- /**
- * 一个简单的job监听器
- *
- * @author 01
- * @date 2019-02-24
- **/
- @Slf4j
- @Component
- public class JobListener implements JobExecutionListener {
- private final ThreadPoolTaskExecutor threadPoolTaskExecutor;
- private long startTime;
-
- @Autowired
- public JobListener(ThreadPoolTaskExecutor threadPoolTaskExecutor) {
- this.threadPoolTaskExecutor = threadPoolTaskExecutor;
- }
-
- /**
- * 该方法会在job开始前执行
- */
- @Override
- public void beforeJob(JobExecution jobExecution) {
- startTime = System.currentTimeMillis();
- log.info("job before " + jobExecution.getJobParameters());
- }
-
- /**
- * 该方法会在job结束后执行
- */
- @Override
- public void afterJob(JobExecution jobExecution) {
- log.info("JOB STATUS : {}", jobExecution.getStatus());
- if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
- log.info("JOB FINISHED");
- threadPoolTaskExecutor.destroy();
- } else if (jobExecution.getStatus() == BatchStatus.FAILED) {
- log.info("JOB FAILED");
- }
- log.info("Job Cost Time : {}/ms", (System.currentTimeMillis() - startTime));
- }
- }

配置一个最基本的Job,Job是真正进行批处理业务的地方。一个Job 通常由一个或多个Step组成(基本就像是一个工作流);而一个Step通常由三部分组成(读入数据:ItemReader,处理数据:ItemProcessor,写入数据:ItemWriter)。代码如下:
- package org.zero.example.springbatchdemo.task.job;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.batch.core.Job;
- import org.springframework.batch.core.Step;
- import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
- import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
- import org.springframework.batch.core.launch.support.RunIdIncrementer;
- import org.springframework.batch.item.ItemProcessor;
- import org.springframework.batch.item.ItemReader;
- import org.springframework.batch.item.ItemWriter;
- import org.springframework.batch.item.database.JpaPagingItemReader;
- import org.springframework.batch.item.database.orm.JpaNativeQueryProvider;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import org.zero.example.springbatchdemo.model.Student;
- import org.zero.example.springbatchdemo.task.listener.JobListener;
-
- import javax.persistence.EntityManagerFactory;
-
- /**
- * 配置一个最基本的Job
- *
- * @author 01
- * @date 2019-02-24
- **/
- @Slf4j
- @Component
- public class DataBatchJob {
- /**
- * Job构建工厂,用于构建Job
- */
- private final JobBuilderFactory jobBuilderFactory;
-
- /**
- * Step构建工厂,用于构建Step
- */
- private final StepBuilderFactory stepBuilderFactory;
-
- /**
- * 实体类管理工工厂,用于访问表格数据
- */
- private final EntityManagerFactory emf;
-
- /**
- * 自定义的简单Job监听器
- */
- private final JobListener jobListener;
-
- @Autowired
- public DataBatchJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory,
- EntityManagerFactory emf, JobListener jobListener) {
- this.jobBuilderFactory = jobBuilderFactory;
- this.stepBuilderFactory = stepBuilderFactory;
- this.emf = emf;
- this.jobListener = jobListener;
- }
-
- /**
- * 一个最基础的Job通常由一个或者多个Step组成
- */
- public Job dataHandleJob() {
- return jobBuilderFactory.get("dataHandleJob").
- incrementer(new RunIdIncrementer()).
- // start是JOB执行的第一个step
- start(handleDataStep()).
- // 可以调用next方法设置其他的step,例如:
- // next(xxxStep()).
- // next(xxxStep()).
- // ...
- // 设置我们自定义的JobListener
- listener(jobListener).
- build();
- }
-
- /**
- * 一个简单基础的Step主要分为三个部分
- * ItemReader : 用于读取数据
- * ItemProcessor : 用于处理数据
- * ItemWriter : 用于写数据
- */
- private Step handleDataStep() {
- return stepBuilderFactory.get("getData").
- // <输入对象, 输出对象> chunk通俗的讲类似于SQL的commit; 这里表示处理(processor)100条后写入(writer)一次
- <Student, Student>chunk(100).
- // 捕捉到异常就重试,重试100次还是异常,JOB就停止并标志失败
- faultTolerant().retryLimit(3).retry(Exception.class).skipLimit(100).skip(Exception.class).
- // 指定ItemReader对象
- reader(getDataReader()).
- // 指定ItemProcessor对象
- processor(getDataProcessor()).
- // 指定ItemWriter对象
- writer(getDataWriter()).
- build();
- }
-
- /**
- * 读取数据
- *
- * @return ItemReader Object
- */
- private ItemReader<? extends Student> getDataReader() {
- // 读取数据,这里可以用JPA,JDBC,JMS 等方式读取数据
- JpaPagingItemReader<Student> reader = new JpaPagingItemReader<>();
-
- try {
- // 这里选择JPA方式读取数据
- JpaNativeQueryProvider<Student> queryProvider = new JpaNativeQueryProvider<>();
- // 一个简单的 native SQL
- queryProvider.setSqlQuery("SELECT * FROM student");
- // 设置实体类
- queryProvider.setEntityClass(Student.class);
- queryProvider.afterPropertiesSet();
-
- reader.setEntityManagerFactory(emf);
- // 设置每页读取的记录数
- reader.setPageSize(3);
- // 设置数据提供者
- reader.setQueryProvider(queryProvider);
- reader.afterPropertiesSet();
-
- // 所有ItemReader和ItemWriter实现都会在ExecutionContext提交之前将其当前状态存储在其中,
- // 如果不希望这样做,可以设置setSaveState(false)
- reader.setSaveState(true);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- return reader;
- }
-
- /**
- * 处理数据
- *
- * @return ItemProcessor Object
- */
- private ItemProcessor<Student, Student> getDataProcessor() {
- return student -> {
- // 模拟处理数据,这里处理就是打印一下
- log.info("processor data : " + student.toString());
-
- return student;
- };
- }
-
- /**
- * 写入数据
- *
- * @return ItemWriter Object
- */
- private ItemWriter<Student> getDataWriter() {
- return list -> {
- for (Student student : list) {
- // 模拟写数据,为了演示的简单就不写入数据库了
- log.info("write data : " + student);
- }
- };
- }
- }

通常运行Job的方式有两种:
把Job对象注入到Spring容器里,Spring Batch默认在项目启动完成后就会运行容器里配置好的Job,如果配置了多个Job也可以通过配置文件去指定。
采用定时任务去运行Job。通过调用的方式主动去运行Job,需要使用到JobLauncher中的run方法。具体代码如下:
- package org.zero.example.springbatchdemo.task;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.batch.core.*;
- import org.springframework.batch.core.launch.JobLauncher;
- import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
- import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
- import org.springframework.batch.core.repository.JobRestartException;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.annotation.Scheduled;
- import org.springframework.stereotype.Component;
- import org.zero.example.springbatchdemo.task.job.DataBatchJob;
-
- /**
- * 简单的定时任务
- *
- * @author 01
- * @date 2019-02-24
- **/
- @Slf4j
- @Component
- public class TimeTask {
-
- private final JobLauncher jobLauncher;
- private final DataBatchJob dataBatchJob;
-
- @Autowired
- public TimeTask(JobLauncher jobLauncher, DataBatchJob dataBatchJob) {
- this.jobLauncher = jobLauncher;
- this.dataBatchJob = dataBatchJob;
- }
-
- // 定时任务,每十秒执行一次
- @Scheduled(cron = "0/10 * * * * ?")
- public void runBatch() throws JobParametersInvalidException, JobExecutionAlreadyRunningException,
- JobRestartException, JobInstanceAlreadyCompleteException {
- log.info("定时任务执行了...");
- // 在运行一个job的时候需要添加至少一个参数,这个参数最后会被写到batch_job_execution_params表中,
- // 不添加这个参数的话,job不会运行,并且这个参数在表中中不能重复,若设置的参数已存在表中,则会抛出异常,
- // 所以这里才使用时间戳作为参数
- JobParameters jobParameters = new JobParametersBuilder()
- .addLong("timestamp", System.currentTimeMillis())
- .toJobParameters();
-
- // 获取job并运行
- Job job = dataBatchJob.dataHandleJob();
- JobExecution execution = jobLauncher.run(job, jobParameters);
- log.info("定时任务结束. Exit Status : {}", execution.getStatus());
- }
- }

最后,在Spring Boot的启动类上加上两个注解@EnableBatchProcessing和@EnableScheduling,以开启批处理及定时任务,否则批处理和定时任务都不会执行,代码如下:
- package org.zero.example.springbatchdemo;
-
- import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.scheduling.annotation.EnableScheduling;
-
- /**
- * :@EnableBatchProcessing 用于开启批处理作业的配置
- * :@EnableScheduling 用于开启定时任务的配置
- *
- * @author 01
- * @date 2019-02-24
- */
- @EnableScheduling
- @EnableBatchProcessing
- @SpringBootApplication
- public class SpringBatchDemoApplication {
- public static void main(String[] args) {
- SpringApplication.run(SpringBatchDemoApplication.class, args);
- }
- }

启动项目,等待十秒,控制台输出日志如下,证明我们的批处理程序正常执行了:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。