当前位置:   article > 正文

Java 使用线程池执行大数据量统计任务_java 大批量数据 多线程执行 得到汇总结果

java 大批量数据 多线程执行 得到汇总结果

最近需要对每周生成的日志表进行处理,并且输出结果到另一张表。日志表少的有300万,多有的有上千万条记录。因此打算用多线程来处理数据。在使用线程池时,几个注意点:

1、在入口的地方,直接新建一个线程为执行,然后返回结果,后续通过日志表来跟踪;

2、设置独立的线程名规则,区分自动生成的线程名;

3、直接使用ThreadPoolExecutor,而不是借用Executors类生成;

4、利用Future的阻塞特性来控制全部线程执行结束的时间点;

5、考虑是否有必要增加中断执行的机制;

6、考虑能合成批量操作的地方尽量合成批量操作。

代码参考:

  1. //1.计算线程数
  2. int threadNum = totalCount / StatConstant.SPLIT_NUM;
  3. if (threadNum * StatConstant.SPLIT_NUM < totalCount) {
  4. threadNum++;
  5. }
  6. //2.发起线程
  7. List<Future<Integer>> futureList = new ArrayList<>();
  8. ThreadFactory threadFactory = new ThreadFactoryBuilder()
  9. .setNameFormat("LogHandlerThread-%d")
  10. .build();
  11. ExecutorService executorService = new ThreadPoolExecutor(threadNum, threadNum, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(threadNum), threadFactory);
  12. for (int i = 0; i < threadNum; i++) {
  13. int begin = i * StatConstant.SPLIT_NUM;
  14. int end = (i + 1) * StatConstant.SPLIT_NUM;
  15. if (i == threadNum - 1) {
  16. end = totalCount;
  17. }
  18. Future<Integer> future = executorService.submit(new LogHandlerThread(begin, end, weekNo, applicationContext));
  19. futureList.add(future);
  20. }
  21. //3.记录线程结果
  22. boolean finalResult = true;
  23. for (int i = 0; i < futureList.size(); i++) {
  24. try {
  25. Future<Integer> future = futureList.get(i);
  26. Integer result = future.get();
  27. handleCount += ((result == null) ? 0 : result);
  28. } catch (Exception e) {
  29. weekLog.setMessage(weekLog.getMessage() + "###" + "(ThreadNum=" + i + ")" + e.getMessage());
  30. finalResult = false;
  31. }
  32. }
  33. executorService.shutdown();
  34. //4.执行其他任务...
  35.     public class LogHandlerThread implements Callable<Integer> {
  36.         public LogHandlerThread(Integer begin, Integer end, String weekNo, ApplicationContext applicationContext) {
  37.           //初始..
  38.         }
  39.         @Override
  40.         public Integer call() {
  41.           //执行..
  42.         }
  43.      }

期间还碰上,死锁的问题(org.springframework.dao.DeadlockLoserDataAccessException: PreparedStatementCallback; SQL [INSERT IGNORE INTO tb(...) VALUES (..)]; Deadlock found when trying to get lock; try restarting transaction;),本来想通过批量初始来提高性能,但是表在更新的时候,如果是锁同样的行记录,确实容易出现死锁,没有太好的办法,说明业务逻辑上可能需要适当的调整来规避这种多线程冲突的情况,优先通过优化设计来解决冲突。


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/721445
推荐阅读
相关标签
  

闽ICP备14008679号