赞
踩
名称 | 作用 |
---|---|
JobRepository | 用于注册 Job 的容器 |
JobLauncher | 启动 Job 接口 |
Job | 实际要执行的任务,由一个或多个 Step 构成 |
Step | 每个 Step 都包含 ItemReader、ItemProcessor 和 ItemWriter |
ItemReader | 用来读取数据的接口 |
ItemProcessor | 用来读取数据的接口 |
ItemWriter | 用来输出数据的接口 |
@Configuration @EnableBatchProcessing public class CsvBatchConfig { @Bean public ItemReader<Person> reader() throws Exception { …… return reader; } @Bean public ItemProcessor<Person, Person> processor() { CsvItemProcessor processor = new CsvItemProcessor(); //1 processor.setValidator(csvBeanValidator()); //2 return processor; } @Bean public ItemWriter<Person> writer(DataSource dataSource) {//1 …… return writer; } @Bean public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { …… } @Bean public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { …… return jobLauncher; } @Bean public Job importJob(JobBuilderFactory jobs, Step s1) { return jobs.get("importJob") .incrementer(new RunIdIncrementer()) .flow(s1) //1 .end() .listener(csvJobListener()) //2 .build(); } @Bean public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person,Person> processor) { return stepBuilderFactory .get("step1") .<Person, Person>chunk(65000) //1 .reader(reader) //2 .processor(processor) //3 .writer(writer) //4 .build(); } @Bean public CsvJobListener csvJobListener() { return new CsvJobListener(); } @Bean public Validator<Person> csvBeanValidator() { return new CsvBeanValidator<Person>(); } }
public class CsvJobListener implements JobExecutionListener { // 自定义监听器类 } // 注册监听器 @Bean public CsvJobListener csvJobListener() { return new CsvJobListener(); } @Bean public Job importJob(JobBuilderFactory jobs, Step s1) { return jobs.get("importJob") .incrementer(new RunIdIncrementer()) .flow(s1) .end() .listener(csvJobListener()) //绑定监听器 .build(); }
public class CsvItemProcessor extends ValidatingItemProcessor<Person> {
@Override
public Person process(Person item) throws ValidationException {
super.process(item);
// 具体处理数据的代码
return item;
}
}
// 参数绑定
String path = "people.csv";
jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.addString("input.file.name", path)
.toJobParameters();
jobLauncher.run(importJob,jobParameters);
// 定义 Bean
@Bean
@StepScope
public FlatFileItemReader<Person> reader(@Value("#{jobParameters['input.file.name']}") String pathToFile) throws Exception {
FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();
reader.setResource(new ClassPathResource(pathToFile));
return reader;
}
spring.batch.job.name= #启动时要执行的任务,默认全部
spring.batch.job.enabled=true # 是否自动执行自定义的 Job,默认是
spring.batch.initializer.enabled=true # 是否初始化 Spring Batch 的数据库,默认是
spring.batch.schems=
spring.batch.table-prefix= #设置 Spring batch 的数据库表的前缀
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.3.0.M2</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.pyc</groupId> <artifactId>batch</artifactId> <version>0.0.1-SNAPSHOT</version> <name>batch</name> <description>Demo project for Spring Boot</description> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.8</java.version> </properties> <dependencies> <!-- hibernate-validator 依赖,数据校验用--> <dependency> <groupId>org.hibernate</groupId> <artifactId>hibernate-validator</artifactId> </dependency> <dependency> <groupId>com.oracle</groupId> <artifactId>ojdbc6</artifactId> <version>11.2.0.2.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> <exclusions> <exclusion> <groupId>org.hsqldb</groupId> <artifactId>hsqldb</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
spring.datasource.driverClassName=oracle.jdbc.OracleDriver
spring.datasource.url=jdbc\:oracle\:thin\:@localhost\:1521\:xe
spring.datasource.username=pyc
spring.datasource.password=pyc
spring.batch.job.enabled=true
logging.file=log.log
logging.level.org.springframework.web = DEBUG
pyc,21,汉族,汕尾
ycy,21,汉族,肇庆
yqy,67,汉族,余姚
lb,100,古汉族,江东
qf,100,古汉族,长安
create table PERSON
(
id NUMBER not null primary key,
name varchar(20),
age number,
nation varchar(20),
address varchar(20)
);
package com.pyc.batch.domain; import javax.validation.constraints.Size; public class Person { // 用 JSR-303 注解校验数据 @Size(max = 4,min = 2) private String name; private int age; private String nation; private String address; public String getName() { return name; } public void setName(String name) { this.name = name; } public void setAge(int age) { this.age = age; } public int getAge() { return age; } public void setAddress(String address) { this.address = address; } public String getAddress() { return address; } public void setNation(String nation) { this.nation = nation; } public String getNation() { return nation; } }
package com.pyc.batch.mybatch; import com.pyc.batch.domain.Person; import org.springframework.batch.item.validator.ValidatingItemProcessor; import org.springframework.batch.item.validator.ValidationException; public class CsvItemProcessor extends ValidatingItemProcessor<Person> { @Override public Person process(Person item) throws ValidationException { super.process(item); if(item.getNation().equals("汉族")){ item.setNation("01"); }else { item.setNation("02"); } return item; } }
package com.pyc.batch.mybatch; import org.springframework.batch.item.validator.ValidationException; import org.springframework.batch.item.validator.Validator; import org.springframework.beans.factory.InitializingBean; import javax.validation.ConstraintViolation; import javax.validation.Validation; import javax.validation.ValidatorFactory; import java.util.Set; public class CsvBeanValidator<T> implements Validator<T>, InitializingBean { private javax.validation.Validator validator; @Override public void afterPropertiesSet() throws Exception { ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory(); validator = validatorFactory.usingContext().getValidator(); } @Override public void validate(T t) throws ValidationException { Set<ConstraintViolation<T>> constraintViolations = validator.validate(t); if(constraintViolations.size()>0){ StringBuilder message = new StringBuilder(); for(ConstraintViolation<T> constraintViolation : constraintViolations){ message.append(constraintViolation.getMessage()).append("\n"); } throw new ValidationException(message.toString()); } } }
package com.pyc.batch.mybatch; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionListener; public class CsvJobListener implements JobExecutionListener { long startTime; long endTime; @Override public void beforeJob(JobExecution jobExecution) { startTime = System.currentTimeMillis(); System.out.println("任务处理开始"); } @Override public void afterJob(JobExecution jobExecution) { endTime = System.currentTimeMillis(); System.out.println("任务处理结束"); System.out.println("耗时:" + (endTime - startTime) + "ms"); } }
@Configuration @EnableBatchProcessing public class CsvBatchConfig { @Bean public ItemReader<Person> reader() throws Exception { FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>(); //1 reader.setResource(new ClassPathResource("people.csv")); //2 reader.setLineMapper(new DefaultLineMapper<Person>() {{ //3 setLineTokenizer(new DelimitedLineTokenizer() {{ setNames(new String[] { "name","age", "nation" ,"address"}); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{ setTargetType(Person.class); }}); }}); return reader; } @Bean public ItemProcessor<Person, Person> processor() { CsvItemProcessor processor = new CsvItemProcessor(); //1 processor.setValidator(csvBeanValidator()); //2 return processor; } @Bean public ItemWriter<Person> writer(DataSource dataSource) {//1 JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>(); //2 writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>()); String sql = "insert into person " + "(id,name,age,nation,address) " + "values(hibernate_sequence.nextval, :name, :age, :nation,:address)"; writer.setSql(sql); //3 writer.setDataSource(dataSource); return writer; } @Bean public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setTransactionManager(transactionManager); jobRepositoryFactoryBean.setDatabaseType("oracle"); return jobRepositoryFactoryBean.getObject(); } @Bean public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager)); return jobLauncher; } @Bean public Job importJob(JobBuilderFactory jobs, Step s1) { return jobs.get("importJob") .incrementer(new RunIdIncrementer()) .flow(s1) //1 .end() .listener(csvJobListener()) //2 .build(); } @Bean public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person,Person> processor) { return stepBuilderFactory .get("step1") .<Person, Person>chunk(65000) //1 .reader(reader) //2 .processor(processor) //3 .writer(writer) //4 .build(); } @Bean public CsvJobListener csvJobListener() { return new CsvJobListener(); } @Bean public Validator<Person> csvBeanValidator() { return new CsvBeanValidator<Person>(); } }
package com.pyc.batch.mybatch; import javax.sql.DataSource; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.batch.item.validator.Validator; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.transaction.PlatformTransactionManager; import com.pyc.batch.domain.Person; //@Configuration @EnableBatchProcessing public class TriggerBatchConfig { @Bean @StepScope public FlatFileItemReader<Person> reader(@Value("#{jobParameters['input.file.name']}") String pathToFile) throws Exception { FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>(); //1 reader.setResource(new ClassPathResource(pathToFile)); //2 reader.setLineMapper(new DefaultLineMapper<Person>() {{ //3 setLineTokenizer(new DelimitedLineTokenizer() {{ setNames(new String[] { "name","age", "nation" ,"address"}); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{ setTargetType(Person.class); }}); }}); return reader; } @Bean public ItemProcessor<Person, Person> processor() { CsvItemProcessor processor = new CsvItemProcessor(); //1 processor.setValidator(csvBeanValidator()); //2 return processor; } @Bean public ItemWriter<Person> writer(DataSource dataSource) {//1 JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>(); //2 writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>()); String sql = "insert into person " + "(id,name,age,nation,address) " + "values(hibernate_sequence.nextval, :name, :age, :nation,:address)"; writer.setSql(sql); //3 writer.setDataSource(dataSource); return writer; } @Bean public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setTransactionManager(transactionManager); jobRepositoryFactoryBean.setDatabaseType("oracle"); return jobRepositoryFactoryBean.getObject(); } @Bean public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager)); return jobLauncher; } @Bean public Job importJob(JobBuilderFactory jobs, Step s1) { return jobs.get("importJob") .incrementer(new RunIdIncrementer()) .flow(s1) //1 .end() .listener(csvJobListener()) //2 .build(); } @Bean public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person,Person> processor) { return stepBuilderFactory .get("step1") .<Person, Person>chunk(65000) //1 .reader(reader) //2 .processor(processor) //3 .writer(writer) //4 .build(); } @Bean public CsvJobListener csvJobListener() { return new CsvJobListener(); } @Bean public Validator<Person> csvBeanValidator() { return new CsvBeanValidator<Person>(); } }
package com.pyc.batch.web; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class DemoController { @Autowired JobLauncher jobLauncher; @Autowired Job importJob; public JobParameters jobParameters; @RequestMapping("/read") public String imp(String fileName) throws Exception{ String path = fileName+".csv"; jobParameters = new JobParametersBuilder() .addLong("time", System.currentTimeMillis()) .addString("input.file.name", path) .toJobParameters(); jobLauncher.run(importJob,jobParameters); return "ok"; } }
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。