当前位置:   article > 正文

XxlJob + 线程池 + CountDownLatch 导致的任务堵塞_xxljob任务堆积

xxljob任务堆积

背景:系统需要通过xxlJob定时请求外部接口抓取数据,因系统性能问题采用线程池应对并发情况。

核心代码如下:

  1. public void job(){
  2. //业务
  3. List<String> manualContentUrls = Collections.synchronizedList(Lists.newArrayList());
  4. List<String> originVideoUrls = Collections.synchronizedList(coContentPlanDTO.getOriginVideoUrls());
  5. List<String> titles = Collections.synchronizedList(new ArrayList<>());
  6. CountDownLatch countDownLatch = new CountDownLatch(originVideoUrls.size());
  7. //下载执行
  8. for (String originVideoUrl : originVideoUrls) {
  9. threadPool.submit(() -> {
  10. run(originVideoUrl,manualContentUrls,coContentPlanDTO,url,titles);
  11. //计数器减一
  12. countDownLatch.countDown();
  13. });
  14. }
  15. //主线程阻塞
  16. countDownLatch.await();
  17. //业务
  18. }
  19. public void run(String originVideoUrl,
  20. List<String> manualContentUrls,
  21. CoContentPlanDTO coContentPlanDTO,
  22. Map<String, String> url,
  23. List<String> titles){
  24. if (StringUtils.isBlank(originVideoUrl)) {
  25. return;
  26. }
  27. Result<AutoPlanDownInfoDTO> autoPlanDownInfoDTOResult = fileProcessService.downLoadFile4AutoPlan(originVideoUrl);
  28. if (autoPlanDownInfoDTOResult.isFail()) {
  29. //流程节点扭转-----下载失败
  30. UpdateContentPlanFlowDTO failDTO = FlowProcessUtils.buildFlowDTO(PlanFlowPhaseEnums.DOWNLOAD.getCode(), BaseEventEnums.FAIL.getCode(), coContentPlanDTO.getId());
  31. failDTO.setStatus(CoContentPlanStatusEnum.HANDLEING);
  32. Result<String> failFlowResult = FlowProcessUtils.remoteInvoke(url.get("updateFlowNode"), JSONObject.toJSONString(failDTO));
  33. if (failFlowResult.isFail()) {
  34. log.error("[视频下载流程]------执行成功, flow节点扭转失败, 准备退出定时任务循环, 失败的计划id:{}, 失败原因:{}", coContentPlanDTO.getId(), failFlowResult.getMessage());
  35. return;
  36. }
  37. }
  38. AutoPlanDownInfoDTO autoPlanDownInfoDTO = autoPlanDownInfoDTOResult.getData();
  39. String ossUrl = autoPlanDownInfoDTO.getOssUrl();
  40. if (StringUtils.isNotBlank(ossUrl)) {
  41. manualContentUrls.add(ossUrl);
  42. }
  43. titles.add(autoPlanDownInfoDTO.getTitle());
  44. }
           

发现xxlJob任务堆积,日志显示调度成功,但执行器就是没有执行,且没有结果回调

初步分析是xxlJob任务线程被阻塞了,因为我们调度器的阻塞策略配置为:单机串行

对三种阻塞处理策略的理解 (1)单机串行:新的调度任务进入执行器后,该调度任务进入FIFO队列,以串行方式执行。 (2)丢弃后续调度:新的调度任务进入执行器后,若存在相同的正在运行的调度任务,本次调度请求将会被丢弃并被标记为失败。 (3)覆盖之前调度:新的调度任务进入执行器后,若存在相同的正在运行的调度任务,将会终止当前正在运行的调度任务,并清空队列,然后运行本次新的调度任务

一个任务被阻塞导致后面大量任务堆积,长时间不管有可能会出现OOM

开始问题分析,先看日志是那条job日志被阻塞,发现最后一条任务执行的日志线程id为:JobThread-25-1683799794248

再通过jstack -l $pid 查看线程日志,看一下这条线程现在在干嘛,这里可以看到我们的业务代码,线程具体阻塞再哪一行代码,还有阻塞的原因,明显是CountDownLatch这个并发工具搞的鬼。

回到我们的业务代码,发现里面用到了CountDownLatch.await()方法,这里简单介绍一下并发工具类CountDownLatch和常用方法

CountDownLatch 概念

CountDownLatch可以使一个获多个线程等待其他线程各自执行完毕后再执行。

CountDownLatch 定义了一个计数器,和一个阻塞队列, 当计数器的值递减为0之前,阻塞队列里面的线程处于挂起状态,当计数器递减到0时会唤醒阻塞队列所有线程,这里的计数器是一个标志,可以表示一个任务一个线程,也可以表示一个倒计时器,CountDownLatch可以解决那些一个或者多个线程在执行之前必须依赖于某些必要的前提业务先执行的场景。

CountDownLatch 常用方法说明

CountDownLatch(int count); //构造方法,创建一个值为count 的计数器。

await();//阻塞当前线程,将当前线程加入阻塞队列。

await(long timeout, TimeUnit unit);//在timeout的时间之内阻塞当前线程,时间一过则当前线程可以执行,

countDown();//对计数器进行递减1操作,当计数器递减至0时,当前线程会去唤醒阻塞队列里的所有线程。

可以发现我们业务代码块就是用到了CountDownLatch.await()通过判断计数器的数量来阻塞job主线程,所以基本可以把bug定位到计数器减1(countDownLatch.countDown())前一块的业务代码

通过一段时间的业务代码分析发现,countDownLatch.countDown()执行的线程有可能会发生空指针报错,但是却并没有在日志里发现有关空指针的错误(这就很奇怪),空指针报错导致子线程销毁,外面的job主线程的计数器没办法进行减1操作,所以导致job主线程的计数器永远不会归零,一直受到CountDownLatch.await()的阻塞。。。。

解决办法,将业务代码块整个try catch final抱住,countDownLatch.countDown()执行计数器减一操作放在final块儿里,这样无论代码是否报错都会将计数器的数量减一,这样就可以保证计数器必然可以归零的情况,也避免了主线程一直被阻塞的情况。

改完后再重现一下,发现确实是空指针导致xxlJob任务阻塞,问题完美解决

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/546361
推荐阅读
相关标签
  

闽ICP备14008679号