赞
踩
字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。
// 用于在线程到达屏障时,优先执行 barrierAction,
// 方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行)
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
// parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,
// 然后当前线程被阻塞。
public CyclicBarrier(int parties) {
this(parties, null);
}
/** 防止栅栏进入的锁 */
private final ReentrantLock lock = new ReentrantLock();
/** . 条件等待,直到触发* */
private final Condition trip = lock.newCondition();
/** 用于重置的屏障拦截的线程数量 */
private final int parties;
/* 触发时要运行的线程 */
private final Runnable barrierCommand;
/** 每一个栅栏代表一代 */
private Generation generation = new Generation();
/** 线程计数器 */
private int count;
//屏障 指定数量的线程全部调用await()方法时,这些线程不再阻塞
// BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
//循环 通过reset()方法可以进行重置
public void reset()
CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的场景。
每个线程执行完业务逻辑,就执行await方法进行等待
public class CyclicBarrierTest2 {
//保存每个学生的平均成绩
private ConcurrentHashMap<String, Integer> map=new ConcurrentHashMap<String,Integer>();
private ExecutorService threadPool= Executors.newFixedThreadPool(3);
private CyclicBarrier cb=new CyclicBarrier(3,()->{
int result=0;
Set<String> set = map.keySet();
for(String s:set){
result+=map.get(s);
}
System.out.println("三人平均成绩为:"+(result/3)+"分");
});
public void count(){
for(int i=0;i<3;i++){
threadPool.execute(new Runnable(){
@Override
public void run() {
//获取学生平均成绩
int score=(int)(Math.random()*40+60);
map.put(Thread.currentThread().getName(), score);
System.out.println(Thread.currentThread().getName()
+"同学的平均成绩为:"+score);
try {
//执行完运行await(),等待所有学生平均成绩都计算完毕
cb.await();
System.out.println(Thread.currentThread().getName()
+"同学线程执行完毕");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}
public static void main(String[] args) {
CyclicBarrierTest2 cb=new CyclicBarrierTest2();
cb.count();
}
执行结果
可以看到,在执行完CyclicBarrier的函数后,才会继续执行线程内等待后的代码
pool-1-thread-1同学的平均成绩为:84
pool-1-thread-3同学的平均成绩为:73
pool-1-thread-2同学的平均成绩为:97
三人平均成绩为:84分
pool-1-thread-2同学线程执行完毕
pool-1-thread-1同学线程执行完毕
pool-1-thread-3同学线程执行完毕
利用CyclicBarrier的计数器能够重置,屏障可以重复使用的特性,可以支持类似“人满发车”的场景
public class CyclicBarrierTest3 {
public static void main(String[] args) {
AtomicInteger counter = new AtomicInteger();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5, 5, 1000, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
(r) -> new Thread(r, counter.addAndGet(1) + " 号 "),
new ThreadPoolExecutor.AbortPolicy());
CyclicBarrier cyclicBarrier = new CyclicBarrier(5,
() -> System.out.println("裁判:比赛开始~~"));
for (int i = 0; i < 10; i++) {
threadPoolExecutor.submit(new Runner(cyclicBarrier));
}
}
static class Runner extends Thread{
private CyclicBarrier cyclicBarrier;
public Runner (CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
int sleepMills = ThreadLocalRandom.current().nextInt(1000);
Thread.sleep(sleepMills);
System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting());
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
}
}
CyclicBarrier 设置的5个位置,所以每次有5个选手了就比赛开始。等下一次又有5个会继续开始。
关注点:
1.一组线程在触发屏障之前互相等待,最后一个线程到达屏障后唤醒逻辑是如何实现的
2.删栏循环使用是如何实现的
3.条件队列到同步队列的转换实现逻辑
上述案例,计算3个人平均成绩代码案例分析
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
// 1、count-1如果为0,表示全部线程已经到达栅栏,触发barrierCommand线程执行
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
// 执行线程内逻辑
command.run();
ranAction = true;
// 2、等待队列转同步队列,并且唤醒(唤醒条件队列的线程),重置栅栏
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 3、不为0,即表示一般没有到达栅栏时,都会循环执行。直到触发、中断、中断或超时
for (;;) {
try {
if (!timed)
// 4、调用AQS的await方法,初始化条件队列,入队
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1、初始化条件等待队列或者插入尾节点
Node node = addConditionWaiter();
// 2、如果释放锁(如果这里不进行释放,在最初的地方那个lock,就成了死锁,其他线程无法进来)
int savedState = fullyRelease(node);
int interruptMode = 0;
// 3、不是同步队列则park,是同步队列就结束循环
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
(thread0)lock加独占锁,count-1,初始化条件等待队列并且入队,释放独占锁,然后阻塞线程0
(thread1)lock加独占锁,count-1,入队条件队列并且在尾节点,释放独占锁,然后阻塞线程1
(thread2)lock加独占锁,这时候count-1=0,代表到达栅栏处,条件队列出队操作,用do/while方式出队,
开始获取独占锁,出队的元素调用transferForSignal,初始化同步队列,插到同步队列尾部,然后阻塞,
然后(thread2)unlock解锁,唤醒(thread0)线程。
线程0倍唤醒后,执行acquireQueued方法,去CAS获取锁,同步队列出队,释放锁,然后唤醒线程1,最终线程1出队,至此3个线程全部执行完毕。
伪代码进行讲解
lock.lock();
try {
// 前半段:进入条件队列,释放独占锁,然后会阻塞线程,(thread0,thread1)
// 过度阶段:被其他调用signal/signalAll的线程(thread2)唤醒(前提,要在同步队里中)
// 调用signal/signalAll的线程(thread2),需要把条件队列转同步队列,可以在释放的时候锁的时候唤醒head后续节点所在的线程(thread0)
// 后半段:(thread0)获取独占锁(所以需要条件等待队列转换为同步队列),
// 然后(thread0)最终在finally 里释放掉这个独占锁(唤醒同步队里head的后续节点所在的节点所在的线程(thread1))
// 后半段其实就是独占锁的逻辑
await();
} finally {
// 唤醒的是头同步对队列的节点绑定的线程
lock.unlock();
}
// 前置步骤,需要条件队列转同步队列,唤醒线程,重新获取独占锁(唤醒和获取锁都是共性逻辑,方便共用)
signAll();
PS:
前半段:进入条件队列,释放独占锁,然后会阻塞线程,(thread0,thread1)
过度阶段:被其他调用signal/signalAll的线程(thread2)唤醒(前提,要在同步队里中)
调用signal/signalAll的线程(thread2),需要把条件队列转同步队列,可以在释放的时候锁的时候唤醒head后续节点所在的线程(thread0)
后半段:(thread0)获取独占锁(所以需要条件等待队列转换为同步队列),
然后(thread0)最终在finally 里释放掉这个独占锁(唤醒同步队里head的后续节点所在的节点所在的线程(thread1))
后半段其实就是独占锁的逻辑,唤醒的是头同步对队列的节点绑定的线程
为什么搞这么复杂?
因为CyclicBarrier是按照管城模型进行设计的。
一定要有条件队列和同步队列等相关的。
CyclicBarrier是通过ReentrantLock的"独占锁"和Conditon来实现一组线程的阻塞唤醒的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。