当前位置:   article > 正文

并发编程(十):AQS之CyclicBarrier_barrier与aqs

barrier与aqs

一,底层AQS源码分析:并发编程(四):AbstractQueuedSynchronizer源码分析

二,CyclicBarrier介绍

    1,线程处理

        * CyclicBarrier 的字面意思就是循环屏障。在一组线程到达循环屏障时阻塞,直到最后一个线程到达屏障时,屏障才会放开,让所有线程执行。该类可以与 CountDownLatch 进行类比,功能基本一致。不同点在于 CyclicBarrier 的循环概念,CountDownLatch 为一次性屏障,放开之后不会再对后续线程进行拦截,CyclicBarrier 一次放开之后会把屏障值设置为初始状态,循环进行屏障!

    2,类图

        * 其实从类图中不能看到任何有用信息,CyclicBarrier 内部通过 ReentrantLock Condition 进行控制,实现循环屏障

    3,常用API

  1. // 初始化重入锁进行线程加锁处理
  2. private final ReentrantLock lock = new ReentrantLock();
  3. // 初始化Condition进行线程阻塞和线程唤醒处理
  4. private final Condition trip = lock.newCondition();
  5. // CyclicBarrier初始化许可量
  6. private final int parties;
  7. // 屏障放开前最后一个线程会执行的线程类(为null不执行)
  8. private final Runnable barrierCommand;
  9. // 表示一个循环屏障周期
  10. private Generation generation = new Generation();
  11. // 初始化CyclicBarrier,并传递屏障值
  12. public CyclicBarrier(int parties);
  13. // 初始化CyclicBarrier,并传递屏障值和barrierAction
  14. CyclicBarrier(int parties, Runnable barrierAction)

    4,功能DEMO

  1. package com.gupao;
  2. import java.util.concurrent.BrokenBarrierException;
  3. import java.util.concurrent.CyclicBarrier;
  4. /**
  5. * @author pj_zhang
  6. * @create 2019-09-15 11:33
  7. */
  8. public class CyclicBarrierTest {
  9. public static void main(String[] args) throws InterruptedException {
  10. // 初始化 CyclicBarrier,并传递count为3
  11. CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
  12. // 因为初始屏障为3,但线程数量定位6,所以对于 cyclicBarrier 来说,会有两次阻塞和两次释放
  13. for (int i = 0; i < 6; i++) {
  14. new Thread(() -> {
  15. System.out.println(Thread.currentThread().getName() + "正在执行");
  16. // 每一个线程内部等待
  17. try {
  18. cyclicBarrier.await();
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. } catch (BrokenBarrierException e) {
  22. e.printStackTrace();
  23. }
  24. System.out.println(Thread.currentThread().getName() + "执行完毕; " + System.currentTimeMillis());
  25. }, "thread_" + i).start();
  26. Thread.sleep(1000);
  27. }
  28. }
  29. }

三,源码分析

    1,初始化

        * CyclicBarrier(int parties)

  1. public CyclicBarrier(int parties) {
  2. // 直接通过方法重载调用
  3. this(parties, null);
  4. }

        * CyclicBarrier(int parties, Runnable barrierAction)

  1. public CyclicBarrier(int parties, Runnable barrierAction) {
  2. if (parties <= 0) throw new IllegalArgumentException();
  3. this.parties = parties;
  4. this.count = parties;
  5. // 初始化屏障前执行线程,没有则为空
  6. this.barrierCommand = barrierAction;
  7. }

    2,循环屏障

        * await(..):无论是显示等待还是不显示等待,最终都是殊途同归,调用同一个底层类

  1. public int await() throws InterruptedException, BrokenBarrierException {
  2. try {
  3. return dowait(false, 0L);
  4. } catch (TimeoutException toe) {
  5. throw new Error(toe); // cannot happen
  6. }
  7. }
  1. public int await(long timeout, TimeUnit unit)
  2. throws InterruptedException,
  3. BrokenBarrierException,
  4. TimeoutException {
  5. return dowait(true, unit.toNanos(timeout));
  6. }

        * dowait(boolean timed, long nanos)

  1. private int dowait(boolean timed, long nanos)
  2. throws InterruptedException, BrokenBarrierException,
  3. TimeoutException {
  4. // lock:重入锁,已在常量定义
  5. // 定义语句:private final ReentrantLock lock = new ReentrantLock();
  6. final ReentrantLock lock = this.lock;
  7. lock.lock();
  8. try {
  9. // 赋值当前循环屏障标识
  10. // 可以通过 reset() 方法对标识进行重置
  11. final Generation g = generation;
  12. // 循环屏障中断
  13. if (g.broken)
  14. throw new BrokenBarrierException();
  15. // 线程中断处理,
  16. if (Thread.interrupted()) {
  17. // 中断处理后打破屏障
  18. breakBarrier();
  19. throw new InterruptedException();
  20. }
  21. // 此处表示对循环屏障的屏障值递减,当减为0时屏障放开
  22. int index = --count;
  23. if (index == 0) { // tripped
  24. boolean ranAction = false;
  25. try {
  26. // 减为0,表示当前线程是屏障放开前最后一个线程
  27. // 由屏障放开前最后一个线程,执行初始化传递的屏障线程
  28. final Runnable command = barrierCommand;
  29. if (command != null)
  30. command.run();
  31. ranAction = true;
  32. // 构造新的循环屏障,
  33. nextGeneration();
  34. return 0;
  35. } finally {
  36. // 循环屏障释放异常,则打破屏障
  37. if (!ranAction)
  38. breakBarrier();
  39. }
  40. }
  41. // 前一步没有放开,说明屏障还需等待,线程阻塞
  42. for (;;) {
  43. try {
  44. // timed判断是限时阻塞还是直接阻塞
  45. if (!timed)
  46. // trip:Condition,已在常亮定义
  47. // 定义语句:private final Condition trip = lock.newCondition();
  48. trip.await();
  49. else if (nanos > 0L)
  50. // 此处返回剩余时间
  51. nanos = trip.awaitNanos(nanos);
  52. } catch (InterruptedException ie) {
  53. // 异常后打破屏障
  54. if (g == generation && ! g.broken) {
  55. breakBarrier();
  56. throw ie;
  57. } else {
  58. Thread.currentThread().interrupt();
  59. }
  60. }
  61. // 循环屏障中断处理
  62. if (g.broken)
  63. throw new BrokenBarrierException();
  64. // 标识被重置,不做任何处理
  65. if (g != generation)
  66. return index;
  67. // 限时等待超时处理
  68. if (timed && nanos <= 0L) {
  69. breakBarrier();
  70. throw new TimeoutException();
  71. }
  72. }
  73. } finally {
  74. lock.unlock();
  75. }
  76. }

        * breakBarrier():打破循环屏障

  1. private void breakBarrier() {
  2. // 设置参数true
  3. generation.broken = true;
  4. // 重置屏障值为初始值
  5. count = parties;
  6. // 通过Condition唤醒全部阻塞线程进行执行
  7. trip.signalAll();
  8. }

        * nextGeneration():说明当前屏障已经阻塞完成,进行下一次循环阻塞

  1. private void nextGeneration() {
  2. // 通过Condition唤醒全部阻塞线程
  3. trip.signalAll();
  4. // 重置屏障值为初始值
  5. count = parties;
  6. // 构建一个新的Generation,进行下一次循环
  7. generation = new Generation();
  8. }

    3,屏障重置

        * reset()

  1. public void reset() {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. // 打破循环屏障,已经被屏障的线程全部唤醒执行
  6. breakBarrier();
  7. // 构建下一个循环屏障
  8. nextGeneration();
  9. } finally {
  10. lock.unlock();
  11. }
  12. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/148925
推荐阅读
相关标签
  

闽ICP备14008679号