当前位置:   article > 正文

java性能优化之批量处理数据!_java批量处理大量数据

java批量处理大量数据

一、常见场景
场景一:对于百万级别excel数据导入,如果优化?
场景二:高并发下写数据,如何快速响应,如何提升数据处理效率,尽可能减轻数据库的压力?
场景三、大批量消费消息队列消息的时候,如何提升数据处理效率,提高消费效率减少消息挤压?

二、解决方案
1)场景一和场景三解决方案:多线程+批量保存数据库
2)场景二解决方案:异步处理+批量保存数据到数据库

三、异步处理(这里简单聊下,后面有时间再详细讲解)
1、发送数据到消息队列(比较推荐,基本不丢失,实现稍微麻烦)
2、基于guava eventbus实现异步事件(实现简单,性能比较好,重启服务有丢失的风险)
3、基于线程池实现异步处理(实现简单,并发量不大,可以用,重启服务有丢失的风险)

三、批量处理数据的几种方案对比

  1. 1、通过for循环逐条导入100w数据,例如:
  2. for(int i=0;i<1000000;i++){
  3. save(entity);
  4. }
  5. 结果分析::耗时900s
  6. 2024-03-20 17:35:10.592 INFO [dancechar-base-service,XNIO-1 task-1,4be7f2e99e65a1,1546061704931049472] c.l.d.base.biz.student.service.StudentService.saveStuListWithOne:66- 通过for完成百万数据插入!!!总耗时:900s
  7. 2024-03-20 17:35:11.052 INFO [dancechar-base-service,XNIO-1 task-1,4be7f2e99e65a1,1546065486100299776] c.l.dancechar.base.framework.ResponseLogAdvice.beforeBodyWrite:53- 当前请求返回结果:true
  8. 2024-03-20 17:35:11.113 INFO [dancechar-base-service,XNIO-1 task-1,4be7f2e99e65a1,1546065486356152320] c.l.d.framework.common.trace.TraceWebFilter.doFilter:52- 当前请求总耗时:901772ms
  9. 2、通过batch循环批量导入100w数据
  10. saveBatch(entityList);
  11. 结果分析:耗时165s,性能相比1,提供5倍以上
  12. 2024-03-20 18:19:06.704 INFO [dancechar-base-service,XNIO-1 task-2,cad1de9712a53f,1546076540821831680] c.l.d.framework.common.trace.TraceWebFilter.printAccessLog:77- 开始当前请求-/sys/student/saveStuListWithBatch,方法-POST,body参数:{}
  13. 2024-03-20 18:19:06.719 INFO [dancechar-base-service,XNIO-1 task-2,cad1de9712a53f,1546076540842803200] c.l.d.b.biz.student.controller.StudentController.saveStuListWithBatch:51- 批量插入百万数据....
  14. 2024-03-20 18:21:52.258 INFO [dancechar-base-service,XNIO-1 task-2,cad1de9712a53f,1546076540842803200] c.l.d.base.biz.student.service.StudentService.saveStuListWithBatch:77- 批量插入百万数据!!!总耗时:165s
  15. 2024-03-20 18:21:52.664 INFO [dancechar-base-service,XNIO-1 task-2,cad1de9712a53f,1546077236912717824] c.l.dancechar.base.framework.ResponseLogAdvice.beforeBodyWrite:53- 当前请求返回结果:true
  16. 2024-03-20 18:21:52.678 INFO [dancechar-base-service,XNIO-1 task-2,cad1de9712a53f,1546077236975632384] c.l.d.framework.common.trace.TraceWebFilter.doFilter:52- 当前请求总耗时:165981ms
  17. 3、通过多线程batch循环批量导入100w数据
  18. threadPoolTaskExecutor.execute(()->{
  19. saveBatch(entityList);
  20. })
  21. 结果分析:耗时70s,性能相比2,提升2倍以上
  22. 2024-03-20 18:26:48.625 INFO [dancechar-base-service,XNIO-1 task-3,4bca21714d53a8,1546078181767774208] c.l.d.base.biz.student.service.StudentService.saveStuListWithThreadPoolBatch:112- 通过线程池批量插入百万数据!!!总耗时:70s
  23. 2024-03-20 18:26:48.636 INFO [dancechar-base-service,XNIO-1 task-3,4bca21714d53a8,1546078478309261312] c.l.dancechar.base.framework.ResponseLogAdvice.beforeBodyWrite:53- 当前请求返回结果:true
  24. 2024-03-20 18:26:48.644 INFO [dancechar-base-service,XNIO-1 task-3,4bca21714d53a8,1546078478342815744] c.l.d.framework.common.trace.TraceWebFilter.doFilter:52- 当前请求总耗时:70716ms

四、代码演示

controller层

  1. package com.litian.dancechar.base.biz.student.controller;
  2. import com.litian.dancechar.base.biz.student.dto.StudentReqDTO;
  3. import com.litian.dancechar.base.biz.student.dto.StudentRespDTO;
  4. import com.litian.dancechar.base.biz.student.service.StudentService;
  5. import com.litian.dancechar.framework.common.base.PageWrapperDTO;
  6. import com.litian.dancechar.framework.common.base.RespResult;
  7. import com.litian.dancechar.framework.common.util.DCBeanUtil;
  8. import io.swagger.annotations.Api;
  9. import io.swagger.annotations.ApiOperation;
  10. import lombok.extern.slf4j.Slf4j;
  11. import org.springframework.web.bind.annotation.PostMapping;
  12. import org.springframework.web.bind.annotation.RequestBody;
  13. import org.springframework.web.bind.annotation.RequestMapping;
  14. import org.springframework.web.bind.annotation.RestController;
  15. import javax.annotation.Resource;
  16. import java.util.List;
  17. /**
  18. * 学生业务处理
  19. *
  20. * @author kyle0432
  21. * @date 2024/03/20 16:26
  22. */
  23. @Api(tags = "学生相关api")
  24. @RestController
  25. @Slf4j
  26. @RequestMapping(value = "/sys/student/")
  27. public class StudentController {
  28. @Resource
  29. private StudentService studentService;
  30. @ApiOperation(value = "单条插入百万数据", notes = "单条插入百万数据")
  31. @PostMapping("saveStuListWithOne")
  32. public RespResult<Boolean> saveStuListWithOne() {
  33. log.info("单条插入百万数据....");
  34. return studentService.saveStuListWithOne();
  35. }
  36. @ApiOperation(value = "批量插入百万数据", notes = "批量插入百万数据")
  37. @PostMapping("saveStuListWithBatch")
  38. public RespResult<Boolean> saveStuListWithBatch() {
  39. log.info("批量插入百万数据....");
  40. return studentService.saveStuListWithBatch();
  41. }
  42. @ApiOperation(value = "通过线程池批量插入数据", notes = "通过线程池批量插入数据")
  43. @PostMapping("saveStuListWithThreadPoolBatch")
  44. public RespResult<Boolean> saveStuListWithThreadPoolBatch() {
  45. log.info("通过线程池批量插入百万数据....");
  46. return studentService.saveStuListWithThreadPoolBatch();
  47. }
  48. }

service层

  1. package com.litian.dancechar.base.biz.student.service;
  2. import cn.hutool.core.util.ObjectUtil;
  3. import cn.hutool.core.util.RandomUtil;
  4. import cn.hutool.core.util.StrUtil;
  5. import cn.hutool.json.JSONUtil;
  6. import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
  7. import com.github.pagehelper.PageHelper;
  8. import com.google.common.collect.Lists;
  9. import com.litian.dancechar.base.biz.student.dao.entity.StudentDO;
  10. import com.litian.dancechar.base.biz.student.dao.inf.StudentDao;
  11. import com.litian.dancechar.base.biz.student.dto.StudentReqDTO;
  12. import com.litian.dancechar.base.biz.student.dto.StudentRespDTO;
  13. import com.litian.dancechar.base.common.constants.RedisKeyConstants;
  14. import com.litian.dancechar.base.framework.redislistener.common.RedisChannelEnum;
  15. import com.litian.dancechar.base.init.ThreadPoolInit;
  16. import com.litian.dancechar.framework.cache.publish.util.MessagePublishUtil;
  17. import com.litian.dancechar.framework.cache.redis.util.RedisHelper;
  18. import com.litian.dancechar.framework.common.base.PageWrapperDTO;
  19. import com.litian.dancechar.framework.common.base.RespResult;
  20. import com.litian.dancechar.framework.common.util.DCBeanUtil;
  21. import com.litian.dancechar.framework.common.util.PageResultUtil;
  22. import lombok.extern.slf4j.Slf4j;
  23. import org.apache.commons.lang3.time.StopWatch;
  24. import org.springframework.stereotype.Service;
  25. import org.springframework.transaction.annotation.Transactional;
  26. import javax.annotation.Resource;
  27. import java.util.List;
  28. import java.util.concurrent.CountDownLatch;
  29. /**
  30. * 学生服務
  31. *
  32. * @author kyle0432
  33. * @date 2024/03/20 19:30
  34. */
  35. @Service
  36. @Slf4j
  37. @Transactional(rollbackFor = Exception.class)
  38. public class StudentService extends ServiceImpl<StudentDao,StudentDO> {
  39. @Resource
  40. private StudentDao studentDao;
  41. @Resource
  42. private RedisHelper redisHelper;
  43. @Resource
  44. private MessagePublishUtil messagePublishUtil;
  45. /**
  46. * 功能:单条插入百万数据
  47. */
  48. public RespResult<Boolean> saveStuListWithOne() {
  49. StopWatch stopWatch = StopWatch.createStarted();
  50. List<StudentDO> studentList = buildStudentList(1000000);
  51. for(StudentDO studentDO : studentList){
  52. save(studentDO);
  53. }
  54. log.info("通过for完成百万数据插入!!!总耗时:{}s", stopWatch.getTime()/1000);
  55. return RespResult.success(true);
  56. }
  57. /**
  58. * 功能:批量插入数据
  59. */
  60. public RespResult<Boolean> saveStuListWithBatch() {
  61. StopWatch stopWatch = StopWatch.createStarted();
  62. //构建100w条数据
  63. List<StudentDO> studentList = buildStudentList(1000000);
  64. //使用mybatis-plus分批+批量方式插入【每一批1000条数据】
  65. this.saveBatch(studentList);
  66. log.info("批量插入百万数据!!!总耗时:{}s", stopWatch.getTime()/1000);
  67. return RespResult.success(true);
  68. }
  69. /**
  70. * 功能:通过线程池批量插入数据
  71. */
  72. public RespResult<Boolean> saveStuListWithThreadPoolBatch(){
  73. StopWatch stopWatch = StopWatch.createStarted();
  74. // 每次插入1000条数据
  75. List<StudentDO> studentList = this.buildStudentList(1000000);
  76. int dataSize = studentList.size();
  77. int step = 1000;
  78. int totalTasks = (dataSize % step == 0 ? dataSize/step : (dataSize/step + 1));
  79. final CountDownLatch countDownLatch = new CountDownLatch(totalTasks);
  80. for(int j = 0; j < dataSize; j=j+step){
  81. final int start = j;
  82. final int perCount = (dataSize - start) < step ? (dataSize - start) : step;
  83. ThreadPoolInit.getStudentThreadPoolTaskExecutor().execute(()->{
  84. try {
  85. log.info("多线程开始: start == " + start + " , 多线程个数count" + perCount);
  86. saveBatch(studentList.subList(start,perCount+start));
  87. countDownLatch.countDown();
  88. } catch (Exception e) {
  89. log.error(e.getMessage(), e);
  90. }
  91. });
  92. }
  93. try {
  94. countDownLatch.await();
  95. } catch (InterruptedException e) {
  96. log.error(e.getMessage(), e);
  97. }
  98. log.info("通过线程池批量插入百万数据!!!总耗时:{}s", stopWatch.getTime()/1000);
  99. return RespResult.success(true);
  100. }
  101. private List<StudentDO> buildStudentList(int dataSize){
  102. List<StudentDO> studentList = Lists.newArrayList();
  103. String noPrefix = RandomUtil.randomNumbers(3);
  104. for (int i=0; i<dataSize;i++){
  105. StudentDO studentDO = new StudentDO();
  106. studentDO.setNo(noPrefix+ i);
  107. studentList.add(studentDO);
  108. }
  109. return studentList;
  110. }
  111. }

线程池初始化(不同的业务使用不同的线程池)

  1. package com.litian.dancechar.base.init;
  2. import com.litian.dancechar.framework.common.thread.CustomThreadPoolFactory;
  3. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * 线程池初始化(不同的业务使用不同的线程池)
  7. *
  8. * @author kyle0432
  9. * @date 2024/03/20 23:25
  10. */
  11. @Component
  12. public class ThreadPoolInit {
  13. /**
  14. * 公共的线程池
  15. */
  16. private static ThreadPoolTaskExecutor threadPoolTaskExecutor =
  17. CustomThreadPoolFactory.newFixedThreadPool(16);
  18. /**
  19. * 学生业务线程池
  20. */
  21. private static ThreadPoolTaskExecutor studentThreadPoolTaskExecutor =
  22. CustomThreadPoolFactory.newFixedThreadPool(8);
  23. public static ThreadPoolTaskExecutor getThreadPoolTaskExecutor() {
  24. return threadPoolTaskExecutor;
  25. }
  26. public static ThreadPoolTaskExecutor getStudentThreadPoolTaskExecutor() {
  27. return studentThreadPoolTaskExecutor;
  28. }
  29. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/793554
推荐阅读
相关标签
  

闽ICP备14008679号