当前位置:   article > 正文

java 控件组序号_java中的JUC组件(Semaphore、CountDownLatch、CyclicBarrier)

java 组件编号

[toc]

1、简介

Semaphore、CountDownLatch、CyclicBarrier 这三个工具类都是用于并发控制的操作,底层都是基于AQS去实现的;

Semaphore(信号量): 提供一个竞争资源处理的工具,当系统内有足够的信号量事,线程可以去获取信号量执行操作,当信号量资源被使用完后,需要等待资源释放后后续线程(资源竞争)才能够执行;

CountDownLatch(闭锁):可以理解为一个开关,闭锁创建的时候,可以指定一个闭锁打开依赖条件个数,只有满足条件的资源就绪时,闭锁打开,对应的线程可以执行后续任务;

CyclicBarrier(栅栏):当所有依赖栅栏的线程执行到关键节点的时候,都需要等待其他线程到达栅栏点,否则无法继续执行,所有线程到达栅栏的关键点,这些线程即可继续执行后续任务。

2、Semaphore

semaphore在代码中使用的执行流程简图:

ce7f23ace83ba0355cccf2a12798df2c.png

其本质就是操作系统的P-V操作,当资源足够的时候线程获得资源并执行,资源不足时线程等待或者退出,当资源被释放时线程又可以获取竞争资源继续执行;

2.1、源码实现:

daa3c7cee2e15049f27b546dd5aa50d9.png

Semaphore类的代码结构如上图,可以看到其内部实现了公平锁和非公平锁,所以我们可以使用这两种不同的模式来构建Semaphore对象实例;

不论我们使用公平与非公平锁,其初始化最终都会调用到sync的构造方法; 下面可以看看内部的获取资源以及释放资源的具体实现:

public class Semaphore implements java.io.Serializable {

abstract static class Sync extends AbstractQueuedSynchronizer{

//初始化

Sync(int permits) {

setState(permits);

}

}

public void acquire(int permits) throws InterruptedException {

if (permits < 0) throw new IllegalArgumentException();

sync.acquireSharedInterruptibly(permits);

}

public void release() {

sync.releaseShared(1);

}

}

sync.acquireSharedInterruptibly(permits) 最终会调用的AQS的实现,对于AQS中该实现简单说明:如果这里能够获取到资源,内部就返回success,这里会扣减permits的值,任务继续执行,否则,将当前线程加入等待队列中(具体参考AbstractQueuedSynchronizer.acquireSharedInterruptibly());

3、CountDownLatch

CountDownLatch在代码中使用的执行流程简图:

29ce3c4daacc5717cc7fca83abb85dc3.png

3.1、源码实现

349256bb9f4cd56aa96f567e8878d50e.png

public class CountDownLatch {

private static final class Sync extends AbstractQueuedSynchronizer {

protected int tryAcquireShared(int acquires) {

return (getState() == 0) ? 1 : -1;

}

}

public CountDownLatch(int count) {

if (count < 0) throw new IllegalArgumentException("count < 0");

this.sync = new Sync(count);

}

public void await() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}

public void countDown() {

sync.releaseShared(1);

}

}

当某个线程调用await()方法时由于CountDownLatch.Sync中的实现tryAcquireShared会判断state是否等于0,如果不等于,就会进入等待队列,直到countDown调用sync.releaseShared(1)使得sync的状态到0,await的线程才会继续执行;

4、CyclicBarrier

CyclicBarrier在代码中使用的执行流程简图:

96555c15a3e2f411c029d1722b8de848.png

4.1、源码实现

0a45896c1f11bb57084ea881ab19c1b0.png

public class CyclicBarrier {

public int await() throws InterruptedException, BrokenBarrierException {

try {

return dowait(false, 0L);

} catch (TimeoutException toe) {

throw new Error(toe); // cannot happen

}

}

private static class Generation {

boolean broken = false;

}

private int dowait(boolean timed, long nanos)

throws InterruptedException, BrokenBarrierException,

TimeoutException {

final ReentrantLock lock = this.lock;

lock.lock();

try {

//保存当前generation 副本

final Generation g = generation;

//如果当前副本已推出,抛出异常

if (g.broken)

throw new BrokenBarrierException();

//如果当前线程已中断抛出异常

if (Thread.interrupted()) {

breakBarrier();

throw new InterruptedException();

}

//计算当前是否所有线程到达“栅栏”

int index = --count;

if (index == 0) { // tripped

//所有线程到达栅栏

boolean ranAction = false;

try {

//如果初始化了自定方法,触发

final Runnable command = barrierCommand;

if (command != null)

command.run();

ranAction = true;

//唤醒所有到达“栅栏”的线程,并跳到下一个新的generation

nextGeneration();

return 0;

} finally {

if (!ranAction)

breakBarrier();

}

}

// loop until tripped, broken, interrupted, or timed out

//如果还有线程没有到达栅栏

for (;;) {

try {

//根据设置的等待时间进行等待

//里面会调用LockSupport.park(this);

//将当前线程放入等待队列中

if (!timed)

trip.await();

else if (nanos > 0L)

nanos = trip.awaitNanos(nanos);

} catch (InterruptedException ie) {

//异常处理流程

if (g == generation && ! g.broken) {

breakBarrier();

throw ie;

} else {

// We're about to finish waiting even if we had not

// been interrupted, so this interrupt is deemed to

// "belong" to subsequent execution.

Thread.currentThread().interrupt();

}

}

if (g.broken)

throw new BrokenBarrierException();

//当前线程geration 不等于实际 geration 正常返回

if (g != generation)

return index;

//超过等待时间所有线程还未到达“栅栏”,抛出异常

if (timed && nanos <= 0L) {

breakBarrier();

throw new TimeoutException();

}

}

} finally {

lock.unlock();

}

}

//跳到下一个geration操作

private void nextGeneration() {

// signal completion of last generation

trip.signalAll();

// set up next generation

count = parties;

generation = new Generation();

}

}

CyclicBarrier 维护了一个计数器,和一个 generation 每次调用await都会有将计数器减一,并且产生一个新的 generation ,只要计数器不为零,所有前置线程都会触发 ((Condition)trip).await(); 内部会调用 LockSupport.park(this); 方法将线程加入等待队列,知道所有线程就绪,会调用 trip.signalAll(); 唤醒所有线程,同时执行一个用户自定义的 Runnable 策略

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/148994
推荐阅读
相关标签
  

闽ICP备14008679号