当前位置:   article > 正文

Spring Boot 集成 批处理框架Spring batch_url7.me/kb6v

url7.me/kb6v

Spring Batch是一个轻量级的框架,完全面向Spring的批处理框架,用于企业级大量的数据读写处理系统。以POJO和Spring 框架为基础,包括日志记录/跟踪,事务管理、 作业处理统计工作重新启动、跳过、资源管理等功能。

       业务方案:

1、批处理定期提交。

2、并行批处理:并行处理工作。

3、企业消息驱动处理

4、大规模的并行处理

5、手动或是有计划的重启

6、局部处理:跳过记录(如:回滚)

      技术目标:

1、利用Spring编程模型:使程序员专注于业务处理,让Spring框架管理流程。

2、明确分离批处理的执行环境和应用。

3、提供核心的,共通的接口。

4、提供开箱即用(out of the box)的简单的默认的核心执行接口。

5、提供Spring框架中配置、自定义、和扩展服务。

6、所有存在的核心服务可以很容的被替换和扩展,不影响基础层。

7、提供一个简单的部署模式,利用Maven构建独立的Jar文件。

实现步骤:

1.定义处理对象

2.创建中间转换器 ***ItemProcessor 实现 ItemProcessor<I,O>接口

3.创建工作Job BatchConfiguration 主要处理读数据、处理数据、写数据等操作

4.创建listener job执行监听器

 

 

一般的批处理系统需要处理大量的数据, 内部消化单条记录失败的情况, 还要管理中断,在重启后也不去重复执行已经处理过的部分

 

 Spring Batch单/多处理单元(processors), 以及多个微线程(tasklets)

具体实现过程:

1.引入jar:

  1. <!-- Spring-boot启动项目 -->
  2. <parent>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-parent</artifactId>
  5. <version>1.3.1.RELEASE</version>
  6. </parent>
  7. <properties>
  8. <java.version>1.8</java.version>
  9. </properties>
  10. <!--Spring batch核心包->
  11. <dependencies>
  12. <dependency>
  13. <groupId>org.springframework.boot</groupId>
  14. <artifactId>spring-boot-starter-batch</artifactId>
  15. </dependency>
  16. <dependency>
  17. <groupId>mysql</groupId>
  18. <artifactId>mysql-connector-java</artifactId>
  19. </dependency>
  20. </dependencies>

2.配置application.properties :配置数据源

  1. ######mysql\u6570\u636E\u6E90#########
  2. spring.datasource.driver-class-name=com.mysql.jdbc.Driver
  3. spring.datasource.password= wbw123456
  4. spring.datasource.url= jdbc:mysql://localhost/mydatabase
  5. spring.datasource.username=root
  6. <a target=_blank href="http://projects.spring.io/spring-batch/">点击打开链接</a>


3.新建实体类
  1. package com.my.gs.batch.processing.domain;
  2. public class Person {
  3. //ID
  4. private Integer personId;
  5. //姓名
  6. private String personName;
  7. //年龄
  8. private String personAge;
  9. //性别
  10. private String personSex;
  11. public Person(){};
  12. public Person( String personName, String personAge,
  13. String personSex) {
  14. this.personName = personName;
  15. this.personAge = personAge;
  16. this.personSex = personSex;
  17. }
  18. public Integer getPersonId() {
  19. return personId;
  20. }
  21. public void setPersonId(Integer personId) {
  22. this.personId = personId;
  23. }
  24. public String getPersonName() {
  25. return personName;
  26. }
  27. public void setPersonName(String personName) {
  28. this.personName = personName;
  29. }
  30. public String getPersonAge() {
  31. return personAge;
  32. }
  33. public void setPersonAge(String personAge) {
  34. this.personAge = personAge;
  35. }
  36. public String getPersonSex() {
  37. return personSex;
  38. }
  39. public void setPersonSex(String personSex) {
  40. this.personSex = personSex;
  41. }
  42. }

4.中间转换器:

  1. package com.my.gs.batch.processing.itemprocessor;
  2. import java.sql.ResultSet;
  3. import java.sql.SQLException;
  4. import java.util.List;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.batch.item.ItemProcessor;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.jdbc.core.JdbcTemplate;
  10. import org.springframework.jdbc.core.RowMapper;
  11. import com.my.gs.batch.processing.domain.Person;
  12. /**
  13. * 中间转换器
  14. * @author wbw
  15. *
  16. */
  17. public class PersonItemProcessor implements ItemProcessor<Person, Person> {
  18. //查询
  19. private static final String GET_PRODUCT = "select * from Person where personName = ?";
  20. private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);
  21. @Autowired
  22. private JdbcTemplate jdbcTemplate;
  23. @Override
  24. public Person process(final Person person) throws Exception {
  25. List<Person> personList = jdbcTemplate.query(GET_PRODUCT, new Object[] {person.getPersonName()}, new RowMapper<Person>() {
  26. @Override
  27. public Person mapRow( ResultSet resultSet, int rowNum ) throws SQLException {
  28. Person p = new Person();
  29. p.setPersonName(resultSet.getString(1));
  30. p.setPersonAge(resultSet.getString(2));
  31. p.setPersonSex(resultSet.getString(3));
  32. return p;
  33. }
  34. });
  35. if(personList.size() >0){
  36. log.info("该数据已录入!!!");
  37. }
  38. String sex = null;
  39. if(person.getPersonSex().equals("0")){
  40. sex ="男";
  41. }else{
  42. sex ="女";
  43. }
  44. log.info("转换 (性别:"+person.getPersonSex()+") 为 (" + sex + ")");
  45. final Person transformedPerson = new Person(person.getPersonName(), person.getPersonAge(),sex);
  46. log.info("转换 (" + person + ") 为 (" + transformedPerson + ")");
  47. return transformedPerson;
  48. }
  49. }
5.处理具体工作业务  主要包含三个部分:读数据、处理数据、写数据

  1. package com.my.gs.batch.processing.configuration;
  2. import javax.sql.DataSource;
  3. import org.springframework.batch.core.Job;
  4. import org.springframework.batch.core.JobExecutionListener;
  5. import org.springframework.batch.core.Step;
  6. import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
  7. import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
  8. import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
  9. import org.springframework.batch.core.launch.support.RunIdIncrementer;
  10. import org.springframework.batch.item.ItemProcessor;
  11. import org.springframework.batch.item.ItemReader;
  12. import org.springframework.batch.item.ItemWriter;
  13. import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
  14. import org.springframework.batch.item.database.JdbcBatchItemWriter;
  15. import org.springframework.batch.item.file.FlatFileItemReader;
  16. import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
  17. import org.springframework.batch.item.file.mapping.DefaultLineMapper;
  18. import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
  19. import org.springframework.beans.factory.annotation.Qualifier;
  20. import org.springframework.context.annotation.Bean;
  21. import org.springframework.context.annotation.Configuration;
  22. import org.springframework.core.io.ClassPathResource;
  23. import org.springframework.jdbc.core.JdbcTemplate;
  24. import com.my.gs.batch.processing.domain.Person;
  25. import com.my.gs.batch.processing.itemprocessor.PersonItemProcessor;
  26. /**
  27. * 处理具体工作业务 主要包含三个部分:读数据、处理数据、写数据
  28. * @author wbw
  29. *
  30. */
  31. @Configuration
  32. @EnableBatchProcessing
  33. public class PersonBatchConfiguration {
  34. //插入语句
  35. private static final String PERSON_INSERT = "INSERT INTO Person (personName, personAge,personSex) VALUES (:personName, :personAge,:personSex)";
  36. public static final String Person_INSERT = "INSERT INTO Person (id, name,description,quantity) VALUES (:id, :name,:description,:quantity)";
  37. // tag::readerwriterprocessor[] 1.读数据
  38. @Bean
  39. public ItemReader<Person> reader() {
  40. FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();
  41. //加载外部文件数据 文件类型:CSV
  42. reader.setResource(new ClassPathResource("sample-data.csv"));
  43. reader.setLineMapper(new DefaultLineMapper<Person>() {{
  44. setLineTokenizer(new DelimitedLineTokenizer() {{
  45. setNames(new String[] { "personName","personAge","personSex" });
  46. }});
  47. setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
  48. setTargetType(Person.class);
  49. }});
  50. }});
  51. return reader;
  52. }
  53. //2.处理数据
  54. @Bean
  55. public PersonItemProcessor processor() {
  56. return new PersonItemProcessor();
  57. }
  58. //3.写数据
  59. @Bean
  60. public ItemWriter<Person> writer(DataSource dataSource) {
  61. JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();
  62. writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
  63. writer.setSql(PERSON_INSERT);
  64. writer.setDataSource(dataSource);
  65. return writer;
  66. }
  67. // end::readerwriterprocessor[]
  68. // tag::jobstep[]
  69. @Bean
  70. public Job importUserJob(JobBuilderFactory jobs, @Qualifier("step1")Step s1, JobExecutionListener listener) {
  71. return jobs.get("importUserJob")
  72. .incrementer(new RunIdIncrementer())
  73. .listener(listener)
  74. .flow(s1)
  75. .end()
  76. .build();
  77. }
  78. @Bean
  79. public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader,
  80. ItemWriter<Person> writer, ItemProcessor<Person, Person> processor) {
  81. return stepBuilderFactory.get("step1")
  82. .<Person, Person> chunk(10)
  83. .reader(reader)
  84. .processor(processor)
  85. .writer(writer)
  86. .build();
  87. }
  88. // end::jobstep[]
  89. @Bean
  90. public JdbcTemplate jdbcTemplate(DataSource dataSource) {
  91. return new JdbcTemplate(dataSource);
  92. }
  93. }
6.监听器:用于处理任务执行之后和之前
  1. <pre name="code" class="java">package com.my.gs.batch.processing.listener;
  2. import java.sql.ResultSet;
  3. import java.sql.SQLException;
  4. import java.util.List;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.batch.core.BatchStatus;
  8. import org.springframework.batch.core.JobExecution;
  9. import org.springframework.batch.core.listener.JobExecutionListenerSupport;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.jdbc.core.JdbcTemplate;
  12. import org.springframework.jdbc.core.RowMapper;
  13. import org.springframework.stereotype.Component;
  14. import com.my.gs.batch.processing.domain.Person;
  15. /**
  16. * Job执行监听器
  17. * @author wbw
  18. *
  19. */
  20. @Component
  21. public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
  22. private static final String PERSON_SQL = "SELECT personName, personAge,personSex FROM Person";
  23. private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
  24. @Autowired
  25. private JdbcTemplate jdbcTemplate;
  26. @Autowired
  27. public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
  28. if(this.jdbcTemplate==null){
  29. this.jdbcTemplate = jdbcTemplate;
  30. }
  31. }
  32. @Override
  33. public void afterJob(JobExecution jobExecution) {
  34. if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
  35. log.info("!!! JOB 执行完成!");
  36. List<Person> results = jdbcTemplate.query(PERSON_SQL, new RowMapper<Person>() {
  37. @Override
  38. public Person mapRow(ResultSet rs, int row) throws SQLException {
  39. return new Person(rs.getString(1), rs.getString(2),rs.getString(3));
  40. }
  41. });
  42. log.info("入库条数---------"+results.size());
  43. for (Person person : results) {
  44. log.info("新增 <" + person.getPersonName() + "> 成功!!!!!");
  45. }
  46. }
  47. }
  48. /* (non-Javadoc)
  49. * @see org.springframework.batch.core.listener.JobExecutionListenerSupport#beforeJob(org.springframework.batch.core.JobExecution)
  50. */
  51. @Override
  52. public void beforeJob(JobExecution jobExecution) {
  53. // TODO Auto-generated method stub
  54. super.beforeJob(jobExecution);
  55. }
  56. }

 
7.新建csv文件
<img src="" alt="" />
</pre><pre code_snippet_id="1563739" snippet_file_name="blog_20160124_10_8470765" name="code" class="java">8.启动执行
  1. <pre name="code" class="java">package com.my.batch;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class Application {
  6. public static void main(String[] args) throws Exception {
  7. SpringApplication.run(Application.class, args);
  8. }
  9. }

更多相关资源:http://projects.spring.io/spring-batch/
 
 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/262957
推荐阅读
相关标签
  

闽ICP备14008679号