赞
踩
需求背景如下:
从文件中读取数据并经过业务处理后存储到数据库中,同时避免出现OOM(Out of Memory)
import java.io.*; import java.util.*; import java.nio.file.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; @Service public class FileDataProcessor { @Autowired private JdbcTemplate jdbcTemplate; private static final int BATCH_SIZE = 1000; // 每次处理的记录数 public void processFile(String filePath) { try (BufferedReader reader = Files.newBufferedReader(Paths.get(filePath))) { String line; List<String> batch = new ArrayList<>(); while ((line = reader.readLine()) != null) { batch.add(line); if (batch.size() >= BATCH_SIZE) { processBatch(batch); batch.clear(); } } // 处理最后一批数据 if (!batch.isEmpty()) { processBatch(batch); } } catch (IOException e) { e.printStackTrace(); } } private void processBatch(List<String> batch) { List<ProcessedData> processedDataList = new ArrayList<>(); for (String line : batch) { // 业务处理逻辑 ProcessedData processedData = processLine(line); processedDataList.add(processedData); } saveToDatabase(processedDataList); } private ProcessedData processLine(String line) { // 示例业务处理逻辑 ProcessedData processedData = new ProcessedData(); processedData.setField(line); return processedData; } private void saveToDatabase(List<ProcessedData> processedDataList) { String sql = "INSERT INTO processed_data (field) VALUES (?)"; List<Object[]> batchArgs = new ArrayList<>(); for (ProcessedData data : processedDataList) { batchArgs.add(new Object[] { data.getField() }); } jdbcTemplate.batchUpdate(sql, batchArgs); } public static class ProcessedData { private String field; public String getField() { return field; } public void setField(String field) { this.field = field; } } }
import java.io.*; import java.nio.file.Files; import java.nio.file.Paths; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; import java.util.stream.Stream; @Service public class FileDataProcessor { @Autowired private JdbcTemplate jdbcTemplate; public void processFile(String filePath) { try (Stream<String> lines = Files.lines(Paths.get(filePath))) { lines.forEach(line -> { // 业务处理逻辑 ProcessedData processedData = processLine(line); saveToDatabase(processedData); }); } catch (IOException e) { e.printStackTrace(); } } private ProcessedData processLine(String line) { // 示例业务处理逻辑 ProcessedData processedData = new ProcessedData(); processedData.setField(line); return processedData; } private void saveToDatabase(ProcessedData processedData) { String sql = "INSERT INTO processed_data (field) VALUES (?)"; jdbcTemplate.update(sql, processedData.getField()); } public static class ProcessedData { private String field; public String getField() { return field; } public void setField(String field) { this.field = field; } } }
3.1、配置Spring Batch
首先添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
3.2、创建Batch配置类
import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.FileSystemResource; import org.springframework.transaction.PlatformTransactionManager; import javax.sql.DataSource; @Configuration @EnableBatchProcessing public class BatchConfig { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Autowired private DataSource dataSource; @Bean public FlatFileItemReader<InputData> reader() { FlatFileItemReader<InputData> reader = new FlatFileItemReader<>(); reader.setResource(new FileSystemResource("path/to/your/input/file.csv")); reader.setLineMapper(new DefaultLineMapper<InputData>() {{ setLineTokenizer(new DelimitedLineTokenizer() {{ setNames("field1", "field2"); // 设置CSV文件的列名 }}); setFieldSetMapper(new BeanWrapperFieldSetMapper<InputData>() {{ setTargetType(InputData.class); }}); }}); return reader; } @Bean public ItemProcessor<InputData, ProcessedData> processor() { return inputData -> { ProcessedData processedData = new ProcessedData(); processedData.setField(inputData.getField1() + "-" + inputData.getField2()); return processedData; }; } @Bean public ItemWriter<ProcessedData> writer() { JdbcBatchItemWriter<ProcessedData> writer = new JdbcBatchItemWriter<>(); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); writer.setSql("INSERT INTO processed_data (field) VALUES (:field)"); writer.setDataSource(dataSource); return writer; } @Bean public Job importUserJob(JobRepository jobRepository, Step step1) { return jobBuilderFactory.get("importUserJob") .incrementer(new RunIdIncrementer()) .flow(step1) .end() .build(); } @Bean public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager, ItemReader<InputData> reader, ItemProcessor<InputData, ProcessedData> processor, ItemWriter<ProcessedData> writer) { return stepBuilderFactory.get("step1") .<InputData, ProcessedData>chunk(1000) .reader(reader) .processor(processor) .writer(writer) .build(); } }
在上面的代码中,我们定义了一个名为BatchConfig
的配置类,其中包含了读取CSV文件、处理数据和写入数据库的配置。
请注意,你需要将path/to/your/input/file.csv
替换为你的实际CSV文件路径。
另外,你还需要根据你的数据库表结构修改writer
方法中的SQL语句。
3.3、定义数据模型
public class InputData { private String field1; private String field2; // Getters and setters public String getField1() { return field1; } public void setField1(String field1) { this.field1 = field1; } public String getField2() { return field2; } public void setField2(String field2) { this.field2 = field2; } } public class ProcessedData { private String field; // Getters and setters public String getField() { return field; } public void setField(String field) { this.field = field; } }
3.4、配置数据源
application.properties
文件中配置你的数据库连接信息。spring.datasource.url=jdbc:mysql://localhost:3306/your_database
spring.datasource.username=your_username
spring.datasource.password=your_password
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# Spring Batch specific properties
spring.batch.jdbc.initialize-schema=always
请注意,你需要将your_database
、your_username
和your_password
替换为你的实际数据库信息。
3.5、运行批处理任务
通过CommandLineRunner启动批处理任务
import org.springframework.batch.core.Job; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class BatchApplication implements CommandLineRunner { @Autowired private JobLauncher jobLauncher; @Autowired private Job job; public static void main(String[] args) { SpringApplication.run(BatchApplication.class, args); } @Override public void run(String... args) throws Exception { jobLauncher.run(job, new JobParameters()); } }
通过运行Spring Boot应用程序,上述配置将启动Spring Batch批处理任务,从文件中读取数据,处理数据,并将其存储到数据库中。
import org.apache.commons.io.FileUtils; import org.apache.commons.io.LineIterator; import java.io.File; import java.io.IOException; import java.util.List; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.springframework.batch.item.ItemReader; import org.springframework.stereotype.Component; @Component public class CsvReader implements ItemReader<InputData> { private final File file; private LineIterator iterator; public CsvReader(File file) { this.file = file; } @Override public InputData read() throws Exception { if (iterator == null || !iterator.hasNext()) { return null; } String line = iterator.nextLine(); // 解析CSV行并创建InputData对象 // ... return inputData; } }
在上面的代码中,我们创建了一个名为CsvReader
的类,它实现了ItemReader
接口,用于从CSV文件中读取数据。
请注意,你需要根据你的CSV文件格式和InputData
类来解析CSV行并创建InputData
对象。
Java 8引入了Stream API,它提供了一种简洁的方式来处理集合数据。
你可以使用Stream API来处理文件中的每一行数据。
import java.nio.file.Files; import java.nio.file.Paths; import java.util.stream.Stream; import org.springframework.batch.item.ItemReader; import org.springframework.stereotype.Component; @Component public class LineReader implements ItemReader<String> { private final String filePath; private Stream<String> lines; public LineReader(String filePath) { this.filePath = filePath; } @Override public String read() throws Exception { if (lines == null) { lines = Files.lines(Paths.get(filePath)); } return lines.findFirst().orElse(null); } }
在上面的代码中,我们创建了一个名为LineReader
的类,它实现了ItemReader
接口,用于从文件中逐行读取数据。
请注意,你需要根据你的文件路径来创建LineReader
对象。
import org.apache.commons.io.FileUtils; import org.apache.commons.io.LineIterator; import java.io.File; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.*; public class FileProcessor { @SneakyThrows public static void readInApacheIOWithThreadPool() { // 创建一个最大线程数为10,队列最大数为100的线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100)); // 使用 Apache 的方式逐行读取数据 try (LineIterator fileContents = FileUtils.lineIterator(new File("temp/test.txt"), StandardCharsets.UTF_8.name())) { List<String> lines = new ArrayList<>(); while (fileContents.hasNext()) { String nextLine = fileContents.nextLine(); lines.add(nextLine); // 读取到十万行时,拆分成两个50000行的列表,交给异步线程处理 if (lines.size() == 100000) { processLinesInBatches(lines, threadPoolExecutor); lines.clear(); // 清除已处理的内容 } } // 处理剩余的行 if (!lines.isEmpty()) { processTask(lines); } } finally { threadPoolExecutor.shutdown(); } } private static void processLinesInBatches(List<String> lines, ThreadPoolExecutor threadPoolExecutor) throws InterruptedException, ExecutionException { List<List<String>> partitions = Lists.partition(lines, 50000); List<Future<?>> futureList = new ArrayList<>(); for (List<String> partition : partitions) { Future<?> future = threadPoolExecutor.submit(() -> processTask(partition)); futureList.add(future); } // 等待所有任务执行结束,防止OOM for (Future<?> future : futureList) { future.get(); } } private static void processTask(List<String> lines) { for (String line : lines) { // 模拟业务执行 try { TimeUnit.MILLISECONDS.sleep(10L); } catch (InterruptedException e) { Thread.currentThread().interrupt(); e.printStackTrace(); } } } }
上述代码,当内存达的数据量达到10000的时候,拆封两个任务交给异步线程执行,每个任务分别处理5000行数据。
后续使用future#get(),等待异步线程执行完成之后,主线程才能继续读取数据。
注意:上述代码中,BATCH_SIZE
和LINES_PER_FILE
需要根据实际情况进行调整。
另外,如果需要处理的数据量非常大,可以考虑将文件拆分为更小的部分,然后使用多线程并行处理每个部分。
import java.io.*; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import org.apache.commons.io.FileUtils; import org.apache.commons.io.LineIterator; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class LargeFileProcessor { private static final Logger logger = LogManager.getLogger(LargeFileProcessor.class); private static final int LINES_PER_FILE = 100000; private static final int BATCH_SIZE = 1000; public static void main(String[] args) { try { splitFileAndRead("temp/test.txt"); } catch (Exception e) { logger.error("Failed to process large file", e); } } public static void splitFileAndRead(String largeFileName) throws Exception { // 先将大文件拆分成小文件 List<File> fileList = splitLargeFile(largeFileName); // 创建一个最大线程数为10,队列最大数为100的线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 10, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100)); List<Future<?>> futureList = new ArrayList<>(); for (File file : fileList) { Future<?> future = threadPoolExecutor.submit(() -> { try (Stream<String> inputStream = Files.lines(file.toPath(), StandardCharsets.UTF_8); Connection conn = getConnection()) { List<String> batch = new ArrayList<>(); inputStream.forEach(line -> { batch.add(line); if (batch.size() == BATCH_SIZE) { insertBatch(conn, batch); batch.clear(); } }); if (!batch.isEmpty()) { insertBatch(conn, batch); } } catch (IOException | SQLException e) { logger.error("Error processing file: " + file.getName(), e); } }); futureList.add(future); } for (Future<?> future : futureList) { future.get(); // 等待所有任务执行结束 } threadPoolExecutor.shutdown(); } private static List<File> splitLargeFile(String largeFileName) throws IOException { LineIterator fileContents = FileUtils.lineIterator(new File(largeFileName), StandardCharsets.UTF_8.name()); List<String> lines = new ArrayList<>(); int num = 1; List<File> files = new ArrayList<>(); while (fileContents.hasNext()) { String nextLine = fileContents.nextLine(); lines.add(nextLine); if (lines.size() == LINES_PER_FILE) { createSmallFile(lines, num++, files); } } if (!lines.isEmpty()) { createSmallFile(lines, num, files); } return files; } private static void createSmallFile(List<String> lines, int num, List<File> files) throws IOException { Path filePath = Files.createTempFile("smallfile_" + num, ".txt"); Files.write(filePath, lines, StandardCharsets.UTF_8); files.add(filePath.toFile()); lines.clear(); // 清空lines列表以便重新使用 } private static void insertBatch(Connection conn, List<String> batch) { String insertSQL = "INSERT INTO my_table (column1, column2) VALUES (?, ?)"; try (PreparedStatement pstmt = conn.prepareStatement(insertSQL)) { for (String line : batch) { String[] parts = line.split(","); pstmt.setString(1, parts[0]); pstmt.setString(2, parts[1]); pstmt.addBatch(); } pstmt.executeBatch(); conn.commit(); } catch (SQLException e) { logger.error("Error inserting batch into database", e); } } private static Connection getConnection() throws SQLException { String jdbcUrl = "jdbc:mysql://localhost:3306/mydatabase"; String username = "username"; String password = "password"; Connection conn = DriverManager.getConnection(jdbcUrl, username, password); conn.setAutoCommit(false); // 手动提交事务 return conn; } }
上述代码,首先将大文件分割成多个小文件,然后使用线程池并行处理这些小文件,每个线程处理一个小文件,并将数据批量插入数据库。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。