当前位置:   article > 正文

JUC源码阅读之CyclicBarrier-AQS的典型实现(五)_berrier aqs实现

berrier aqs实现

1、类注释摘抄

对于失败的同步尝试,CyclicBarrier 使用了一种要么全部要么全不 (all-or-none) 的破坏模式:如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么在该屏障点等待的其他所有线程也将通过 BrokenBarrierException(如果它们几乎同时被中断,则用 InterruptedException)以反常的方式离开。

2、源码中的主要方法

  1. /**
  2. * Each use of the barrier is represented as a generation instance.
  3. * The generation changes whenever the barrier is tripped, or
  4. * is reset. There can be many generations associated with threads
  5. * using the barrier - due to the non-deterministic way the lock
  6. * may be allocated to waiting threads - but only one of these
  7. * can be active at a time (the one to which {@code count} applies)
  8. * and all the rest are either broken or tripped.
  9. * There need not be an active generation if there has been a break
  10. * but no subsequent reset.
  11. *每个barrier都有一个Generation。当barrier被tripped或者重置时,Generation会变化
  12. *使用barrier的线程会和许多Generation相关,因为不确定的方式,锁可能被分配给等待线程。
  13. *但是每次只有一个在活动,其余的都会被破坏或者tripped。如果有中断但是没有后继重置,不需要激活新的一代
  14. */
  15. private static class Generation {
  16. boolean broken = false;
  17. }
  18. /** The lock for guarding barrier entry */
  19. //barrier的入口锁
  20. private final ReentrantLock lock = new ReentrantLock();
  21. /** Condition to wait on until tripped */
  22. //Condition等待tripped
  23. private final Condition trip = lock.newCondition();
  24. /** The number of parties */
  25. //通过构造器传入的参数,表示总的等待线程的数量。
  26. private final int parties;
  27. /* The command to run when tripped */
  28. //当屏障正常打开后运行的程序,通过最后一个调用await的线程来执行。
  29. private final Runnable barrierCommand;
  30. /** The current generation */
  31. private Generation generation = new Generation();
  32. /**
  33. * Number of parties still waiting. Counts down from parties to 0
  34. * on each generation. It is reset to parties on each new
  35. * generation or when broken.
  36. *还在等待的线程数量。每一代的Conuts都会从总的等待数量下降到0
  37. 每启动新的一代,或者broken时会重置为总等待数量。
  38. */
  39. private int count;
  40. /**
  41. * Updates state on barrier trip and wakes up everyone.
  42. * Called only while holding lock.
  43. *当barrier trip的时候更新state。唤醒每一个
  44. *只有当拥有锁的时候调用。
  45. */
  46. private void nextGeneration() {
  47. // signal completion of last generation
  48. //通知上一代完成,
  49. trip.signalAll();
  50. // set up next generation设置新的一代。
  51. count = parties;
  52. generation = new Generation();
  53. }
  54. /**
  55. * Sets current barrier generation as broken and wakes up everyone.
  56. * Called only while holding lock.
  57. *把当前barrier的一代设置为broken,唤醒每一个。
  58. *拥有锁的时候调用。
  59. */
  60. private void breakBarrier() {
  61. generation.broken = true;
  62. count = parties;
  63. trip.signalAll();
  64. }
  65. /**
  66. * Main barrier code, covering the various policies.
  67. *主要的barrier代码,覆盖了各种策略
  68. */
  69. private int dowait(boolean timed, long nanos)
  70. throws InterruptedException, BrokenBarrierException,
  71. TimeoutException {
  72. final ReentrantLock lock = this.lock;
  73. lock.lock();
  74. try {
  75. final Generation g = generation;
  76. if (g.broken)//如果是broken状态,则抛出异常
  77. throw new BrokenBarrierException();
  78. if (Thread.interrupted()) {//如果线程是中断状态,则唤醒所有,generation的broken为true,抛出异常
  79. breakBarrier();
  80. throw new InterruptedException();
  81. }
  82. int index = --count;//减少当前count
  83. if (index == 0) { // tripped如果count当前为0,则证明已经是trripped状态,运行之前输入的Runnable
  84. boolean ranAction = false;
  85. try {
  86. final Runnable command = barrierCommand;
  87. if (command != null)
  88. command.run();
  89. ranAction = true;
  90. nextGeneration();//通知所有线程,继续下一代的设置。
  91. return 0;
  92. } finally {
  93. if (!ranAction)
  94. breakBarrier();
  95. }
  96. }
  97. // loop until tripped, broken, interrupted, or timed out
  98. //除非tripped,中断或者超时,否则循环
  99. for (;;) {
  100. try {
  101. if (!timed)//如果没设置超时状态,则等待
  102. trip.await();//await()会释放锁,这样下一个线程就可以继续dowait()
  103. else if (nanos > 0L)
  104. nanos = trip.awaitNanos(nanos);
  105. } catch (InterruptedException ie) {
  106. if (g == generation && ! g.broken) {
  107. breakBarrier();
  108. throw ie;
  109. } else {
  110. // We're about to finish waiting even if we had not
  111. // been interrupted, so this interrupt is deemed to
  112. // "belong" to subsequent execution.
  113. Thread.currentThread().interrupt();
  114. }
  115. }
  116. if (g.broken)
  117. throw new BrokenBarrierException();
  118. //因为别的线程是最后一个await的,也就是会调用nextGeneration();生成一个新的generation
  119. //所以正常情况下g会不等于generation,执行返回index
  120. if (g != generation)
  121. return index;
  122. if (timed && nanos <= 0L) {
  123. breakBarrier();
  124. throw new TimeoutException();
  125. }
  126. }
  127. } finally {
  128. lock.unlock();
  129. }
  130. }


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/148975
推荐阅读
  

闽ICP备14008679号