当前位置:   article > 正文

一篇文章彻底搞懂AQS工具类之CyclicBarrier(深度剖析)_aqcirs

aqcirs


前言


一、CyclicBarrier是什么?

字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用
在这里插入图片描述

1、构造方法

// 用于在线程到达屏障时,优先执行 barrierAction,
// 方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行)
public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

// parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,
// 然后当前线程被阻塞。   
public CyclicBarrier(int parties) {
        this(parties, null);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2、属性

    /** 防止栅栏进入的锁 */
    private final ReentrantLock lock = new ReentrantLock();
    /** . 条件等待,直到触发* */
    private final Condition trip = lock.newCondition();
    /** 用于重置的屏障拦截的线程数量 */
    private final int parties;
    /* 触发时要运行的线程 */
    private final Runnable barrierCommand;
    /** 每一个栅栏代表一代 */
    private Generation generation = new Generation();

    /** 线程计数器 */
    private int count;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3、使用方法

//屏障 指定数量的线程全部调用await()方法时,这些线程不再阻塞
// BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException

//循环  通过reset()方法可以进行重置
public void reset()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4、使用场景

CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的场景
每个线程执行完业务逻辑,就执行await方法进行等待

public class CyclicBarrierTest2 {

    //保存每个学生的平均成绩
    private ConcurrentHashMap<String, Integer> map=new ConcurrentHashMap<String,Integer>();

    private ExecutorService threadPool= Executors.newFixedThreadPool(3);

    private CyclicBarrier cb=new CyclicBarrier(3,()->{
        int result=0;
        Set<String> set = map.keySet();
        for(String s:set){
            result+=map.get(s);
        }
        System.out.println("三人平均成绩为:"+(result/3)+"分");
    });


    public void count(){
        for(int i=0;i<3;i++){
            threadPool.execute(new Runnable(){

                @Override
                public void run() {
                    //获取学生平均成绩
                    int score=(int)(Math.random()*40+60);
                    map.put(Thread.currentThread().getName(), score);
                    System.out.println(Thread.currentThread().getName()
                            +"同学的平均成绩为:"+score);
                    try {
                        //执行完运行await(),等待所有学生平均成绩都计算完毕
                        cb.await();
                        System.out.println(Thread.currentThread().getName()
                                +"同学线程执行完毕");
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }

            });
        }
    }

    public static void main(String[] args) {
        CyclicBarrierTest2 cb=new CyclicBarrierTest2();
        cb.count();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

执行结果
可以看到,在执行完CyclicBarrier的函数后,才会继续执行线程内等待后的代码

pool-1-thread-1同学的平均成绩为:84
pool-1-thread-3同学的平均成绩为:73
pool-1-thread-2同学的平均成绩为:97
三人平均成绩为:84分
pool-1-thread-2同学线程执行完毕
pool-1-thread-1同学线程执行完毕
pool-1-thread-3同学线程执行完毕
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

利用CyclicBarrier的计数器能够重置,屏障可以重复使用的特性,可以支持类似“人满发车”的场景

public class CyclicBarrierTest3 {

    public static void main(String[] args) {

        AtomicInteger counter = new AtomicInteger();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                5, 5, 1000, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100),
                (r) -> new Thread(r, counter.addAndGet(1) + " 号 "),
                new ThreadPoolExecutor.AbortPolicy());

        CyclicBarrier cyclicBarrier = new CyclicBarrier(5,
                () -> System.out.println("裁判:比赛开始~~"));

        for (int i = 0; i < 10; i++) {
            threadPoolExecutor.submit(new Runner(cyclicBarrier));
        }

    }
    static class Runner extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Runner (CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            try {
                int sleepMills = ThreadLocalRandom.current().nextInt(1000);
                Thread.sleep(sleepMills);
                System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting());
                cyclicBarrier.await();

            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

CyclicBarrier 设置的5个位置,所以每次有5个选手了就比赛开始。等下一次又有5个会继续开始。


二、核心源码分析

关注点:
1.一组线程在触发屏障之前互相等待,最后一个线程到达屏障后唤醒逻辑是如何实现的
2.删栏循环使用是如何实现的
3.条件队列到同步队列的转换实现逻辑

上述案例,计算3个人平均成绩代码案例分析

2.1、CyclicBarrier-await()-等待方法

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
            // 1、count-1如果为0,表示全部线程已经到达栅栏,触发barrierCommand线程执行
            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                    	// 执行线程内逻辑
                        command.run();
                    ranAction = true;
                    // 2、等待队列转同步队列,并且唤醒(唤醒条件队列的线程),重置栅栏
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // 3、不为0,即表示一般没有到达栅栏时,都会循环执行。直到触发、中断、中断或超时
            for (;;) {
                try {
                    if (!timed)
                    	// 4、调用AQS的await方法,初始化条件队列,入队
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();
                if (g != generation)
                    return index;
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

2.2、AQS-await等待方法(条件等待队列处理逻辑)

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 1、初始化条件等待队列或者插入尾节点
            Node node = addConditionWaiter();
                        // 2、如果释放锁(如果这里不进行释放,在最初的地方那个lock,就成了死锁,其他线程无法进来)
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 3、不是同步队列则park,是同步队列就结束循环
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

三、模拟多个线程执行CyclicBarrier-await流程图分析

3.1、线程0-await逻辑

(thread0)lock加独占锁,count-1,初始化条件等待队列并且入队,释放独占锁,然后阻塞线程0
在这里插入图片描述

3.2、线程1-await逻辑

(thread1)lock加独占锁,count-1,入队条件队列并且在尾节点,释放独占锁,然后阻塞线程1
在这里插入图片描述

3.3、线程2-await逻辑-条件转同步队列的核心处理逻辑

(thread2)lock加独占锁,这时候count-1=0,代表到达栅栏处,条件队列出队操作,用do/while方式出队,
开始获取独占锁,出队的元素调用transferForSignal,初始化同步队列,插到同步队列尾部,然后阻塞,
然后(thread2)unlock解锁,唤醒(thread0)线程。
在这里插入图片描述

3.4、线程0-重新获取锁逻辑

线程0倍唤醒后,执行acquireQueued方法,去CAS获取锁,同步队列出队,释放锁,然后唤醒线程1,最终线程1出队,至此3个线程全部执行完毕。

在这里插入图片描述


四、为什么需要条件等待队列转换为同步队列?

伪代码进行讲解

 lock.lock();
try {
        // 前半段:进入条件队列,释放独占锁,然后会阻塞线程,(thread0,thread1)
        // 过度阶段:被其他调用signal/signalAll的线程(thread2)唤醒(前提,要在同步队里中)
        // 调用signal/signalAll的线程(thread2),需要把条件队列转同步队列,可以在释放的时候锁的时候唤醒head后续节点所在的线程(thread0)
        // 后半段:(thread0)获取独占锁(所以需要条件等待队列转换为同步队列),
        // 然后(thread0)最终在finally 里释放掉这个独占锁(唤醒同步队里head的后续节点所在的节点所在的线程(thread1))
        // 后半段其实就是独占锁的逻辑
        await();
        } finally {
            // 唤醒的是头同步对队列的节点绑定的线程
            lock.unlock();
        }
        
// 前置步骤,需要条件队列转同步队列,唤醒线程,重新获取独占锁(唤醒和获取锁都是共性逻辑,方便共用)
signAll(); 

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

PS:
前半段:进入条件队列,释放独占锁,然后会阻塞线程,(thread0,thread1)
过度阶段:被其他调用signal/signalAll的线程(thread2)唤醒(前提,要在同步队里中)
调用signal/signalAll的线程(thread2),需要把条件队列转同步队列,可以在释放的时候锁的时候唤醒head后续节点所在的线程(thread0)
后半段:(thread0)获取独占锁(所以需要条件等待队列转换为同步队列),
然后(thread0)最终在finally 里释放掉这个独占锁(唤醒同步队里head的后续节点所在的节点所在的线程(thread1))
后半段其实就是独占锁的逻辑,唤醒的是头同步对队列的节点绑定的线程


五、为什么CyclicBarrier不和CountDownLatch设计 用同一套逻辑处理?

为什么搞这么复杂?
因为CyclicBarrier是按照管城模型进行设计的。
一定要有条件队列和同步队列等相关的。

总结

CyclicBarrier是通过ReentrantLock的"独占锁"和Conditon来实现一组线程的阻塞唤醒的。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/149041
推荐阅读
相关标签
  

闽ICP备14008679号