赞
踩
在项目开发过程中,我们经常会有批量插入的需求,例如:定时统计任务
但是受限于MySQL中 max_allowed_packet 参数限制,5.7版本默认值为4M,这显然不太符合我们的需求,当然我们也可以通过修改此值来适应我们业务,今天分享在不修改此值的情况下,如何在客户端优雅的处理此场景
- @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
- public void submit(List<GroupDayStatistics> dataList) {
- // 清理统计数据
- mapper.delete();
-
- int batchNum = 1000;
- List<Object> tempList = new ArrayList<>(batchNum);
- for (int i = 0, size = dataList.size(); i < size; i++) {
- tempList.add(dataList.get(i));
- if (tempList.size() == 1000 || i + 1 == size) {
- mapper.batchInsert(tempList);
- tempList.clear();
- }
- }
- }
这种写法有个缺点:其他地方要用都得copy一份过去,麻烦
- @Service
- public class BatchInsertUtil<T> {
- /**
- * 批次新增(删除函数 以及 分片新增都将在同一个事务中进行处理)
- *
- * @param batchSize
- * 批次新增大小
- * @param deleteSupplier
- * 清理函数
- * @param insertFunction
- * 批次新增调用函数
- * @param data
- * 需新增数据
- * @return left:删除影响行 right:新增行数
- */
- @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
- public ImmutablePair<Integer, Integer> batchInsert(int batchSize, IntSupplier deleteSupplier,
- Function<List<T>, Integer> insertFunction, List<T> data) {
- int deleteRow = 0;
- // 清理数据
- if (Objects.nonNull(deleteSupplier)) {
- deleteRow = deleteSupplier.getAsInt();
- }
- // 分片分事务插入数据
- List<List<T>> partition = ListUtils.partition(data, batchSize);
- int insertRow = partition.stream().mapToInt(insertFunction::apply).sum();
- return new ImmutablePair<>(deleteRow, insertRow);
- }
- }

- // 注入我们的工具类
- @Resource
- private BatchInsertUtil<需要插入的实体类> batchInsertUtil;
- IntSupplier deleteSupplier = () -> mapper.delete();
- Function<List<需要插入的实体类>, Integer> insertFunction = arg -> mapper.batchInsert(arg);
- batchInsertUtil.batchInsert(1000, deleteSupplier, insertFunction, dataList);
本来到这就结束了,问题很多的小明就说了:
你这个怎么删除与分片新增都在一个事务中,如果我一次性插入的数据过多,这不就是一个大事务了嘛?虽然在一个事务中可以保证原子性,但是我有的场景就是想要他们分别处于不同事务,业务上的一致性我自己保证,你就说能不能做吧!
......
......
......
安排!
- // 这里我们在第二步的基础上引入 编程式事务:
- @Resource
- private PlatformTransactionManager transactionManager;
-
- /**
- * 批次新增(删除函数 以及 分片新增函数都将采取独立事务commit)
- *
- * @param batchSize
- * 批次新增大小
- * @param deleteSupplier
- * 清理函数
- * @param insertFunction
- * 批次新增调用函数
- * @param data
- * 需新增数据
- * @return left:删除影响行 right:新增行数
- */
- @Transactional(rollbackFor = Exception.class, propagation = Propagation.NOT_SUPPORTED)
- public ImmutablePair<Integer, Integer> batchInsertAloneTransaction(int batchSize, IntSupplier deleteSupplier,
- Function<List<T>, Integer> insertFunction, List<T> data) {
-
- TransactionStatus transactionStatus = buildTransactionStatus();
- int deleteRow = 0;
- try {
- // 清理数据
- if (Objects.nonNull(deleteSupplier)) {
- deleteRow = deleteSupplier.getAsInt();
- }
- transactionManager.commit(transactionStatus);
- } catch (Exception e) {
- transactionManager.rollback(transactionStatus);
- }
- // 分片分事务插入数据
- List<List<T>> partition = ListUtils.partition(data, batchSize);
- int insertRow = partition.stream().mapToInt(list -> insertAndCommit(insertFunction, list)).sum();
- return new ImmutablePair<>(deleteRow, insertRow);
- }
-
- /**
- * 新增分片数据并提交事务
- *
- * @param insertFunction
- * 新增函数
- * @param dataList
- * 分片数据
- * @return 新增行数
- */
- private int insertAndCommit(Function<List<T>, Integer> insertFunction, List<T> dataList) {
- TransactionStatus transactionStatus = buildTransactionStatus();
- try {
- Integer apply = insertFunction.apply(dataList);
- transactionManager.commit(transactionStatus);
- return apply;
- } catch (Exception e) {
- transactionManager.rollback(transactionStatus);
- throw e;
- }
- }
-
- /**
- * 构建事务状态
- *
- * @return 结果
- */
- private TransactionStatus buildTransactionStatus() {
- DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();
- defaultTransactionDefinition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
- return transactionManager.getTransaction(defaultTransactionDefinition);
- }
-
- // 注:分片新增任意一个事务操作失败将不对已提交事务产生任何影响,需自行保证数据在业务上的一致性

现在整个世界都变优雅了!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。