当前位置:   article > 正文

探索CompletableFuture:高效异步编程的利器_competablefuture之批量处理

competablefuture之批量处理

目录

一、CompletableFuture基本功能安利

二、CompletableFuture使用介绍

(一)任务创建使用

1.supplyAsync创建带有返回值的异步任务

2.runAsync创建没有返回值的异步任务

(二)异步回调使用

1.异步回调:thenApply和thenApplyAsync

2.异步回调:thenCompose和thenComposeAsync

3.异步回调:thenAccept和thenAcceptAsync

4.异步回调:thenRun和thenRunAsync

(三)多任务组合使用

1.多任务组合:thenCombine

2.多任务组合:thenAcceptBoth

3.多任务组合:runAfterBoth

4.多任务组合:applyToEither

5.多任务组合:acceptEither

6.多任务组合:runAfterEither

7.多任务组合:allOf

8.多任务组合:anyOf

(四)结果处理使用

1.结果处理:whenComplete

2.结果处理:exceptionally

3.结果处理:handle

(五)方法混合使用举例

1.多个方法组合使用

2.并发处理批量任务

A.模拟规定校验商品敏感词信息归类

B.模拟从一个上传的商品信息中提炼商品内含的文本数据

C.模拟处理一个商品信息中的敏感词命中情况统计,异步并发

D.模拟并发处理批量任务

E.结果展示

参考文章


干货分享,感谢您的阅读!

随着现代软件系统的复杂性和用户需求的多样化,异步编程成为了提升系统性能和响应速度的重要手段。在Java领域,CompletableFuture作为Java 8引入的新特性,提供了强大的异步编程能力,极大地简化了多线程和并发任务的处理。本文将探讨CompletableFuture的基本功能和使用方法,介绍如何利用其提升程序的并发性能和代码的可维护性。

其原理相关内容见:CompletableFuture回调机制的设计与实现_张彦峰ZYF的博客-CSDN博客

一、CompletableFuture基本功能安利

CompletableFuture是JDK8中的新特性,主要用于对JDK5中加入的Future的补充,实现了CompletionStage和Future接口。其基本功能主要如下图,作为一项利器在我们的平时开发中频繁使用。

一般使用上我们主要集中在任务创建、异步回调、多任务组合、结果处理和结果获取,本文主要针对各部分主要功能进行基本的安利使用,同时附上一些源码的文章供分析。

二、CompletableFuture使用介绍

假设我们有一个敏感词系统,可以针对各种信息进行基本的敏感词验证,其核心主要包括三部分功能:文本清洗、敏感词验证、敏感词干预生效。

  1. 第一步,文本清洗针对验证的文本将直接去除其中的特殊符号、表情包、隐藏符号、中文简体繁体转换等内容,只保留文本中含有的中英文和数字信息。
  2. 第二步,清洗后,将清洗文本与实际相关的词库进行验证来查看是否命中相关敏感词,并给出词库命中的敏感词信息。
  3. 第三步,如果存在词库命中的敏感词,同时考虑各词是否有加白以及生效规则(只对某区域生效等),从而确定最终的命中信息。

以上是业务基背景,相关可以采用责任链管道模式实现可见责任链模式(以及变种管道模式)的应用案例_张彦峰ZYF的博客-CSDN博客_责任链模式应用场景责任链在实际开发中的应用还是比较多的,特别是在营销订购系统、审核流转换处理、任务流程处理系统等系统中,其实我们在开发中往往主要应用的主要无非是以下三个场景(起码以我的平时开发的角度来看):一是无需太关心责任链中各处理流的顺序的简单使用;二是需要关注处理顺序,按责任链条延续处理,每个处理节点均可对请求进行节点的处理, 或将其传递给链上的下个处理节点;三是在处理中和纯的责任链模式在链上只会有一个处理器用于处理业务数据存在差异,需要进行管道模式采用多个处理器都会处理业务数据。针对以上场景进行业务举例和代码书写https://blog.csdn.net/xiaofeng10330111/article/details/123956717?spm=1001.2014.3001.5501其中含有基本业务流程图可方便理解。

(一)任务创建使用

1.supplyAsync创建带有返回值的异步任务

  1. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
  2. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

我们限定创建用于清洗用户文本有返回值的异步任务,采用以下两种方式进行展示其基本创建:

  • 使用默认线程池(ForkJoinPool.commonPool())异步任务创建-有返回值的异步任务supplyAsync
  • 自定义线程池异步任务创建-有返回值的异步任务supplyAsync

使用举例代码:

  1. /**
  2. * @author yanfengzhang
  3. * @description CompletableFuture使用
  4. * @date 2022/12/29 23:45
  5. */
  6. @RunWith(SpringRunner.class)
  7. @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
  8. @Slf4j
  9. public class CompletableFutureTest {
  10. @Autowired
  11. private SensitivePipelineExecutor sensitivePipelineExecutor;
  12. /**
  13. * 异步任务创建:有返回值的异步任务supplyAsync
  14. * 使用默认线程池 + 自定义线程池
  15. */
  16. @Test
  17. public void testSupplyAsync() throws ExecutionException, InterruptedException {
  18. log.info("异步任务创建-有返回值的异步任务supplyAsync:使用默认线程池(ForkJoinPool.commonPool())");
  19. CompletableFuture<String> contentCleanTaskByDefault = CompletableFuture.supplyAsync(() -> {
  20. log.info("异步任务获取用户文本清洗结果:用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】");
  21. ContentCleanResContext contentCleanResContext = sensitivePipelineExecutor.getContentCleanRes(
  22. ContentInfoContext.builder()
  23. .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰")
  24. .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰")
  25. .contentAttr(ContentAttr.builder().build()).build());
  26. return contentCleanResContext.getCleanContent();
  27. });
  28. log.info("异步任务获取用户文本清洗结果:【{}】", contentCleanTaskByDefault.get());
  29. log.info("异步任务创建-有返回值的异步任务supplyAsync:自定义线程池");
  30. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
  31. 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
  32. private AtomicInteger threadCount = new AtomicInteger(1);
  33. @Override
  34. public Thread newThread(Runnable r) {
  35. return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
  36. }
  37. });
  38. CompletableFuture<String> contentCleanTaskByDefine = CompletableFuture.supplyAsync(() -> {
  39. log.info("异步任务获取用户文本清洗结果:用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】");
  40. ContentCleanResContext contentCleanResContext = sensitivePipelineExecutor.getContentCleanRes(
  41. ContentInfoContext.builder()
  42. .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰")
  43. .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰")
  44. .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build());
  45. return contentCleanResContext.getCleanContent();
  46. }, threadPoolExecutor);
  47. log.info("异步任务获取用户文本清洗结果:【{}】", contentCleanTaskByDefine.get());
  48. }
  49. }

测试结果展示:

重点关注打印日志“异步任务获取用户文本清洗结果:用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】”以及各自结果“异步任务获取用户文本清洗结果:【中国张彦峰外卖】”,管道相关处理日志暂时忽略。

  1. 2023-01-22 10:47:48,349 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 64] - 异步任务创建-有返回值的异步任务supplyAsync:使用默认线程池(ForkJoinPool.commonPool())
  2. 2023-01-22 10:47:48,354 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 66] - 异步任务获取用户文本清洗结果:用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】
  3. 2023-01-22 10:47:48,356 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, contentAttr=ContentAttr(belong=null, cityCode=null, source=null, sourceId=null, bizType=null))
  4. 2023-01-22 10:47:48,398 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 34 ms to scan 1 urls, producing 4 keys and 18 values
  5. 2023-01-22 10:47:48,434 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 34 ms to scan 1 urls, producing 4 keys and 13 values
  6. 2023-01-22 10:47:48,466 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 31 ms to scan 1 urls, producing 3 keys and 9 values
  7. 2023-01-22 10:47:48,877 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, cleanContent=中国张彦峰外卖, contentAttr=ContentAttr(belong=null, cityCode=null, source=null, sourceId=null, bizType=null))
  8. 2023-01-22 10:47:48,878 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 75] - 异步任务获取用户文本清洗结果:【中国张彦峰外卖】
  9. 2023-01-22 10:47:48,878 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 77] - 异步任务创建-有返回值的异步任务supplyAsync:自定义线程池
  10. 2023-01-22 10:47:48,879 [thread-processor-1] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 88] - 异步任务获取用户文本清洗结果:用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】
  11. 2023-01-22 10:47:48,880 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  12. 2023-01-22 10:47:48,882 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, cleanContent=中国张彦峰外卖, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  13. 2023-01-22 10:47:48,882 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 96] - 异步任务获取用户文本清洗结果:【中国张彦峰外卖】

2.runAsync创建没有返回值的异步任务

  1. public static CompletableFuture<Void> runAsync(Runnable runnable)
  2. public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)

我们限定创建用于清洗用户文本无返回值的异步任务,采用以下两种方式进行展示其基本创建:

  • 使用默认线程池(ForkJoinPool.commonPool())异步任务创建-无返回值的异步任务supplyAsync
  • 自定义线程池异步任务创建-无返回值的异步任务supplyAsync

因为无返回值,所以一般在处理中,异步任务对用户文本清洗并刷新数据到指定缓存,并"异步通知进行消息转发,触达各消费端进行后续清理

使用举例代码:

  1. /**
  2. * @author yanfengzhang
  3. * @description CompletableFuture使用
  4. * @date 2022/12/29 23:45
  5. */
  6. @RunWith(SpringRunner.class)
  7. @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
  8. @Slf4j
  9. public class CompletableFutureTest {
  10. /**
  11. * 异步任务创建:没有返回值的异步任务runAsync
  12. * 使用默认线程池 + 自定义线程池
  13. */
  14. @Test
  15. public void testRunAsync() throws ExecutionException, InterruptedException {
  16. log.info("异步任务创建-有返回值的异步任务runAsync:使用默认线程池(ForkJoinPool.commonPool())");
  17. CompletableFuture<Void> contentCleanTaskByDefault = CompletableFuture.runAsync(() -> {
  18. log.info("异步任务对用户文本清洗并刷新数据");
  19. log.info("异步通知进行消息转发,触达各消费端进行后续清理");
  20. });
  21. log.info("异步任务创建-有返回值的异步任务runAsync:自定义线程池");
  22. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
  23. 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
  24. private AtomicInteger threadCount = new AtomicInteger(1);
  25. @Override
  26. public Thread newThread(Runnable r) {
  27. return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
  28. }
  29. });
  30. CompletableFuture<Void> contentCleanTaskByDefine = CompletableFuture.runAsync(() -> {
  31. log.info("异步任务对用户文本清洗并刷新数据");
  32. log.info("异步通知进行消息转发,触达各消费端进行后续清理");
  33. }, threadPoolExecutor);
  34. }
  35. }

测试结果展示:

  1. 2023-01-22 10:56:35,672 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 105] - 异步任务创建-有返回值的异步任务runAsync:使用默认线程池(ForkJoinPool.commonPool())
  2. 2023-01-22 10:56:35,676 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 111] - 异步任务创建-有返回值的异步任务runAsync:自定义线程池
  3. 2023-01-22 10:56:35,676 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 107] - 异步任务对用户文本清洗并刷新数据
  4. 2023-01-22 10:56:35,677 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 108] - 异步通知进行消息转发,触达各消费端进行后续清理
  5. 2023-01-22 10:56:35,679 [thread-processor-1] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 122] - 异步任务对用户文本清洗并刷新数据
  6. 2023-01-22 10:56:35,680 [thread-processor-1] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 123] - 异步通知进行消息转发,触达各消费端进行后续清理

重点关注打印日志“异步任务获取用户文本清洗结果:用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】”以及各自结果“异步通知进行消息转发,触达各消费端进行后续清理”,管道相关处理日志暂时忽略。

(二)异步回调使用

1.异步回调:thenApply和thenApplyAsync

  1. public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
  2. public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
  3. public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor)

我们限定两个任务来完成用户文本的清洗和词库命中情况,采用以下两种方式进行展示,注意这两个方法是有返回值的:

  • 异步回调-thenApply使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况
  • 异步回调-thenApplyAsync使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况,采用自定义线程池

使用举例代码:

  1. /**
  2. * @author yanfengzhang
  3. * @description CompletableFuture使用
  4. * @date 2022/12/29 23:45
  5. */
  6. @RunWith(SpringRunner.class)
  7. @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
  8. @Slf4j
  9. public class CompletableFutureTest {
  10. @Autowired
  11. private SensitivePipelineExecutor sensitivePipelineExecutor;
  12. /**
  13. * 异步回调:thenApply和thenApplyAsync
  14. * thenApply接收一个函数作为参数,使用该函数处理上一个CompletableFuture调用的结果,并返回一个具有处理结果的Future对象。
  15. * <p>
  16. * 使用thenApply方法时子任务与父任务使用的是同一个线程
  17. * thenApplyAsync在子任务中是另起一个线程执行任务,可以自定义线程池
  18. */
  19. @Test
  20. public void testThenApply() throws ExecutionException, InterruptedException {
  21. CompletableFuture<ContentCleanResContext> contentCleanRes = CompletableFuture.supplyAsync(() ->
  22. sensitivePipelineExecutor.getContentCleanRes(
  23. ContentInfoContext.builder()
  24. .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  25. .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  26. .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()));
  27. CompletableFuture<SensitveHitContext> sensitveHitRes1 = contentCleanRes.thenApply((contentCleanResInfo) ->
  28. sensitivePipelineExecutor.getSensitveHitRes(contentCleanResInfo));
  29. log.info("异步回调-thenApply使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况");
  30. log.info("任务一:将用户文本进行清洗,清洗结果展示【{}】", contentCleanRes.get().getCleanContent());
  31. log.info("任务二:回调清洗结果查看数据数据命中敏感词情况,命中结果展示【{}】",
  32. sensitveHitRes1.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  33. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
  34. 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
  35. private AtomicInteger threadCount = new AtomicInteger(1);
  36. @Override
  37. public Thread newThread(Runnable r) {
  38. return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
  39. }
  40. });
  41. CompletableFuture<SensitveHitContext> sensitveHitRes2 = CompletableFuture.supplyAsync(() ->
  42. sensitivePipelineExecutor.getContentCleanRes(
  43. ContentInfoContext.builder()
  44. .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  45. .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  46. .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()), threadPoolExecutor)
  47. .thenApplyAsync((contentCleanResInfo) ->
  48. sensitivePipelineExecutor.getSensitveHitRes(contentCleanResInfo), threadPoolExecutor);
  49. log.info("异步回调-thenApplyAsync使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况");
  50. log.info("任务一:将用户文本进行清洗,清洗结果展示【{}】", contentCleanRes.get().getCleanContent());
  51. log.info("任务二:回调清洗结果查看数据数据命中敏感词情况,命中结果展示【{}】",
  52. sensitveHitRes2.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  53. }
  54. }

测试结果展示:

重点关注打印日志“任务一:将用户文本进行清洗,清洗结果展示【中国张彦峰外卖腾讯南京酒精】”以及“任务二:回调清洗结果查看数据数据命中敏感词情况,命中结果展示【[张彦峰, 外卖, 腾讯, 酒精, 南京]】”,管道相关处理日志暂时忽略。

  1. 2023-01-22 11:02:55,076 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 144] - 异步回调-thenApply使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况
  2. 2023-01-22 11:02:55,079 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  3. 2023-01-22 11:02:55,122 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 38 ms to scan 1 urls, producing 4 keys and 18 values
  4. 2023-01-22 11:02:55,156 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 31 ms to scan 1 urls, producing 4 keys and 13 values
  5. 2023-01-22 11:02:55,188 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 32 ms to scan 1 urls, producing 3 keys and 9 values
  6. 2023-01-22 11:02:55,565 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  7. 2023-01-22 11:02:55,566 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
  8. 2023-01-22 11:02:55,566 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 145] - 任务一:将用户文本进行清洗,清洗结果展示【中国张彦峰外卖腾讯南京酒精】
  9. 2023-01-22 11:02:55,580 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
  10. 2023-01-22 11:02:55,581 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 146] - 任务二:回调清洗结果查看数据数据命中敏感词情况,命中结果展示【[张彦峰, 腾讯, 酒精, 南京]】
  11. 2023-01-22 11:02:55,583 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  12. 2023-01-22 11:02:55,583 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 166] - 异步回调-thenApplyAsync使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况
  13. 2023-01-22 11:02:55,583 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 167] - 任务一:将用户文本进行清洗,清洗结果展示【中国张彦峰外卖腾讯南京酒精】
  14. 2023-01-22 11:02:55,585 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  15. 2023-01-22 11:02:55,586 [thread-processor-2] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
  16. 2023-01-22 11:02:55,586 [thread-processor-2] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
  17. 2023-01-22 11:02:55,587 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 168] - 任务二:回调清洗结果查看数据数据命中敏感词情况,命中结果展示【[张彦峰, 腾讯, 酒精, 南京]】

2.异步回调:thenCompose和thenComposeAsync

  1. public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
  2. public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
  3. public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

我们限定两个任务来完成用户文本的清洗和词库命中情况,采用以下两种方式进行展示,注意这两个方法是有返回值的:

  • 异步回调-thenCompose使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况
  • 异步回调-thenComposeAsync使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况

使用举例代码:

  1. /**
  2. * @author yanfengzhang
  3. * @description CompletableFuture使用
  4. * @date 2022/12/29 23:45
  5. */
  6. @RunWith(SpringRunner.class)
  7. @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
  8. @Slf4j
  9. public class CompletableFutureTest {
  10. @Autowired
  11. private SensitivePipelineExecutor sensitivePipelineExecutor;
  12. /**
  13. * 异步回调:thenCompose和thenComposeAsync
  14. * thenCompose的参数为一个返回CompletableFuture实例的函数,该函数的参数是先前计算步骤的结果。
  15. * <p>
  16. * thenApply转换的是泛型中的类型,返回的是同一个CompletableFuture
  17. * thenCompose将内部的CompletableFuture调用展开来并使用上一个CompletableFutre调用的结果在下一步的CompletableFuture调用中进行运算,是生成一个新的CompletableFuture。
  18. */
  19. @Test
  20. public void testThenCompose() throws ExecutionException, InterruptedException {
  21. CompletableFuture<SensitveHitContext> sensitveHitRes1 = CompletableFuture.supplyAsync(() ->
  22. sensitivePipelineExecutor.getContentCleanRes(
  23. ContentInfoContext.builder()
  24. .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  25. .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  26. .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build())).thenCompose(new Function<ContentCleanResContext, CompletionStage<SensitveHitContext>>() {
  27. @Override
  28. public CompletionStage<SensitveHitContext> apply(ContentCleanResContext contentCleanResInfo) {
  29. return CompletableFuture.supplyAsync(new Supplier<SensitveHitContext>() {
  30. @Override
  31. public SensitveHitContext get() {
  32. return sensitivePipelineExecutor.getSensitveHitRes(contentCleanResInfo);
  33. }
  34. });
  35. }
  36. });
  37. log.info("异步回调-thenCompose使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况");
  38. log.info("【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精】命中敏感词情况结果展示【{}】",
  39. sensitveHitRes1.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  40. CompletableFuture<SensitveHitContext> sensitveHitRes2 = CompletableFuture.supplyAsync(() ->
  41. sensitivePipelineExecutor.getContentCleanRes(
  42. ContentInfoContext.builder()
  43. .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  44. .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  45. .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()))
  46. .thenComposeAsync(new Function<ContentCleanResContext, CompletionStage<SensitveHitContext>>() {
  47. @Override
  48. public CompletionStage<SensitveHitContext> apply(ContentCleanResContext contentCleanResInfo) {
  49. return CompletableFuture.supplyAsync(new Supplier<SensitveHitContext>() {
  50. @Override
  51. public SensitveHitContext get() {
  52. return sensitivePipelineExecutor.getSensitveHitRes(contentCleanResInfo);
  53. }
  54. });
  55. }
  56. });
  57. log.info("异步回调-thenComposeAsync使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况");
  58. log.info("【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精】命中敏感词情况结果展示【{}】",
  59. sensitveHitRes2.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  60. }
  61. }

测试结果展示:

重点关注打印日志“【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精】命中敏感词情况结果展示【[张彦峰, 外卖, 腾讯, 酒精, 南京]】”,管道相关处理日志暂时忽略。

  1. 2023-01-22 11:06:38,047 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 197] - 异步回调-thenCompose使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况
  2. 2023-01-22 11:06:38,050 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  3. 2023-01-22 11:06:38,138 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 68 ms to scan 1 urls, producing 4 keys and 18 values
  4. 2023-01-22 11:06:38,196 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 54 ms to scan 1 urls, producing 4 keys and 13 values
  5. 2023-01-22 11:06:38,254 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 56 ms to scan 1 urls, producing 3 keys and 9 values
  6. 2023-01-22 11:06:38,774 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  7. 2023-01-22 11:06:38,776 [ForkJoinPool.commonPool-worker-2] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
  8. 2023-01-22 11:06:38,787 [ForkJoinPool.commonPool-worker-2] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
  9. 2023-01-22 11:06:38,789 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 198] - 【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精】命中敏感词情况结果展示【[张彦峰, 腾讯, 酒精, 南京]】
  10. 2023-01-22 11:06:38,790 [ForkJoinPool.commonPool-worker-2] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  11. 2023-01-22 11:06:38,792 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 218] - 异步回调-thenComposeAsync使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况
  12. 2023-01-22 11:06:38,794 [ForkJoinPool.commonPool-worker-2] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  13. 2023-01-22 11:06:38,797 [ForkJoinPool.commonPool-worker-2] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
  14. 2023-01-22 11:06:38,798 [ForkJoinPool.commonPool-worker-2] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
  15. 2023-01-22 11:06:38,800 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 219] - 【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精】命中敏感词情况结果展示【[张彦峰, 腾讯, 酒精, 南京]】

3.异步回调:thenAccept和thenAcceptAsync

  1. public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
  2. public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
  3. public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

我们限定两个任务来完成用户文本的清洗和词库命中情况,采用以下两种方式进行展示,注意这两个方法是没有返回值的:

  • 异步回调-thenAccept使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况
  • 异步回调-thenAcceptAsync使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况,使用自定义线程池

使用举例代码:

  1. /**
  2. * @author yanfengzhang
  3. * @description CompletableFuture使用
  4. * @date 2022/12/29 23:45
  5. */
  6. @RunWith(SpringRunner.class)
  7. @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
  8. @Slf4j
  9. public class CompletableFutureTest {
  10. @Autowired
  11. private SensitivePipelineExecutor sensitivePipelineExecutor;
  12. /**
  13. * 异步回调:thenAccept和thenAcceptAsync
  14. * 函数式接口Consumer,这个接口只有输入,没有返回值。
  15. * <p>
  16. * thenAccep方法时子任务与父任务使用的是同一个线程
  17. * henAccepAsync在子任务中可能是另起一个线程执行任务
  18. *
  19. * @throws ExecutionException
  20. * @throws InterruptedException
  21. */
  22. @Test
  23. public void testThenAccept() throws ExecutionException, InterruptedException {
  24. log.info("异步回调-thenAccept使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况");
  25. CompletableFuture<ContentCleanResContext> contentCleanRes = CompletableFuture.supplyAsync(() ->
  26. sensitivePipelineExecutor.getContentCleanRes(
  27. ContentInfoContext.builder()
  28. .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  29. .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  30. .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()));
  31. log.info("任务一:将用户文本进行清洗,清洗结果展示【{}】", contentCleanRes.get().getCleanContent());
  32. CompletableFuture<Void> sensitveHitRes = contentCleanRes.thenAccept((contentCleanResInfo) -> {
  33. SensitveHitContext sensitveHitContext = sensitivePipelineExecutor.getSensitveHitRes(contentCleanResInfo);
  34. log.info("任务二:回调清洗结果查看数据数据命中敏感词情况,命中结果展示【{}】",
  35. sensitveHitContext.getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  36. });
  37. log.info("异步回调-thenAcceptAsync使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况");
  38. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
  39. 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
  40. private AtomicInteger threadCount = new AtomicInteger(1);
  41. @Override
  42. public Thread newThread(Runnable r) {
  43. return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
  44. }
  45. });
  46. CompletableFuture<Void> sensitveHitRes2 = CompletableFuture.supplyAsync(() -> {
  47. ContentCleanResContext contentCleanResContext = sensitivePipelineExecutor.getContentCleanRes(
  48. ContentInfoContext.builder()
  49. .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  50. .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  51. .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build());
  52. log.info("任务一:将用户文本进行清洗,清洗结果展示【{}】", contentCleanResContext.getCleanContent());
  53. return contentCleanResContext;
  54. }, threadPoolExecutor)
  55. .thenAcceptAsync((contentCleanResInfo) -> {
  56. SensitveHitContext sensitveHitContext = sensitivePipelineExecutor.getSensitveHitRes(contentCleanResInfo);
  57. log.info("任务二:回调清洗结果查看数据数据命中敏感词情况,命中结果展示【{}】",
  58. sensitveHitContext.getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  59. }, threadPoolExecutor);
  60. }
  61. }

测试结果展示:

重点关注打印日志“content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精”、“任务一:将用户文本进行清洗,清洗结果展示【中国张彦峰外卖腾讯南京酒精】”和“任务二:回调清洗结果查看数据数据命中敏感词情况,命中结果展示【[张彦峰, 腾讯, 酒精, 南京]】”,管道相关处理日志暂时忽略。

  1. 2023-01-22 11:10:42,946 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 235] - 异步回调-thenAccept使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况
  2. 2023-01-22 11:10:42,953 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  3. 2023-01-22 11:10:43,001 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 41 ms to scan 1 urls, producing 4 keys and 18 values
  4. 2023-01-22 11:10:43,038 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 36 ms to scan 1 urls, producing 4 keys and 13 values
  5. 2023-01-22 11:10:43,073 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 34 ms to scan 1 urls, producing 3 keys and 9 values
  6. 2023-01-22 11:10:43,500 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  7. 2023-01-22 11:10:43,501 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 242] - 任务一:将用户文本进行清洗,清洗结果展示【中国张彦峰外卖腾讯南京酒精】
  8. 2023-01-22 11:10:43,502 [main] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
  9. 2023-01-22 11:10:43,515 [main] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
  10. 2023-01-22 11:10:43,516 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 246] - 任务二:回调清洗结果查看数据数据命中敏感词情况,命中结果展示【[张彦峰, 腾讯, 酒精, 南京]】
  11. 2023-01-22 11:10:43,516 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 250] - 异步回调-thenAcceptAsync使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况
  12. 2023-01-22 11:10:43,518 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  13. 2023-01-22 11:10:43,519 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  14. 2023-01-22 11:10:43,520 [thread-processor-1] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 266] - 任务一:将用户文本进行清洗,清洗结果展示【中国张彦峰外卖腾讯南京酒精】
  15. 2023-01-22 11:10:43,520 [thread-processor-2] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
  16. 2023-01-22 11:10:43,521 [thread-processor-2] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
  17. 2023-01-22 11:10:43,521 [thread-processor-2] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 272] - 任务二:回调清洗结果查看数据数据命中敏感词情况,命中结果展示【[张彦峰, 腾讯, 酒精, 南京]】

4.异步回调:thenRun和thenRunAsync

  1. public CompletableFuture<Void> thenRun(Runnable action)
  2. public CompletableFuture<Void> thenRunAsync(Runnable action)
  3. public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

我们限定两个任务来完成用户文本的清洗和词库命中情况,采用以下两种方式进行展示,注意这两个方法是没有入参也没有返回值的:

  • 异步回调-thenRun使用,任务一将用户文本进行清洗,任务二进行异步通知清洗完成执行后续操作
  • 异步回调-thenRunAsync使用,任务一将用户文本进行清洗,任务二进行异步通知清洗完成执行后续操作

使用举例代码:

  1. /**
  2. * @author yanfengzhang
  3. * @description CompletableFuture使用
  4. * @date 2022/12/29 23:45
  5. */
  6. @RunWith(SpringRunner.class)
  7. @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
  8. @Slf4j
  9. public class CompletableFutureTest {
  10. @Autowired
  11. private SensitivePipelineExecutor sensitivePipelineExecutor;
  12. /**
  13. * 异步回调:thenRun和thenRunAsync
  14. * thenRun表示某个任务执行完成后执行的动作,即回调方法,无入参,无返回值。
  15. * thenRun会在上一阶段 CompletableFuture计算完成的时候执行一个Runnable,而Runnable并不使用该CompletableFuture计算的结果。
  16. * <p>
  17. * thenRun方法时子任务与父任务使用的是同一个线程
  18. * thenRunAsync在子任务中可能是另起一个线程执行任务
  19. *
  20. * @throws ExecutionException
  21. * @throws InterruptedException
  22. */
  23. @Test
  24. public void testThenRun() throws ExecutionException, InterruptedException {
  25. log.info("异步回调-thenRun使用,任务一将用户文本进行清洗,任务二进行异步通知清洗完成执行后续操作");
  26. CompletableFuture<ContentCleanResContext> contentCleanRes = CompletableFuture.supplyAsync(() ->
  27. sensitivePipelineExecutor.getContentCleanRes(
  28. ContentInfoContext.builder()
  29. .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  30. .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  31. .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()));
  32. log.info("任务一:将用户文本进行清洗,清洗结果展示【{}】", contentCleanRes.get().getCleanContent());
  33. CompletableFuture<Void> notifyRes1 = contentCleanRes.thenRun(() -> {
  34. log.info("任务二:通知相关消费者告知已清洗完毕,可执行后续操作!");
  35. });
  36. log.info("异步回调-thenRunAsync使用,任务一将用户文本进行清洗,任务二进行异步通知清洗完成执行后续操作");
  37. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
  38. 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
  39. private AtomicInteger threadCount = new AtomicInteger(1);
  40. @Override
  41. public Thread newThread(Runnable r) {
  42. return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
  43. }
  44. });
  45. CompletableFuture<Void> notifyRes2 = CompletableFuture.supplyAsync(() -> {
  46. ContentCleanResContext contentCleanResContext = sensitivePipelineExecutor.getContentCleanRes(
  47. ContentInfoContext.builder()
  48. .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  49. .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  50. .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build());
  51. log.info("任务一:将用户文本进行清洗,清洗结果展示【{}】", contentCleanResContext.getCleanContent());
  52. return contentCleanResContext;
  53. }
  54. , threadPoolExecutor)
  55. .thenRunAsync(() -> {
  56. log.info("任务二:通知相关消费者告知已清洗完毕,可执行后续操作!");
  57. }, threadPoolExecutor);
  58. }
  59. }

测试结果展示:

重点关注打印日志“content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精”、“任务一:将用户文本进行清洗,清洗结果展示【中国张彦峰外卖腾讯南京酒精】”和“任务二:通知相关消费者告知已清洗完毕,可执行后续操作!”,管道相关处理日志暂时忽略。

  1. 2023-01-22 11:15:06,816 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 290] - 异步回调-thenRun使用,任务一将用户文本进行清洗,任务二进行异步通知清洗完成执行后续操作
  2. 2023-01-22 11:15:06,829 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  3. 2023-01-22 11:15:06,885 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 50 ms to scan 1 urls, producing 4 keys and 18 values
  4. 2023-01-22 11:15:06,953 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 66 ms to scan 1 urls, producing 4 keys and 13 values
  5. 2023-01-22 11:15:07,035 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 80 ms to scan 1 urls, producing 3 keys and 9 values
  6. 2023-01-22 11:15:07,780 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  7. 2023-01-22 11:15:07,780 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 297] - 任务一:将用户文本进行清洗,清洗结果展示【中国张彦峰外卖腾讯南京酒精】
  8. 2023-01-22 11:15:07,781 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 299] - 任务二:通知相关消费者告知已清洗完毕,可执行后续操作!
  9. 2023-01-22 11:15:07,781 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 302] - 异步回调-thenRunAsync使用,任务一将用户文本进行清洗,任务二进行异步通知清洗完成执行后续操作
  10. 2023-01-22 11:15:07,783 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  11. 2023-01-22 11:15:07,786 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  12. 2023-01-22 11:15:07,787 [thread-processor-1] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 318] - 任务一:将用户文本进行清洗,清洗结果展示【中国张彦峰外卖腾讯南京酒精】
  13. 2023-01-22 11:15:07,788 [thread-processor-2] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 324] - 任务二:通知相关消费者告知已清洗完毕,可执行后续操作!

(三)多任务组合使用

1.多任务组合:thenCombine

  1. public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
  2. public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
  3. public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

我们限定用户文本的清洗完成清洗,然后异步查看企业合规管控处理查看数据数据命中敏感词情况和正则校验处理查看数据数据命中敏感词情况,只要有一个词库反馈不命中就认为商品可以售卖,该方法有具体返回值。

使用举例代码:

  1. /**
  2. * @author yanfengzhang
  3. * @description CompletableFuture使用
  4. * @date 2022/12/29 23:45
  5. */
  6. @RunWith(SpringRunner.class)
  7. @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
  8. @Slf4j
  9. public class CompletableFutureTest {
  10. @Autowired
  11. private SensitivePipelineExecutor sensitivePipelineExecutor;
  12. /**
  13. * 多任务组合:thenCombine
  14. * 合并两个线程任务的结果,并进一步处理,该方法有返回值。
  15. */
  16. @Test
  17. public void testThenCombine() throws ExecutionException, InterruptedException {
  18. CompletableFuture<ContentCleanResContext> contentCleanRes = CompletableFuture.supplyAsync(() ->
  19. sensitivePipelineExecutor.getContentCleanRes(
  20. ContentInfoContext.builder()
  21. .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  22. .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  23. .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()));
  24. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
  25. 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
  26. private AtomicInteger threadCount = new AtomicInteger(1);
  27. @Override
  28. public Thread newThread(Runnable r) {
  29. return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
  30. }
  31. });
  32. CompletableFuture<SensitveHitContext> complianceSensitveHitRes = CompletableFuture.supplyAsync(() -> {
  33. try {
  34. return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.COMPLIANCE).handle(contentCleanRes.get());
  35. } catch (Exception e) {
  36. return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
  37. }
  38. }, threadPoolExecutor);
  39. log.info("任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【{}】",
  40. complianceSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  41. CompletableFuture<SensitveHitContext> regularSensitveHitRes = CompletableFuture.supplyAsync(() -> {
  42. try {
  43. return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.REGULAR).handle(contentCleanRes.get());
  44. } catch (Exception e) {
  45. return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
  46. }
  47. }, threadPoolExecutor);
  48. log.info("任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【{}】",
  49. regularSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  50. CompletableFuture<String> result = complianceSensitveHitRes.thenCombineAsync(regularSensitveHitRes, (res1, res2) -> {
  51. if (CollectionUtils.isEmpty(res1.getHitWords()) || CollectionUtils.isEmpty(res2.getHitWords())) {
  52. /*只要有一个词库反馈不命中就认为商品可以售卖*/
  53. return "当前商品可以售卖";
  54. }
  55. List<SensitiveWord> hitWords = Lists.newArrayList();
  56. hitWords.addAll(res1.getHitWords());
  57. hitWords.addAll(res2.getHitWords());
  58. return "当前商品不可以售卖,商品信息中包含敏感词" + hitWords.stream().map(SensitiveWord::getSensitive).collect(Collectors.toList());
  59. });
  60. log.info("组合任务一和二结论:{}", result.get());
  61. }
  62. }

测试结果展示:

重点关注打印日志“content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精”、“任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】”和“任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[酒精, 南京]】”,以及整合结论“组合任务一和二结论:当前商品不可以售卖,商品信息中包含敏感词[张彦峰, 酒精, 南京]”,管道相关处理日志暂时忽略。

  1. 2023-01-22 11:30:49,847 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  2. 2023-01-22 11:30:49,893 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 45 ms to scan 1 urls, producing 4 keys and 18 values
  3. 2023-01-22 11:30:49,931 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 37 ms to scan 1 urls, producing 4 keys and 13 values
  4. 2023-01-22 11:30:49,967 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 35 ms to scan 1 urls, producing 3 keys and 9 values
  5. 2023-01-22 11:30:50,401 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  6. 2023-01-22 11:30:50,404 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 357] - 任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】
  7. 2023-01-22 11:30:50,415 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 366] - 任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[酒精, 南京]】
  8. 2023-01-22 11:30:50,416 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 378] - 组合任务一和二结论:当前商品不可以售卖,商品信息中包含敏感词[张彦峰, 酒精, 南京]

2.多任务组合:thenAcceptBoth

  1. public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
  2. public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
  3. public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)

我们限定用户文本的清洗完成清洗,然后异步查看企业合规管控处理查看数据数据命中敏感词情况和正则校验处理查看数据数据命中敏感词情况,只要有一个词库反馈不命中就认为商品可以售卖异步处理商品上架,否则对商品直接下架处理并告知下架理由为存在敏感词,该方法没有返回值,相关操作建议进行异步操作处理。

使用举例代码:

  1. /**
  2. * @author yanfengzhang
  3. * @description CompletableFuture使用
  4. * @date 2022/12/29 23:45
  5. */
  6. @RunWith(SpringRunner.class)
  7. @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
  8. @Slf4j
  9. public class CompletableFutureTest {
  10. @Autowired
  11. private SensitivePipelineExecutor sensitivePipelineExecutor;
  12. /**
  13. * 多任务组合:thenAcceptBoth
  14. * 无返回值
  15. * 当两个CompletionStage都正常完成计算的时候,就会执行提供的action消费两个异步的结果
  16. */
  17. @Test
  18. public void testThenAcceptBoth() throws ExecutionException, InterruptedException {
  19. CompletableFuture<ContentCleanResContext> contentCleanRes = CompletableFuture.supplyAsync(() ->
  20. sensitivePipelineExecutor.getContentCleanRes(
  21. ContentInfoContext.builder()
  22. .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  23. .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  24. .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()));
  25. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
  26. 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
  27. private AtomicInteger threadCount = new AtomicInteger(1);
  28. @Override
  29. public Thread newThread(Runnable r) {
  30. return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
  31. }
  32. });
  33. CompletableFuture<SensitveHitContext> complianceSensitveHitRes = CompletableFuture.supplyAsync(() -> {
  34. try {
  35. return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.COMPLIANCE).handle(contentCleanRes.get());
  36. } catch (Exception e) {
  37. return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
  38. }
  39. }, threadPoolExecutor);
  40. log.info("任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【{}】",
  41. complianceSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  42. CompletableFuture<SensitveHitContext> regularSensitveHitRes = CompletableFuture.supplyAsync(() -> {
  43. try {
  44. return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.REGULAR).handle(contentCleanRes.get());
  45. } catch (Exception e) {
  46. return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
  47. }
  48. }, threadPoolExecutor);
  49. log.info("任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【{}】",
  50. regularSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  51. CompletableFuture<Void> result = complianceSensitveHitRes.thenAcceptBoth(regularSensitveHitRes, (res1, res2) -> {
  52. if (CollectionUtils.isEmpty(res1.getHitWords()) || CollectionUtils.isEmpty(res2.getHitWords())) {
  53. /*只要有一个词库反馈不命中就认为商品可以售卖*/
  54. log.info("组合任务一和二结论异步处理商品上架");
  55. return;
  56. }
  57. List<SensitiveWord> hitWords = Lists.newArrayList();
  58. hitWords.addAll(res1.getHitWords());
  59. hitWords.addAll(res2.getHitWords());
  60. log.info("组合任务一和二结论异步处理商品下架,并告知下架理由为存在敏感词【{}】", hitWords.stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  61. });
  62. }
  63. }

测试结果展示:

重点关注打印日志“content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精”、“任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】”和“任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[酒精, 南京]】”,以及整合结论“组合任务一和二结论异步处理商品下架,并告知下架理由为存在敏感词【[张彦峰, 酒精, 南京]】”,管道相关处理日志暂时忽略。

  1. 2023-01-22 11:35:13,204 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  2. 2023-01-22 11:35:13,253 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 49 ms to scan 1 urls, producing 4 keys and 18 values
  3. 2023-01-22 11:35:13,307 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 51 ms to scan 1 urls, producing 4 keys and 13 values
  4. 2023-01-22 11:35:13,362 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 53 ms to scan 1 urls, producing 3 keys and 9 values
  5. 2023-01-22 11:35:14,232 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  6. 2023-01-22 11:35:14,236 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 411] - 任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】
  7. 2023-01-22 11:35:14,260 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 420] - 任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[酒精, 南京]】
  8. 2023-01-22 11:35:14,261 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 431] - 组合任务一和二结论异步处理商品下架,并告知下架理由为存在敏感词【[张彦峰, 酒精, 南京]】

3.多任务组合:runAfterBoth

  1. public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
  2. public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action)
  3. public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)

我们限定用户文本的清洗完成清洗,然后异步查看企业合规管控处理查看数据数据命中敏感词情况和正则校验处理查看数据数据命中敏感词情况,只要有一个词库反馈不命中就认为商品可以售卖异步处理商品上架,否则对商品直接下架处理并告知下架理由为存在敏感词,该方法没有入参也没有返回值,相关操作建议进行异步保存和异步操作处理。

使用举例代码:

  1. /**
  2. * @author yanfengzhang
  3. * @description CompletableFuture使用
  4. * @date 2022/12/29 23:45
  5. */
  6. @RunWith(SpringRunner.class)
  7. @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
  8. @Slf4j
  9. public class CompletableFutureTest {
  10. @Autowired
  11. private SensitivePipelineExecutor sensitivePipelineExecutor;
  12. /**
  13. * 多任务组合:runAfterBoth
  14. * 没有入参,也没有返回值
  15. * 两个线程任务相比较,有任何一个执行完成,就进行下一步操作,不关心运行结果。
  16. */
  17. @Test
  18. public void testRunAfterBoth() throws ExecutionException, InterruptedException {
  19. CompletableFuture<ContentCleanResContext> contentCleanRes = CompletableFuture.supplyAsync(() ->
  20. sensitivePipelineExecutor.getContentCleanRes(
  21. ContentInfoContext.builder()
  22. .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰")
  23. .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰")
  24. .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()));
  25. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
  26. 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
  27. private AtomicInteger threadCount = new AtomicInteger(1);
  28. @Override
  29. public Thread newThread(Runnable r) {
  30. return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
  31. }
  32. });
  33. CompletableFuture<SensitveHitContext> complianceSensitveHitRes = CompletableFuture.supplyAsync(() -> {
  34. try {
  35. return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.COMPLIANCE).handle(contentCleanRes.get());
  36. } catch (Exception e) {
  37. return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
  38. }
  39. }, threadPoolExecutor);
  40. log.info("任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【{}】,结果暂存redis",
  41. complianceSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  42. CompletableFuture<SensitveHitContext> regularSensitveHitRes = CompletableFuture.supplyAsync(() -> {
  43. try {
  44. return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.REGULAR).handle(contentCleanRes.get());
  45. } catch (Exception e) {
  46. return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
  47. }
  48. }, threadPoolExecutor);
  49. log.info("任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【{}】,结果暂存redis",
  50. regularSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  51. CompletableFuture<Void> result = complianceSensitveHitRes.runAfterBoth(regularSensitveHitRes, () -> {
  52. log.info("组合任务一和二从缓存中获取暂存数据,以下为模拟");
  53. if (CollectionUtils.isEmpty(SensitveHitContext.builder().build().getHitWords()) ||
  54. CollectionUtils.isEmpty(SensitveHitContext.builder().build().getHitWords())) {
  55. /*只要有一个词库反馈不命中就认为商品可以售卖*/
  56. log.info("组合任务一和二结论异步处理商品上架");
  57. return;
  58. }
  59. List<SensitiveWord> hitWords = Lists.newArrayList();
  60. hitWords.addAll(SensitveHitContext.builder().build().getHitWords());
  61. hitWords.addAll(SensitveHitContext.builder().build().getHitWords());
  62. log.info("组合任务一和二结论异步处理商品下架,并告知下架理由为存在敏感词【{}】", hitWords.stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  63. });
  64. }
  65. }

测试结果展示:

重点关注打印日志“content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰”、“任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】,结果暂存redis”和“任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[]】,结果暂存redis”,以及整合结论“组合任务一和二结论异步处理商品上架”,管道相关处理日志暂时忽略。

  1. 2023-01-22 11:41:12,483 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  2. 2023-01-22 11:41:12,520 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 37 ms to scan 1 urls, producing 4 keys and 18 values
  3. 2023-01-22 11:41:12,551 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 30 ms to scan 1 urls, producing 4 keys and 13 values
  4. 2023-01-22 11:41:12,584 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 32 ms to scan 1 urls, producing 3 keys and 9 values
  5. 2023-01-22 11:41:13,210 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, cleanContent=中国张彦峰外卖, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  6. 2023-01-22 11:41:13,218 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 465] - 任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】,结果暂存redis
  7. 2023-01-22 11:41:13,241 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 474] - 任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[]】,结果暂存redis
  8. 2023-01-22 11:41:13,243 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 477] - 组合任务一和二从缓存中获取暂存数据,以下为模拟
  9. 2023-01-22 11:41:13,244 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 481] - 组合任务一和二结论异步处理商品上架

4.多任务组合:applyToEither

  1. public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
  2. public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
  3. public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)

我们限定用户文本的清洗完成清洗,然后异步查看企业合规管控处理查看数据数据命中敏感词情况和正则校验处理查看数据数据命中敏感词情况,只要有一个词库反馈不命中就认为商品可以售卖, 该方式下我们只关心最先返回的命中结果就进行后续操作,一般要求业务两边的校验处理结果是保持一致的,本处只做模拟使用,注意该方式有返回值。

使用举例代码:

  1. /**
  2. * @author yanfengzhang
  3. * @description CompletableFuture使用
  4. * @date 2022/12/29 23:45
  5. */
  6. @RunWith(SpringRunner.class)
  7. @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
  8. @Slf4j
  9. public class CompletableFutureTest {
  10. @Autowired
  11. private SensitivePipelineExecutor sensitivePipelineExecutor;
  12. /**
  13. * 多任务组合:applyToEither
  14. * 该方法有返回值
  15. * 两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的转化操作。
  16. */
  17. @Test
  18. public void testApplyToEither() throws ExecutionException, InterruptedException {
  19. CompletableFuture<ContentCleanResContext> contentCleanRes = CompletableFuture.supplyAsync(() ->
  20. sensitivePipelineExecutor.getContentCleanRes(
  21. ContentInfoContext.builder()
  22. .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  23. .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  24. .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()));
  25. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
  26. 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
  27. private AtomicInteger threadCount = new AtomicInteger(1);
  28. @Override
  29. public Thread newThread(Runnable r) {
  30. return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
  31. }
  32. });
  33. CompletableFuture<SensitveHitContext> complianceSensitveHitRes = CompletableFuture.supplyAsync(() -> {
  34. try {
  35. return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.COMPLIANCE).handle(contentCleanRes.get());
  36. } catch (Exception e) {
  37. return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
  38. }
  39. }, threadPoolExecutor);
  40. log.info("任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【{}】",
  41. complianceSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  42. CompletableFuture<SensitveHitContext> regularSensitveHitRes = CompletableFuture.supplyAsync(() -> {
  43. try {
  44. return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.REGULAR).handle(contentCleanRes.get());
  45. } catch (Exception e) {
  46. return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
  47. }
  48. }, threadPoolExecutor);
  49. log.info("任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【{}",
  50. regularSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  51. CompletableFuture<String> result = complianceSensitveHitRes.applyToEither(regularSensitveHitRes, res -> {
  52. if (CollectionUtils.isEmpty(res.getHitWords())) {
  53. /*只要有一个词库反馈不命中就认为商品可以售卖*/
  54. return "当前商品可以售卖";
  55. }
  56. return "当前商品不可以售卖,商品信息中包含敏感词" + res.getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList());
  57. });
  58. log.info("组合任务一和二处理结论:{}", result.get());
  59. }
  60. }

测试结果展示:

重点关注打印日志“content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精”、“任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】”和“任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[酒精, 南京]”,以及整合结论“组合任务一和二处理结论:当前商品不可以售卖,商品信息中包含敏感词[张彦峰]”,管道相关处理日志暂时忽略。

  1. 2023-01-22 11:46:18,735 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  2. 2023-01-22 11:46:18,803 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 68 ms to scan 1 urls, producing 4 keys and 18 values
  3. 2023-01-22 11:46:18,865 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 58 ms to scan 1 urls, producing 4 keys and 13 values
  4. 2023-01-22 11:46:18,919 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 52 ms to scan 1 urls, producing 3 keys and 9 values
  5. 2023-01-22 11:46:19,668 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  6. 2023-01-22 11:46:19,671 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 521] - 任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】
  7. 2023-01-22 11:46:19,682 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 530] - 任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[酒精, 南京]
  8. 2023-01-22 11:46:19,682 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 539] - 组合任务一和二处理结论:当前商品不可以售卖,商品信息中包含敏感词[张彦峰]

5.多任务组合:acceptEither

  1. public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
  2. public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
  3. public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)

我们限定用户文本的清洗完成清洗,然后异步查看企业合规管控处理查看数据数据命中敏感词情况和正则校验处理查看数据数据命中敏感词情况,只要有一个词库反馈不命中就认为商品可以售卖, 该方式下我们只关心最先返回的命中结果就进行后续操作,一般要求业务两边的校验处理结果是保持一致的,本处只做模拟使用,注意该方式没有返回值,处理结果需要异步操作。

使用举例代码:

  1. /**
  2. * @author yanfengzhang
  3. * @description CompletableFuture使用
  4. * @date 2022/12/29 23:45
  5. */
  6. @RunWith(SpringRunner.class)
  7. @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
  8. @Slf4j
  9. public class CompletableFutureTest {
  10. @Autowired
  11. private SensitivePipelineExecutor sensitivePipelineExecutor;
  12. /**
  13. * 多任务组合:acceptEither
  14. * 将已经完成任务的执行结果作为方法入参,但是无返回值
  15. * 两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的消费操作。
  16. */
  17. @Test
  18. public void testAcceptEither() throws ExecutionException, InterruptedException {
  19. CompletableFuture<ContentCleanResContext> contentCleanRes = CompletableFuture.supplyAsync(() ->
  20. sensitivePipelineExecutor.getContentCleanRes(
  21. ContentInfoContext.builder()
  22. .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  23. .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  24. .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()));
  25. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
  26. 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
  27. private AtomicInteger threadCount = new AtomicInteger(1);
  28. @Override
  29. public Thread newThread(Runnable r) {
  30. return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
  31. }
  32. });
  33. CompletableFuture<SensitveHitContext> complianceSensitveHitRes = CompletableFuture.supplyAsync(() -> {
  34. try {
  35. return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.COMPLIANCE).handle(contentCleanRes.get());
  36. } catch (Exception e) {
  37. return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
  38. }
  39. }, threadPoolExecutor);
  40. log.info("任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【{}】",
  41. complianceSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  42. CompletableFuture<SensitveHitContext> regularSensitveHitRes = CompletableFuture.supplyAsync(() -> {
  43. try {
  44. return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.REGULAR).handle(contentCleanRes.get());
  45. } catch (Exception e) {
  46. return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
  47. }
  48. }, threadPoolExecutor);
  49. log.info("任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【{}",
  50. regularSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  51. CompletableFuture<Void> result = complianceSensitveHitRes.acceptEither(regularSensitveHitRes, (res) -> {
  52. if (CollectionUtils.isEmpty(res.getHitWords())) {
  53. /*只要有一个词库反馈不命中就认为商品可以售卖*/
  54. log.info("组合任务一和二结论异步处理商品上架");
  55. return;
  56. }
  57. log.info("组合任务一和二结论异步处理商品下架,并告知下架理由为存在敏感词【{}】", res.getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  58. });
  59. }
  60. }

测试结果展示:

重点关注打印日志“content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精”、“任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】”和“任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[酒精, 南京]”,以及整合结论“组合任务一和二结论异步处理商品下架,并告知下架理由为存在敏感词【[张彦峰]】”,管道相关处理日志暂时忽略。

  1. 2023-01-22 11:50:15,466 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  2. 2023-01-22 11:50:15,510 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 44 ms to scan 1 urls, producing 4 keys and 18 values
  3. 2023-01-22 11:50:15,548 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 37 ms to scan 1 urls, producing 4 keys and 13 values
  4. 2023-01-22 11:50:15,593 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 44 ms to scan 1 urls, producing 3 keys and 9 values
  5. 2023-01-22 11:50:15,977 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  6. 2023-01-22 11:50:15,980 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 572] - 任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】
  7. 2023-01-22 11:50:15,990 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 581] - 任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[酒精, 南京]
  8. 2023-01-22 11:50:15,991 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 589] - 组合任务一和二结论异步处理商品下架,并告知下架理由为存在敏感词【[张彦峰]】

6.多任务组合:runAfterEither

  1. public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
  2. public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action)
  3. public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)

我们限定用户文本的清洗完成清洗,然后异步查看企业合规管控处理查看数据数据命中敏感词情况和正则校验处理查看数据数据命中敏感词情况,只要有一个词库反馈不命中就认为商品可以售卖, 该方式下我们只关心最先返回的命中结果就进行后续操作,一般要求业务两边的校验处理结果是保持一致的,本处只做模拟使用,注意该方式没有入参没有返回值,处理结果需要异步操作。

使用举例代码:

  1. /**
  2. * @author yanfengzhang
  3. * @description CompletableFuture使用
  4. * @date 2022/12/29 23:45
  5. */
  6. @RunWith(SpringRunner.class)
  7. @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
  8. @Slf4j
  9. public class CompletableFutureTest {
  10. @Autowired
  11. private SensitivePipelineExecutor sensitivePipelineExecutor;
  12. /**
  13. * 多任务组合:runAfterEither
  14. * 没有入参,也没有返回值
  15. * 两个线程任务相比较,有任何一个执行完成,就进行下一步操作,不关心运行结果。
  16. */
  17. @Test
  18. public void testRunAfterEither() throws ExecutionException, InterruptedException {
  19. CompletableFuture<ContentCleanResContext> contentCleanRes = CompletableFuture.supplyAsync(() ->
  20. sensitivePipelineExecutor.getContentCleanRes(
  21. ContentInfoContext.builder()
  22. .content("中國㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰")
  23. .cleanContent("中國㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰")
  24. .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()));
  25. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
  26. 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
  27. private AtomicInteger threadCount = new AtomicInteger(1);
  28. @Override
  29. public Thread newThread(Runnable r) {
  30. return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
  31. }
  32. });
  33. CompletableFuture<SensitveHitContext> complianceSensitveHitRes = CompletableFuture.supplyAsync(() -> {
  34. try {
  35. return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.COMPLIANCE).handle(contentCleanRes.get());
  36. } catch (Exception e) {
  37. return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
  38. }
  39. }, threadPoolExecutor);
  40. log.info("任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【{}】,将结果存入redis中",
  41. complianceSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  42. CompletableFuture<SensitveHitContext> regularSensitveHitRes = CompletableFuture.supplyAsync(() -> {
  43. try {
  44. return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.REGULAR).handle(contentCleanRes.get());
  45. } catch (Exception e) {
  46. return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
  47. }
  48. }, threadPoolExecutor);
  49. log.info("任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【{}】,将结果存入redis中",
  50. regularSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  51. CompletableFuture<Void> future = complianceSensitveHitRes.runAfterEither(regularSensitveHitRes, () -> {
  52. log.info("组合任务一和二从缓存中获取暂存数据,以下为模拟:直接按指定内容获取当前缓存中的结果");
  53. if (CollectionUtils.isEmpty(SensitveHitContext.builder().build().getHitWords())) {
  54. /*只要有一个词库反馈不命中就认为商品可以售卖*/
  55. log.info("组合任务一和二结论,从缓存中获取结果不存在敏感词直接异步处理商品上架");
  56. return;
  57. }
  58. log.info("组合任务一和二结论异步处理商品下架,并告知下架理由为存在敏感词【{}】", "張彥峰");
  59. });
  60. }
  61. }

测试结果展示:

重点关注打印日志“content=中國㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰”、“任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[]】,将结果存入redis中”和“任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[]】,将结果存入redis中”,以及整合结论“组合任务一和二结论,从缓存中获取结果不存在敏感词直接异步处理商品上架”,管道相关处理日志暂时忽略。

  1. 2023-01-22 11:54:33,692 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, cleanContent=中國㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  2. 2023-01-22 11:54:33,772 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 80 ms to scan 1 urls, producing 4 keys and 18 values
  3. 2023-01-22 11:54:33,820 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 47 ms to scan 1 urls, producing 4 keys and 13 values
  4. 2023-01-22 11:54:33,858 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 37 ms to scan 1 urls, producing 3 keys and 9 values
  5. 2023-01-22 11:54:34,226 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, cleanContent=中国外卖, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  6. 2023-01-22 11:54:34,228 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 623] - 任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[]】,将结果存入redis中
  7. 2023-01-22 11:54:34,237 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 632] - 任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[]】,将结果存入redis中
  8. 2023-01-22 11:54:34,237 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 635] - 组合任务一和二从缓存中获取暂存数据,以下为模拟:直接按指定内容获取当前缓存中的结果
  9. 2023-01-22 11:54:34,237 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 638] - 组合任务一和二结论,从缓存中获取结果不存在敏感词直接异步处理商品上架

7.多任务组合:allOf

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

我们限定用户文本的清洗完成清洗,然后异步查看词库情况,按一下两种情况进行分析

  • 情况一:当给定的多个异步任务都正常完成后,返回一个新的 CompletableFuture,给定 CompletableFuture 的结果不会反映在返回的 CompletableFuture 中,但可以通过单独检查给定任务来获得结果
  • 情况二:当任何一个异步任务异常完成,则返回的CompletableFuture 也会异常完成,并且将该异步任务的异常作为其原因
  • 情况三:当存在多个异常完成时,则返回排在前面的异步任务的异常信息。

具体代码如下,各自验证结果分开验证:

  1. /**
  2. * @author yanfengzhang
  3. * @description CompletableFuture使用
  4. * @date 2022/12/29 23:45
  5. */
  6. @RunWith(SpringRunner.class)
  7. @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
  8. @Slf4j
  9. public class CompletableFutureTest {
  10. @Autowired
  11. private SensitivePipelineExecutor sensitivePipelineExecutor;
  12. /**
  13. * 多任务组合:allOf
  14. * <p>
  15. * 实现多 CompletableFuture 的同时返回
  16. * CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。
  17. */
  18. @Test
  19. public void testAllOf() throws ExecutionException, InterruptedException {
  20. CompletableFuture<ContentCleanResContext> contentCleanRes = CompletableFuture.supplyAsync(() ->
  21. sensitivePipelineExecutor.getContentCleanRes(
  22. ContentInfoContext.builder()
  23. .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  24. .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  25. .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()));
  26. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
  27. 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
  28. private AtomicInteger threadCount = new AtomicInteger(1);
  29. @Override
  30. public Thread newThread(Runnable r) {
  31. return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
  32. }
  33. });
  34. log.info("情况一:当给定的多个异步任务都正常完成后,返回一个新的 CompletableFuture," +
  35. "给定 CompletableFuture 的结果不会反映在返回的 CompletableFuture 中,但可以通过单独检查给定任务来获得结果");
  36. CompletableFuture<SensitveHitContext> complianceSensitveHitRes1 = CompletableFuture.supplyAsync(() -> {
  37. try {
  38. return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.COMPLIANCE).handle(contentCleanRes.get());
  39. } catch (Exception e) {
  40. return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
  41. }
  42. }, threadPoolExecutor);
  43. log.info("任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【{}】",
  44. complianceSensitveHitRes1.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  45. CompletableFuture<SensitveHitContext> regularSensitveHitRes1 = CompletableFuture.supplyAsync(() -> {
  46. try {
  47. return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.REGULAR).handle(contentCleanRes.get());
  48. } catch (Exception e) {
  49. return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
  50. }
  51. }, threadPoolExecutor);
  52. log.info("任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【{}】",
  53. regularSensitveHitRes1.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  54. CompletableFuture<SensitveHitContext> thesaurusSensitveHitRes1 = CompletableFuture.supplyAsync(() -> {
  55. try {
  56. return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.THESAURUS).handle(contentCleanRes.get());
  57. } catch (Exception e) {
  58. return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
  59. }
  60. }, threadPoolExecutor);
  61. log.info("任务三:根据相关业务配置进行相关词库校验匹配查看数据数据命中敏感词情况,命中结果展示【{}】",
  62. thesaurusSensitveHitRes1.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  63. CompletableFuture<Void> result1 = CompletableFuture.allOf(complianceSensitveHitRes1, regularSensitveHitRes1, thesaurusSensitveHitRes1);
  64. log.info("多任务组合:allOf执行完毕,相关词库敏感词命中情况展示:企业合规管控词库【{}】,正则敏感词库【{}】,业务自身词库【{}】",
  65. complianceSensitveHitRes1.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()),
  66. regularSensitveHitRes1.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()),
  67. thesaurusSensitveHitRes1.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  68. log.info("情况二:当任何一个异步任务异常完成,则返回的CompletableFuture 也会异常完成,并且将该异步任务的异常作为其原因。");
  69. CompletableFuture<SensitveHitContext> complianceSensitveHitRes2 = CompletableFuture.supplyAsync(() -> {
  70. try {
  71. return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.COMPLIANCE).handle(contentCleanRes.get());
  72. } catch (Exception e) {
  73. return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
  74. }
  75. }, threadPoolExecutor);
  76. log.info("任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【{}】",
  77. complianceSensitveHitRes2.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  78. CompletableFuture<SensitveHitContext> regularSensitveHitRes2 = CompletableFuture.supplyAsync(() -> {
  79. throw new RuntimeException("正则词库管控处理查看数据数据命中敏感词情况异常暂停");
  80. }, threadPoolExecutor);
  81. log.info("任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【{}】",
  82. regularSensitveHitRes2.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  83. CompletableFuture<SensitveHitContext> thesaurusSensitveHitRes2 = CompletableFuture.supplyAsync(() -> {
  84. try {
  85. return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.THESAURUS).handle(contentCleanRes.get());
  86. } catch (Exception e) {
  87. return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
  88. }
  89. }, threadPoolExecutor);
  90. log.info("任务三:根据相关业务配置进行相关词库校验匹配查看数据数据命中敏感词情况,命中结果展示【{}】",
  91. thesaurusSensitveHitRes2.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  92. CompletableFuture<Void> result2 = CompletableFuture.allOf(complianceSensitveHitRes2, regularSensitveHitRes2, thesaurusSensitveHitRes2);
  93. log.info("多任务组合:allOf执行完毕,相关词库敏感词命中情况展示:企业合规管控词库【{}】,正则敏感词库【{}】,业务自身词库【{}】",
  94. complianceSensitveHitRes2.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()),
  95. regularSensitveHitRes2.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()),
  96. thesaurusSensitveHitRes2.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  97. log.info("情况三:当存在多个异常完成时,则返回排在前面的异步任务的异常信息。");
  98. CompletableFuture<SensitveHitContext> complianceSensitveHitRes3 = CompletableFuture.supplyAsync(() -> {
  99. throw new RuntimeException("企业合规管控处理查看数据数据命中敏感词情况异常暂停");
  100. }, threadPoolExecutor);
  101. log.info("任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【{}】",
  102. complianceSensitveHitRes3.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  103. CompletableFuture<SensitveHitContext> regularSensitveHitRes3 = CompletableFuture.supplyAsync(() -> {
  104. throw new RuntimeException("正则合规管控处理查看数据数据命中敏感词情况异常暂停");
  105. }, threadPoolExecutor);
  106. log.info("任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【{}】",
  107. regularSensitveHitRes3.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  108. CompletableFuture<SensitveHitContext> thesaurusSensitveHitRes3 = CompletableFuture.supplyAsync(() -> {
  109. throw new RuntimeException("业务自身词库处理查看数据数据命中敏感词情况异常暂停");
  110. }, threadPoolExecutor);
  111. log.info("任务三:根据相关业务配置进行相关词库校验匹配查看数据数据命中敏感词情况,命中结果展示【{}】",
  112. thesaurusSensitveHitRes3.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  113. CompletableFuture<Void> result3 = CompletableFuture.allOf(complianceSensitveHitRes3, regularSensitveHitRes3, thesaurusSensitveHitRes3);
  114. log.info("多任务组合:allOf执行完毕,相关词库敏感词命中情况展示:企业合规管控词库【{}】,正则敏感词库【{}】,业务自身词库【{}】",
  115. complianceSensitveHitRes3.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()),
  116. regularSensitveHitRes3.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()),
  117. thesaurusSensitveHitRes3.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  118. }
  119. }

测试结果展示:

情况一验证结果:重点关注打印日志“content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精”、“任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】”、“任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[酒精, 南京]”和“任务三:根据相关业务配置进行相关词库校验匹配查看数据数据命中敏感词情况,命中结果展示【[腾讯]】”,以及整合结论“多任务组合:allOf执行完毕,相关词库敏感词命中情况展示:企业合规管控词库【[张彦峰]】,正则敏感词库【[酒精, 南京]】,业务自身词库【[腾讯]】”,管道相关处理日志暂时忽略。

  1. 2023-01-22 12:04:07,169 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 669] - 情况一:当给定的多个异步任务都正常完成后,返回一个新的 CompletableFuture,给定 CompletableFuture 的结果不会反映在返回的 CompletableFuture 中,但可以通过单独检查给定任务来获得结果
  2. 2023-01-22 12:04:07,171 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  3. 2023-01-22 12:04:07,200 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 29 ms to scan 1 urls, producing 4 keys and 18 values
  4. 2023-01-22 12:04:07,228 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 27 ms to scan 1 urls, producing 4 keys and 13 values
  5. 2023-01-22 12:04:07,255 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 26 ms to scan 1 urls, producing 3 keys and 9 values
  6. 2023-01-22 12:04:07,610 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  7. 2023-01-22 12:04:07,613 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 678] - 任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】
  8. 2023-01-22 12:04:07,623 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 687] - 任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[酒精, 南京]】
  9. 2023-01-22 12:04:07,624 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 696] - 任务三:根据相关业务配置进行相关词库校验匹配查看数据数据命中敏感词情况,命中结果展示【[腾讯]】
  10. 2023-01-22 12:04:07,625 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 699] - 多任务组合:allOf执行完毕,相关词库敏感词命中情况展示:企业合规管控词库【[张彦峰]】,正则敏感词库【[酒精, 南京]】,业务自身词库【[腾讯]】

情况二验证结果:重点关注打印日志“content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精”、“任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】”、“java.util.concurrent.ExecutionException: java.lang.RuntimeException: 正则词库管控处理查看数据数据命中敏感词情况异常暂停”,管道相关处理日志暂时忽略。

  1. 2023-01-22 12:05:22,159 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 704] - 情况二:当任何一个异步任务异常完成,则返回的CompletableFuture 也会异常完成,并且将该异步任务的异常作为其原因。
  2. 2023-01-22 12:05:22,162 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  3. 2023-01-22 12:05:22,198 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 36 ms to scan 1 urls, producing 4 keys and 18 values
  4. 2023-01-22 12:05:22,232 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 32 ms to scan 1 urls, producing 4 keys and 13 values
  5. 2023-01-22 12:05:22,259 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 26 ms to scan 1 urls, producing 3 keys and 9 values
  6. 2023-01-22 12:05:22,634 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  7. 2023-01-22 12:05:22,637 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 712] - 任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】
  8. java.util.concurrent.ExecutionException: java.lang.RuntimeException: 正则词库管控处理查看数据数据命中敏感词情况异常暂停
  9. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
  10. at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
  11. at org.zyf.javabasic.java8.CompletableFutureTest.testAllOf(CompletableFutureTest.java:718)
  12. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  13. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  14. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  15. at java.lang.reflect.Method.invoke(Method.java:498)
  16. at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
  17. at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
  18. at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
  19. at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
  20. at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)
  21. at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:84)
  22. at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
  23. at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
  24. at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
  25. at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
  26. at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)
  27. at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
  28. at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
  29. at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
  30. at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
  31. at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
  32. at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
  33. at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
  34. at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
  35. at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
  36. at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
  37. at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
  38. at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
  39. at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
  40. at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
  41. at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
  42. at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
  43. at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
  44. Caused by: java.lang.RuntimeException: 正则词库管控处理查看数据数据命中敏感词情况异常暂停
  45. at org.zyf.javabasic.java8.CompletableFutureTest.lambda$testAllOf$44(CompletableFutureTest.java:715)
  46. at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
  47. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  48. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  49. at java.lang.Thread.run(Thread.java:748)

情况三验证结果:重点关注打印日志“content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精”、“java.util.concurrent.ExecutionException: java.lang.RuntimeException: 企业合规管控处理查看数据数据命中敏感词情况异常暂停”,管道相关处理日志暂时忽略。

  1. 2023-01-22 12:06:23,329 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 734] - 情况三:当存在多个异常完成时,则返回排在前面的异步任务的异常信息。
  2. 2023-01-22 12:06:23,332 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  3. java.util.concurrent.ExecutionException: java.lang.RuntimeException: 企业合规管控处理查看数据数据命中敏感词情况异常暂停
  4. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
  5. at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
  6. at org.zyf.javabasic.java8.CompletableFutureTest.testAllOf(CompletableFutureTest.java:739)
  7. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  8. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  9. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  10. at java.lang.reflect.Method.invoke(Method.java:498)
  11. at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
  12. at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
  13. at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
  14. at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
  15. at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)
  16. at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:84)
  17. at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
  18. at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
  19. at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
  20. at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
  21. at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)
  22. at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
  23. at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
  24. at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
  25. at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
  26. at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
  27. at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
  28. at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
  29. at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
  30. at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
  31. at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
  32. at org.junit.runners.Suite.runChild(Suite.java:128)
  33. at org.junit.runners.Suite.runChild(Suite.java:27)
  34. at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
  35. at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
  36. at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
  37. at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
  38. at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
  39. at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
  40. at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
  41. at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
  42. at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
  43. at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
  44. at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
  45. at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
  46. at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
  47. Caused by: java.lang.RuntimeException: 企业合规管控处理查看数据数据命中敏感词情况异常暂停
  48. at org.zyf.javabasic.java8.CompletableFutureTest.lambda$testAllOf$43(CompletableFutureTest.java:736)
  49. at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
  50. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  51. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  52. at java.lang.Thread.run(Thread.java:748)

8.多任务组合:anyOf

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

我们限定用户文本的清洗完成清洗,然后异步查看词库情况。

使用举例代码:

  1. /**
  2. * @author yanfengzhang
  3. * @description CompletableFuture使用
  4. * @date 2022/12/29 23:45
  5. */
  6. @RunWith(SpringRunner.class)
  7. @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
  8. @Slf4j
  9. public class CompletableFutureTest {
  10. @Autowired
  11. private SensitivePipelineExecutor sensitivePipelineExecutor;
  12. /**
  13. * 多任务组合:anyOf
  14. * <p>
  15. * 多个给定的 CompletableFuture,当其中的任何一个完成时,方法返回这个 CompletableFuture
  16. * CompletableFuture是多个任务只要有一个任务执行完成,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回执行完成任务的结果。
  17. */
  18. @Test
  19. public void testAnyOf() throws ExecutionException, InterruptedException {
  20. CompletableFuture<ContentCleanResContext> contentCleanRes = CompletableFuture.supplyAsync(() ->
  21. sensitivePipelineExecutor.getContentCleanRes(
  22. ContentInfoContext.builder()
  23. .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  24. .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  25. .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()));
  26. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
  27. 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
  28. private AtomicInteger threadCount = new AtomicInteger(1);
  29. @Override
  30. public Thread newThread(Runnable r) {
  31. return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
  32. }
  33. });
  34. CompletableFuture<SensitveHitContext> complianceSensitveHitRes1 = CompletableFuture.supplyAsync(() -> {
  35. try {
  36. return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.COMPLIANCE).handle(contentCleanRes.get());
  37. } catch (Exception e) {
  38. return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
  39. }
  40. }, threadPoolExecutor);
  41. log.info("任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【{}】",
  42. complianceSensitveHitRes1.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  43. CompletableFuture<SensitveHitContext> regularSensitveHitRes1 = CompletableFuture.supplyAsync(() -> {
  44. try {
  45. return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.REGULAR).handle(contentCleanRes.get());
  46. } catch (Exception e) {
  47. return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
  48. }
  49. }, threadPoolExecutor);
  50. log.info("任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【{}】",
  51. regularSensitveHitRes1.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  52. CompletableFuture<SensitveHitContext> thesaurusSensitveHitRes1 = CompletableFuture.supplyAsync(() -> {
  53. try {
  54. return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.THESAURUS).handle(contentCleanRes.get());
  55. } catch (Exception e) {
  56. return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
  57. }
  58. }, threadPoolExecutor);
  59. log.info("任务三:根据相关业务配置进行相关词库校验匹配查看数据数据命中敏感词情况,命中结果展示【{}】",
  60. thesaurusSensitveHitRes1.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  61. CompletableFuture<Object> result1 = CompletableFuture.anyOf(complianceSensitveHitRes1, regularSensitveHitRes1, thesaurusSensitveHitRes1);
  62. log.info("多任务组合:allOf执行完毕,相关词库敏感词命中情况展示:企业合规管控词库【{}】,正则敏感词库【{}】,业务自身词库【{}】,allOf结果【{}】",
  63. complianceSensitveHitRes1.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()),
  64. regularSensitveHitRes1.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()),
  65. thesaurusSensitveHitRes1.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()),
  66. ((SensitveHitContext) result1.get()).getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
  67. }
  68. }

测试结果展示:

重点关注打印日志“content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精”、“任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】”、“任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[酒精, 南京]”和“任务三:根据相关业务配置进行相关词库校验匹配查看数据数据命中敏感词情况,命中结果展示【[腾讯]】”,以及整合结论“多任务组合:allOf执行完毕,相关词库敏感词命中情况展示:企业合规管控词库【[张彦峰]】,正则敏感词库【[酒精, 南京]】,业务自身词库【[腾讯]】,allOf结果【[张彦峰]】”,管道相关处理日志暂时忽略。

  1. 2023-01-22 12:07:14,047 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  2. 2023-01-22 12:07:14,089 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 42 ms to scan 1 urls, producing 4 keys and 18 values
  3. 2023-01-22 12:07:14,126 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 35 ms to scan 1 urls, producing 4 keys and 13 values
  4. 2023-01-22 12:07:14,156 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 29 ms to scan 1 urls, producing 3 keys and 9 values
  5. 2023-01-22 12:07:14,529 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  6. 2023-01-22 12:07:14,532 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 789] - 任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】
  7. 2023-01-22 12:07:14,543 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 798] - 任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[酒精, 南京]】
  8. 2023-01-22 12:07:14,544 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 807] - 任务三:根据相关业务配置进行相关词库校验匹配查看数据数据命中敏感词情况,命中结果展示【[腾讯]】
  9. 2023-01-22 12:07:14,545 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 810] - 多任务组合:allOf执行完毕,相关词库敏感词命中情况展示:企业合规管控词库【[张彦峰]】,正则敏感词库【[酒精, 南京]】,业务自身词库【[腾讯]】,allOf结果【[张彦峰]】

(四)结果处理使用

1.结果处理:whenComplete

  1. public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
  2. public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
  3. public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)

我们限定对用户文本清洗后,异步进行通知,对原返回结果不做干预,有异常则中断,注意该方法是没有返回值的。

使用举例代码:

  1. /**
  2. * @author yanfengzhang
  3. * @description CompletableFuture使用
  4. * @date 2022/12/29 23:45
  5. */
  6. @RunWith(SpringRunner.class)
  7. @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
  8. @Slf4j
  9. public class CompletableFutureTest {
  10. @Autowired
  11. private SensitivePipelineExecutor sensitivePipelineExecutor;
  12. /**
  13. * 结果处理:whenComplete
  14. * <p>
  15. * 一般与exceptionally()配合使用,获取前一个异步线程的结果和异常
  16. * 不论上一个阶段是正常/异常完成(即不会对阶段的原来结果产生影响);类似于 try..catch..finanlly 中 finally 代码块,无论是否发生异常都将会执行的。
  17. * 当某个任务执行完成后,会将执行结果或者执行期间抛出的异常传递给回调方法:
  18. * 如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,
  19. */
  20. @Test
  21. public void testWhenComplete() throws ExecutionException, InterruptedException {
  22. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
  23. 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
  24. private AtomicInteger threadCount = new AtomicInteger(1);
  25. @Override
  26. public Thread newThread(Runnable r) {
  27. return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
  28. }
  29. });
  30. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  31. log.info("异步任务获取用户文本清洗结果:用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】");
  32. ContentCleanResContext contentCleanResContext = sensitivePipelineExecutor.getContentCleanRes(
  33. ContentInfoContext.builder()
  34. .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰")
  35. .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰")
  36. .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build());
  37. return contentCleanResContext.getCleanContent() + 12/0;
  38. }, threadPoolExecutor).whenCompleteAsync((res, excption) -> {
  39. log.info("异步任务成功执行,进行异步通知和保存,结果是:{},异常是:", res, excption);
  40. }, threadPoolExecutor);
  41. log.info("异步任务获取用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】清洗结果:【{}】", future.get());
  42. }
  43. }

测试结果展示:

重点关注打印日志“异步任务成功执行,进行异步通知和保存,结果是:null,异常是:java.lang.ArithmeticException: / by zero”,管道相关处理日志暂时忽略。

  1. 2023-01-22 12:13:46,401 [thread-processor-1] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 837] - 异步任务获取用户文本清洗结果:用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】
  2. 2023-01-22 12:13:46,404 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  3. 2023-01-22 12:13:46,444 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 36 ms to scan 1 urls, producing 4 keys and 18 values
  4. 2023-01-22 12:13:46,485 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 39 ms to scan 1 urls, producing 4 keys and 13 values
  5. 2023-01-22 12:13:46,515 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 30 ms to scan 1 urls, producing 3 keys and 9 values
  6. 2023-01-22 12:13:46,918 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, cleanContent=中国张彦峰外卖, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  7. 2023-01-22 12:13:46,921 [thread-processor-2] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 845] - 异步任务成功执行,进行异步通知和保存,结果是:null,异常是:
  8. java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
  9. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
  10. at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
  11. at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
  12. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  13. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  14. at java.lang.Thread.run(Thread.java:748)
  15. Caused by: java.lang.ArithmeticException: / by zero
  16. at org.zyf.javabasic.java8.CompletableFutureTest.lambda$testWhenComplete$50(CompletableFutureTest.java:843)
  17. at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
  18. ... 3 common frames omitted
  19. java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
  20. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
  21. at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
  22. at org.zyf.javabasic.java8.CompletableFutureTest.testWhenComplete(CompletableFutureTest.java:847)
  23. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  24. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  25. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  26. at java.lang.reflect.Method.invoke(Method.java:498)
  27. at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
  28. at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
  29. at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
  30. at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
  31. at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)
  32. at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:84)
  33. at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
  34. at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
  35. at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
  36. at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
  37. at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)
  38. at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
  39. at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
  40. at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
  41. at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
  42. at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
  43. at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
  44. at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
  45. at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
  46. at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
  47. at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
  48. at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
  49. at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
  50. at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
  51. at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
  52. at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
  53. at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
  54. at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
  55. Caused by: java.lang.ArithmeticException: / by zero
  56. at org.zyf.javabasic.java8.CompletableFutureTest.lambda$testWhenComplete$50(CompletableFutureTest.java:843)
  57. at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
  58. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  59. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  60. at java.lang.Thread.run(Thread.java:748)

2.结果处理:exceptionally

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

我们限定对用户文本清洗后,异步进行通知,对原返回结果无异常正常返回,存在异常做干预做默认兜底返回,注意该方法是有返回值。

使用举例代码:

  1. /**
  2. * @author yanfengzhang
  3. * @description CompletableFuture使用
  4. * @date 2022/12/29 23:45
  5. */
  6. @RunWith(SpringRunner.class)
  7. @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
  8. @Slf4j
  9. public class CompletableFutureTest {
  10. @Autowired
  11. private SensitivePipelineExecutor sensitivePipelineExecutor;
  12. /**
  13. * 结果处理:exceptionally
  14. * <p>
  15. * 一般与whenComplete()配合使用,异常捕获范围包含前面的所有异步线程
  16. * 异常的结果处理(上一个阶段异常完成才会被执行);
  17. * 使用方式类似于 try catch中的catch代码块中异常处理;
  18. * 当某个任务执行异常时将抛出的异常作为参数传递到回调方法中,如果该任务正常执行,exceptionally方法返回的CompletionStage的result就是该任务正常执行的结果。
  19. */
  20. @Test
  21. public void testExceptionally() throws ExecutionException, InterruptedException {
  22. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
  23. 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
  24. private AtomicInteger threadCount = new AtomicInteger(1);
  25. @Override
  26. public Thread newThread(Runnable r) {
  27. return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
  28. }
  29. });
  30. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  31. log.info("异步任务获取用户文本清洗结果:用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】");
  32. ContentCleanResContext contentCleanResContext = sensitivePipelineExecutor.getContentCleanRes(
  33. ContentInfoContext.builder()
  34. .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰")
  35. .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰")
  36. .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build());
  37. return contentCleanResContext.getCleanContent() + 12/0;
  38. }, threadPoolExecutor).exceptionally(excption -> {
  39. /*可以感知异常,同时返回默认数据*/
  40. log.info("执行发生异常,返回默认数据,并异步通知相关业务方,异常信息为:" ,excption);
  41. return "";
  42. });
  43. log.info("异步任务获取用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】清洗结果:【{}】", future.get());
  44. }
  45. }

测试结果展示:

重点关注打印日志“执行发生异常,返回默认数据,并异步通知相关业务方,异常信息为:java.lang.ArithmeticException: / by zero”和“异步任务获取用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】清洗结果:【】”,管道相关处理日志暂时忽略。

  1. 2023-01-22 12:17:31,305 [thread-processor-1] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 870] - 异步任务获取用户文本清洗结果:用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】
  2. 2023-01-22 12:17:31,309 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  3. 2023-01-22 12:17:31,353 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 35 ms to scan 1 urls, producing 4 keys and 18 values
  4. 2023-01-22 12:17:31,392 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 38 ms to scan 1 urls, producing 4 keys and 13 values
  5. 2023-01-22 12:17:31,428 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 34 ms to scan 1 urls, producing 3 keys and 9 values
  6. 2023-01-22 12:17:31,939 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, cleanContent=中国张彦峰外卖, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  7. 2023-01-22 12:17:31,942 [thread-processor-1] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 879] - 执行发生异常,返回默认数据,并异步通知相关业务方,异常信息为:
  8. java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
  9. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
  10. at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
  11. at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
  12. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  13. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  14. at java.lang.Thread.run(Thread.java:748)
  15. Caused by: java.lang.ArithmeticException: / by zero
  16. at org.zyf.javabasic.java8.CompletableFutureTest.lambda$testExceptionally$52(CompletableFutureTest.java:876)
  17. at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
  18. ... 3 common frames omitted
  19. 2023-01-22 12:17:31,942 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 882] - 异步任务获取用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】清洗结果:【】

3.结果处理:handle

  1. public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
  2. public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
  3. public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

我们限定对用户文本清洗后,异步进行通知,对原返回结果可以做干预,有异常则可以采用默认返回处理,注意该方法是有返回值的,个人建议实际返回结果需要对内容和异常做处理的信息建议使用改方式。

使用举例代码:

  1. /**
  2. * @author yanfengzhang
  3. * @description CompletableFuture使用
  4. * @date 2022/12/29 23:45
  5. */
  6. @RunWith(SpringRunner.class)
  7. @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
  8. @Slf4j
  9. public class CompletableFutureTest {
  10. @Autowired
  11. private SensitivePipelineExecutor sensitivePipelineExecutor;
  12. /**
  13. * 结果处理:handle
  14. * <p>
  15. * 一般与whenComplete()配合使用,异常捕获范围包含前面的所有异步线程
  16. * 产出型方法,即可以对正常完成的结果进行转换,也可以对异常完成的进行补偿一个结果,即可以改变阶段的现状。
  17. * 跟whenComplete基本一致,区别在于handle的回调方法有返回值,
  18. * 且handle方法返回的CompletableFuture的result是回调方法的执行结果或者回调方法执行期间抛出的异常,与原始CompletableFuture的result无关。
  19. */
  20. @Test
  21. public void testHandle() throws ExecutionException, InterruptedException {
  22. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
  23. 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
  24. private AtomicInteger threadCount = new AtomicInteger(1);
  25. @Override
  26. public Thread newThread(Runnable r) {
  27. return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
  28. }
  29. });
  30. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  31. log.info("异步任务获取用户文本清洗结果:用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】");
  32. ContentCleanResContext contentCleanResContext = sensitivePipelineExecutor.getContentCleanRes(
  33. ContentInfoContext.builder()
  34. .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰")
  35. .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰")
  36. .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build());
  37. return contentCleanResContext.getCleanContent();
  38. }, threadPoolExecutor).handle((res, excption) -> {
  39. /*异步方法执行完的后续处理*/
  40. if (Objects.nonNull(excption)) {
  41. log.info("执行发生异常,返回默认数据,异常信息为:" + excption);
  42. return "";
  43. }
  44. log.info("异步任务成功执行,上一步的结果是:" + res);
  45. return res + "(用于测试)";
  46. });
  47. log.info("异步任务获取用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】清洗结果:【{}】", future.get());
  48. }
  49. }

测试结果展示:

重点关注打印日志“异步任务成功执行,上一步的结果是:中国张彦峰外卖”和“异步任务获取用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】清洗结果:【中国张彦峰外卖(用于测试)】”,管道相关处理日志暂时忽略,同时异常的处理此处不展示(有兴趣可按照上面的方式增加异常进行查看)。

  1. 2023-01-22 12:21:33,625 [thread-processor-1] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 905] - 异步任务获取用户文本清洗结果:用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】
  2. 2023-01-22 12:21:33,628 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  3. 2023-01-22 12:21:33,675 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 38 ms to scan 1 urls, producing 4 keys and 18 values
  4. 2023-01-22 12:21:33,707 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 31 ms to scan 1 urls, producing 4 keys and 13 values
  5. 2023-01-22 12:21:33,737 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 29 ms to scan 1 urls, producing 3 keys and 9 values
  6. 2023-01-22 12:21:34,186 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, cleanContent=中国张彦峰外卖, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  7. 2023-01-22 12:21:34,187 [thread-processor-1] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 918] - 异步任务成功执行,上一步的结果是:中国张彦峰外卖
  8. 2023-01-22 12:21:34,187 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 921] - 异步任务获取用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】清洗结果:【中国张彦峰外卖(用于测试)】

(五)方法混合使用举例

1.多个方法组合使用

回到刚开始的敏感词系统,将核心功能:文本清洗、敏感词验证、敏感词干预生效在同一个服务中连贯处理:

  • 第一步,文本清洗针对验证的文本将直接去除其中的特殊符号、表情包、隐藏符号、中文简体繁体转换等内容,只保留文本中含有的中英文和数字信息。
  • 第二步,清洗后,将清洗文本与实际相关的词库进行验证来查看是否命中相关敏感词,并给出词库命中的敏感词信息。
  • 第三步,如果存在词库命中的敏感词,同时考虑各词是否有加白以及生效规则(只对某区域生效等),从而确定最终的命中信息。

原文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精】中对应[中国,张彦峰,外卖,腾讯, 酒精, 南京]都是各个词库要求的敏感词,但是对于[中国,外卖]是针对业务做的加白处理等,所以实际生效敏感词是[张彦峰, 腾讯, 酒精, 南京]

使用举例代码:

  1. /**
  2. * @author yanfengzhang
  3. * @description CompletableFuture使用
  4. * @date 2022/12/29 23:45
  5. */
  6. @RunWith(SpringRunner.class)
  7. @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
  8. @Slf4j
  9. public class CompletableFutureTest {
  10. @Autowired
  11. private SensitivePipelineExecutor sensitivePipelineExecutor;
  12. /**
  13. * 多个方法组合使用
  14. * <p>
  15. * 测试组合敏感词处理逻辑
  16. */
  17. @Test
  18. public void testSensitiveDeal() throws ExecutionException, InterruptedException {
  19. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
  20. 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
  21. private AtomicInteger threadCount = new AtomicInteger(1);
  22. @Override
  23. public Thread newThread(Runnable r) {
  24. return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
  25. }
  26. });
  27. log.info("开始对用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精】进行敏感词分析");
  28. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  29. return sensitivePipelineExecutor.getContentCleanRes(
  30. ContentInfoContext.builder()
  31. .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  32. .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
  33. .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build());
  34. }, threadPoolExecutor)
  35. .thenComposeAsync(new Function<ContentCleanResContext, CompletionStage<SensitveHitContext>>() {
  36. @Override
  37. public CompletionStage<SensitveHitContext> apply(ContentCleanResContext contentCleanResInfo) {
  38. return CompletableFuture.supplyAsync(new Supplier<SensitveHitContext>() {
  39. @Override
  40. public SensitveHitContext get() {
  41. return sensitivePipelineExecutor.getSensitveHitRes(contentCleanResInfo);
  42. }
  43. });
  44. }
  45. }, threadPoolExecutor)
  46. .thenApplyAsync((sensitveHitContext) ->
  47. sensitivePipelineExecutor.getSensitveEffectiveRes(sensitveHitContext), threadPoolExecutor)
  48. .handle((res, excption) -> {
  49. /*异步方法执行完的后续处理*/
  50. if (Objects.nonNull(excption)) {
  51. log.info("执行发生异常,返回默认数据,异常信息为:" + excption);
  52. return null;
  53. }
  54. log.info("异步任务成功执行,上一步的结果是:" + res);
  55. return res.getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()).toString();
  56. });
  57. log.info("异步任务获取用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精】命中敏感词为:【{}】", future.get());
  58. }
  59. }

测试结果展示:

重点关注打印日志“content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精”,以及对应处理结果“异步任务获取用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精】命中敏感词为:【[张彦峰, 腾讯, 酒精, 南京]】”,管道相关处理日志暂时忽略。

  1. 2023-01-22 12:24:38,526 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 941] - 开始对用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精】进行敏感词分析
  2. 2023-01-22 12:24:38,532 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  3. 2023-01-22 12:24:38,576 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 38 ms to scan 1 urls, producing 4 keys and 18 values
  4. 2023-01-22 12:24:38,608 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 31 ms to scan 1 urls, producing 4 keys and 13 values
  5. 2023-01-22 12:24:38,642 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 33 ms to scan 1 urls, producing 3 keys and 9 values
  6. 2023-01-22 12:24:39,048 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
  7. 2023-01-22 12:24:39,050 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
  8. 2023-01-22 12:24:39,064 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
  9. 2023-01-22 12:24:39,065 [thread-processor-3] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【模型实例(敏感词命中)构建上下文】, context=SensitveHitContext(hasHit=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), hitWords=[SensitiveWord(sensitive=张彦峰, sensitiveId=23, kind=4), SensitiveWord(sensitive=腾讯, sensitiveId=23, kind=2), SensitiveWord(sensitive=酒精, sensitiveId=11, kind=5), SensitiveWord(sensitive=南京, sensitiveId=11, kind=5)])
  10. 2023-01-22 12:24:39,068 [thread-processor-3] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【模型实例(敏感词命中)构建上下文】, context=SensitveHitContext(hasHit=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), hitWords=[SensitiveWord(sensitive=张彦峰, sensitiveId=23, kind=4), SensitiveWord(sensitive=腾讯, sensitiveId=23, kind=2), SensitiveWord(sensitive=酒精, sensitiveId=11, kind=5), SensitiveWord(sensitive=南京, sensitiveId=11, kind=5)])
  11. 2023-01-22 12:24:39,068 [thread-processor-3] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 968] - 异步任务成功执行,上一步的结果是:SensitveEffectiveContext(isHit=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, hitWords=[SensitiveWord(sensitive=张彦峰, sensitiveId=23, kind=4), SensitiveWord(sensitive=腾讯, sensitiveId=23, kind=2), SensitiveWord(sensitive=酒精, sensitiveId=11, kind=5), SensitiveWord(sensitive=南京, sensitiveId=11, kind=5)], whitedWords=[], complianceIgnoreWords=[], ruleIgnoreWords=[])
  12. 2023-01-22 12:24:39,069 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 971] - 异步任务获取用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精】命中敏感词为:【[张彦峰, 腾讯, 酒精, 南京]】

2.并发处理批量任务

假设线上可以对于运营批量新建的商品数据内容进行敏感词分析,针对批量创建的各个商品针对其相关信息(商品名称/商品描述/分类和分类描述/规格名称/属性和属性值/套餐/商品图片特效/商品普通属性和属性值/商家名称/商家公告等)进行敏感词监控并给出对应的结果报告,显然商品数据居多这样的内容建议采用多线程批发处理,此时异步的优势就体现了,具体实现我们按以下方式进行模拟实现:

A.模拟规定校验商品敏感词信息归类

  1. /**
  2. * @author yanfengzhang
  3. * @description 校验商品敏感词信息归类
  4. * @date 2023/1/4 23:00
  5. */
  6. public enum SensitiveValidateField {
  7. NAME(1, "商品名称"),
  8. DESCRIPTION(2, "商品描述"),
  9. TAG_NAME(3, "分类和分类描述"),
  10. SPEC(4, "规格名称"),
  11. ATTR_NAME(5, "属性和属性值"),
  12. COMBO_MEAL(6, "套餐"),
  13. SPECIAL_EFFECT_PIC(7, "商品图片特效"),
  14. CUSTOM_ATTR_NAME(8, "自定义属性"),
  15. ORDINARY_ATTR_NAME(9, "商品普通属性和属性值"),
  16. SP_NAME(10, "标品名称"),
  17. POI_NAME(11, "商家名称"),
  18. POI_ANNOUNCEMENT(12, "商家公告");
  19. /**
  20. * 文本归类(商品名称、商品描述、商家公告、商家名称、经营描述、代言信息等)
  21. */
  22. private Integer type;
  23. /**
  24. * 文本归类描述
  25. */
  26. private String desc;
  27. public Integer getType() {
  28. return type;
  29. }
  30. public String getDesc() {
  31. return desc;
  32. }
  33. SensitiveValidateField(Integer code) {
  34. this.type = code;
  35. }
  36. SensitiveValidateField(Integer code, String desc) {
  37. this.desc = desc;
  38. this.type = code;
  39. }
  40. /**
  41. * 根据文本归类type获取对应的文本归类信息
  42. *
  43. * @param type 文本归类type
  44. * @return
  45. */
  46. public static SensitiveValidateField getEnumById(Integer type) {
  47. for (SensitiveValidateField sensitiveValidateField : SensitiveValidateField.values()) {
  48. if (sensitiveValidateField.getType().equals(type)) {
  49. return sensitiveValidateField;
  50. }
  51. }
  52. return null;
  53. }
  54. /**
  55. * 根据文本归类信息描述获取对应的文本归类信息
  56. *
  57. * @param desc 文本归类描述
  58. * @return
  59. */
  60. public static SensitiveValidateField getEnumByDesc(String desc) {
  61. for (SensitiveValidateField sensitiveValidateField : SensitiveValidateField.values()) {
  62. if (sensitiveValidateField.getDesc().equals(desc)) {
  63. return sensitiveValidateField;
  64. }
  65. }
  66. return null;
  67. }
  68. /**
  69. * 判断文本归类type是否在指定范围
  70. *
  71. * @param type 文本归类type
  72. * @return true-在指定范围
  73. */
  74. public static boolean isSensitiveValidateField(Integer type) {
  75. if (null == type) {
  76. return false;
  77. }
  78. for (SensitiveValidateField tempEnum : SensitiveValidateField.values()) {
  79. if (tempEnum.getType().equals(type)) {
  80. return true;
  81. }
  82. }
  83. return false;
  84. }
  85. }

B.模拟从一个上传的商品信息中提炼商品内含的文本数据

  1. /**
  2. * 模拟从一个上传的商品信息中提炼商品内含的文本数据
  3. *
  4. * @return
  5. */
  6. private List<ContentInfoContext> getContentInfosBySpu() {
  7. List<ContentInfoContext> contentInfoContexts = Lists.newArrayList();
  8. List<String> spuInfoList = Lists.newArrayList("張彥峰", "肯德基", "外賣", "腾讯", "禁药", "酒精南京",
  9. "18252066688", "饿了吗", "廉政勤政", "中国", "巴黎圣母院", "莫沫南路", "辉瑞P药", "捷倍安", "极速达", "老城一埚",
  10. "星即送", "连花清温", "美乐滋", "山茱萸", "欧美齐", "长江鱼", "人气榜第一", "蟹礼券", "茶ta颜悦色", "可食用黄金",
  11. "安培开席", "摇钱树", "特丁通", "草甘膦", "叫只鸡", "贱男消食片", "维信识别", "龙闩花甲", "鮑家糕点", "至尊至比萨",
  12. "胖大哥肉蟹煲", "OBLIGI韩式炸鸡", "星芭芭", "欢乐柠檬", "水多活好", "胸胸烈火", "比基妮", "成人艺术", "仿真手枪", "机关枪");
  13. for (int i = 0; i < 12; i++) {
  14. String contentInfo = spuInfoList.get(getRandomNumberInRange(0, spuInfoList.size() - 1)) + spuInfoList.get(getRandomNumberInRange(0, spuInfoList.size() - 1))
  15. + spuInfoList.get(getRandomNumberInRange(0, spuInfoList.size() - 1));
  16. contentInfoContexts.add(ContentInfoContext.builder()
  17. .content(contentInfo)
  18. .cleanContent(contentInfo)
  19. .contentAttr(ContentAttr.builder()
  20. .belong(SensitiveValidateField.getEnumById(i + 1))
  21. .bizType(BizType.E_COMMERCE.getType())
  22. .cityCode(110010).build()).build());
  23. }
  24. return contentInfoContexts;
  25. }
  26. private static int getRandomNumberInRange(int min, int max) {
  27. Random r = new Random();
  28. return r.ints(min, (max + 1)).limit(1).findFirst().getAsInt();
  29. }

C.模拟处理一个商品信息中的敏感词命中情况统计,异步并发

  1. /**
  2. * 模拟处理一个商品信息中的敏感词命中情况统计
  3. */
  4. private String containsSensitiveWordSpu(SpuSensitiveDealCommand spuSensitiveDealCommand) {
  5. /*1.模拟从一个上传的商品信息中提炼商品内含的文本数据*/
  6. List<ContentInfoContext> spuContentInfos = getContentInfosBySpu();
  7. /*2.对商品各项数据进行校验检查*/
  8. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
  9. 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
  10. private AtomicInteger threadCount = new AtomicInteger(1);
  11. @Override
  12. public Thread newThread(Runnable r) {
  13. return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
  14. }
  15. });
  16. int isHit = 1;
  17. int exceptionHit = 2;
  18. List<CompletableFuture<Map<Integer, String>>> spuSensitiveDealRes = spuContentInfos.stream().map(contentInfoContext -> {
  19. return CompletableFuture.supplyAsync(() -> {
  20. return sensitivePipelineExecutor.getContentCleanRes(contentInfoContext);
  21. }, threadPoolExecutor)
  22. .thenComposeAsync(new Function<ContentCleanResContext, CompletionStage<SensitveHitContext>>() {
  23. @Override
  24. public CompletionStage<SensitveHitContext> apply(ContentCleanResContext contentCleanResInfo) {
  25. return CompletableFuture.supplyAsync(new Supplier<SensitveHitContext>() {
  26. @Override
  27. public SensitveHitContext get() {
  28. return sensitivePipelineExecutor.getSensitveHitRes(contentCleanResInfo);
  29. }
  30. });
  31. }
  32. }, threadPoolExecutor)
  33. .thenApplyAsync((sensitveHitContext) ->
  34. sensitivePipelineExecutor.getSensitveEffectiveRes(sensitveHitContext), threadPoolExecutor)
  35. .handle((res, excption) -> {
  36. Map<Integer, String> hitdetail = Maps.newHashMap();
  37. StringBuilder hitInfoRes = new StringBuilder();
  38. hitInfoRes.append("【").append(contentInfoContext.getContentAttr().getBelong().getDesc())
  39. .append("】[").append(contentInfoContext.getContent()).append("]");
  40. /*异步方法执行完的后续处理*/
  41. if (Objects.nonNull(excption)) {
  42. log.info("执行发生异常,返回默认数据,异常信息为:" + excption);
  43. hitInfoRes.append("校验敏感词异常:").append(excption.getMessage());
  44. hitdetail.put(exceptionHit, hitInfoRes.toString());
  45. return hitdetail;
  46. }
  47. if (!res.getIsHit()) {
  48. return hitdetail;
  49. }
  50. log.info("异步任务成功执行,上一步的结果是:" + res);
  51. hitInfoRes.append("命中敏感词:").append(res.getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()).toString());
  52. hitdetail.put(isHit, hitInfoRes.toString());
  53. return hitdetail;
  54. });
  55. }).collect(Collectors.toList());
  56. /*3.等待所有并行线程完成,结束单词单词商品处理*/
  57. CompletableFuture.allOf(spuSensitiveDealRes.toArray(new CompletableFuture[]{})).join();
  58. threadPoolExecutor.shutdown();
  59. /*4.整合单商品最终结果*/
  60. List<String> validHits = Lists.newArrayList();
  61. List<String> exceptionHits = Lists.newArrayList();
  62. spuSensitiveDealRes.stream().forEach(hitInfo -> {
  63. Map<Integer, String> hitdetail;
  64. try {
  65. hitdetail = hitInfo.get();
  66. } catch (Exception e) {
  67. return;
  68. }
  69. if (MapUtils.isEmpty(hitdetail)) {
  70. return;
  71. }
  72. if (StringUtils.isNotBlank(hitdetail.get(isHit))) {
  73. validHits.add(hitdetail.get(isHit));
  74. }
  75. if (StringUtils.isNotBlank(hitdetail.get(exceptionHit))) {
  76. exceptionHits.add(hitdetail.get(exceptionHit));
  77. }
  78. });
  79. /*4.1 如果没有命中敏感词以及异常情况直接返回*/
  80. String spuName = spuContentInfos.stream().filter(contentInfoContext -> contentInfoContext.getContentAttr().getBelong() == SensitiveValidateField.NAME)
  81. .findFirst().get().getContent();
  82. StringBuilder hitInfoRes = new StringBuilder();
  83. if (CollectionUtils.isEmpty(validHits) && CollectionUtils.isEmpty(exceptionHits)) {
  84. hitInfoRes.append("商品【").append(spuName).append("】未命中敏感词");
  85. return hitInfoRes.toString();
  86. }
  87. /*4.2 统计其中有效命中信息*/
  88. if (CollectionUtils.isNotEmpty(validHits)) {
  89. hitInfoRes.append("商品【").append(spuName).append("】敏感词命中情况统计:\n");
  90. validHits.stream().forEach(hitInfo -> {
  91. hitInfoRes.append(hitInfo).append("\n");
  92. });
  93. }
  94. /*4.3 统计其中异常命中信息*/
  95. if (CollectionUtils.isNotEmpty(exceptionHits)) {
  96. hitInfoRes.append("商品【").append(spuName).append("】敏感词异常处理情况统计:\n");
  97. exceptionHits.stream().forEach(hitInfo -> {
  98. hitInfoRes.append(hitInfo).append("\n");
  99. });
  100. }
  101. return hitInfoRes.toString();
  102. }

D.模拟并发处理批量任务

  1. /**
  2. * 并发处理批量任务
  3. * <p>
  4. * 测试组合敏感词处理逻辑
  5. */
  6. @Test
  7. public void testSensitiveBatchDeal() throws ExecutionException, InterruptedException {
  8. /*定义单个商品处理任务*/
  9. class SpuSentiveTask implements Callable<String> {
  10. private final SpuSensitiveDealCommand command;
  11. public SpuSentiveTask(SpuSensitiveDealCommand command) {
  12. this.command = command;
  13. }
  14. @Override
  15. public String call() throws Exception {
  16. return containsSensitiveWordSpu(command);
  17. }
  18. }
  19. /*模拟批量商品 假设前端传入十个商品,我们将十个商品并发执行*/
  20. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
  21. 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
  22. private AtomicInteger threadCount = new AtomicInteger(1);
  23. @Override
  24. public Thread newThread(Runnable r) {
  25. return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
  26. }
  27. });
  28. List<SpuSentiveTask> spuSentiveTasks = Lists.newArrayList();
  29. for (int i = 0; i < 10; i++) {
  30. spuSentiveTasks.add(new SpuSentiveTask(new SpuSensitiveDealCommand()));
  31. }
  32. List<Future<String>> spuSensitiveBatchRes = threadPoolExecutor.invokeAll(spuSentiveTasks);
  33. threadPoolExecutor.shutdown();
  34. if (CollectionUtils.isEmpty(spuSensitiveBatchRes)) {
  35. log.info("本批次商品数据中不存在敏感词!");
  36. return;
  37. }
  38. final int[] i = {0};
  39. spuSensitiveBatchRes.forEach(spuSensitiveRes -> {
  40. try {
  41. log.info("商品编号{}对应敏感词校验情况如下:", i[0]);
  42. log.info(spuSensitiveRes.get());
  43. i[0]++;
  44. } catch (Exception e) {
  45. log.error("商品编号{}对应敏感词校验情况提取失败", i[0]);
  46. i[0]++;
  47. }
  48. });
  49. }

E.结果展示

原日志因数据太大不在展示,重点关注日志随机选取如下,管道相关处理日志暂时忽略(因为是模拟很多原始数据是随机产生的名称,可以将本问源码直接运行查看最新的随机数据即可,以下是某次随机验证结果):

  1. 商品编号0对应敏感词校验情况如下:
  2. 商品【酒精南京18252066688腾讯】敏感词命中情况统计:
  3. 【商品名称】[酒精南京18252066688腾讯]命中敏感词:[腾讯, 酒精, 南京]
  4. 【分类和分类描述】[肯德基酒精南京茶ta颜悦色]命中敏感词:[肯德基, 酒精, 南京]
  5. 【规格名称】[胖大哥肉蟹煲连花清温酒精南京]命中敏感词:[酒精, 南京]
  6. 【属性和属性值】[极速达人气榜第一禁药]命中敏感词:[禁药]
  7. 【套餐】[腾讯长江鱼机关枪]命中敏感词:[腾讯]
  8. 【标品名称】[酒精南京辉瑞P药安培开席]命中敏感词:[酒精, 南京]
  9. 【商家名称】[比基妮特丁通酒精南京]命中敏感词:[酒精, 南京]
  10. 商品编号1对应敏感词校验情况如下:
  11. 商品【人气榜第一酒精南京成人艺术】敏感词命中情况统计:
  12. 【商品名称】[人气榜第一酒精南京成人艺术]命中敏感词:[酒精, 南京]
  13. 【分类和分类描述】[捷倍安水多活好腾讯]命中敏感词:[腾讯]
  14. 【套餐】[張彥峰贱男消食片仿真手枪]命中敏感词:[张彦峰]
  15. 【商家公告】[极速达蟹礼券禁药]命中敏感词:[禁药]
  16. 商品编号2对应敏感词校验情况如下:
  17. 商品【胸胸烈火茶ta颜悦色饿了吗】敏感词命中情况统计:
  18. 【规格名称】[摇钱树肯德基山茱萸]命中敏感词:[肯德基]
  19. 【商品普通属性和属性值】[肯德基贱男消食片禁药]命中敏感词:[肯德基, 禁药]
  20. 商品编号3对应敏感词校验情况如下:
  21. 商品【草甘膦巴黎圣母院维信识别】敏感词命中情况统计:
  22. 【分类和分类描述】[禁药美乐滋欢乐柠檬]命中敏感词:[禁药]
  23. 【自定义属性】[腾讯龙闩花甲草甘膦]命中敏感词:[腾讯]
  24. 【商品普通属性和属性值】[張彥峰长江鱼外賣]命中敏感词:[张彦峰]
  25. 商品编号4对应敏感词校验情况如下:
  26. 商品【贱男消食片18252066688巴黎圣母院】敏感词命中情况统计:
  27. 【分类和分类描述】[中国肯德基茶ta颜悦色]命中敏感词:[肯德基]
  28. 【规格名称】[腾讯安培开席贱男消食片]命中敏感词:[腾讯]
  29. 【属性和属性值】[禁药美乐滋极速达]命中敏感词:[禁药]
  30. 【商家名称】[成人艺术星即送腾讯]命中敏感词:[腾讯]
  31. 商品编号5对应敏感词校验情况如下:
  32. 商品【机关枪張彥峰人气榜第一】敏感词命中情况统计:
  33. 【商品名称】[机关枪張彥峰人气榜第一]命中敏感词:[张彦峰]
  34. 【商品描述】[禁药贱男消食片叫只鸡]命中敏感词:[禁药]
  35. 【属性和属性值】[中国張彥峰老城一埚]命中敏感词:[张彦峰]
  36. 【商品图片特效】[比基妮酒精南京胖大哥肉蟹煲]命中敏感词:[酒精, 南京]
  37. 【标品名称】[中国星芭芭腾讯]命中敏感词:[腾讯]

参考文章

1.CompletableFuture 使用介绍_Javadoop

2.CompletableFuture使用详解_头未秃的博客-CSDN博客_completablefuture 使用

3.CompletableFuture基本使用_中国一动的博客-CSDN博客_completablefuture使用

4.CompletableFuture使用详解(全网看这一篇就行)_代码搬运工阿新的博客-CSDN博客

5.CompletableFuture使用详解_sermonlizhi的博客-CSDN博客_completablefuture

6.CompletableFuture用法详解_LallanaLee的博客-CSDN博客_completablefuture用法

7.CompletableFuture原理解析 | 老司机撩Java

8.图解CompletableFuture源码_weixin_38592881的博客-CSDN博客

9.深入解读CompletableFuture源码与原理_CoderBruis的博客-CSDN博客_future.isdown()

10.https://blog.csdn.net/qq_52791485/article/details/126980976

11.CompletableFuture源码解析_pngyul的博客-CSDN博客_tryfire

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/948952
推荐阅读
相关标签
  

闽ICP备14008679号