赞
踩
上一篇的线程通信方式我们学习的是Condition,它比较适合于一对一等待。今天勾勾和大家一起学习一对多的线程通信方式CountDownLatch和CyclicBarrier。
目录
CountDownLatch是一个倒数的计数器阀门,初始化时阀门关闭,指定计数的数量,当数量倒数减到0时阀门打开,被阻塞线程被唤醒。
我们先熟悉下它的用法:我们使用CountDownLatch实现main线程等待所有的线程存入数据结束后才能执行。
- public static void main(String[] args) {
- final int capacity = 5;
- final CountDownLatch countDownLatch = new CountDownLatch(capacity);
- List<Integer> list = new ArrayList<>();
- //线程的数量需与CountDownLatch的计数器相等
- IntStream.range(0,capacity).forEach(i ->
- new Thread(()->{
- try {
- list.add(i);
- System.out.println(Thread.currentThread().getName()+"存入数据成功");
- } finally {
- countDownLatch.countDown();
- }
- System.out.println(Thread.currentThread().getName()+"继续做其他事情");
-
- }, "put-" + i).start()
- );
- try {
- //等待所有线程都计数,计算器减为0
- countDownLatch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(Thread.currentThread().getName()+"被唤醒");
- }
上述代码中我们使用有参构造函数指定了计数器的数量,main线程调用await()方法进入阻塞状态,等待计数器减为0;
put线程调用countDown()方法对计数器减1,直到计数器为0,main线程被唤醒。
我们查看运行的结果:
从结果看出,main线程一直阻塞直到所有的线程执行结束。
put线程调用countDown()方法时并没有阻塞,而是继续执行后面的逻辑。
看过现象之后,我们接下来透过现象看本质,分析CountDownLatch实现的原理。
CountDownLatch是AQS的共享模式的实现,其内部也有一个静态内部类Sync继承了AbstractQueuedSynchronizer。
当我们通过构造函数创建CountDownLatch对象时,其实是指定了AQS的同步状态state的值,所以state在CountDownLatch代表的即使计数器的个数。
- public CountDownLatch(int count) {
- if (count < 0) throw new IllegalArgumentException("count < 0");
- this.sync = new Sync(count);
- }
- private static final class Sync extends AbstractQueuedSynchronizer {
- private static final long serialVersionUID = 4982264981922014374L;
- //有参构造器,指定state的值
- Sync(int count) {
- setState(count);
- }
- //获取state状态值
- int getCount() {
- return getState();
- }
- //实现了AQS的共享模式的加锁方法
- protected int tryAcquireShared(int acquires) {
- //如果state为0,则返回大于0的数值,不等于0则返回小于0的数值
- return (getState() == 0) ? 1 : -1;
- }
- //通过死循环的方式释放锁
- protected boolean tryReleaseShared(int releases) {
- for (;;) {
- //获取状态值
- int c = getState();
- //如果此时状态值已经为0了,说明计数器已经减到0了不能再减了
- if (c == 0)
- return false;
- //否则就将计数器减1,并通过CAS的方式赋值给state,因为此时还会有其他线程修改状态
- //这里与ReentrantLock的独占解锁方式不同,独占是直接setState,因为它不会有其他线程竞争
- int nextc = c-1;
- if (compareAndSetState(c, nextc))
- //如果计数器为0,则返回true,唤醒同步队列中的等待线程
- //不等于0则返回false
- return nextc == 0;
- }
- }
- }
看过Sync的源码我们可以明白CountDownLatch是共享模式的加锁和解锁方式,await()表示获取操作,countDown()表示释放操作。
state为0则表示可以加锁,不等于0的时候则线程会调用AQS提供的doAcquireSharedInterruptibly加入同步队列。每次解锁都只释放一个同步器状态,如果计数器为0的时候则会唤醒同步队列中的等待线程。
- public void await() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
- public void countDown() {
- sync.releaseShared(1);
- }
CountDownLatch(int count):构造函数,需要指定一个不小于0的int数值。
await():当前线程调用该方法会进入阻塞状态,直到同步器状态为0时被其他线程唤醒或者被其他线程中断。也即将计数器减为0返回true的线程负责唤醒阻塞的线程。当计数器为0时,调用await()方法将立即返回。
await(long timeout, TimeUnit unit):该方法与await()作用一样,只是添加了等待的时间,如果超过等待时间还没有被唤醒或者被中断,那么阻塞线程将退出阻塞状态。
countDown():该方法主要是将指定的计数器减1,当计数器已经是0了调用该方法将会被忽略,也就是说计数器的值最小只能是0;
为了保证计数器一定会减1,一般要在finally语句块中执行countDown操作。
CountDownLatch到这里为止都介绍完了,下面我们学习跟CountDownLatch能实现同样效果的工具类CyclicBarrier。
CyclicBarrier是一个可循环的屏障,它允许多个线程在执行完相应的操作后彼此等待共同到达一个point,等所有线程都到达后再继续执行。
CyclicBarrier也可以像CountDownLatch一样适用于多个子任务并发执行,当所有子任务都执行完后再继续接下来的工作。
我们把上述CountDownLatch的例子改成CyclicBarrier,代码如下:
- public static void main(String[] args) {
- final int capacity = 5;
- final CyclicBarrier cyclicBarrier = new CyclicBarrier(capacity);
- List<Integer> list = new ArrayList<>();
- //线程的数量需与cyclicBarrier的parties数量相等
- IntStream.range(0,capacity).forEach(i ->
- new Thread(()->{
- try {
- list.add(i);
- System.out.println(Thread.currentThread().getName()+"存入数据成功");
- } finally {
- try {
- cyclicBarrier.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (BrokenBarrierException e) {
- e.printStackTrace();
- }
- }
- System.out.println(Thread.currentThread().getName()+"继续做其他事情");
-
- }, "put-" + i).start()
- );
-
- }
上述代码中通过有参构造指定了parties的数量,然后启动同等数量的线程。
线程调用await()方法会在障点阻塞,直到所有线程都到达障点再继续运行。
我们查看运行结果:
通过运行结果我们看到await()方法之后的代码一直没有运行直到所有线程都调用的await()方法之后才开始执行。
如果我们想要所有线程到达障点之后优先执行某个动作,可以在创建CyclicBarrier对象时指定一个Runnable接口。
- final CyclicBarrier cyclicBarrier = new CyclicBarrier(capacity, ()->{
- System.out.println("我要优先执行");
- });
修改之后的运行结果:
从结果可以看到Runnable接口的逻辑优先执行了。
CyclicBarrier之所以被称为循环栅栏,是因为它还有一个主要的特点,它内部的计数器在为0之后又被重置了还可以再循环利用。
CountDownLatch不能达到这种效果,他的计数器减为0之后就不能再使用了。
我们通过一个例子看下CyclicBarrier的循环利用。
- public static void main(String[] args) {
- final int capacity = 5;
- final CyclicBarrier cyclicBarrier = new CyclicBarrier(capacity);
- List<Integer> list = new ArrayList<>();
- Map<Integer, String> map = new HashMap<>(8);
- //线程的数量需与cyclicBarrier的parties相等
- IntStream.range(0,capacity).forEach(i ->
- new Thread(()->{
- try {
- list.add(i);
- System.out.println(Thread.currentThread().getName()+ "存入list成功");
- } finally {
- try {
- cyclicBarrier.await();
- } catch (InterruptedException | BrokenBarrierException e) {
- e.printStackTrace();
- }
- }
- try {
- map.put(i, "value-i");
- System.out.println(Thread.currentThread().getName()+ "存入map成功");
- } finally {
- try {
- cyclicBarrier.await();
- } catch (InterruptedException | BrokenBarrierException e) {
- e.printStackTrace();
- }
- }
- System.out.println(Thread.currentThread().getName()+"继续做其他事情");
-
- }, "put-" + i).start()
- );
-
- }
上述代码中我们只创建了一个CyclicBarrier对象,但是每个线程调用了两次的await()方法,重复利用了栅栏的计数器。
我们查看运行结果:
熟悉了CyclicBarrier的用法之后,接下来我们分析其实现原理。
CyclicBarrier内部维护了独占锁ReentrantLock,并且关联了一个Condition。
await()方法主要是判断count的数量来决定线程进入阻塞状态还是唤醒所有的阻塞线程。count是初始化时parties的值,parties的值一经赋值不会改变,count会随着线程到达障点而减到0。
- public int await() throws InterruptedException, BrokenBarrierException {
- try {
- return dowait(false, 0L);
- } catch (TimeoutException toe) {
- throw new Error(toe); // cannot happen
- }
- }
- //阻塞方法
- private int dowait(boolean timed, long nanos)
- throws InterruptedException, BrokenBarrierException,
- TimeoutException {
- //获取独占锁
- final ReentrantLock lock = this.lock;
- //加锁,之后的代码都是属于同步代码
- lock.lock();
- try {
- final Generation g = generation;
- //broken默认false,已经broken的barrier不能再次使用了
- if (g.broken)
- throw new BrokenBarrierException();
- //如果线程被打断了,那么将唤醒所有的阻塞线程
- if (Thread.interrupted()) {
- breakBarrier();
- throw new InterruptedException();
- }
- //count值减1
- int index = --count;
- //如果index值为0,则表示所有的线程都到达了障点
- if (index == 0) { // tripped
- boolean ranAction = false;
- try {
- //获取Runnable执行单元,如果不为空则执行逻辑
- //此处就可以明白为什么Runnable逻辑优先执行了吧
- final Runnable command = barrierCommand;
- if (command != null)
- command.run();
- ranAction = true;
- //唤醒阻塞的所有线程,重置count
- //此处可以明白CyclicBarrier为什么可以循环利用了吧
- nextGeneration();
- return 0;
- } finally {
- if (!ranAction)
- breakBarrier();
- }
- }
- // 如果index不为0,则表示还有线程没有达到障点
- //死循环一直等待唤醒
- for (;;) {
- try {
- //如果没有设置超时时间,则调用Condition的await()方法
- //await方法线程释放锁并加入等待队列
- //是不是又到了AQS了
- if (!timed)
- trip.await();
- else if (nanos > 0L)
- //如果设置了超时时间,则调用Condition的awaitNanos()方法
- nanos = trip.awaitNanos(nanos);
- } catch (InterruptedException ie) {
- if (g == generation && ! g.broken) {
- breakBarrier();
- throw ie;
- } else {
- Thread.currentThread().interrupt();
- }
- }
-
- if (g.broken)
- throw new BrokenBarrierException();
-
- if (g != generation)
- return index;
-
- if (timed && nanos <= 0L) {
- breakBarrier();
- throw new TimeoutException();
- }
- }
- } finally {
- //释放锁
- lock.unlock();
- }
- }
-
await方法主要代码分析都写在注释里了,我们通过流程图再熟悉下整个过程:
在await()方法中,很多个分支调用了breakBarrier方法,此方法主要用于异常分支下的线程唤醒和count重置,但是broken被设置为true的CyclicBarrier已经不能再使用了,必须使用reset方法重置它。
- private void breakBarrier() {
- //设置broken为true
- generation.broken = true;
- //重置count
- count = parties;
- //唤醒所有阻塞的线程
- trip.signalAll();
- }
- //重置CyclicBarrier,break现有的generation,重新生成新的generation
- public void reset() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- breakBarrier(); // break the current generation
- nextGeneration(); // start a new generation
- } finally {
- lock.unlock();
- }
- }
nextGeneration用于唤醒所有阻塞的线程,并重置count和generation。
- private void nextGeneration() {
- // 唤醒所有阻塞的线程
- trip.signalAll();
- // 设置count
- count = parties;
- generation = new Generation();
- }
CyclicBarrier(int parties):构造器指定不能小于0的parties,该值不会发生改变。
CyclicBarrier(int parties, Runnable barrierAction):构造器指定parties和一个Runnable 接口,当所有的线程到达障点之后Runnable 接口会被调用。
await():当前线程调用该方法之后会进入阻塞状态直到所有的线程都调用await()方法到达障点才会被唤醒。当CyclicBarrier内部的count为0时,调用await()方法不会进入阻塞状态。
await(long timeout, TimeUnit unit):该方法与await()方法作用一样,只是可以设置阻塞等待的时间,超时没有被唤醒将退出阻塞状态。
isBroken():返回barrier的broken状态,某个线程执行await()方法进入阻塞状态,如果被中断了isBroken()方法将返回true。也即是线程的中断将会导致CyclicBarrier被broken,被broken的CyclicBarrier此时不能再使用必须reset,如果此时线程调用了await()方法将抛出异常BrokenBarrierException。
reset():中断当前barrier,并重新生成Generation。
CountDownLatch的await()线程会等待计数器减为0,而执行CyclicBarrier的await()方法会使线程进入阻塞等待其他线程到达障点。
CountDownLatch计数器不能重置,CyclicBarrier可以重置循环利用。
CountDownLatch是基于AQS的共享模式实现的,CyclicBarrier是基于ReentrantLock和Condition实现的。
CountDownLatch不会让子线程进入阻塞,CyclicBarrier会使所有子线程进入阻塞。
我是勾勾,一直在努力的程序媛,感谢您的点赞、转发和关注!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。