当前位置:   article > 正文

高并发接口实现、大数据批量插入、修改、删除优化_java批量修改接口

java批量修改接口

Java8实现高并发接口

 private LinkedBlockingDeque<Request> queue = new LinkedBlockingDeque();
 
 @Data
 class Request{
     UseLogDto dto;
     CompletableFuture<Integer> future;
     public Request(AppUseLogDto dto,CompletableFuture<Integer> future){
         this.dto = dto;
         this.future = future;
     }
 }

 public int saveLog(UseLogDto logDto) throws Exception {
    //并发队列
    CompletableFuture<Integer> future = new CompletableFuture();
    Request request = new Request(logDto,future);
    queue.add(request);
    
    return future.get();//阻塞:将响应分发到具体的线程
 }

 @Async
 @Scheduled(cron = "*/2 * * * * ?")
 public void logTask() throws Exception {
    int status = 0;
    //避免jvm回收request,future需要做回调
    List<Request> requests = new ArrayList<>();
    try {
        int size = queue.size();
        log.info("扫描发队列长度:" + size);
        if (size == 0){
            return;
        }
        List<AppUseLogDto> list = new ArrayList<>();
        for (int i = 0;i < size;i++){
            Request request = queue.poll();
            AppUseLogDto dto = request.getDto();
            //获取部门信息
            String userId = dto.getUserId();
            EmpUser empUser = cacheEmpUser.get(userId);
            if (empUser == null){
                List<EmpUser> empUserList = empUserMapper.checkDeptTree(null, userId);
                if (!CollectionUtils.isEmpty(empUserList)){
                    empUser = empUserList.get(0);
                    //缓存部门数据
                    cacheEmpUser.put(empUser.getStaffNoOld(),empUser);
                }
            }
            String deptCode = empUser != null ? empUser.getDepartCode() : null;
            String deptName = empUser != null ? empUser.getDepartName() : null;
            dto.setDeptId(deptCode);
            dto.setDeptName(deptName);
            list.add(dto);
            //避免jvm回收request,future需要做回调
            requests.add(request);
        }
        //响应通知
        status = appUseLogMapper.batchAdd(list);
    }catch (Exception e){
        log.error("请求队列处理发生异常:{}",e);
    }finally {
        if (requests.size() > 0){
            for (Request request : requests) {
                request.getFuture().complete(status);
            }
        }
    }
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68

多线程批量插入20w数据30秒完成

 public int batchAddHistory(){
    List<AppUseLogDto> list = select();
    if (CollectionUtils.isEmpty(list)) {
        return 0;
    }
    //线程池初始参数、mybatis批量插入算法
    int count = 500;
    int size = list.size();
    int nThreads = size % count == 0 ? size / count : size / count + 1;
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    List<Future<Integer>> futures = new ArrayList<>();
    for (int i = 0; i < nThreads; i++) {
        int start = i * count;
        int end = (i + 1) * count;
        Callable<Integer> task = () -> {
            //手动开启事务
            DefaultTransactionDefinition def = new DefaultTransactionDefinition();
            def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
            TransactionStatus status = transactionManager.getTransaction(def);
            List<String> ids = null;
            try {
                //拆分集合
                List<AppUseLogDto> AppUseLogs = list.subList(start, end > size ? size : end);
                ids = AppUseLogs.stream().map(dto -> dto.getId()).collect(Collectors.toList());
                //同步主表app_use_log数据到历史表
                appUseLogMapper.batchAddHistory(AppUseLogs);
                // 提交事务
                transactionManager.commit(status);
            } catch (Exception e) {
                // 回滚事务
                transactionManager.rollback(status);
                log.error("同步数据失败:",e);
                log.error("主表app_use_log同步失败,Ids[:" + ids + "]");
                throw new RuntimeException("主表app_use_log同步失败,Ids[:" + ids + "]");
            }
            return 1;
        };
        futures.add(executorService.submit(task));
    }
    //关闭线程池
    executorService.shutdown();
    if (!futures.isEmpty()) {
        return nThreads;
    }
    return 0;
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

存在数据则更新,不存在数据则新增

 public int saveOrUpdate(List<MeteReal> list) {
     if (!CollectionUtils.isEmpty(list)){
         task.execute(() -> {
             int size = list.size();
             long l = System.currentTimeMillis();
             meteRealMapper.saveOrUpdate(list);
             long e = System.currentTimeMillis();
             log.info( "mysql 执行 on DUPLICATE KEY UPDATE 修改" + size + "条数据,共耗时:" + (e - l) + "毫秒");
         });
     }
     return 1;
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

saveOrUpdate方法mapper实现

 <insert id="saveOrUpdate" parameterType="java.util.List">
     insert into m_signal_real_data(SignalID,SignalValue,ReportTime) values
     <foreach collection="list" item="item" separator=",">
         (#{item.signalId}, #{item.value}, #{item.dateTime})
     </foreach>
     on DUPLICATE KEY UPDATE SignalValue=values(SignalValue),ReportTime=values(ReportTime)
 </insert>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/330076
推荐阅读
相关标签
  

闽ICP备14008679号