当前位置:   article > 正文

CountDownLatch和CyclicBarrier_cyclicbarrier 和 countdownlatch

cyclicbarrier 和 countdownlatch

上一篇的线程通信方式我们学习的是Condition,它比较适合于一对一等待。今天勾勾和大家一起学习一对多的线程通信方式CountDownLatch和CyclicBarrier。

目录

CountDownLatch减数计数器

CountDownLatch原理分析

CountDownLatch方法总结

CyclicBarrier循环栅栏

CyclicBarrier循环特点

CyclicBarrier原理分析

CyclicBarrier方法总结


CountDownLatch减数计数器

CountDownLatch是一个倒数的计数器阀门,初始化时阀门关闭,指定计数的数量,当数量倒数减到0时阀门打开,被阻塞线程被唤醒。

我们先熟悉下它的用法:我们使用CountDownLatch实现main线程等待所有的线程存入数据结束后才能执行。

  1. public static void main(String[] args) {
  2.     final int capacity = 5;
  3.     final CountDownLatch countDownLatch = new CountDownLatch(capacity);
  4.     List<Integer> list = new ArrayList<>();
  5.     //线程的数量需与CountDownLatch的计数器相等
  6.     IntStream.range(0,capacity).forEach(i ->
  7.        new Thread(()->{
  8.            try {
  9.                list.add(i);
  10.                System.out.println(Thread.currentThread().getName()+"存入数据成功");
  11.            } finally {
  12.                countDownLatch.countDown();
  13.            }
  14.            System.out.println(Thread.currentThread().getName()+"继续做其他事情");
  15.         }, "put-" + i).start()
  16.     );
  17.     try {
  18.         //等待所有线程都计数,计算器减为0
  19.         countDownLatch.await();
  20.     } catch (InterruptedException e) {
  21.         e.printStackTrace();
  22.     }
  23.     System.out.println(Thread.currentThread().getName()+"被唤醒");
  24. }

上述代码中我们使用有参构造函数指定了计数器的数量,main线程调用await()方法进入阻塞状态,等待计数器减为0;

put线程调用countDown()方法对计数器减1,直到计数器为0,main线程被唤醒。

我们查看运行的结果:

图片

从结果看出,main线程一直阻塞直到所有的线程执行结束。

put线程调用countDown()方法时并没有阻塞,而是继续执行后面的逻辑。

看过现象之后,我们接下来透过现象看本质,分析CountDownLatch实现的原理。

 

CountDownLatch原理分析

CountDownLatch是AQS的共享模式的实现,其内部也有一个静态内部类Sync继承了AbstractQueuedSynchronizer。

当我们通过构造函数创建CountDownLatch对象时,其实是指定了AQS的同步状态state的值,所以state在CountDownLatch代表的即使计数器的个数。

  1. public CountDownLatch(int count) {
  2.     if (count < 0) throw new IllegalArgumentException("count < 0");
  3.     this.sync = new Sync(count);
  4. }
  1. private static final class Sync extends AbstractQueuedSynchronizer {
  2.     private static final long serialVersionUID = 4982264981922014374L;
  3.  //有参构造器,指定state的值
  4.     Sync(int count) {
  5.         setState(count);
  6.     }
  7.  //获取state状态值
  8.     int getCount() {
  9.         return getState();
  10.     }
  11.  //实现了AQS的共享模式的加锁方法
  12.     protected int tryAcquireShared(int acquires) {
  13.         //如果state为0,则返回大于0的数值,不等于0则返回小于0的数值
  14.         return (getState() == 0) ? 1 : -1;
  15.     }
  16.  //通过死循环的方式释放锁
  17.     protected boolean tryReleaseShared(int releases) {       
  18.         for (;;) {
  19.             //获取状态值
  20.             int c = getState();
  21.             //如果此时状态值已经为0了,说明计数器已经减到0了不能再减了
  22.             if (c == 0)
  23.                 return false;
  24.             //否则就将计数器减1,并通过CAS的方式赋值给state,因为此时还会有其他线程修改状态
  25.             //这里与ReentrantLock的独占解锁方式不同,独占是直接setState,因为它不会有其他线程竞争
  26.             int nextc = c-1;
  27.             if (compareAndSetState(c, nextc))
  28.                 //如果计数器为0,则返回true,唤醒同步队列中的等待线程
  29.                 //不等于0则返回false
  30.                 return nextc == 0;
  31.         }
  32.     }
  33. }

看过Sync的源码我们可以明白CountDownLatch是共享模式的加锁和解锁方式,await()表示获取操作,countDown()表示释放操作。

state为0则表示可以加锁,不等于0的时候则线程会调用AQS提供的doAcquireSharedInterruptibly加入同步队列。每次解锁都只释放一个同步器状态,如果计数器为0的时候则会唤醒同步队列中的等待线程。

  1. public void await() throws InterruptedException {
  2.     sync.acquireSharedInterruptibly(1);
  3. }
  1. public void countDown() {
  2.     sync.releaseShared(1);
  3. }

 

CountDownLatch方法总结

CountDownLatch(int count):构造函数,需要指定一个不小于0的int数值。

await():当前线程调用该方法会进入阻塞状态,直到同步器状态为0时被其他线程唤醒或者被其他线程中断。也即将计数器减为0返回true的线程负责唤醒阻塞的线程。当计数器为0时,调用await()方法将立即返回。

await(long timeout, TimeUnit unit):该方法与await()作用一样,只是添加了等待的时间,如果超过等待时间还没有被唤醒或者被中断,那么阻塞线程将退出阻塞状态。

countDown():该方法主要是将指定的计数器减1,当计数器已经是0了调用该方法将会被忽略,也就是说计数器的值最小只能是0;

为了保证计数器一定会减1,一般要在finally语句块中执行countDown操作。

CountDownLatch到这里为止都介绍完了,下面我们学习跟CountDownLatch能实现同样效果的工具类CyclicBarrier

 

CyclicBarrier循环栅栏

CyclicBarrier是一个可循环的屏障,它允许多个线程在执行完相应的操作后彼此等待共同到达一个point,等所有线程都到达后再继续执行。

CyclicBarrier也可以像CountDownLatch一样适用于多个子任务并发执行,当所有子任务都执行完后再继续接下来的工作。

我们把上述CountDownLatch的例子改成CyclicBarrier,代码如下:

  1. public static void main(String[] args) {
  2.         final int capacity = 5;
  3.         final CyclicBarrier cyclicBarrier = new CyclicBarrier(capacity);
  4.         List<Integer> list = new ArrayList<>();
  5.         //线程的数量需与cyclicBarrier的parties数量相等
  6.         IntStream.range(0,capacity).forEach(i ->
  7.                 new Thread(()->{
  8.                     try {
  9.                         list.add(i);
  10.                         System.out.println(Thread.currentThread().getName()+"存入数据成功");
  11.                     } finally {
  12.                         try {
  13.                             cyclicBarrier.await();
  14.                         } catch (InterruptedException e) {
  15.                             e.printStackTrace();
  16.                         } catch (BrokenBarrierException e) {
  17.                             e.printStackTrace();
  18.                         }
  19.                     }
  20.                     System.out.println(Thread.currentThread().getName()+"继续做其他事情");
  21.                 }, "put-" + i).start()
  22.         );
  23.     }

上述代码中通过有参构造指定了parties的数量,然后启动同等数量的线程。

线程调用await()方法会在障点阻塞,直到所有线程都到达障点再继续运行。

我们查看运行结果:

图片

通过运行结果我们看到await()方法之后的代码一直没有运行直到所有线程都调用的await()方法之后才开始执行。

如果我们想要所有线程到达障点之后优先执行某个动作,可以在创建CyclicBarrier对象时指定一个Runnable接口。

  1. final CyclicBarrier cyclicBarrier = new CyclicBarrier(capacity, ()->{
  2.     System.out.println("我要优先执行");
  3. });

修改之后的运行结果:

图片

从结果可以看到Runnable接口的逻辑优先执行了。

 

CyclicBarrier循环特点

CyclicBarrier之所以被称为循环栅栏,是因为它还有一个主要的特点,它内部的计数器在为0之后又被重置了还可以再循环利用。

CountDownLatch不能达到这种效果,他的计数器减为0之后就不能再使用了。

我们通过一个例子看下CyclicBarrier的循环利用。

  1. public static void main(String[] args) {
  2.     final int capacity = 5;
  3.     final CyclicBarrier cyclicBarrier = new CyclicBarrier(capacity);
  4.     List<Integer> list = new ArrayList<>();
  5.     Map<Integer, String> map = new HashMap<>(8);
  6.     //线程的数量需与cyclicBarrier的parties相等
  7.     IntStream.range(0,capacity).forEach(i ->
  8.             new Thread(()->{
  9.                 try {
  10.                     list.add(i);
  11.                     System.out.println(Thread.currentThread().getName()+ "存入list成功");
  12.                 } finally {
  13.                     try {
  14.                         cyclicBarrier.await();
  15.                     } catch (InterruptedException | BrokenBarrierException e) {
  16.                         e.printStackTrace();
  17.                     }
  18.                 }
  19.                 try {
  20.                     map.put(i, "value-i");
  21.                     System.out.println(Thread.currentThread().getName()+ "存入map成功");
  22.                 } finally {
  23.                     try {
  24.                         cyclicBarrier.await();
  25.                     } catch (InterruptedException | BrokenBarrierException e) {
  26.                         e.printStackTrace();
  27.                     }
  28.                 }
  29.                 System.out.println(Thread.currentThread().getName()+"继续做其他事情");
  30.             }, "put-" + i).start()
  31.     );
  32. }

上述代码中我们只创建了一个CyclicBarrier对象,但是每个线程调用了两次的await()方法,重复利用了栅栏的计数器。

我们查看运行结果:

图片

熟悉了CyclicBarrier的用法之后,接下来我们分析其实现原理。

 

CyclicBarrier原理分析

CyclicBarrier内部维护了独占锁ReentrantLock,并且关联了一个Condition。

await()方法主要是判断count的数量来决定线程进入阻塞状态还是唤醒所有的阻塞线程。count是初始化时parties的值,parties的值一经赋值不会改变,count会随着线程到达障点而减到0。

  1. public int await() throws InterruptedException, BrokenBarrierException {
  2.     try {
  3.         return dowait(false0L);
  4.     } catch (TimeoutException toe) {
  5.         throw new Error(toe); // cannot happen
  6.     }
  7. }
  8. //阻塞方法
  9.  private int dowait(boolean timed, long nanos)
  10.         throws InterruptedException, BrokenBarrierException,
  11.                TimeoutException {
  12.         //获取独占锁
  13.         final ReentrantLock lock = this.lock;
  14.         //加锁,之后的代码都是属于同步代码
  15.         lock.lock();
  16.         try {
  17.             final Generation g = generation;
  18.             //broken默认false,已经broken的barrier不能再次使用了
  19.             if (g.broken)
  20.                 throw new BrokenBarrierException();
  21.    //如果线程被打断了,那么将唤醒所有的阻塞线程
  22.             if (Thread.interrupted()) {
  23.                 breakBarrier();
  24.                 throw new InterruptedException();
  25.             }
  26.             //count值减1
  27.             int index = --count;
  28.             //如果index值为0,则表示所有的线程都到达了障点
  29.             if (index == 0) {  // tripped
  30.                 boolean ranAction = false;
  31.                 try {
  32.                     //获取Runnable执行单元,如果不为空则执行逻辑
  33.                     //此处就可以明白为什么Runnable逻辑优先执行了吧
  34.                     final Runnable command = barrierCommand;
  35.                     if (command != null)
  36.                         command.run();
  37.                     ranAction = true;
  38.                     //唤醒阻塞的所有线程,重置count
  39.                     //此处可以明白CyclicBarrier为什么可以循环利用了吧
  40.                     nextGeneration();
  41.                     return 0;
  42.                 } finally {
  43.                     if (!ranAction)
  44.                         breakBarrier();
  45.                 }
  46.             }
  47.             // 如果index不为0,则表示还有线程没有达到障点
  48.             //死循环一直等待唤醒
  49.             for (;;) {
  50.                 try {
  51.                     //如果没有设置超时时间,则调用Condition的await()方法
  52.                     //await方法线程释放锁并加入等待队列
  53.                     //是不是又到了AQS了
  54.                     if (!timed)
  55.                         trip.await();
  56.                     else if (nanos > 0L)
  57.                         //如果设置了超时时间,则调用Condition的awaitNanos()方法
  58.                         nanos = trip.awaitNanos(nanos);
  59.                 } catch (InterruptedException ie) {
  60.                     if (g == generation && ! g.broken) {
  61.                         breakBarrier();
  62.                         throw ie;
  63.                     } else {                        
  64.                         Thread.currentThread().interrupt();
  65.                     }
  66.                 }
  67.                 if (g.broken)
  68.                     throw new BrokenBarrierException();
  69.                 if (g != generation)
  70.                     return index;
  71.                 if (timed && nanos <= 0L) {
  72.                     breakBarrier();
  73.                     throw new TimeoutException();
  74.                 }
  75.             }
  76.         } finally {
  77.             //释放锁
  78.             lock.unlock();
  79.         }
  80.     }

await方法主要代码分析都写在注释里了,我们通过流程图再熟悉下整个过程:

 

图片

在await()方法中,很多个分支调用了breakBarrier方法,此方法主要用于异常分支下的线程唤醒和count重置,但是broken被设置为true的CyclicBarrier已经不能再使用了,必须使用reset方法重置它。

  1. private void breakBarrier() {
  2.     //设置broken为true
  3.     generation.broken = true;
  4.     //重置count
  5.     count = parties;
  6.     //唤醒所有阻塞的线程
  7.     trip.signalAll();
  8. }
  9. //重置CyclicBarrier,break现有的generation,重新生成新的generation
  10. public void reset() {
  11.     final ReentrantLock lock = this.lock;
  12.     lock.lock();
  13.     try {
  14.         breakBarrier();   // break the current generation
  15.         nextGeneration(); // start a new generation
  16.     } finally {
  17.         lock.unlock();
  18.     }
  19. }

nextGeneration用于唤醒所有阻塞的线程,并重置count和generation。

  1. private void nextGeneration() {
  2.     // 唤醒所有阻塞的线程
  3.     trip.signalAll();
  4.     // 设置count
  5.     count = parties;
  6.     generation = new Generation();
  7. }

 

CyclicBarrier方法总结

CyclicBarrier(int parties):构造器指定不能小于0的parties,该值不会发生改变。

CyclicBarrier(int parties, Runnable barrierAction):构造器指定parties和一个Runnable 接口,当所有的线程到达障点之后Runnable 接口会被调用。

await():当前线程调用该方法之后会进入阻塞状态直到所有的线程都调用await()方法到达障点才会被唤醒。当CyclicBarrier内部的count为0时,调用await()方法不会进入阻塞状态。

await(long timeout, TimeUnit unit):该方法与await()方法作用一样,只是可以设置阻塞等待的时间,超时没有被唤醒将退出阻塞状态。

isBroken():返回barrier的broken状态,某个线程执行await()方法进入阻塞状态,如果被中断了isBroken()方法将返回true。也即是线程的中断将会导致CyclicBarrier被broken,被broken的CyclicBarrier此时不能再使用必须reset,如果此时线程调用了await()方法将抛出异常BrokenBarrierException。

reset():中断当前barrier,并重新生成Generation。

 

CyclicBarrier和CountDownLatch区别

  • CountDownLatch的await()线程会等待计数器减为0,而执行CyclicBarrier的await()方法会使线程进入阻塞等待其他线程到达障点。

  • CountDownLatch计数器不能重置,CyclicBarrier可以重置循环利用。

  • CountDownLatch是基于AQS的共享模式实现的,CyclicBarrier是基于ReentrantLock和Condition实现的。

  • CountDownLatch不会让子线程进入阻塞,CyclicBarrier会使所有子线程进入阻塞。

我是勾勾,一直在努力的程序媛,感谢您的点赞、转发和关注!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/148959
推荐阅读
  

闽ICP备14008679号