当前位置:   article > 正文

JUC并发编程与源码分析学习笔记(一)

JUC并发编程与源码分析学习笔记(一)

目录

一、JUC教程简介

二、为什么学好用好多线程如此重要

三、start线程开启C源码分析

1、(代码)base目录下:ThreadBaseDemo.java

四、基础概念复习

五、用户守护线程理论

六、用户守护线程代码演示和总结

2、(代码)base目录下:DaemonDemo.java

七、CompletableFuture之Future为什么出现

八、CompletableFuture之引出FutureTask-上集

3、(代码)cf目录下:CompletableFutureDemo.java

九、CompletableFuture之引出FutureTask-中集

十一、CompletableFuture之FutureTask结合线程池提升性能

4、(代码)cf目录下:FutureThreadPoolDemo.java

十二、CompletableFuture之get获取容易阻塞

5、(代码)cf目录下:FutureAPIDemo.java

十三、CompletableFuture之轮询耗费CPU

十四、CompletableFuture之Future异步优化思路

十五、CompletableFuture之CompletionStage源码分析

十六、CompletableFuture之四大静态方法初讲

6、(代码)cf目录下:CompletableFutureBuildDemo.java

十七、CompletableFuture之通用异步编程-上集

7、(代码)cf目录下:CompletableFutureUseDemo.java

十九、CompletableFuture之链式语法和join方法介绍

二十、CompletableFuture之电商比价大厂案例需求分析

二十一、CompletableFuture之电商比价大厂案例编码实战-上集

二十三、CompletableFuture之获得结果和触发计算

8、(代码)cf目录下:CompletableFutureAPIDemo.java

二十四、CompletableFuture之对计算结果进行处理

9、(代码)cf目录下:CompletableFutureAPI2Demo.java

二十五、CompletableFuture之对计算结果进行消费

10、(代码)cf目录下:CompletableFutureAPI3Demo.java

二十六、CompletableFuture之线程池运行选择

11、(代码)cf目录下:CompletableFutureWithThreadPoolDemo.java

二十七、CompletableFuture之对计算速度选用

12、(代码)cf目录下:CompletableFutureFastDemo.java

二十八、CompletableFuture之对计算结果合并

13、(代码)cf目录下:CompletableFutureCombineDemo.java


一、JUC教程简介

在现今的互联网行业,尤其是开发工程师岗位,如果你对高并发、多线程都没有接触和了解,那肯定无法成为真正的高级开发工程师。高并发、多线程技术是目前非常重要的技术壁垒和对高级开发人员的要求,而要成为优秀的高薪程序员,高并发系统的架构设计和多线程硬核编码技能是当下你必须要掌握的

JUC是什么

java.util.concurrent在并发编程中使用的工具包(对JUC知识的高阶内容讲解和实战增强)

前置知识

①、IDEA之lombok插件

②、Java8新特性(Java8语法本身+函数式编程+方法引用+lambda表达式)

③、JUC初级篇

④、JVM:JVM体系结构(参考资料:深入理解Java虚拟机)

二、为什么学好用好多线程如此重要

java.util.concurrent

java.util.concurrent.atomic

java.util.concurrent.locks

软件方便好处

①、面试B格可以高一点点

②、充分利用多核处理器

③、提高程序性能,高并发系统

④、提高程序吞吐量,异步+回调等生产需求

弊端及问题

①、线程安全性问题(i++、集合类安全否)

②、线程锁问题(synchronized是重量级锁(性能差)---提升--->偏向锁、轻量锁)

③、线程性能问题

三、start线程开启C源码分析

* private native void start0();
* native代表JNI本地接口的调用,调的是第三方C语言所编写的底层函数或者是操作
* 系统的底层代码,从操作系统的这个角度,大家都了解。在我们的任务管理器里面,
* 就算没有装JDK,它也有操作系统级别的进程和线程,有进程必然会有线程。
*
* 对于我们的多线程,它跟语言没有太多关系,是操作系统层面给的

1、(代码)base目录下:ThreadBaseDemo.java

  1. package com.nanjing.juc.base;
  2. /**
  3. * private native void start0();
  4. * native代表JNI本地接口的调用,调的是第三方C语言所编写的底层函数或者是操作
  5. * 系统的底层代码,从操作系统的这个角度,大家都了解。在我们的任务管理器里面,
  6. * 就算没有装JDK,它也有操作系统级别的进程和线程,有进程必然会有线程。
  7. *
  8. * 对于我们的多线程,它跟语言没有太多关系,是操作系统层面给的
  9. *
  10. * @author xizheng
  11. * @date 2023-08-07 20:32:10
  12. */
  13. public class ThreadBaseDemo {
  14. public static void main(String[] args) {
  15. //start方法测试
  16. Thread t1 = new Thread(() -> {
  17. },"t1");
  18. t1.start();
  19. }
  20. }

四、基础概念复习

1把锁:synchronized

2个并:

并发(concurrent)

①、是在同一实体上的多个事件

②、是在一台处理器上“同时”处理多个任务

③、同一时刻,其实是只有一个事件在发生

并行(parallel)

①、是在不同实体上的多个事件

②、是在多台处理器上同时处理多个任务

③、同一时刻,大家真的都在做事情,你做你的,我做我的,但是我们都在做

并发vs并行

3个程:

①、进程

简单的说,在系统中运行的一个应用程序就是一个进程,每一个进程都有它自己的内存空间和系统资源。

②、线程

也被称为轻量级进程,在同一个进程内会有1个或多个线程,是大多数操作系统进行时序调度的基本单元

③、管程(重要)

Monitor(监视器),也就是我们平时所说的锁

Monitor其实是一种同步机制,它的义务是保证(同一时间)只有一个线程可以访问被保护的数据和代码。

JVM中同步是基于进入和退出监视器对象(Monitor,管程对象)来实现的,每个对象实例都会有一个Monitor对象

  1. //管程测试
  2. Object o = new Object();
  3. new Thread(() -> {
  4. synchronized (o){
  5. }
  6. },"t1").start();

Monitor对象会和Java对象一同创建并销毁,它底层是由C++语言来实现的。

JVM第3版

执行线程就要求先成功持有管程,然后才能执行方法,最后当方法完成(无论是正常完后曾还是非正常完成)时释放管程。在方法执行期间,执行线程持有了管程,其他任何线程都无法再获取到同一个管程。

五、用户守护线程理论

Java线程分为用户线程和守护线程

①、一般情况下不做特别说明配置,默认都是用户线程

②、用户线程(User Thread)

是系统的工作线程,它会完成这个程序需要完成的业务操作

③、守护线程(Daemon Thread)

Ⅰ、是一种特殊的线程为其它线程服务的,在后台默默地完成一些系统性的服务,比如垃圾回收线程就是最典型的例子

Ⅱ、守护线程作为一个服务线程,没有服务对象就没有必要继续执行了,如果用户线程全部结束了,意味着程序需要完成的业务操作已经结束了,系统可以退出了。所以假如当系统只剩下守护线程的时候,java虚拟机会自动退出。

线程的daemon属性

  1. public final boolean isDaemon() {
  2. return this.daemon;
  3. }

true表示是守护线程

false表示是用户线程

六、用户守护线程代码演示和总结

2、(代码)base目录下:DaemonDemo.java

  1. package com.nanjing.juc.base;
  2. import java.util.concurrent.TimeUnit;
  3. /**
  4. * 用户线程和守护线程
  5. *
  6. * @author xizheng
  7. * @date 2023-08-07 21:16:53
  8. */
  9. public class DaemonDemo {
  10. public static void main(String[] args) {//一切方法运行的入口
  11. Thread t1 = new Thread(() -> {
  12. System.out.println(Thread.currentThread().getName()+"\t 开始运行, "+
  13. (Thread.currentThread().isDaemon() ? "守护线程":"用户线程"));
  14. while(true){
  15. System.out.println("123");
  16. }
  17. },"t1");
  18. t1.setDaemon(true);
  19. t1.start();
  20. //暂停几秒钟线程
  21. try {
  22. TimeUnit.SECONDS.sleep(3);
  23. }catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. System.out.println(Thread.currentThread().getName()+"\t ----end 主线程");
  27. }
  28. }

小总结

如果用户线程全部结束意味着程序需要完成的业务操作已经结束了,守护线程随着JVM一同结束工作

setDaemon(true)方法必须在start()之前设置,否则报 IllegalThreadStateException 异常

七、CompletableFuture之Future为什么出现

1、Future接口理论知识复习

Future接口(FutureTask实现类)定义了操作 异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等

比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,忙其它事情或者先执行完,过了一会才去获取子任务的执行结果或变更的任务状态。

八、CompletableFuture之引出FutureTask-上集

一句话:Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。

1、Future接口常用实现类FutureTask异步任务

①、Future接口能干什么

Future是Java5新加的一个接口,它提供了一种异步并行计算的功能

如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行。

主线程继续处理其他任务或者先行结束,再通过Future获取计算结果。

Runnable接口

Callable接口

Future接口和FutureTask实现类

目的:异步多线程任务执行且返回有结果,三个特点:多线程/有返回/异步任务

(班长为老师去买水作为新启动的异步多线程任务且买到水有结果返回)

3、(代码)cf目录下:CompletableFutureDemo.java

  1. package com.nanjing.juc.cf;
  2. import java.util.concurrent.Callable;
  3. import java.util.concurrent.ExecutionException;
  4. import java.util.concurrent.FutureTask;
  5. /**
  6. * Future接口常用实现类FutureTask异步任务
  7. *
  8. * @author xizheng
  9. * @date 2023-08-07 21:47:44
  10. */
  11. public class CompletableFutureDemo {
  12. public static void main(String[] args) throws ExecutionException, InterruptedException {
  13. FutureTask<String> futureTask = new FutureTask<>(new MyThread2());
  14. Thread t1 = new Thread(futureTask,"t1");
  15. t1.start();
  16. System.out.println(futureTask.get());
  17. }
  18. }
  19. class MyThread implements Runnable{
  20. @Override
  21. public void run() {
  22. }
  23. }
  24. class MyThread2 implements Callable<String>{
  25. @Override
  26. public String call() throws Exception {
  27. System.out.println("----come in call() ");
  28. return "hello Callable";
  29. }
  30. }

九、CompletableFuture之引出FutureTask-中集

十一、CompletableFuture之FutureTask结合线程池提升性能

Future编码实战和优缺点分析

优点:

future+线程池异步多线程任务配合,能显著提高程序的执行效率

4、(代码)cf目录下:FutureThreadPoolDemo.java

  1. package com.nanjing.juc.cf;
  2. import java.util.concurrent.*;
  3. /**
  4. * 优点:
  5. * future+线程池异步多线程任务配合,能显著提高程序的执行效率。
  6. *
  7. * @author xizheng
  8. * @date 2023-08-07 21:58:42
  9. */
  10. public class FutureThreadPoolDemo {
  11. public static void main(String[] args) throws ExecutionException, InterruptedException {
  12. //3个任务,目前开启多个异步任务线程来处理,请问耗时多少?532 毫秒
  13. ExecutorService threadPool = Executors.newFixedThreadPool(3);
  14. long startTime = System.currentTimeMillis();
  15. FutureTask<String> futureTask1 = new FutureTask<String>(() -> {
  16. try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
  17. return "task1 over";
  18. });
  19. threadPool.submit(futureTask1);
  20. FutureTask<String> futureTask2 = new FutureTask<String>(() -> {
  21. try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }
  22. return "task2 over";
  23. });
  24. threadPool.submit(futureTask2);
  25. System.out.println(futureTask1.get());
  26. System.out.println(futureTask2.get());
  27. try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }
  28. long endTime = System.currentTimeMillis();
  29. System.out.println("----costTime: "+(endTime - startTime) + " 毫秒");
  30. System.out.println(Thread.currentThread().getName()+"\t -----end");
  31. threadPool.shutdown();
  32. }
  33. public static void m1(String[] args) {
  34. //3个任务,目前只有一个线程main来处理,请问耗时多少?1125 毫秒
  35. long startTime = System.currentTimeMillis();
  36. //暂停毫秒
  37. try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
  38. try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }
  39. try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }
  40. long endTime = System.currentTimeMillis();
  41. System.out.println("----costTime: "+(endTime - startTime) + " 毫秒");
  42. System.out.println(Thread.currentThread().getName()+"\t -----end");
  43. }
  44. }

十二、CompletableFuture之get获取容易阻塞

缺点:get()阻塞:一旦调用get()方法求结果,如果计算没有完成容易导致程序阻塞

5、(代码)cf目录下:FutureAPIDemo.java

  1. package com.nanjing.juc.cf;
  2. import java.util.concurrent.ExecutionException;
  3. import java.util.concurrent.FutureTask;
  4. import java.util.concurrent.TimeUnit;
  5. import java.util.concurrent.TimeoutException;
  6. /**
  7. * Future缺点:
  8. * 1、get()阻塞: 一旦调用get()方法求结果,如果计算没有完成容易导致程序阻塞
  9. * 2、isDone()轮询: 轮询的方式会耗费无谓的CPU资源,而且也不见得能及时得到计算结果
  10. *
  11. *
  12. * @author xizheng
  13. * @date 2023-08-07 22:21:23
  14. */
  15. public class FutureAPIDemo {
  16. public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
  17. FutureTask<String> futureTask = new FutureTask<String>(() -> {
  18. System.out.println(Thread.currentThread().getName()+"\t -----come in");
  19. //暂停几秒钟线程
  20. try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
  21. return "task over";
  22. });
  23. Thread t1 = new Thread(futureTask, "t1");
  24. t1.start();
  25. // System.out.println(futureTask.get());//会阻塞主线程,一般建议放在程序后面
  26. System.out.println(Thread.currentThread().getName()+"\t ----忙其它任务了");
  27. // System.out.println(futureTask.get());
  28. // System.out.println(futureTask.get(3,TimeUnit.SECONDS));
  29. while(true){
  30. if(futureTask.isDone()){
  31. System.out.println(futureTask.get());
  32. break;
  33. }else{
  34. //暂停毫秒
  35. try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
  36. System.out.println("正在处理中,不要再催了,越催越慢 ,再催熄火");
  37. }
  38. }
  39. }
  40. }
  41. /**
  42. * 1.get容易导致阻塞,一般建议放在程序后面,一旦调用不见不散,非要等到结果才会离开,不管你是否计算完成,容易程序堵塞。
  43. * 2.假如我不愿意等待很长时间,我希望过时不候,可以自动离开
  44. */

十三、CompletableFuture之轮询耗费CPU

isDone()轮询

①、轮询的方式会耗费无谓的CPU资源,而且也不见得能及时得到计算结果

②、如果想要异步获取结果,通常都会以轮询的方式去获取结果尽量不要阻塞

代码如上

结论:Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果

十四、CompletableFuture之Future异步优化思路

想完成一些复杂的任务

1、对于简单的业务场景使用Future完全OK

2、回调通知

①、应对Future的完成时间,完成了可以告诉我,也就是我们的回调通知

②、通过轮询的方式去判断任务是否完成这样非常占cPU并且代码也不优雅

3、创建异步任务

Future+线程池配合

4、多个任务前后依赖可以组合处理(水煮鱼)

①、想将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值

②、将两个或多个异步计算合成一个异步计算,这几个异步计算互相独立,同时后面这个又依赖前一个处理的结果。

5、对计算速度选最快

当Future集合中某个任务最快结束时,返回结果,返回第一名处理结果

6、。。。。。。

①、使用Future之前提供的那点API就囊中羞涩,处理起来不够优雅,这时候还是让CompletableFuture 以声明的方式优雅的处理这些需求

②、从i到i++,哈哈哈

③、Future能干的,CompletableFuture都能干

十五、CompletableFuture之CompletionStage源码分析

CompletableFuture为什么出现

get()方法在Future计算完成之前会一直处在阻塞状态下,

isDone()方法容易耗费CPU资源

对于真正的异步处理我们希望是可以通过传入回调函数,在Future结束时自动调用该回调函数,这样,我们就不用等待结果。

阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的CPU资源。

因此,JDK8设计出CompletableFuture。

CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。

类架构说明

接口CompletionStage

①、CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段

②、一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun() -> System.out.println())

③、一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符传参数。

类CompletableFuture

①、在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数时编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。

②、它可能代表一个明确完成的Future,也有可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作。

③、它实现了Future和CompletionStage接口

十六、CompletableFuture之四大静态方法初讲

核心的四个静态方法,来创建一个异步任务

①、runAsync 无返回值

  1. public static CompletableFuture<Void> runAsync(Runnable var0)
  2. public static CompletableFuture<Void> runAsync(Runnable var0, Executor var1)

②、supplyAsync 有返回值

  1. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> var0)
  2. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> var0, Executor var1)

上述Executor executor参数说明

①、没有指定Executor的方法,直接使用默认的ForkJoinPool.commonPool()作为它的线程池执行异步代码

②、如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码

6、(代码)cf目录下:CompletableFutureBuildDemo.java

  1. package com.nanjing.juc.cf;
  2. import java.util.concurrent.*;
  3. /**
  4. * CompletableFuture之四大静态方法
  5. *
  6. * @author xizheng
  7. * @date 2023-08-08 09:57:37
  8. */
  9. public class CompletableFutureBuildDemo {
  10. public static void main(String[] args) throws ExecutionException, InterruptedException {
  11. ExecutorService threadPool = Executors.newFixedThreadPool(3);
  12. //runAsync 无 返回值
  13. CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
  14. System.out.println(Thread.currentThread().getName());
  15. //暂停几秒钟线程
  16. try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace();}
  17. },threadPool);
  18. System.out.println(completableFuture2.get());
  19. //supplyAsync 有 返回值
  20. CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
  21. System.out.println(Thread.currentThread().getName());
  22. //暂停几秒钟线程
  23. try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace();}
  24. return "hello supplyAsync";
  25. },threadPool);
  26. System.out.println(completableFuture.get());
  27. threadPool.shutdown();
  28. }
  29. }

十七、CompletableFuture之通用异步编程-上集

Code之通用演示,减少阻塞和轮询

从Java8开始引入了CompletableFuture,它是Future的功能增强版,减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法

7、(代码)cf目录下:CompletableFutureUseDemo.java

  1. package com.nanjing.juc.cf;
  2. import java.util.concurrent.*;
  3. /**
  4. * CompletableFuture减少阻塞和轮询
  5. *
  6. * @author xizheng
  7. * @date 2023-08-08 10:15:57
  8. */
  9. public class CompletableFutureUseDemo {
  10. public static void main(String[] args) {
  11. ExecutorService threadPool = Executors.newFixedThreadPool(3);
  12. try{
  13. CompletableFuture.supplyAsync(() -> {
  14. System.out.println(Thread.currentThread().getName() + "----come in");
  15. int result = ThreadLocalRandom.current().nextInt(10);
  16. try {
  17. TimeUnit.SECONDS.sleep(1);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. System.out.println("----1秒钟后出结果: " + result);
  22. // if(result > 2){
  23. // int i=10/0;
  24. // }
  25. return result;
  26. },threadPool).whenComplete((v,e) -> {
  27. if (e == null) {
  28. System.out.println("-----计算完成,更新系统UpdateValue: "+v);
  29. }
  30. }).exceptionally(e -> {
  31. e.printStackTrace();
  32. System.out.println("异常情况: "+e.getCause()+"\t"+e.getMessage());
  33. return null;
  34. });
  35. System.out.println(Thread.currentThread().getName()+"线程先去忙其它任务");
  36. //主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒种线程
  37. // try {
  38. // TimeUnit.SECONDS.sleep(3);
  39. // } catch (InterruptedException e) {
  40. // e.printStackTrace();
  41. // }
  42. }catch (Exception e){
  43. e.printStackTrace();
  44. }finally {
  45. threadPool.shutdown();
  46. }
  47. }
  48. public static void future1(String[] args) throws ExecutionException, InterruptedException {
  49. CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
  50. System.out.println(Thread.currentThread().getName() + "----come in");
  51. int result = ThreadLocalRandom.current().nextInt(10);
  52. try {
  53. TimeUnit.SECONDS.sleep(1);
  54. } catch (InterruptedException e) {
  55. e.printStackTrace();
  56. }
  57. System.out.println("----1秒钟后出结果: " + result);
  58. return result;
  59. });
  60. System.out.println(Thread.currentThread().getName()+"线程先去忙其它任务");
  61. System.out.println(completableFuture.get());
  62. }
  63. }

解释下为什么默认线程池关闭,自定义线程池记得关闭 (守护线程、用户线程)

CompletableFuture的优点

①、异步任务结束时,会自动回调某个对象的方法;

②、主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行

③、异步任务出错时,会自动回调某个对象的方法

十九、CompletableFuture之链式语法和join方法介绍

案例精讲-从电商网站的比价需求说开去

①、函数式编程已经主流

大厂面试题

Lambda表达式+Stream流式调用+Chain链式调用+Java8函数式编程

Runnable(无参数,无返回值)

  1. @FunctionalInterface
  2. public interface Runnable {
  3. void run();
  4. }

Function<T,R>接受一个参数,并且有返回值

  1. @FunctionalInterface
  2. public interface Function<T, R> {
  3. R apply(T var1);
  4. }

Consumer接受一个参数,没有返回值

  1. @FunctionalInterface
  2. public interface Consumer<T> {
  3. void accept(T var1);
  4. }

BiConsumer<T, U>接受两个参数(Bi,英文单词词根,代表两个的意思),没有返回值

  1. @FunctionalInterface
  2. public interface BiConsumer<T, U> {
  3. void accept(T var1, U var2);
  4. }

Supplier没有参数,有一个返回值

  1. @FunctionalInterface
  2. public interface Supplier<T> {
  3. T get();
  4. }

二十、CompletableFuture之电商比价大厂案例需求分析

大厂业务需求说明

切记,功能->性能

二十一、CompletableFuture之电商比价大厂案例编码实战-上集

  1. package com.nanjing.juc.cf;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.Getter;
  5. import lombok.NoArgsConstructor;
  6. import lombok.experimental.Accessors;
  7. import java.util.Arrays;
  8. import java.util.List;
  9. import java.util.concurrent.CompletableFuture;
  10. import java.util.concurrent.ThreadLocalRandom;
  11. import java.util.concurrent.TimeUnit;
  12. import java.util.stream.Collectors;
  13. /**
  14. * get和join的区别是在编译时是否报出检查时异常
  15. *
  16. * 案例说明: 电商比价需求,模拟如下情况:
  17. *
  18. * 1需求说明
  19. * 1.1 同一款产品,同时搜索出同款产品在各大电商平台的售价;
  20. * 1.2 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少
  21. *
  22. * 2 输出返回
  23. * 出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List<String>
  24. * 《mysql》 in jd price is 88.05
  25. * 《mysql》 in taobao price is 90.43
  26. *
  27. * 3 技术要求
  28. * 3.1 函数式编程
  29. * 3.2 链式编程
  30. * 3.3 Stream流式计算
  31. *
  32. * @author xizheng
  33. * @date 2023-08-14 09:15:18
  34. */
  35. public class CompletableFutureMallDemo {
  36. static List<NetMall> list = Arrays.asList(
  37. new NetMall("jd"),
  38. new NetMall("dangdang"),
  39. new NetMall("taobao"),
  40. new NetMall("pdd"),
  41. new NetMall("tmall")
  42. );
  43. /**
  44. * step by step 一家家搜查
  45. * List<NetMall> ----->map------> List<String>
  46. *
  47. * @param list 列表
  48. * @param productName 产品名称
  49. * @return {@link List}<{@link String}>
  50. */
  51. public static List<String> getPrice(List<NetMall> list,String productName){
  52. return list
  53. .stream()
  54. .map(netMall ->
  55. String.format(productName + " in %s price is %.2f",
  56. netMall.getNetMallName(),
  57. netMall.calcPrice(productName)))
  58. .collect(Collectors.toList());
  59. }
  60. /**
  61. * List<NetMall> ----->List<CompletableFuture<String>>------> List<String>
  62. *
  63. * @param list 列表
  64. * @param productName 产品名称
  65. * @return {@link List}<{@link String}>
  66. */
  67. public static List<String> getPriceByCompletableFuture(List<NetMall> list,String productName){
  68. return list
  69. .stream()
  70. .map(netMall ->
  71. CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f",
  72. netMall.getNetMallName(),
  73. netMall.calcPrice(productName))))
  74. .collect(Collectors.toList())
  75. .stream()
  76. .map(s -> s.join())
  77. .collect(Collectors.toList());
  78. }
  79. public static void main(String[] args) {
  80. // CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
  81. // return "hello 1234";
  82. // });
  83. //
  84. // //System.out.println(completableFuture.get());
  85. // System.out.println(completableFuture.join());
  86. // System.out.println(ThreadLocalRandom.current().nextDouble() * 2 + "mysql".charAt(0));//110.29467425469919
  87. long startTime = System.currentTimeMillis();
  88. List<String> list1 = getPrice(list, "mysql");
  89. for (String element : list1) {
  90. System.out.println(element);
  91. }
  92. long endTime = System.currentTimeMillis();
  93. System.out.println("----costTime: " + (endTime - startTime) + " 毫秒");
  94. System.out.println("------------------------");
  95. long startTime2 = System.currentTimeMillis();
  96. List<String> list2 = getPriceByCompletableFuture(list, "mysql");
  97. for (String element : list2) {
  98. System.out.println(element);
  99. }
  100. long endTime2 = System.currentTimeMillis();
  101. System.out.println("----costTime: " + (endTime2 - startTime2) + " 毫秒");
  102. }
  103. }
  104. class NetMall{
  105. @Getter
  106. private String netMallName;
  107. public NetMall(String netMallName) {
  108. this.netMallName = netMallName;
  109. }
  110. public double calcPrice(String productName){
  111. try {
  112. TimeUnit.SECONDS.sleep(1);
  113. } catch (InterruptedException e) {
  114. e.printStackTrace();
  115. }
  116. return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
  117. }
  118. }
  119. @AllArgsConstructor
  120. @NoArgsConstructor
  121. @Data
  122. @Accessors(chain = true)
  123. class Student{
  124. private Integer id;
  125. private String studentName;
  126. private String major;
  127. }

二十三、CompletableFuture之获得结果和触发计算

CompletableFuture常用方法

获得结果和触发计算

1、获取结果

public T get()

public T get(long timeout,TimeUnit unit)

public T join()

public T getNow(T valueIfAbsent)

①、没有计算完成的情况下,给我一个替代结果

②、立即获取结果不阻塞

Ⅰ、计算完,返回计算完成后的结果

Ⅱ、没算完,返回设定的valueIfAbsent

2、主动触发计算

public boolean complete(T value)

是否打断get方法立即返回括号值

8、(代码)cf目录下:CompletableFutureAPIDemo.java

  1. package com.nanjing.juc.cf;
  2. import java.util.concurrent.CompletableFuture;
  3. import java.util.concurrent.ExecutionException;
  4. import java.util.concurrent.TimeUnit;
  5. import java.util.concurrent.TimeoutException;
  6. /**
  7. * 获得结果和触发计算
  8. *
  9. * @author xizheng
  10. * @date 2023-08-14 12:31:19
  11. */
  12. public class CompletableFutureAPIDemo {
  13. public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
  14. CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
  15. //暂停几秒钟线程
  16. try {
  17. TimeUnit.SECONDS.sleep(10);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. return "abc";
  22. });
  23. //System.out.println(completableFuture.get());
  24. //System.out.println(completableFuture.get(2L,TimeUnit.SECONDS));
  25. System.out.println(completableFuture.join());
  26. //暂停几秒钟线程
  27. try {
  28. TimeUnit.SECONDS.sleep(2);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. System.out.println(completableFuture.getNow("xxx"));
  33. System.out.println(completableFuture.complete("completeValue")+"\t"+completableFuture.get());
  34. }
  35. }

二十四、CompletableFuture之对计算结果进行处理

1、thenApply

①、计算结果存在依赖关系,这两个线程串行化

②、异常相关

由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停。

2、handle

①、计算结果存在依赖关系,这两个线程串行化

②、异常相关

有异常也可以往下一步走,根据带的异常参数可以进一步处理

9、(代码)cf目录下:CompletableFutureAPI2Demo.java

  1. package com.nanjing.juc.cf;
  2. import java.util.concurrent.CompletableFuture;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.Executors;
  5. import java.util.concurrent.TimeUnit;
  6. /**
  7. * 对计算结果进行处理
  8. * thenApply与handle异常处理的区别
  9. *
  10. * @author xizheng
  11. * @date 2023-08-14 13:04:18
  12. */
  13. public class CompletableFutureAPI2Demo {
  14. public static void main(String[] args) {
  15. ExecutorService threadPool = Executors.newFixedThreadPool(3);
  16. CompletableFuture.supplyAsync(() ->{
  17. //暂停几秒钟线程
  18. try {
  19. TimeUnit.SECONDS.sleep(1);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. System.out.println("111");
  24. return 1;
  25. },threadPool).handle((f,e) -> {
  26. int i=10/0;
  27. System.out.println("222");
  28. return f + 2;
  29. }).handle((f,e) -> {
  30. System.out.println("333");
  31. return f + 3;
  32. }).whenComplete((v,e) -> {
  33. if(e == null){
  34. System.out.println("----计算结果: " + v);
  35. }
  36. }).exceptionally(e -> {
  37. e.printStackTrace();
  38. System.out.println(e.getMessage());
  39. return null;
  40. });
  41. System.out.println(Thread.currentThread().getName()+"----主线程先去忙其它任务");
  42. threadPool.shutdown();
  43. }
  44. }

二十五、CompletableFuture之对计算结果进行消费

接收任务的处理结果,并消费处理,无返回结果

thenAccept

  1. CompletableFuture.supplyAsync(() -> {
  2. return 1;
  3. }).thenApply(f -> {
  4. return f + 2;
  5. }).thenApply(f -> {
  6. return f + 3;
  7. }).thenAccept(System.out::println);

对比补充

Code之任务之间的顺序执行

thenRun

①、thenRun(Runnable runnable)

②、任务A执行完B,并且B不需要A的结果

thenAccept

①、thenAccept(Consumer action)

②、任务A执行完执行B,B需要A的结果,但是任务B无返回值

thenApply

①、thenApply(Function fn)

②、任务A执行完执行B,B需要A的结果,同时任务B有返回值

10、(代码)cf目录下:CompletableFutureAPI3Demo.java

  1. package com.nanjing.juc.cf;
  2. import java.util.concurrent.CompletableFuture;
  3. /**
  4. * 对计算结果进行消费
  5. *
  6. * @author xizheng
  7. * @date 2023-08-14 13:22:53
  8. */
  9. public class CompletableFutureAPI3Demo {
  10. public static void main(String[] args) {
  11. CompletableFuture.supplyAsync(() -> {
  12. return 1;
  13. }).thenApply(f -> {
  14. return f + 2;
  15. }).thenApply(f -> {
  16. return f + 3;
  17. }).thenAccept(System.out::println);
  18. System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
  19. System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(r -> System.out.println(r)).join());
  20. System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(r -> r + "resultB").join());
  21. }
  22. }

二十六、CompletableFuture之线程池运行选择

以thenRun和thenRunAsync为例,有什么区别?

11、(代码)cf目录下:CompletableFutureWithThreadPoolDemo.java

  1. package com.nanjing.juc.cf;
  2. import java.sql.Time;
  3. import java.util.concurrent.CompletableFuture;
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.Executors;
  6. import java.util.concurrent.TimeUnit;
  7. /**
  8. * 线程池运行选择
  9. *
  10. * @author xizheng
  11. * @date 2023-08-14 13:37:26
  12. */
  13. public class CompletableFutureWithThreadPoolDemo {
  14. public static void main(String[] args) {
  15. ExecutorService threadPool = Executors.newFixedThreadPool(5);
  16. try {
  17. CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
  18. try {
  19. TimeUnit.MILLISECONDS.sleep(20);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. System.out.println("1号任务" + "\t" + Thread.currentThread().getName());
  24. return "abcd";
  25. },threadPool).thenRunAsync(() -> {
  26. try {
  27. TimeUnit.MILLISECONDS.sleep(20);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. System.out.println("2号任务" + "\t" + Thread.currentThread().getName());
  32. }).thenRun(() -> {
  33. try {
  34. TimeUnit.MILLISECONDS.sleep(10);
  35. } catch (InterruptedException e) {
  36. e.printStackTrace();
  37. }
  38. System.out.println("3号任务" + "\t" + Thread.currentThread().getName());
  39. }).thenRun(() -> {
  40. try {
  41. TimeUnit.MILLISECONDS.sleep(10);
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. }
  45. System.out.println("4号任务" + "\t" + Thread.currentThread().getName());
  46. });
  47. System.out.println(completableFuture.get(2L, TimeUnit.SECONDS));
  48. } catch (Exception e) {
  49. e.printStackTrace();
  50. } finally {
  51. threadPool.shutdown();
  52. }
  53. }
  54. }

1 没有传入自定义线程池,都用默认线程池ForkJoinPool

2 传入了一个自定义线程池,

如果你执行第一个任务的时候,传入了一个自定义线程池:

调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用一个线程池。

调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池

3 备注

   有可能处理太快,系统优化切换原则,直接使用main线程处理

其它如:thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是同理

二十七、CompletableFuture之对计算速度选用

谁快用谁

12、(代码)cf目录下:CompletableFutureFastDemo.java

  1. package com.nanjing.juc.cf;
  2. import java.util.concurrent.CompletableFuture;
  3. import java.util.concurrent.TimeUnit;
  4. /**
  5. * 对计算速度选用
  6. *
  7. * @author xizheng
  8. * @date 2023-08-14 14:10:41
  9. */
  10. public class CompletableFutureFastDemo {
  11. public static void main(String[] args) {
  12. CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {
  13. System.out.println("A come in");
  14. try {
  15. TimeUnit.SECONDS.sleep(3);
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. return "playA";
  20. });
  21. CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {
  22. System.out.println("B come in");
  23. try {
  24. TimeUnit.SECONDS.sleep(6);
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. return "playB";
  29. });
  30. CompletableFuture<String> result = playA.applyToEither(playB, f -> {
  31. return f + " is winer";
  32. });
  33. System.out.println(Thread.currentThread().getName()+"\t"+"------: "+result.join());
  34. }
  35. }

二十八、CompletableFuture之对计算结果合并

两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCombine来处理

先完成的先等着,等待其它分支任务

13、(代码)cf目录下:CompletableFutureCombineDemo.java

  1. package com.nanjing.juc.cf;
  2. import java.util.concurrent.CompletableFuture;
  3. import java.util.concurrent.TimeUnit;
  4. /**
  5. * 对计算结果合并
  6. *
  7. * @author xizheng
  8. * @date 2023-08-14 14:18:21
  9. */
  10. public class CompletableFutureCombineDemo {
  11. public static void main(String[] args) {
  12. CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
  13. System.out.println(Thread.currentThread().getName() + "\t ---启动");
  14. //暂停几秒钟线程
  15. try {
  16. TimeUnit.SECONDS.sleep(1);
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. return 10;
  21. });
  22. CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
  23. System.out.println(Thread.currentThread().getName() + "\t ---启动");
  24. //暂停几秒钟线程
  25. try {
  26. TimeUnit.SECONDS.sleep(2);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. return 20;
  31. });
  32. CompletableFuture<Integer> result = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
  33. System.out.println("-----开始两个结果合并");
  34. return x + y;
  35. });
  36. System.out.println(result.join());
  37. }
  38. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/966798
推荐阅读
相关标签
  

闽ICP备14008679号