赞
踩
* CyclicBarrier 的字面意思就是循环屏障。在一组线程到达循环屏障时阻塞,直到最后一个线程到达屏障时,屏障才会放开,让所有线程执行。该类可以与 CountDownLatch 进行类比,功能基本一致。不同点在于 CyclicBarrier 的循环概念,CountDownLatch 为一次性屏障,放开之后不会再对后续线程进行拦截,CyclicBarrier 一次放开之后会把屏障值设置为初始状态,循环进行屏障!
* 其实从类图中不能看到任何有用信息,CyclicBarrier 内部通过 ReentrantLock 和 Condition 进行控制,实现循环屏障
- // 初始化重入锁进行线程加锁处理
- private final ReentrantLock lock = new ReentrantLock();
- // 初始化Condition进行线程阻塞和线程唤醒处理
- private final Condition trip = lock.newCondition();
- // CyclicBarrier初始化许可量
- private final int parties;
- // 屏障放开前最后一个线程会执行的线程类(为null不执行)
- private final Runnable barrierCommand;
- // 表示一个循环屏障周期
- private Generation generation = new Generation();
- // 初始化CyclicBarrier,并传递屏障值
- public CyclicBarrier(int parties);
- // 初始化CyclicBarrier,并传递屏障值和barrierAction
- CyclicBarrier(int parties, Runnable barrierAction)
- package com.gupao;
-
- import java.util.concurrent.BrokenBarrierException;
- import java.util.concurrent.CyclicBarrier;
-
- /**
- * @author pj_zhang
- * @create 2019-09-15 11:33
- */
- public class CyclicBarrierTest {
-
- public static void main(String[] args) throws InterruptedException {
- // 初始化 CyclicBarrier,并传递count为3
- CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
- // 因为初始屏障为3,但线程数量定位6,所以对于 cyclicBarrier 来说,会有两次阻塞和两次释放
- for (int i = 0; i < 6; i++) {
- new Thread(() -> {
- System.out.println(Thread.currentThread().getName() + "正在执行");
- // 每一个线程内部等待
- try {
- cyclicBarrier.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (BrokenBarrierException e) {
- e.printStackTrace();
- }
- System.out.println(Thread.currentThread().getName() + "执行完毕; " + System.currentTimeMillis());
- }, "thread_" + i).start();
- Thread.sleep(1000);
- }
- }
- }
* CyclicBarrier(int parties)
- public CyclicBarrier(int parties) {
- // 直接通过方法重载调用
- this(parties, null);
- }
* CyclicBarrier(int parties, Runnable barrierAction)
- public CyclicBarrier(int parties, Runnable barrierAction) {
- if (parties <= 0) throw new IllegalArgumentException();
- this.parties = parties;
- this.count = parties;
- // 初始化屏障前执行线程,没有则为空
- this.barrierCommand = barrierAction;
- }
* await(..):无论是显示等待还是不显示等待,最终都是殊途同归,调用同一个底层类
- public int await() throws InterruptedException, BrokenBarrierException {
- try {
- return dowait(false, 0L);
- } catch (TimeoutException toe) {
- throw new Error(toe); // cannot happen
- }
- }
- public int await(long timeout, TimeUnit unit)
- throws InterruptedException,
- BrokenBarrierException,
- TimeoutException {
- return dowait(true, unit.toNanos(timeout));
- }
* dowait(boolean timed, long nanos)
- private int dowait(boolean timed, long nanos)
- throws InterruptedException, BrokenBarrierException,
- TimeoutException {
- // lock:重入锁,已在常量定义
- // 定义语句:private final ReentrantLock lock = new ReentrantLock();
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- // 赋值当前循环屏障标识
- // 可以通过 reset() 方法对标识进行重置
- final Generation g = generation;
- // 循环屏障中断
- if (g.broken)
- throw new BrokenBarrierException();
- // 线程中断处理,
- if (Thread.interrupted()) {
- // 中断处理后打破屏障
- breakBarrier();
- throw new InterruptedException();
- }
- // 此处表示对循环屏障的屏障值递减,当减为0时屏障放开
- int index = --count;
- if (index == 0) { // tripped
- boolean ranAction = false;
- try {
- // 减为0,表示当前线程是屏障放开前最后一个线程
- // 由屏障放开前最后一个线程,执行初始化传递的屏障线程
- final Runnable command = barrierCommand;
- if (command != null)
- command.run();
- ranAction = true;
- // 构造新的循环屏障,
- nextGeneration();
- return 0;
- } finally {
- // 循环屏障释放异常,则打破屏障
- if (!ranAction)
- breakBarrier();
- }
- }
-
- // 前一步没有放开,说明屏障还需等待,线程阻塞
- for (;;) {
- try {
- // timed判断是限时阻塞还是直接阻塞
- if (!timed)
- // trip:Condition,已在常亮定义
- // 定义语句:private final Condition trip = lock.newCondition();
- trip.await();
- else if (nanos > 0L)
- // 此处返回剩余时间
- nanos = trip.awaitNanos(nanos);
- } catch (InterruptedException ie) {
- // 异常后打破屏障
- if (g == generation && ! g.broken) {
- breakBarrier();
- throw ie;
- } else {
- Thread.currentThread().interrupt();
- }
- }
-
- // 循环屏障中断处理
- if (g.broken)
- throw new BrokenBarrierException();
-
- // 标识被重置,不做任何处理
- if (g != generation)
- return index;
-
- // 限时等待超时处理
- if (timed && nanos <= 0L) {
- breakBarrier();
- throw new TimeoutException();
- }
- }
- } finally {
- lock.unlock();
- }
- }
* breakBarrier():打破循环屏障
- private void breakBarrier() {
- // 设置参数true
- generation.broken = true;
- // 重置屏障值为初始值
- count = parties;
- // 通过Condition唤醒全部阻塞线程进行执行
- trip.signalAll();
- }
* nextGeneration():说明当前屏障已经阻塞完成,进行下一次循环阻塞
- private void nextGeneration() {
- // 通过Condition唤醒全部阻塞线程
- trip.signalAll();
- // 重置屏障值为初始值
- count = parties;
- // 构建一个新的Generation,进行下一次循环
- generation = new Generation();
- }
* reset()
- public void reset() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- // 打破循环屏障,已经被屏障的线程全部唤醒执行
- breakBarrier();
- // 构建下一个循环屏障
- nextGeneration();
- } finally {
- lock.unlock();
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。