赞
踩
CyclicBarrier ,回环栅栏(循环屏障),通过它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。
构造方法
//parties表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
public CyclicBarrier(int parties) {
this(parties, null);
}
//用于在线程到达屏障时,优先执行 barrierAction,
//方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行)
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
重要方法
//指定数量的线程全部调用await()方法时,这些线程不再阻塞
// BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
//循环重置
public void reset() {}
CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的场景。
例子一 ,多个线程调用await之后阻塞,等到达到屏障拦截的线程数量之后,再一起执行
public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(3); for (int i = 0; i < 5; i++) { new Thread(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getName() + "开始等待其他线程"); cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + "开始执行"); //TODO 模拟业务处理 Thread.sleep(5000); System.out.println(Thread.currentThread().getName() + "执行完毕"); } catch (Exception e) { e.printStackTrace(); } } }).start(); } }
Thread-0开始等待其他线程
Thread-2开始等待其他线程
Thread-1开始等待其他线程
Thread-4开始等待其他线程
Thread-3开始等待其他线程
Thread-2开始执行
Thread-1开始执行
Thread-0开始执行
例子二 用于多线程计算数据,最后合并计算结果的场景
//保存每个学生的平均成绩 private ConcurrentHashMap<String, Integer> map=new ConcurrentHashMap<String,Integer>(); private ExecutorService threadPool= Executors.newFixedThreadPool(3); private CyclicBarrier cb=new CyclicBarrier(3,()->{ int result=0; Set<String> set = map.keySet(); for(String s:set){ result+=map.get(s); } System.out.println("三人平均成绩为:"+(result/3)+"分"); }); public void count(){ for(int i=0;i<3;i++){ threadPool.execute(new Runnable(){ @Override public void run() { //获取学生平均成绩 int score=(int)(Math.random()*40+60); map.put(Thread.currentThread().getName(), score); System.out.println(Thread.currentThread().getName() +"同学的平均成绩为:"+score); try { //执行完运行await(),等待所有学生平均成绩都计算完毕 cb.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }); } } public static void main(String[] args) { CyclicBarrierTest2 cb=new CyclicBarrierTest2(); cb.count(); }
pool-1-thread-3同学的平均成绩为:62
pool-1-thread-1同学的平均成绩为:74
pool-1-thread-2同学的平均成绩为:85
三人平均成绩为:73分
例子三 利用CyclicBarrier的计数器能够重置,屏障可以重复使用的特性
public static void main(String[] args) { AtomicInteger counter = new AtomicInteger(); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 5, 5, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), (r) -> new Thread(r, counter.addAndGet(1) + " 号 "), new ThreadPoolExecutor.AbortPolicy()); CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("裁判:比赛开始~~")); for (int i = 0; i < 10; i++) { threadPoolExecutor.submit(new Runner(cyclicBarrier)); } } static class Runner extends Thread{ private CyclicBarrier cyclicBarrier; public Runner (CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { try { int sleepMills = ThreadLocalRandom.current().nextInt(1000); Thread.sleep(sleepMills); System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting()); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } } }
3 号 选手已就位, 准备共用时: 223ms0
2 号 选手已就位, 准备共用时: 315ms1
5 号 选手已就位, 准备共用时: 471ms2
1 号 选手已就位, 准备共用时: 556ms3
4 号 选手已就位, 准备共用时: 923ms4
裁判:比赛开始~~
3 号 选手已就位, 准备共用时: 285ms0
2 号 选手已就位, 准备共用时: 413ms1
1 号 选手已就位, 准备共用时: 533ms2
5 号 选手已就位, 准备共用时: 661ms3
4 号 选手已就位, 准备共用时: 810ms4
裁判:比赛开始~~
以例子以为例,thread0 从await方法开始,await会调用dowait
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
那么dowait里又做了什么
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; //如果状态为0就将其修改为1,并设置当前线程,调用await时外面肯定是lock lock.lock(); try { //每一个栈栏算是一代 final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //每个线程执行一次,count自减一 int index = --count; // count减到0,说明几个线程都到达屏障,就会重置 进入下一个屏障 if (index == 0) { boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) //trip就是一个条件队列condition,入条件等待队列,单向链表结构 trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
入队阻塞的方法在await()中,下面看一下
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //入队 创建节点 Node node = addConditionWaiter(); // 释放锁,这样其他线程才能获取锁,执行 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { //不在当前队列中就阻塞 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
addConditionWaiter中创建节点,对于thread0,头节点是自己,lastWaiter也是自己
/** * 添加条件等待节点 */ private Node addConditionWaiter() { Node t = lastWaiter; //节点取消 则移除. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //创建一个节点,thread=Thread.currentThread(),waitstatus= Node.CONDITION =-2 Node node = new Node(Thread.currentThread(), Node.CONDITION); //如果上一个节点为null,则将该节点设置为头节点 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
释放锁
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //释放锁时state变为0了 if (c == 0) { free = true; //将当前线程设置为null setExclusiveOwnerThread(null); } //修改state状态值 setState(c); return free; }
thread0的大致流程
首先会调用lock.lock进行加锁,加锁之后调用trip.await方法进行入队阻塞,入队是通过addConditionWaiter添加进条件等待队列,然后通过fullyRelease释放锁,设置当前线程为null,然后修改state状态值,最后调用LockSupport.park(this);进行阻塞。
threa1的流程大致和thread0一样,还没有达到屏障数量,入队的地方和thread0不一样
private Node addConditionWaiter() { Node t = lastWaiter; //节点取消 则移除. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //创建一个节点,thread=Thread.currentThread(),waitstatus= Node.CONDITION =-2 Node node = new Node(Thread.currentThread(), Node.CONDITION); //如果上一个节点为null,则将该节点设置为头节点 if (t == null) firstWaiter = node; else //thread1执行时,t已经不为null了 t.nextWaiter = node; //lastWaiter指向当前 lastWaiter = node; return node; }
thread1流程
thread2执行的时候count已经减到0,会执行nextGeneration方法
//每个线程执行一次,count自减一 int index = --count; // count减到0,说明几个线程都到达屏障,就会重置 进入下一个屏障 if (index == 0) { boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //开启下一代屏障 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } private void nextGeneration() { // 唤醒所有线程,唤醒操作是在同步等待队列中,所以要将条件等待队列转换为同步等待队列 trip.signalAll(); // 重置count count = parties; //创建下一代屏障 generation = new Generation(); } public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } private void doSignalAll(Node first) { //将firstWaiter和lastWaiter都置为null,就没有首尾节点了 lastWaiter = firstWaiter = null; //条件队列的出队 do { //循环将条件队列转同步队列 Node next = first.nextWaiter; //first的nextWaiter置为null first.nextWaiter = null; //条件队列转同步队列,因为唤醒是在同步队列中 transferForSignal(first); first = next; } while (first != null); }
条件队列的出队
/** * 条件队列转同步队列 */ final boolean transferForSignal(Node node) { //将同步状态改为0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; //将ws设置为-1 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
同步队列入队
thread2开始解锁
private int dowait(boolean timed, long nanos){ //.... } finally { lock.unlock(); } } public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //唤醒线程0 unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) //将ws设置为0 compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //唤醒线程0 LockSupport.unpark(s.thread); }
Thread0唤醒之后就会获取锁,执行业务逻辑然后再释放锁
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 在这里获取锁,再释放锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } //CAS尝试获取锁 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //cas获取锁 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
同步队列就会出队
Thread0唤醒之后,执行业务逻辑之后会unlock,又会唤醒Thread1,Thread1唤醒之后,又会唤醒Thread2。
总结:
await方法,
前半段 释放锁 进入条件队列,阻塞线程(Thread0 Thread1),
过渡阶段 其他线程调用singnal/signalAll唤醒(Thread2),条件队列转同步队列,可以在释放锁的时候唤醒head的后续节点所在的线程
后半段 (Thread0)被唤醒的线程获取锁(如果有竞争,CAS获取锁失败,还会阻塞),Thread0释放锁,唤醒同步队列中head的后续节点所在的线程(独占锁的逻辑)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。