当前位置:   article > 正文

juc并发编程学习笔记下(尚硅谷)_【juc并发编程】尚硅谷2021年最新juc并发编程

【juc并发编程】尚硅谷2021年最新juc并发编程

5 多线程锁

5.1锁的八个问题演示

class Phone { 
    public static synchronized void sendSMS() throws Exception { 
        //停留4秒 
        TimeUnit.SECONDS.sleep(4); 
        System.out.println("------sendSMS"); 
    } 
    
    public synchronized void sendEmail() throws Exception { 
        System.out.println("------sendEmail"); 
    } 
        public void getHello() { 
        System.out.println("------getHello"); 
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
1 标准访问,先打印短信还是邮件 
------sendSMS 
------sendEmail 
2 停4秒在短信方法内,先打印短信还是邮件 
------sendSMS 
------sendEmail 
3 新增普通的hello方法,是先打短信还是hello 
------getHello 
------sendSMS 
4 现在有两部手机,先打印短信还是邮件 
------sendEmail 
------sendSMS 
5 两个静态同步方法,1部手机,先打印短信还是邮件 
------sendSMS 
------sendEmail
6 两个静态同步方法,2部手机,先打印短信还是邮件 
------sendSMS 
------sendEmail 
7 1个静态同步方法,1个普通同步方法,1部手机,先打印短信还是邮件 
------sendEmail 
------sendSMS 
8 1个静态同步方法,1个普通同步方法,2部手机,先打印短信还是邮件 
------sendEmail 
------sendSMS
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

synchronized实现同步的基础:Java中的每一个对象都可以作为锁。
具体表现为以下3种形式。
对于普通同步方法,锁是当前实例对象。
对于静态同步方法,锁是当前类的Class对象。
对于同步方法块,锁是Synchonized括号里配置的对象

5.2公平锁和非公平锁

ReentrantLock构造方法根据传入参数创建公平锁或非公平锁,默认为非公平锁。非公平锁可能会有线程饿死,效率高;公平锁阳光普照,效率相对低。

NonfairSync源码:

	static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

FairSync源码:

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

5.3可重入锁(递归锁)

synchronized和lock都是可重入锁

5.4死锁

查看进程jps、jstack

deadlock测试代码:

public static void main(String[] args) {
        Ticket t1 = new Ticket();
        Ticket t2 = new Ticket();
        new Thread(() -> {
            synchronized (t1){
                System.out.println("拿到锁t1,准备获取锁t2");
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (t2){
                    System.out.println("获取锁t2");
                }
            }
        }, "aa").start();

        new Thread(() -> {
            synchronized (t2){
                System.out.println("拿到锁t2,准备获取锁t1");
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (t1){
                    System.out.println("获取锁t1");
                }
            }
        }, "bb").start();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

6、callable接口

使用callable接口创建线程

public static void main(String[] args) throws Exception {
    FutureTask<Integer> futureTask = new FutureTask<Integer>(() -> {
        System.out.println(Thread.currentThread().getName() + "==callable线程执行。。。");
        return 1111;
    });
    new Thread(futureTask, "AA").start();
    System.out.println(futureTask.get());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

7、JUC 三大辅助类

7.1CountDownLatch

CountDownLatch类可以设置一个计数器,然后通过countDown方法来进行减1的操作,使用await方法等待计数器不大于0,然后继续执行await方法之后的语句。

  • CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞
  • 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞)
  • 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行

测试代码:

public static void main(String[] args) throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(6);
    for (int i = 0; i < 6; i++) {
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "同学离开了教室。");
            countDownLatch.countDown();
        }, String.valueOf(i)).start();
    }
    countDownLatch.await();
    System.out.println(Thread.currentThread().getName() + "班长锁门了。");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

7.2循环栅栏CyclicBarrier

CyclicBarrier看英文单词可以看出大概就是循环阻塞的意思,在使用中CyclicBarrier的构造方法第一个参数是目标障碍数,每次执行CyclicBarrier一次障碍数会加一,如果达到了目标障碍数,才会执行cyclicBarrier.await()之后的语句。可以将CyclicBarrier理解为加1操作

测试代码:

	private final static int num = 7;

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(num, () -> {
            System.out.println("七颗龙珠已收集,可以召唤神龙");
        });
        for (int i = 1; i <= num; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "颗龙珠已收集");
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, String.valueOf(i)).start();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

7.3信号量Semaphore

	public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 6; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "抢到了停车位");
                    TimeUnit.SECONDS.sleep(new Random().nextInt(6));
                    System.out.println(Thread.currentThread().getName() + "离开了停车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            }, String.valueOf(i)).start();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

8、读写锁

8.1读写锁介绍

现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。
针对这种场景,JAVA的并发包提供了读写锁ReentrantReadWriteLock,它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁。

  1. 线程进入读锁的前提条件:
    • 没有其他线程的写锁
    • 没有写请求, 或者有写请求,但调用线程和持有锁的线程是同一个(可重入锁)。
  2. 线程进入写锁的前提条件:
    • 没有其他线程的读锁
    • 没有其他线程的写锁
    而读写锁有以下三个重要的特性:
    (1)公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
    (2)重进入:读锁和写锁都支持线程重进入。
    (3)锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。

8.2读写锁案例实现

资源类

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @author lyz
 * @Title: CustomCache
 * @Description:
 * @date 2021/10/8 15:03
 */
public class CustomCache {

    private volatile Map<String, String> cacheMap = new HashMap<>();

    private ReadWriteLock rwLock = new ReentrantReadWriteLock();

    public void put(String key, String value){
        rwLock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + "正在写入数据。。" + key);
            TimeUnit.SECONDS.sleep(1);
            cacheMap.put(key, value);
            System.out.println(Thread.currentThread().getName() + "写完了" + key);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            rwLock.writeLock().unlock();
        }
    }

    public String get(String key){
        rwLock.readLock().lock();
        String value = null;
        try {
            System.out.println(Thread.currentThread().getName() + "正在读数据。。" + key);
            TimeUnit.SECONDS.sleep(1);
            value = cacheMap.get(key);
            System.out.println(Thread.currentThread().getName() + "读完了key=" + key + ",value=" + value);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            rwLock.readLock().unlock();
        }
        return value;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

测试代码

public static void main(String[] args) {
        CustomCache cache = new CustomCache();
        for (int i = 0; i < 5; i++) {
            final int num = i;
            new Thread(() -> {
                cache.put(String.valueOf(num), String.valueOf(num));
            }, String.valueOf(i)).start();
        }

        for (int i = 0; i < 5; i++) {
            final int num = i;
            new Thread(() -> {
                cache.get(String.valueOf(num));
            }, String.valueOf(i)).start();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

8.3锁降级演示demo

	public static void main(String[] args) {
        ReadWriteLock rwLock = new ReentrantReadWriteLock();
        rwLock.writeLock().lock();
        System.out.println("写入数据");
        rwLock.readLock().lock();
        System.out.println("读取数据");
        rwLock.readLock().unlock();
        rwLock.writeLock().unlock();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

9、阻塞队列

常用的队列主要有以下两种:
• 先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性
• 后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件(栈)
在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起
为什么需要BlockingQueue
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了
在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

9.1BlockingQueue核心方法

在这里插入图片描述
drainTo(): 一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

9.2测试代码

public static void main(String[] args) throws Exception {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);

        //第一组 抛出异常
        System.out.println(blockingQueue.add("a"));
        System.out.println(blockingQueue.add("b"));
        System.out.println(blockingQueue.add("c"));
//        System.out.println(blockingQueue.add("d"));
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
//        System.out.println(blockingQueue.remove());
        System.out.println("======================================================");

        //第二组 特殊值
        System.out.println(blockingQueue.offer("1"));
        System.out.println(blockingQueue.offer("2"));
        System.out.println(blockingQueue.offer("3"));
        System.out.println(blockingQueue.offer("4"));
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println("======================================================");

        //第三组 阻塞
        blockingQueue.put("a");
        blockingQueue.put("b");
        blockingQueue.put("c");
//        blockingQueue.put("d");
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
//        System.out.println(blockingQueue.take());
        System.out.println("======================================================");

        //第四组 超时
        System.out.println(blockingQueue.offer("a", 3l, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("b", 3l, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("c", 3l, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("d", 3l, TimeUnit.SECONDS));
        System.out.println(blockingQueue.poll(3l, TimeUnit.SECONDS));
        System.out.println(blockingQueue.poll(3l, TimeUnit.SECONDS));
        System.out.println(blockingQueue.poll(3l, TimeUnit.SECONDS));
        System.out.println(blockingQueue.poll(3l, TimeUnit.SECONDS));
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

10、线程池

10.1线程池简介

线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。

特点:

• 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
• 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行。
• 提高线程的可管理性: 线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
• Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类
  • 1
  • 2
  • 3
  • 4

请添加图片描述

10.2测试代码

public static void main(String[] args) {
        //一池多线程
//        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        //一池一线程
//        ExecutorService threadPool = Executors.newSingleThreadExecutor();
        //可扩展线程池
        ExecutorService threadPool = Executors.newCachedThreadPool();

        try {
            for (int i = 0; i < 10; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "办理业务。");
                });
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

看源码可知底层使用的都是ThreadPoolExecutor

	public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  • 1
  • 2
  • 3
  • 4
  • 5

10.3常用参数

• corePoolSize线程池的核心线程数
• maximumPoolSize能容纳的最大线程数
• keepAliveTime空闲线程存活时间
• unit 存活的时间单位
• workQueue 存放提交但未执行任务的队列
• threadFactory 创建线程的工厂类
• handler 等待队列满后的拒绝策略

10.4线程池底层工作原理

请添加图片描述

1. 在创建了线程池后,线程池中的线程数为零
2. 当调用execute()方法添加一个请求任务时,线程池会做出如下判断: 
   2.1 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务; 
   2.2 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列; 
   2.3 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务; 
   2.4 如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
3. 当一个线程完成任务时,它会从队列中取下一个任务来执行
4. 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断: 
   4.1 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。 
   4.2 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

10.4.1 JDK内置的拒绝策略

CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
AbortPolicy: 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
DiscardPolicy: 直接丢弃,其他啥都没有
DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入

10.5 ThreadPoolExecutor手动创建

实际开发中创建线程池推荐使用ThreadPoolExecutor及其7个参数手动创建
请添加图片描述
测试代码

public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 2l, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        try {
            for (int i = 0; i < 10; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "办理业务。");
                });
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

11、Fork/Join

Fork/Join它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。Fork/Join框架要完成两件事情: Fork:把一个复杂任务进行分拆,大事化小 Join:把分拆任务的结果进行合并

  1. 任务分割:首先Fork/Join框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割
  2. 执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据。
    在Java

定义分支合并类

import java.util.concurrent.RecursiveTask;

/**
 * @author lyz
 * @Title: MyTask
 * @Description:
 * @date 2021/10/13 17:18
 */
public class MyTask extends RecursiveTask<Integer> {

    private static final int value = 10;

    private int max;

    private int min;

    private int result;

    public MyTask(int max, int min) {
        this.max = max;
        this.min = min;
    }

    @Override
    protected Integer compute() {
        if (max - min < value){
            for (int i = min; i <= max; i++) {
                result += i;
            }
        } else {
            int mid = (max + min) / 2;
            MyTask task1 = new MyTask(mid, min);
            MyTask task2 = new MyTask(max, mid + 1);
            task1.fork();
            task2.fork();
            result = task1.join() + task2.join();
        }
        return result;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

调用分支合并代码:

	public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyTask task = new MyTask(100, 1);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Integer> submit = forkJoinPool.submit(task);
        Integer result = submit.get();
        System.out.println(result);
        forkJoinPool.shutdown();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

12、CompletableFuture

12.1线程回顾

初始化线程的4种方式:

  1. 继承Thread
  2. 实现Runnable接口
  3. 实现Callable接口 + FutureTask (可以拿到返回结果,可以处理异常)
  4. 线程池

方式1和方式2:主进程无法获取线程的运算结果。不适合当前场景

方式3:主进程可以获取线程的运算结果,并设置给itemVO,但是不利于控制服务器中的线程资源。可以导致服务器资源耗尽。

方式4:通过如下两种方式初始化线程池:

Executors.newFiexedThreadPool(3);
//或者
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit unit, workQueue, threadFactory, handler);
  • 1
  • 2
  • 3

通过线程池性能稳定,也可以获取执行结果,并捕获异常。但是,在业务复杂情况下,一个异步调用可能会依赖于另一个异步调用的执行结果。

12.2 CompletableFuture介绍

Future是Java 5添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行。

虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

很多语言,比如Node.js,采用回调的方式实现异步编程。Java的一些框架,比如Netty,自己扩展了Java的 Future接口,提供了addListener等多个扩展方法;Google guava也提供了通用的扩展Future;Scala也提供了简单易用且功能强大的Future/Promise异步编程模式。

作为正统的Java类库,是不是应该做点什么,加强一下自身库的功能呢?

在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

CompletableFuture类实现了Future接口,所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。
CompletableFuture和FutureTask同属于Future接口的实现类,都可以获取线程的执行结果。

12.3创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作。

static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
  • 1
  • 2
  • 3
  • 4

没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。

  • runAsync方法不支持返回值。
  • supplyAsync可以支持返回值。

12.4计算完成时回调方法

当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor);

public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn);
  • 1
  • 2
  • 3
  • 4
  • 5

whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。BiConsumer<? super T,? super Throwable>可以定义处理业务

whenComplete 和 whenCompleteAsync 的区别:
whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

代码示例:

public class CompletableFutureDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<Object>() {
            @Override
            public Object get() {
                System.out.println(Thread.currentThread().getName() + "\t completableFuture");
                int i = 10 / 0;
                return 1024;
            }
        }).whenComplete(new BiConsumer<Object, Throwable>() {
            @Override
            public void accept(Object o, Throwable throwable) {
                System.out.println("-------o=" + o.toString());
                System.out.println("-------throwable=" + throwable);
            }
        }).exceptionally(new Function<Throwable, Object>() {
            @Override
            public Object apply(Throwable throwable) {
                System.out.println("throwable=" + throwable);
                return 6666;
            }
        });
        System.out.println(future.get());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

12.5. 线程串行化方法

thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。

thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。

thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行 thenRun的后续操作

带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型

代码演示:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            System.out.println(Thread.currentThread().getName() + "\t completableFuture");
            //int i = 10 / 0;
            return 1024;
        }
    }).thenApply(new Function<Integer, Integer>() {
        @Override
        public Integer apply(Integer o) {
            System.out.println("thenApply方法,上次返回结果:" + o);
            return  o * 2;
        }
    }).whenComplete(new BiConsumer<Integer, Throwable>() {
        @Override
        public void accept(Integer o, Throwable throwable) {
            System.out.println("-------o=" + o);
            System.out.println("-------throwable=" + throwable);
        }
    }).exceptionally(new Function<Throwable, Integer>() {
        @Override
        public Integer apply(Throwable throwable) {
            System.out.println("throwable=" + throwable);
            return 6666;
        }
    });
    System.out.println(future.get());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

12.6. 多任务组合

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
  • 1
  • 2
  • 3

allOf:等待所有任务完成

anyOf:只要有一个任务完成

public static void main(String[] args) {
    List<CompletableFuture> futures = Arrays.asList(CompletableFuture.completedFuture("hello"),
                                                    CompletableFuture.completedFuture(" world!"),
                                                    CompletableFuture.completedFuture(" hello"),
                                                    CompletableFuture.completedFuture("java!"));
    final CompletableFuture<Void> allCompleted = CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));
    allCompleted.thenRun(() -> {
        futures.stream().forEach(future -> {
            try {
                System.out.println("get future at:"+System.currentTimeMillis()+", result:"+future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    });
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

测试结果:

get future at:1568892339473, result:hello
get future at:1568892339473, result: world!
get future at:1568892339473, result: hello
get future at:1568892339473, result:java!
  • 1
  • 2
  • 3
  • 4

几乎同时完成任务!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/966829
推荐阅读
相关标签
  

闽ICP备14008679号