当前位置:   article > 正文

Java并发编程学习笔记

Java并发编程学习笔记

什么是并发

多个线程一起运行,实际上某个时刻只有一个线程在执行的线程就叫做并发

并发是程序上的逻辑概念,并行是物理上的概念。

如果程序是采用多线程的技术编写的,那么运行在单核单线程的机器上就会并发执行。

如果运行在多核或多线程的机器上(例如:CPU支持超线程技术,也就是一个核心可以同时运行多个线程;或者CPU是多核心的)就会并行执行。

线程的六种状态

Java中线程的状态分为6种:

1.初始(NEW) :新创建了一个线程对象,但还没有调用start()方法。

2.运行(RUNNABLE) Java线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法。该状态的线程位于可运行线程池中,等待被线程调度选中,获取CPU的使用权,此时处于就绪状态(ready)。就绪状态的线程在获得CPU时间片后变为运行中状态(running)。

3.阻塞(BLOCKED) :表示线程阻塞于锁。

4.等待(WAITING) :进入该状态的线程需要等待其他线程做出一些特定动作(通知或中断)。

5.超时等待(TIMED_WAITING) :该状态不同于WAITING,它可以在指定的时间后自行返回。

6.终止(TERMINATED) :表示该线程已经执行完毕。

这6种状态定义在Thread类的State枚举中,可查看源码进行一一对应。

如何将任务交给线程

方式一:重写Thread的run方法

  1. public class example {
  2. public static void main(String[] args){
  3. Thread thread = new Thread(){
  4. @Override
  5. public void run(){
  6. System.out.println("子线程。。。")
  7. }
  8. };
  9. thread.start();
  10. System.out.println("main 结束")
  11. }
  12. }

方式二:实现java.lang.Runnable接口,在创建Thread对象的时候传进去

  1. public class example {
  2. public static void main(String[] args){
  3. Thread thread = new Thread()(
  4. () -> System.out.println("子线程。。。")
  5. );
  6. thread.start();
  7. System.out.println("main 结束")
  8. }
  9. }

方式三:使用FutureTask,在创建Thread对象的时候传进去

  1. public class example {
  2. public static void main(String[] args){
  3. Callable<String> callable = () -> {
  4. System.out.println("我是子任务!")
  5. return "sub task done";
  6. }
  7. FutureTask<String> task = new FutureTask(callable);
  8. Thread thread = new Thread(task);
  9. thread.start();
  10. try{
  11. //String subResult = task.get();
  12. String subResult = task.get(5,TimeUnit.MINUTES);
  13. System.out.println("子线程返回值:"+subResult)
  14. }catch(InterruptedException e){
  15. e.printStackTrace();
  16. }catch(ExecutionException e){
  17. Throwable cause = e.getCause();
  18. e.printStackTrace();
  19. }catch(TimeoutException e){
  20. e.printStackTrace();
  21. }
  22. System.out.println("main结束")
  23. }
  24. }

这种方法的好处在于可以在主线程中获取子线程的返回值,也可以在主线程中获取子线程发生的异常。【通过get()方法】

Java8中的CompletableFuture

API

准备工具类:

  1. public class SmallTool {
  2. public static void sleepMillis(long millis) {
  3. try {
  4. Thread.sleep(millis);
  5. } catch (InterruptedException e) {
  6. throw new RuntimeException(e);
  7. }
  8. }
  9. public static void printTimeAndThread(String tag) {
  10. String result = new StringJoiner("\t|\t")
  11. .add(String.valueOf(System.currentTimeMillis()))
  12. .add(String.valueOf(Thread.currentThread().getId()))
  13. .add(Thread.currentThread().getName())
  14. .add(tag)
  15. .toString();
  16. System.out.println(result);
  17. }
  18. }

supplyAsync

开启异步任务

  1. public class _01_supplyAsync {
  2. public static void main(String[] args) {
  3. SmallTool.printTimeAndThread("小白进入餐厅");
  4. SmallTool.printTimeAndThread("小白点了番茄鸡蛋+大米饭");
  5. CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
  6. SmallTool.printTimeAndThread("厨师炒菜");
  7. SmallTool.sleepMillis(200);
  8. SmallTool.printTimeAndThread("厨师打饭");
  9. SmallTool.sleepMillis(100);
  10. return "番茄炒蛋+米饭做好了!";
  11. });
  12. SmallTool.printTimeAndThread("小白在打王者");
  13. //join()方法会等待任务结束再返回任务的结果
  14. SmallTool.printTimeAndThread(String.format("%s ,小白开吃",cf1.join()));
  15. }
  16. }
  1. 1670219363628 | 1 | main | 小白进入餐厅
  2. 1670219363628 | 1 | main | 小白点了番茄鸡蛋+大米饭
  3. 1670219363676 | 1 | main | 小白在打王者
  4. 1670219363677 | 11 | ForkJoinPool.commonPool-worker-9 | 厨师炒菜
  5. 1670219363882 | 11 | ForkJoinPool.commonPool-worker-9 | 厨师打饭
  6. 1670219364001 | 1 | main | 番茄炒蛋+米饭做好了! ,小白开吃

thenCompose

连接两个有依赖的任务

把前面任务的结果交给下一个异步任务

注意:在前一个任务完成有结果后,下一个任务才会触发。

  1. public class _02_theCompose {
  2. public static void main(String[] args) {
  3. SmallTool.printTimeAndThread("小白进入餐厅");
  4. SmallTool.printTimeAndThread("小白点了番茄鸡蛋+大米饭");
  5. CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
  6. SmallTool.printTimeAndThread("厨师炒菜");
  7. SmallTool.sleepMillis(200);
  8. return "厨师做好了番茄炒蛋";
  9. }).thenCompose(dish->
  10. CompletableFuture.supplyAsync(()->{
  11. SmallTool.printTimeAndThread("服务员盛米饭");
  12. SmallTool.sleepMillis(100);
  13. return dish+"+米饭,准备上菜!";
  14. }));
  15. SmallTool.printTimeAndThread("小白在打王者");
  16. //join()方法会等待任务结束再返回任务的结果
  17. SmallTool.printTimeAndThread(String.format("%s ,小白开吃",cf1.join()));
  18. }
  19. }
  1. 1670219605487 | 1 | main | 小白进入餐厅
  2. 1670219605487 | 1 | main | 小白点了番茄鸡蛋+大米饭
  3. 1670219605535 | 11 | ForkJoinPool.commonPool-worker-9 | 厨师炒菜
  4. 1670219605535 | 1 | main | 小白在打王者
  5. 1670219605739 | 12 | ForkJoinPool.commonPool-worker-2 | 服务员盛米饭
  6. 1670219605858 | 1 | main | 厨师做好了番茄炒蛋+米饭,准备上菜! ,小白开吃

thenCombine

合并两个任务

需要传入两个参数:一个是异步任务,一个是BiFunction(把两个结果转换为一个值)

  1. public class _03_thenCombine {
  2. public static void main(String[] args) {
  3. SmallTool.printTimeAndThread("小白进入餐厅");
  4. SmallTool.printTimeAndThread("小白点了番茄鸡蛋+大米饭");
  5. CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
  6. SmallTool.printTimeAndThread("厨师正在炒菜");
  7. SmallTool.sleepMillis(200);;
  8. return "厨师做好了番茄炒蛋";
  9. }).thenCombine(CompletableFuture.supplyAsync(() -> {
  10. SmallTool.printTimeAndThread("服务员正在做米饭");
  11. SmallTool.sleepMillis(100);
  12. return "服务员做好米饭";
  13. }),(dish,rice) -> {
  14. SmallTool.printTimeAndThread("服务员在打饭");
  15. SmallTool.sleepMillis(100);
  16. return String.format("%s + %s 好了",dish,rice);
  17. });
  18. SmallTool.printTimeAndThread("小白在打王者");
  19. //join()方法会等待任务结束再返回任务的结果
  20. SmallTool.printTimeAndThread(String.format("%s ,小白开吃",cf1.join()));
  21. }
  22. }
  1. 1670219803053 | 1 | main | 小白进入餐厅
  2. 1670219803053 | 1 | main | 小白点了番茄鸡蛋+大米饭
  3. 1670219803104 | 11 | ForkJoinPool.commonPool-worker-9 | 厨师正在炒菜
  4. 1670219803104 | 12 | ForkJoinPool.commonPool-worker-2 | 服务员正在做米饭
  5. 1670219803104 | 1 | main | 小白在打王者
  6. 1670219803305 | 11 | ForkJoinPool.commonPool-worker-9 | 服务员在打饭
  7. 1670219803420 | 1 | main | 厨师做好了番茄炒蛋 + 服务员做好米饭 好了 ,小白开吃

thenApply

把前面的异步任务结果交给后面的Function,其实相当于java Stream API中的map操作

简单的说,就是对线程处理任务的返回值进一步处理

  1. public class _04_1_thenApply {
  2. public static void main(String[] args) {
  3. SmallTool.printTimeAndThread("小白吃好了");
  4. SmallTool.printTimeAndThread("小白结账并且要求开发票");
  5. CompletableFuture<String> invoice = CompletableFuture.supplyAsync(() -> {
  6. SmallTool.printTimeAndThread("服务员收款 500元");
  7. SmallTool.sleepMillis(200);;
  8. return "厨师做好了番茄炒蛋";
  9. }).thenApply(money -> {
  10. // 对上一步任务处理的结果进行进一步处理
  11. SmallTool.printTimeAndThread(String.format("服务员开发票 面额 %s元",money));
  12. SmallTool.sleepMillis(200);
  13. return String.format("%s元发票",money);
  14. });
  15. SmallTool.printTimeAndThread("小白,接到朋友的电话,想一起打游戏");
  16. SmallTool.printTimeAndThread(String.format("小白拿到%s,准备回家",invoice.join()));
  17. }
  18. }
  1. 1670220910609 | 1 | main | 小白吃好了
  2. 1670220910609 | 1 | main | 小白结账并且要求开发票
  3. 1670220910654 | 11 | ForkJoinPool.commonPool-worker-9 | 服务员收款 500
  4. 1670220910654 | 1 | main | 小白,接到朋友的电话,想一起打游戏
  5. 1670220910868 | 11 | ForkJoinPool.commonPool-worker-9 | 服务员开发票 面额 厨师做好了番茄炒蛋元
  6. 1670220911072 | 1 | main | 小白拿到厨师做好了番茄炒蛋元发票,准备回家

thenSupplyAsync

将前面线程处理的结果交给另一个线程处理

它会把前后两部分看成是两个独立的任务,只有当第一个任务执行完之后,第二个任务才会执行。

  1. public class _04_2_thenApplyAsync {
  2. public static void main(String[] args) {
  3. SmallTool.printTimeAndThread("小白吃好了");
  4. SmallTool.printTimeAndThread("小白结账并且要求开发票");
  5. CompletableFuture<String> invoice = CompletableFuture.supplyAsync(() -> {
  6. SmallTool.printTimeAndThread("服务员收款 500元");
  7. SmallTool.sleepMillis(200);;
  8. return "厨师做好了番茄炒蛋";
  9. }).thenApplyAsync(money -> {
  10. SmallTool.printTimeAndThread(String.format("服务员开发票 面额 %s元",money));
  11. SmallTool.sleepMillis(200);
  12. return String.format("%s元发票",money);
  13. });
  14. SmallTool.printTimeAndThread("小白,接到朋友的电话,想一起打游戏");
  15. SmallTool.printTimeAndThread(String.format("小白拿到%s,准备回家",invoice.join()));
  16. }
  17. }
  1. 1670221125485 | 1 | main | 小白吃好了
  2. 1670221125486 | 1 | main | 小白结账并且要求开发票
  3. 1670221125530 | 11 | ForkJoinPool.commonPool-worker-9 | 服务员收款 500
  4. 1670221125530 | 1 | main | 小白,接到朋友的电话,想一起打游戏
  5. 1670221125745 | 11 | ForkJoinPool.commonPool-worker-9 | 服务员开发票 面额 厨师做好了番茄炒蛋元
  6. 1670221125946 | 1 | main | 小白拿到厨师做好了番茄炒蛋元发票,准备回家

applyToEither

上一个任务和这一个任务一起运行,哪个先运行完成就把哪个任务结果交给Function

  1. public class _05_applyToEither {
  2. public static void main(String[] args) {
  3. SmallTool.printTimeAndThread("张三走出餐厅,来到公交站");
  4. SmallTool.printTimeAndThread("等待700路或者800路公交到来");
  5. CompletableFuture<String> bus = CompletableFuture.supplyAsync(() -> {
  6. SmallTool.printTimeAndThread("700路公交正在赶来");
  7. SmallTool.sleepMillis(200);;
  8. return "700路公交到了";
  9. }).applyToEither(CompletableFuture.supplyAsync(()->{
  10. SmallTool.printTimeAndThread("800路公交正在赶来");
  11. SmallTool.sleepMillis(200);
  12. return "800路公交到了";
  13. }),firstComeBus -> firstComeBus);
  14. SmallTool.printTimeAndThread(String.format("%s,小白坐车回家",bus.join()));
  15. }
  16. }

exceptionally

异步任务异常处理的最优雅的方式

exceptionally 可以捕获上述任意操作出现异常都可以捕获

  1. public class _06_exceptionally {
  2. public static void main(String[] args) {
  3. SmallTool.printTimeAndThread("张三走出餐厅,来到公交站");
  4. SmallTool.printTimeAndThread("等待700路或者800路公交到来");
  5. CompletableFuture<String> bus = CompletableFuture.supplyAsync(() -> {
  6. SmallTool.printTimeAndThread("700路公交正在赶来");
  7. SmallTool.sleepMillis(200);;
  8. return "700路公交到了";
  9. }).applyToEither(CompletableFuture.supplyAsync(()->{
  10. SmallTool.printTimeAndThread("800路公交正在赶来");
  11. SmallTool.sleepMillis(200);
  12. return "800路公交到了";
  13. }),firstComeBus -> {
  14. SmallTool.printTimeAndThread(firstComeBus);
  15. if(firstComeBus.startsWith("700")){
  16. throw new RuntimeException("撞树了。。。");
  17. }
  18. return firstComeBus;
  19. }).exceptionally(e->{
  20. SmallTool.printTimeAndThread(e.getMessage());
  21. SmallTool.printTimeAndThread("小白叫出租车");
  22. return "出租车到了";
  23. });
  24. SmallTool.printTimeAndThread(String.format("%s,小白坐车回家",bus.join()));
  25. }
  26. }

性能

我们会遇到需要批量执行多项任务。这时想使用多线程。

准备

  1. public class Dish {
  2. private String name;
  3. private Integer productionTime;
  4. public Dish(String name, Integer productionTime) {
  5. this.name = name;
  6. this.productionTime = productionTime;
  7. }
  8. //做菜
  9. public void make(){
  10. SmallTool.sleepMillis(TimeUnit.SECONDS.toMillis(this.productionTime));
  11. SmallTool.printTimeAndThread(this.name + "制作完毕,来吃我吧!");
  12. }
  13. }
  1. public class _01_terribleCode {
  2. public static void main(String[] args) {
  3. SmallTool.printTimeAndThread("小白和小伙伴们进餐厅点菜!");
  4. long startTime = System.currentTimeMillis();
  5. List<Dish> dishes = new ArrayList<>();
  6. //点菜
  7. for(int i=0;i<=10;i++){
  8. Dish dish = new Dish("菜"+i,1);
  9. dishes.add(dish);
  10. }
  11. //做菜
  12. for(Dish dish: dishes) {
  13. //问题所在:这样做岂不是把多个线程串行执行了么?
  14. CompletableFuture.runAsync(()->{
  15. dish.make();
  16. }).join();
  17. }
  18. SmallTool.printTimeAndThread("菜都做好了,上桌"+(System.currentTimeMillis()-startTime));
  19. }
  20. }

这种做法把多个线程串行执行

正确做法:

  1. public class _02_perfectCode {
  2. public static void main(String[] args) {
  3. SmallTool.printTimeAndThread("小白和小伙伴们进餐厅点菜!");
  4. long startTime = System.currentTimeMillis();
  5. //将keepalive设置为0,就代表线程工作完后会立刻销毁
  6. //用这种方式我们可以更容易观察到任务调度的情况
  7. //这种方式不要用于生产中
  8. ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<>());
  9. List<Dish> dishes = new ArrayList<>();
  10. //点菜
  11. for(int i=0;i<=10;i++){
  12. Dish dish = new Dish("菜"+i,1);
  13. dishes.add(dish);
  14. }
  15. //做菜
  16. List<CompletableFuture> cfList = new ArrayList<>();
  17. for(Dish dish: dishes){
  18. CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> dish.make());
  19. cfList.add(cf);
  20. }
  21. // 等待所有任务执行完毕
  22. CompletableFuture.allOf(cfList.toArray(new CompletableFuture[cfList.size()])).join();
  23. }

这样做才能同时执行。

上面的代码还可以使用lamada表达式进行简化:

  1. public class _02_perfectCode {
  2. public static void main(String[] args) {
  3. SmallTool.printTimeAndThread("小白和小伙伴们进餐厅点菜!");
  4. long startTime = System.currentTimeMillis();
  5. //将keepalive设置为0,就代表线程工作完后会立刻销毁
  6. //用这种方式我们可以更容易观察到任务调度的情况
  7. //这种方式不要用于生产中
  8. ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<>());
  9. //替代上面注释的代码
  10. CompletableFuture[] dishes = IntStream.rangeClosed(1, 10)
  11. .mapToObj(i -> new Dish("菜" + i, 1))
  12. .map(dish -> CompletableFuture.runAsync(dish::make,executor))
  13. .toArray(size -> new CompletableFuture[size]);
  14. CompletableFuture.allOf(dishes).join();
  15. threadPool.shutdown();
  16. SmallTool.printTimeAndThread("菜都做好了,上桌"+(System.currentTimeMillis()-startTime));
  17. //当前电脑有多少个核心
  18. System.out.println(Runtime.getRuntime().availableProcessors());
  19. //查看当前线程数
  20. System.out.println(ForkJoinPool.commonPool().getPoolSize());
  21. //查看最大线程数
  22. System.out.println(ForkJoinPool.getCommonPoolParallelism());
  23. }
  24. }

如何查看运行环境的性能指标呢?

先来了解个概念:

若电脑的CPU是6核心12线程,则表示这颗CPU可以同时运行12个线程,可以同时处理12件事。

而支持的最大线程数则为12-1=11

JVM查看运行环境有多少个核心

  1. //当前电脑有多少个核心
  2. System.out.println(Runtime.getRuntime().availableProcessors());

查看当前线程池里当前线程数

System.out.println(ForkJoinPool.commonPool().getPoolSize());

查看最大线程数

  1. //查看最大线程数
  2. System.out.println(ForkJoinPool.getCommonPoolParallelism());

所谓提升性能,只需要将线程数设置为合适的数值即可。这个值多少合适呢?需要经过多次测试才行。

注意:不要修改commonPool中的最大线程数,因为commonPool并不是只为completableFuture服务的。

如果需要通过任务的数量动态决定线程数,需要自定义线程池。当任务来的时候判断一下数量,创建线程池,在执行外后将线程池销毁掉。

与线程池结合使用

什么是线程池

ThreadPoolExecutor 源于java.util.concurrent包

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {...}

corePoolSize:核心线程数,即时刻在线的线程数量

maximumPoolSize:最大线程数,即核心的线程数的最大值,超过这个值会报错。

keepAliveTime:存活时间

TimeUnit unit:存活时间单位

BlockingQueue<Runnable> workQueue:工作队列

ThreadFactory threadFactory:线程工厂(可用来定制线程优先级,名字或扩展线程功能)

RejectedExecutionHandler handler:当线程数已经达到最大线程数且工作队列已经放满了的时候该如何处理?即拒绝策略。JDK提供的策略有4种 :

(1)直接丢弃

(2)替换工作队列的最后一个

(3)抛出异常

(4)谁提交的这个任务?让他自己执行去。

线程池与CompletableFuture联合使用

线程池使用完记得关闭

  1. public class _02_perfectCode {
  2. public static void main(String[] args) {
  3. SmallTool.printTimeAndThread("小白和小伙伴们进餐厅点菜!");
  4. long startTime = System.currentTimeMillis();
  5. //将keepalive设置为0,就代表线程工作完后会立刻销毁
  6. //用这种方式我们可以更容易观察到任务调度的情况
  7. //这种方式不要用于生产中
  8. ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<>());
  9. //替代上面注释的代码
  10. CompletableFuture[] dishes = IntStream.rangeClosed(1, 10)
  11. .mapToObj(i -> new Dish("菜" + i, 1))
  12. .map(dish -> CompletableFuture.runAsync(dish::make,executor))
  13. .toArray(size -> new CompletableFuture[size]);
  14. CompletableFuture.allOf(dishes).join();
  15. executor.shutdown();
  16. SmallTool.printTimeAndThread("菜都做好了,上桌"+(System.currentTimeMillis()-startTime));
  17. //当前电脑有多少个核心
  18. System.out.println(Runtime.getRuntime().availableProcessors());
  19. //查看当前线程数
  20. System.out.println(ForkJoinPool.commonPool().getPoolSize());
  21. //查看最大线程数
  22. System.out.println(ForkJoinPool.getCommonPoolParallelism());
  23. }
  24. }

线程中断

线程在sleep的时候是什么状态?

TIME_WAITING

在哪些情况下会抛出InterruptException?

当线程正在等待、休眠或以其他方式被占用,并且线程在活动之前或期间【即不处于RUNNABLE状态,即BLOCKED、WAITING、TIMED_WAITING】被中断时抛出。

sleep和wait方法肯定会抛出。

如果线程没有睡眠,调用它的interrupt会怎样?

触发InterruptedException只是中断在线程非活跃状态下的表现形式。中断本身跟线程是否在活跃状态没有关系。

之所以在不活跃状态下中断会抛出异常,是因为线程在不活跃状态下发生中断的时候,抛异常是让你知道线程为什么回到运行状态。

如果线程的状态本来就是活跃的,此时触发中断,线程会假装看不到。如果你不想处理中断,那中断就会真的被无视掉。但是如果你想要处理中断的话,可以通过下面两个方法得知自己是否被中断:

  1. public boolean isInterrupted() //thread会记住中断信号,如果不清除掉会一直存在【只是查看灯有没有开着】
  2. public static boolean interrupted() //该方法判断是否中断的同时可以清除中断【不仅仅查看有没有开灯,还会把灯关掉】
  1. Thread.currentThread().isInterrupted()
  2. Thread.interrupted()

阻塞队列

特点

(1)队列需要有一个固定的容量

(2)如果队列是空的,需要等待,等到有元素为止

(3)如果队列是满的,需要等待,等到有空位为止

实现

接口 BlockingQueue

可以实现在多个线程之间安全地传递数据。

六个方法

  • take()

从队列中取出元素,如果当前队列已经空了,则会出现阻塞的现象。

  • poll(long timeout,TimeUnit unit)

从队列中取出元素,可以设置超时时间,超过指定时间没有取到元素就会得到null

  • poll()

从队列中不等待直接取出元素,如果没有元素,马上就撤,即立即返回null

适用于非常想获取元素,但不一定会获取到情况

  • put()

向队列中放入元素,如果当前队列已经满了,则会出现阻塞的现象。

  • offer(long timeout,TimeUnit unit)

从队列中放入元素,可以设置超时时间,超过指定时间没有放入元素就会得到Boolean

  • offer()

从队列中不等待直接放入元素,如果满了,马上就撤,即立即返回Boolean

适用于非常想放入元素,但不一定会放入的情况。

  1. package org.example.blockQueue;
  2. import org.example.SmallTool;
  3. import java.util.LinkedList;
  4. import java.util.List;
  5. import java.util.concurrent.BlockingQueue;
  6. import java.util.concurrent.LinkedBlockingQueue;
  7. import java.util.stream.Collectors;
  8. public class _02_OneProducer_OneConsumer_BlockQueue {
  9. public static void main(String[] args) {
  10. BlockingQueue<String> shaobingQueue = new LinkedBlockingQueue<>(3);
  11. List<String> xiaoBaiMsg = new LinkedList<>();
  12. List<String> chefAMsg = new LinkedList<>();
  13. List<String> roadPeopleAMsg = new LinkedList<>();
  14. List<String> roadPeopleBMsg = new LinkedList<>();
  15. Thread xiaoBai = new Thread(()->{
  16. for(int i=0;i<5;i++){
  17. String shaobing = String.format("第%d个烧饼",i+1);
  18. try {
  19. shaobingQueue.put(shaobing);
  20. } catch (InterruptedException e) {
  21. SmallTool.printTimeAndThread("小白被中断"+e.getMessage());
  22. }
  23. xiaoBaiMsg.add(String.format("%d 小白制作了 [%s]",System.currentTimeMillis(),shaobing));
  24. }
  25. });
  26. Thread chushiA = new Thread(()->{
  27. for(int i=0;i<5;i++){
  28. String shaobing = String.format("厨师A的第%d个烧饼",i+1);
  29. try {
  30. shaobingQueue.put(shaobing);
  31. } catch (InterruptedException e) {
  32. SmallTool.printTimeAndThread("厨师A被中断"+e.getMessage());
  33. }
  34. chefAMsg.add(String.format("%d 厨师A制作了[%s]",System.currentTimeMillis(),shaobing));
  35. }
  36. });
  37. Thread roadPeopleA = new Thread(()->{
  38. for(int i=0;i<5;i++){
  39. String shaobing = null;
  40. try {
  41. shaobing = shaobingQueue.take();
  42. } catch (InterruptedException e) {
  43. SmallTool.printTimeAndThread("路人甲被中断"+e.getMessage());
  44. }
  45. roadPeopleAMsg.add(String.format("%d 路人甲 买到了 [%s]",System.currentTimeMillis(),shaobing));
  46. }
  47. });
  48. Thread roadPeopleB = new Thread(()->{
  49. for(int i=0;i<5;i++){
  50. String shaobing = null;
  51. try {
  52. shaobing = shaobingQueue.take();
  53. } catch (InterruptedException e) {
  54. SmallTool.printTimeAndThread("路人甲被中断"+e.getMessage());
  55. }
  56. roadPeopleAMsg.add(String.format("%d 路人甲 买到了 [%s]",System.currentTimeMillis(),shaobing));
  57. }
  58. });
  59. xiaoBai.start();
  60. chushiA.start();
  61. roadPeopleA.start();
  62. roadPeopleB.start();
  63. try {
  64. xiaoBai.join();
  65. chushiA.join();
  66. roadPeopleA.join();
  67. roadPeopleB.join();
  68. } catch (InterruptedException e) {
  69. SmallTool.printTimeAndThread("join 产生中断"+e.getMessage());
  70. }
  71. System.out.println(xiaoBaiMsg.stream().collect(Collectors.joining("\n")));
  72. System.out.println(chefAMsg.stream().collect(Collectors.joining("\n")));
  73. System.out.println("-------------------------------------------------------");
  74. System.out.println(roadPeopleAMsg.stream().collect(Collectors.joining("\n")));
  75. System.out.println(roadPeopleBMsg.stream().collect(Collectors.joining("\n")));
  76. }
  77. }

阻塞队列的种类

  • LinkedBlockingQueue

  • ArrayBlockingQueue(int capacity,boolean fair):fair=true 为有序队列


  • SynchronousQueue

这个队列没有容量,支持公平模式。【公平模式(TransferQueue):队尾匹配(判断匹配),队头出队,先进先出。非公平模式(默认策略:TransferStack):栈顶匹配,栈顶出栈,后进先出。】

生产者线程对其的插入操作put必须等待消费者的移除操作take。(没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。)

SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。
 

使用场景

  • 适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务。
  • 在线程池里。如果我们不确定来自生产者请求数量,但是这些请求需要很快的处理掉,那么配合SynchronousQueue为每个生产者请求分配一个消费线程是处理效率最高的办法。

  • PriorityBlockingQueue(int initialCapacity, 比较器)

可以传入比较器对队内元素进行排序。

BlockingQueue<Pancake> blockingQueue = new PriorityBlockingQueue<>(3,Comparator.comparing(Pancake::flavor))
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/968402
推荐阅读
相关标签
  

闽ICP备14008679号