赞
踩
private LinkedBlockingDeque<Request> queue = new LinkedBlockingDeque(); @Data class Request{ UseLogDto dto; CompletableFuture<Integer> future; public Request(AppUseLogDto dto,CompletableFuture<Integer> future){ this.dto = dto; this.future = future; } } public int saveLog(UseLogDto logDto) throws Exception { //并发队列 CompletableFuture<Integer> future = new CompletableFuture(); Request request = new Request(logDto,future); queue.add(request); return future.get();//阻塞:将响应分发到具体的线程 } @Async @Scheduled(cron = "*/2 * * * * ?") public void logTask() throws Exception { int status = 0; //避免jvm回收request,future需要做回调 List<Request> requests = new ArrayList<>(); try { int size = queue.size(); log.info("扫描发队列长度:" + size); if (size == 0){ return; } List<AppUseLogDto> list = new ArrayList<>(); for (int i = 0;i < size;i++){ Request request = queue.poll(); AppUseLogDto dto = request.getDto(); //获取部门信息 String userId = dto.getUserId(); EmpUser empUser = cacheEmpUser.get(userId); if (empUser == null){ List<EmpUser> empUserList = empUserMapper.checkDeptTree(null, userId); if (!CollectionUtils.isEmpty(empUserList)){ empUser = empUserList.get(0); //缓存部门数据 cacheEmpUser.put(empUser.getStaffNoOld(),empUser); } } String deptCode = empUser != null ? empUser.getDepartCode() : null; String deptName = empUser != null ? empUser.getDepartName() : null; dto.setDeptId(deptCode); dto.setDeptName(deptName); list.add(dto); //避免jvm回收request,future需要做回调 requests.add(request); } //响应通知 status = appUseLogMapper.batchAdd(list); }catch (Exception e){ log.error("请求队列处理发生异常:{}",e); }finally { if (requests.size() > 0){ for (Request request : requests) { request.getFuture().complete(status); } } } }
public int batchAddHistory(){ List<AppUseLogDto> list = select(); if (CollectionUtils.isEmpty(list)) { return 0; } //线程池初始参数、mybatis批量插入算法 int count = 500; int size = list.size(); int nThreads = size % count == 0 ? size / count : size / count + 1; ExecutorService executorService = Executors.newFixedThreadPool(10); List<Future<Integer>> futures = new ArrayList<>(); for (int i = 0; i < nThreads; i++) { int start = i * count; int end = (i + 1) * count; Callable<Integer> task = () -> { //手动开启事务 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); TransactionStatus status = transactionManager.getTransaction(def); List<String> ids = null; try { //拆分集合 List<AppUseLogDto> AppUseLogs = list.subList(start, end > size ? size : end); ids = AppUseLogs.stream().map(dto -> dto.getId()).collect(Collectors.toList()); //同步主表app_use_log数据到历史表 appUseLogMapper.batchAddHistory(AppUseLogs); // 提交事务 transactionManager.commit(status); } catch (Exception e) { // 回滚事务 transactionManager.rollback(status); log.error("同步数据失败:",e); log.error("主表app_use_log同步失败,Ids[:" + ids + "]"); throw new RuntimeException("主表app_use_log同步失败,Ids[:" + ids + "]"); } return 1; }; futures.add(executorService.submit(task)); } //关闭线程池 executorService.shutdown(); if (!futures.isEmpty()) { return nThreads; } return 0; }
public int saveOrUpdate(List<MeteReal> list) {
if (!CollectionUtils.isEmpty(list)){
task.execute(() -> {
int size = list.size();
long l = System.currentTimeMillis();
meteRealMapper.saveOrUpdate(list);
long e = System.currentTimeMillis();
log.info( "mysql 执行 on DUPLICATE KEY UPDATE 修改" + size + "条数据,共耗时:" + (e - l) + "毫秒");
});
}
return 1;
}
<insert id="saveOrUpdate" parameterType="java.util.List">
insert into m_signal_real_data(SignalID,SignalValue,ReportTime) values
<foreach collection="list" item="item" separator=",">
(#{item.signalId}, #{item.value}, #{item.dateTime})
</foreach>
on DUPLICATE KEY UPDATE SignalValue=values(SignalValue),ReportTime=values(ReportTime)
</insert>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。