当前位置:   article > 正文

使用Mybatis批量插入大量数据的实践

使用Mybatis批量插入大量数据的实践

前言

在项目开发过程中,我们经常会有批量插入的需求,例如:定时统计任务

但是受限于MySQL中 max_allowed_packet 参数限制,5.7版本默认值为4M,这显然不太符合我们的需求,当然我们也可以通过修改此值来适应我们业务,今天分享在不修改此值的情况下,如何在客户端优雅的处理此场景

常用基础版

  1. @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
  2. public void submit(List<GroupDayStatistics> dataList) {
  3. // 清理统计数据
  4. mapper.delete();
  5. int batchNum = 1000;
  6. List<Object> tempList = new ArrayList<>(batchNum);
  7. for (int i = 0, size = dataList.size(); i < size; i++) {
  8. tempList.add(dataList.get(i));
  9. if (tempList.size() == 1000 || i + 1 == size) {
  10. mapper.batchInsert(tempList);
  11. tempList.clear();
  12. }
  13. }
  14. }

这种写法有个缺点:其他地方要用都得copy一份过去,麻烦

改进版

  1. @Service
  2. public class BatchInsertUtil<T> {
  3. /**
  4. * 批次新增(删除函数 以及 分片新增都将在同一个事务中进行处理)
  5. *
  6. * @param batchSize
  7. * 批次新增大小
  8. * @param deleteSupplier
  9. * 清理函数
  10. * @param insertFunction
  11. * 批次新增调用函数
  12. * @param data
  13. * 需新增数据
  14. * @return left:删除影响行 right:新增行数
  15. */
  16. @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
  17. public ImmutablePair<Integer, Integer> batchInsert(int batchSize, IntSupplier deleteSupplier,
  18. Function<List<T>, Integer> insertFunction, List<T> data) {
  19. int deleteRow = 0;
  20. // 清理数据
  21. if (Objects.nonNull(deleteSupplier)) {
  22. deleteRow = deleteSupplier.getAsInt();
  23. }
  24. // 分片分事务插入数据
  25. List<List<T>> partition = ListUtils.partition(data, batchSize);
  26. int insertRow = partition.stream().mapToInt(insertFunction::apply).sum();
  27. return new ImmutablePair<>(deleteRow, insertRow);
  28. }
  29. }
使用第一步
  1. // 注入我们的工具类
  2. @Resource
  3. private BatchInsertUtil<需要插入的实体类> batchInsertUtil;
使用第二步
  1. IntSupplier deleteSupplier = () -> mapper.delete();
  2. Function<List<需要插入的实体类>, Integer> insertFunction = arg -> mapper.batchInsert(arg);
  3. batchInsertUtil.batchInsert(1000, deleteSupplier, insertFunction, dataList);

本来到这就结束了,问题很多的小明就说了:

你这个怎么删除与分片新增都在一个事务中,如果我一次性插入的数据过多,这不就是一个大事务了嘛?虽然在一个事务中可以保证原子性,但是我有的场景就是想要他们分别处于不同事务,业务上的一致性我自己保证,你就说能不能做吧!

......

......

......

安排!

进一步优化 

  1. // 这里我们在第二步的基础上引入 编程式事务:
  2. @Resource
  3. private PlatformTransactionManager transactionManager;
  4. /**
  5. * 批次新增(删除函数 以及 分片新增函数都将采取独立事务commit)
  6. *
  7. * @param batchSize
  8. * 批次新增大小
  9. * @param deleteSupplier
  10. * 清理函数
  11. * @param insertFunction
  12. * 批次新增调用函数
  13. * @param data
  14. * 需新增数据
  15. * @return left:删除影响行 right:新增行数
  16. */
  17. @Transactional(rollbackFor = Exception.class, propagation = Propagation.NOT_SUPPORTED)
  18. public ImmutablePair<Integer, Integer> batchInsertAloneTransaction(int batchSize, IntSupplier deleteSupplier,
  19. Function<List<T>, Integer> insertFunction, List<T> data) {
  20. TransactionStatus transactionStatus = buildTransactionStatus();
  21. int deleteRow = 0;
  22. try {
  23. // 清理数据
  24. if (Objects.nonNull(deleteSupplier)) {
  25. deleteRow = deleteSupplier.getAsInt();
  26. }
  27. transactionManager.commit(transactionStatus);
  28. } catch (Exception e) {
  29. transactionManager.rollback(transactionStatus);
  30. }
  31. // 分片分事务插入数据
  32. List<List<T>> partition = ListUtils.partition(data, batchSize);
  33. int insertRow = partition.stream().mapToInt(list -> insertAndCommit(insertFunction, list)).sum();
  34. return new ImmutablePair<>(deleteRow, insertRow);
  35. }
  36. /**
  37. * 新增分片数据并提交事务
  38. *
  39. * @param insertFunction
  40. * 新增函数
  41. * @param dataList
  42. * 分片数据
  43. * @return 新增行数
  44. */
  45. private int insertAndCommit(Function<List<T>, Integer> insertFunction, List<T> dataList) {
  46. TransactionStatus transactionStatus = buildTransactionStatus();
  47. try {
  48. Integer apply = insertFunction.apply(dataList);
  49. transactionManager.commit(transactionStatus);
  50. return apply;
  51. } catch (Exception e) {
  52. transactionManager.rollback(transactionStatus);
  53. throw e;
  54. }
  55. }
  56. /**
  57. * 构建事务状态
  58. *
  59. * @return 结果
  60. */
  61. private TransactionStatus buildTransactionStatus() {
  62. DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();
  63. defaultTransactionDefinition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
  64. return transactionManager.getTransaction(defaultTransactionDefinition);
  65. }
  66. // 注:分片新增任意一个事务操作失败将不对已提交事务产生任何影响,需自行保证数据在业务上的一致性

结语

现在整个世界都变优雅了!

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/801944
推荐阅读
相关标签
  

闽ICP备14008679号