赞
踩
SpringBoot高效批量插入百万数据
前言:我相信很多小伙伴和我一样在初学的时候,面对几万几十万数据插入,不知道如何下手,面对百万级别数据时更是不知所措,我们大部分初学者,很多人都喜欢for循环插入数据,或者是开启多个线程,然后分批使用for循环插入,当我们需要将大量数据存储到数据库中时,传统的逐条插入方式显然效率低下,并且容易导致性能瓶颈。而批量插入是一种更加高效的方式,可以大幅提高数据的插入速度,特别是在数据量较大的情况下。本文将介绍如何使用 Spring Boot 实现高效批量插入百万数据,以解决传统逐条插入方式存在的性能问题。我们将使用不同的插入方式来比较。
1.抛出问题
传统的单条插入存在什么问题:
2.前期准备工作
框架:springboot+mybatis plus +mysql
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.4.2</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.33</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.15</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
CREATE TABLE `student` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`age` int DEFAULT NULL,
`addr` varchar(255) DEFAULT NULL,
`addr_Num` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=8497107 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
server:
port: 8090
spring:
datasource:
url: jdbc:mysql://localhost:3306/boot_study?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true
username: root
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
@TableName(value = "student") @Data public class StudentDO { /** 主键 type:自增 */ @TableId(type = IdType.AUTO) private Integer id; /** 名字 */ private String name; /** 年龄 */ private Integer age; /** 地址 */ private String addr; /** 地址号 @TableField:与表字段映射 */ @TableField(value = "addr_num") private String addrNum; public StudentDO(String name, int age, String addr, String addrNum) { this.name = name; this.age = age; this.addr = addr; this.addrNum = addrNum; } }
@RestController @RequestMapping("/student") public class StudentController { @Autowired private StudentMapper studentMapper; @Autowired private StudentService studentService; @Autowired private SqlSessionFactory sqlSessionFactory; @Autowired private ThreadPoolTaskExecutor taskExecutor; @Autowired private PlatformTransactionManager transactionManager; }
public interface StudentService extends IService<StudentDO> {
}
//实现定义
@Service
public class StudentServiceImpl extends ServiceImpl<StudentMapper, StudentDO> implements StudentService {
}
public interface StudentMapper extends BaseMapper<StudentDO> {
@Insert("<script>" +
"insert into student (name, age, addr, addr_num) values " +
"<foreach collection='studentDOList' item='item' separator=','> " +
"(#{item.name}, #{item.age},#{item.addr}, #{item.addrNum}) " +
"</foreach> " +
"</script>")
public int insertSplice(@Param("studentDOList") List<StudentDO> studentDOList);
}
3.测试示例演示
模拟100万条数据不同方式插入
@GetMapping("/for")
public void insertForData () {
long start = System.currentTimeMillis();
for (int i = 0; i < 1000000 ; i++) {
StudentDO StudentDO = new StudentDO("张三"+i, i, "地址"+i, i+"号");
studentMapper.insert(StudentDO);
}
long end = System.currentTimeMillis();
System.out.println("插入数据耗费时间:"+(end-start));
}
结果:实际上不知道等了多久很慢很慢,总体时间差不多半个多小时,因为这里的for循环进行单条插入时,每次都是在获取连接(Connection)、释放连接和资源关闭等操作上,(如果数据量大的情况下)极其消耗资源,导致时间长。
2. xml拼接foreach sql插入(大量数据不建议)
10万条数据插入数据耗费时间:3.554秒
@GetMapping("/sql")
public void insertSqlData () {
long start = System.currentTimeMillis();
ArrayList<StudentDO> arrayList = new ArrayList<>();
for (int i = 0; i < 100000 ; i++) {
StudentDO StudentDO = new StudentDO("张三"+i, i, "地址"+i, i+"号");
arrayList.add(StudentDO);
}
studentMapper.insertSplice(arrayList);
long end = System.currentTimeMillis();
System.out.println("插入数据耗费时间:"+(end-start));
}
结果:我们在Mapper里面是要insert注解拼接,拼接结果就是将所有的数据集成在一条SQL语句的value值上,其由于提交到服务器上的insert语句少了,相就不需要每次获取连接(Connection)、释放连接和资源关闭,网络负载少了,插入的性能有了提高。但是在数据量大的情况下可能会出现内存溢出、解析SQL语句耗时等情况。
3. mybatis-plus 批量插入saveBatch(推荐)
10万条数据插入数据耗费时间:2.481秒,在数据量不大的情况下和上面差不多
50万条数据插入数据耗费时间:12.473秒
@GetMapping("/batch")
public void insertSaveBatchData () {
long start = System.currentTimeMillis();
ArrayList<StudentDO> arrayList = new ArrayList<>();
for (int i = 0; i < 100000 ; i++) {
StudentDO StudentDO = new StudentDO("张三"+i, i, "地址"+i, i+"号");
arrayList.add(StudentDO);
}
studentService.saveBatch(arrayList);
long end = System.currentTimeMillis();
System.out.println("插入数据耗费时间:"+(end-start));
}
结果:使用MyBatis-Plus实现IService接口中批处理saveBatch()方法,可以很明显的看到性能有了提升,我们可以查看一下源码,它的底层实现原理利用分片处理(batchSize = 1000) + 分批提交事务的操作,来提高插入性能,并没有在连接上消耗性能,MySQLJDBC驱动默认情况下忽略saveBatch()方法中的executeBatch()语句,将需要批量处理的一组SQL语句进行拆散,执行时一条一条给MySQL数据库,造成实际上是分片插入,即与单条插入方式相比,有提高,但是性能未能得到实质性的提高。
@GetMapping("/forSaveBatch") public void insertforSaveBatchData () { //创建批量插入SqlSession SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false); StudentMapper studentMapper = sqlSession.getMapper(StudentMapper.class); long start = System.currentTimeMillis(); ArrayList<StudentDO> arrayList = new ArrayList<>(); for (int i = 0; i < 500000 ; i++) { StudentDO StudentDO = new StudentDO("张三"+i, i, "地址"+i, i+"号"); studentMapper.insert(StudentDO); } sqlSession.commit(); sqlSession.close(); long end = System.currentTimeMillis(); System.out.println("插入数据耗费时间:"+(end-start)); }
结果:手动开启批处理,手动处理关闭自动提交事务,共用同一个SqlSession之后,for循环单条插入的性能得到实质性的提高;由于同一个SqlSession省去对资源相关操作的耗能、减少对事务处理的时间等,从而极大程度上提高执行效率。
5. ThreadPoolTaskExecutor分割多线程插入(大数据量强烈推荐)
50万条数据插入数据耗费时间:3。536秒,插入速度直接是前面的4倍,是不是很疑惑这样就快了这么多?
原理:多线程批量插入的过程,首先定义了一个线程池(ThreadPoolTaskExecutor),用于管理线程的生命周期和执行任务。然后,我们将要插入的数据列表按照指定的批次大小分割成多个子列表,并开启多个线程来执行插入操作,通过 TransactionManager 获取事务管理器,并使用 TransactionDefinition 定义事务属性。然后,在每个线程中,我们通过 transactionManager.getTransaction() 方法获取事务状态,并在插入操作中使用该状态来管理事务。
在插入操作完成后,我们再根据操作结果调用transactionManager.commit()或 transactionManager.rollback() 方法来提交或回滚事务。在每个线程执行完毕后,都会调用 CountDownLatch 的 countDown() 方法,以便主线程等待所有线程都执行完毕后再返回。
@GetMapping("/threadPoolInsert") public void insertThreadPoolBatchData () { ArrayList<StudentDO> arrayList = new ArrayList<>(); for (int i = 0; i < 500000 ; i++) { StudentDO StudentDO = new StudentDO("张三"+i, i, "武汉"+i, i+"号"); arrayList.add(StudentDO); } int count = arrayList.size(); int pageSize = 10000; int threadNum = count % pageSize == 0 ? count / pageSize: count / pageSize + 1; CountDownLatch downLatch = new CountDownLatch(threadNum); long start = System.currentTimeMillis(); for (int i = 0; i < threadNum; i++) { //开始序号 int startIndex = i * pageSize; //结束序号 int endIndex = Math.min(count, (i+1)*pageSize); //分割list List<StudentDO> StudentDOs = arrayList.subList(startIndex, endIndex); taskExecutor.execute(() -> { DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); TransactionStatus status = transactionManager.getTransaction(definition); try { studentMapper.insertSplice(StudentDOs); transactionManager.commit(status); }catch (Exception e){ transactionManager.rollback(status); e.printStackTrace(); }finally { //执行完后 计数 downLatch.countDown(); } }); } try { //等待 downLatch.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } long end = System.currentTimeMillis(); System.out.println("插入数据耗费时间:"+(end-start)); }
async:
executor:
thread:
core_pool_size: 35
max_pool_size: 35
queue_capacity: 99999
name:
prefix: async-testDB-
@EnableAsync @Configuration public class ExecutorConfig { @Value("${async.executor.thread.core_pool_size}") private int corePoolSize; @Value("${async.executor.thread.max_pool_size}") private int maxPoolSize; @Value("${async.executor.thread.queue_capacity}") private int queueCapacity; @Value("${async.executor.thread.name.prefix}") private String namePrefix; @Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //设置核心线程数 taskExecutor.setCorePoolSize(corePoolSize); //设置最大线程数 taskExecutor.setMaxPoolSize(maxPoolSize); //设置队列容量 taskExecutor.setQueueCapacity(queueCapacity); //设置线程名前缀 taskExecutor.setThreadNamePrefix(namePrefix); //设置拒绝策略 // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return taskExecutor; } }
//接口服务 public interface AsyncService { void executeAsync(List<StudentDO> studentDOList, CountDownLatch countDownLatch); } //实现类 @Service public class AsyncServiceImpl extends ServiceImpl implements AsyncService { @Autowired private StudentService studentService; @Async("asyncServiceExecutor") @Override public void executeAsync(List<StudentDO> studentDOList, CountDownLatch countDownLatch) { try{ //异步线程要做的事情 studentService.saveBatch(studentDOList); }finally { countDownLatch.countDown();// 很关键, 无论上面程序是否异常必须执行countDown,否则await无法释放 } } }
@GetMapping("/asyncInsertData") public void asyncInsertData() { List<StudentDO> studentDOList = getTestData(); //测试每100条数据插入开一个线程 long start = System.currentTimeMillis(); List<List<StudentDO>> lists = ListUtil.split(studentDOList, 10000); CountDownLatch countDownLatch = new CountDownLatch(lists.size()); for (List<StudentDO> listSub:lists) { asyncService.executeAsync(listSub,countDownLatch); } try { countDownLatch.await(); //保证之前的所有的线程都执行完成,才会走下面的; // 这样就可以在下面拿到所有线程执行完的集合结果 } catch (Exception e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println("插入数据耗费时间:"+(end-start)); } public List<StudentDO> getTestData() { ArrayList<StudentDO> arrayList = new ArrayList<>(); for (int i = 0; i < 500000 ; i++) { StudentDO studentDO = new StudentDO("张三"+i, i, "武汉"+i, i+"号"); arrayList.add(studentDO); } return arrayList; }
50万条数据插入数据耗费时间:2.604秒,这里插入和上面差不多因为他们使用的都是多线程插入
总结:经过上面的示例演示我们心里已经有谱了,知道什么时候该使用哪一种数据插入方式,针对对不同线程数的测试,发现不是线程数越多越好,具体多少合适,通常的算法:CPU核心数量*2 +2 个线程。
实际要根据每个人的电脑配置情况设置合适的线程数,可以根据下面这个公式获取:
int processNum = Runtime.getRuntime().availableProcessors(); // 返回可用处理器的Java虚拟机的数量
int corePoolSize = (int) (processNum / (1 - 0.2));
int maxPoolSize = (int) (processNum / (1 - 0.5));
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。