当前位置:   article > 正文

Java并发成神系列(8)—AQS工具用法和对应源码分析_aqsscf-

aqsscf-

目录

1.AQS原理

2.ReentrantLock

2.1 非公平锁加锁流程

1)加锁流程

2)解锁流程

2.2 可重入原理

2.3 可打断原理

2.4 公平锁实现

2.5 条件变量实现原理

3.ReentranReadWriteLock(读写锁)

4.Semaphore(信号量)

4.1 Semaphore基本使用

4.2 Semaphore基本原理

5.CountdownLatch

1)基本用法

2)应用

6.CycliBarrier


1.AQS原理

1)AbstractQueuedSynchronizer(全称):是阻塞式锁和相关的同步器工具的框架(基础)

2)AQS特点:

①用state属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁:

  • getState - 获取 state 状态

  • setState - 设置 state 状态

  • compareAndSetState - 利用cas 机制设置 state 状态

  • 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源

②提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList

条件变量来实现等待、唤醒机制,支持多个条件变量,其中每个条件变量都类似于 Monitor 的 WaitSet

3)子类主要实现下面一些方法(如果子类没有覆盖,默认是会抛异常的)

  • tryAcquire

  • tryRelease

  • tryAcquireShared

  • tryReleaseShared

  • isHeldExclusively:是否持有锁

  1. // 获取锁的姿势
  2. // 如果获取锁失败
  3. if (!tryAcquire(arg)) {
  4. // 入队, 可以选择阻塞当前线程 park unpark
  5. }
  6. // 释放锁的姿势
  7. // 如果释放锁成功
  8. if (tryRelease(arg)) {
  9. // 让阻塞线程恢复运行
  10. }

2.ReentrantLock

可以看到RentranLock也继承了Lock这个接口,然后定义了一个同步器Sync(同样继承自AQS类),但同步器的实现有两个,一个是非公平锁,一个是公平锁的!

 

2.1 非公平锁加锁流程

1)加锁流程

主要思路:

①没有竞争时,直接加锁成功;

②有竞争时就要去重试加锁(阻塞前会重试多次),

③没有成功的话,会创造一个Node队列(有一个head)

RentranLock的构造器的默认方法就是非公平锁的。

  1. public ReentrantLock() {
  2. sync = new NonfairSync();
  3. }

详细流程分析:

①首先,没有竞争时,直接加锁成功,设置Owner为当前线程:

 ②如果有竞争(有别的线程),CAS加锁失败,会调acquire方法:

进入tryAcquire逻辑:

会再判断一次能否加锁成功;失败则会进入addWaiter逻辑,构造Node队列:

其中,黄色三角的0表示Node的waitStatus 状态, 0 为默认正常状态;-1则表示其有责任唤醒后面的Node。

然后,当前线程进入 acquire方法的 acquireQueued 逻辑:(acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞)

  • 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,我们这里设置这时 state 仍为 1,失败

  • 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false

  • shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败

  • 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true

  • 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示已经阻塞)

 

2)解锁流程

主要思路:

①当前线程释放锁;

②唤醒离head最近的节点

③这个节点的线程重新去竞争锁(有可能被新来的线程竞争拿走)

④失败的话,重新去阻塞

详细流程:

首先,尝试去释放锁,释放锁成功的话,会进入unparkSuccessor逻辑:

unparkSuccessor中会找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行;当然这时又有两种情况,它加锁成功或者不成功。

  • 如果加锁成功(没有竞争),会设置 (acquireQueued 方法中)

  1. exclusiveOwnerThread 为 Thread-1,state = 1

  2. head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread

  3. 原本的 head 因为从链表断开,而可被垃圾回收

  • 如果这时候有其它线程来竞争(非公平的体现),而且被Thread-4抢走了:

  1. Thread-4 被设置为 exclusiveOwnerThread,state = 1

  2. Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞

2.2 可重入原理

思路:加锁时让状态自增(加几次锁,就自增几次);解锁时让状态state自减;

  1. static final class NonfairSync extends Sync {
  2. // ...
  3. // Sync 继承过来的方法, 方便阅读, 放在此处
  4. final boolean nonfairTryAcquire(int acquires) {
  5. final Thread current = Thread.currentThread();
  6. int c = getState();
  7. if (c == 0) {
  8. if (compareAndSetState(0, acquires)) {
  9. setExclusiveOwnerThread(current);
  10. return true;
  11. }
  12. }
  13. // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
  14. else if (current == getExclusiveOwnerThread()) {
  15. // state++,加一次自增一次
  16. int nextc = c + acquires;
  17. if (nextc < 0) // overflow
  18. throw new Error("Maximum lock count exceeded");
  19. setState(nextc);
  20. return true;
  21. }
  22. return false;
  23. }
  24. // Sync 继承过来的方法, 方便阅读, 放在此处
  25. protected final boolean tryRelease(int releases) {
  26. // state--,解锁时自减
  27. int c = getState() - releases;
  28. if (Thread.currentThread() != getExclusiveOwnerThread())
  29. throw new IllegalMonitorStateException();
  30. boolean free = false;
  31. // 支持锁重入, 只有 state 减为 0, 才释放成功
  32. if (c == 0) {
  33. free = true;
  34. setExclusiveOwnerThread(null);
  35. }
  36. setState(c);
  37. return free;
  38. }
  39. }

2.3 可打断原理

不可打断模式:

在这个模式下,即使被其它线程打断了,它任然会驻留在AQS队列中,一直等它获得锁之后才知道自己原来被打断过,然后继续运行(仅是设置一下打断标记);

可打断模式:

在park的线程在被打断后,会以抛出异常的形式停止;

2.4 公平锁实现

线程在尝试获取锁时会先检查在队列中是否有更靠前的节点线程存在!

  1. // 与非公平锁主要区别在于 tryAcquire 方法的实现
  2. protected final boolean tryAcquire(int acquires) {
  3. final Thread current = Thread.currentThread();
  4. int c = getState();
  5. if (c == 0) {
  6. // 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
  7. if (!hasQueuedPredecessors() &&
  8. compareAndSetState(0, acquires)) {
  9. setExclusiveOwnerThread(current);
  10. return true;
  11. }
  12. }
  13. else if (current == getExclusiveOwnerThread()) {
  14. int nextc = c + acquires;
  15. if (nextc < 0)
  16. throw new Error("Maximum lock count exceeded");
  17. setState(nextc);
  18. return true;
  19. }
  20. return false;
  21. }

2.5 条件变量实现原理

每个条件变量其实就对应着一个等待队列,它的是实现类就是ConditionObject

1)await流程

①持有锁的T0线程调用await()之后,会进入ConditionObject 的addConditionWaiter流程,创建新的Node状态为-2(Node.CONDITION),将T0加入队列尾部。

 ②AQS的释放锁步骤:fullyRelease 流程(把重入的state都减掉)

 ③竞争锁,并阻塞T0:Park

2)signal流程

①首先,进入ConditionObject 的 doSignal 流程:取得等待队列中第一个 Node,也就是T0

②执行 transferForSignal 流程:

将该 Node 加入 AQS 队列尾部,将 T0 的 waitStatus 改为 0,T3 的waitStatus 改为 -1

3.ReentranReadWriteLock(读写锁)

1.使用场景:

读操作远远高于写操作时,这时候可以采用读写锁(可以让读-读操作并发)提高性能。而读-写和写-写则是互斥的。

应用:读锁可以保护read方法,写锁可以保护write方法;

  1. class DataContainer {
  2. private Object data;
  3. private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
  4. private ReentrantReadWriteLock.ReadLock r = rw.readLock();
  5. private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
  6. public Object read() {
  7. log.debug("获取读锁...");
  8. r.lock();
  9. try {
  10. log.debug("读取");
  11. sleep(1);
  12. return data;
  13. } finally {
  14. log.debug("释放读锁...");
  15. r.unlock();
  16. }
  17. }
  18. public void write() {
  19. log.debug("获取写锁...");
  20. w.lock();
  21. try {
  22. log.debug("写入");
  23. sleep(1);
  24. } finally {
  25. log.debug("释放写锁...");
  26. w.unlock();
  27. }
  28. }
  29. }

使用注意事项:

1)读锁不支持条件变量;

2)不支持重入时升级锁:也就是持有读锁的情况下去获取写锁(不支持),会导致写锁永久等待

  1. r.lock();
  2. try {
  3. // ...
  4. w.lock();
  5. try {
  6. // ...
  7. } finally{
  8. w.unlock();
  9. }
  10. } finally{
  11. r.unlock();
  12. }

3)支持锁重入时降级:也就是持有写锁的情况下去获取读锁

4.Semaphore(信号量

信号量(Semaphore): 用来限制能同时访问共享资源的线程上限;

4.1 Semaphore基本使用

  1. public static void main(String[] args) {
  2. // 1. 创建 semaphore 对象,在这也就是有3个许可!
  3. Semaphore semaphore = new Semaphore(3);
  4. // 2. 10个线程同时运行
  5. for (int i = 0; i < 10; i++) {
  6. new Thread(() -> {
  7. // 3. 获取许可
  8. try {
  9. semaphore.acquire();
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. try {
  14. log.debug("running...");
  15. sleep(1);
  16. log.debug("end...");
  17. } finally {
  18. // 4. 释放许可
  19. semaphore.release();
  20. }
  21. }).start();
  22. }
  23. }

应用场景:

1)使用 Semaphore 限流:在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,但是它只适合限制单机线程数量,并且仅是限制线程数,而不是限制资源数,例如连接数,请对比 Tomcat LimitLatch 的实现)

2)用 Semaphore 实现简单连接池: 性能和可读性更好;数据库连接池中的线程数和数据库连接数是相等的;

4.2 Semaphore基本原理

比喻:Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位

竞争许可: acquire

acquire->acquireSharedInterruptibly(1)->tryAcquireShared(1)->nonfairTryAcquireShared(1),如果资源用完了,返回负数,tryAcquireShared返回负数,表示失败。否则返回正数,tryAcquireShared返回正数,表示成功。

  • 如果成功,获取信号量成功。

  • 如果失败,调用doAcquireSharedInterruptibly,进入for循环:

    • 如果当前驱节点为头节点,调用tryAcquireShared尝试获取锁

      • 如果结果大于等于0,表明获取锁成功,调用setHeadAndPropagate,将当前节点设为头节点,之后又调用doReleaseShared,唤醒后继节点。

    • 调用shoudParkAfterFailure,第一次调用返回false,并将前驱节点改为-1,第二次循环如果再进入此方法,会进入阻塞并检查打断的方法。

1)这里的permits(state)的state就是AQS的state!

 2)上述5个线程会来竞争这三个3个许可,当有3个线程竞争成功之后,剩下的线程就会进入AQS队列park阻塞。

释放许可:Release

3)如果此时有线程释放了许可(state+1),接下来靠近head节点的线程去竞争许可;如果竞争成功,又将state-1,然后将原本的头节点移除,将T0的这个节点变为头节点;

 

5.CountdownLatch

1)基本用法

一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;

CountdownLatch允许多个线程阻塞在一个地方,直至所有线程的任务都执行完毕;CountdownLatch是共享锁的一种实现,它默认构造 AQS 的 state 值为 count;

用途:

用来进行线程同步协作,等待所有线程完成倒计时。 其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一

相比于t.join(),CountdownLatch是更高层的API;t.join只能等待某个线程结束才行,但有时候线程池中的线程是不会结束的,所以CountdownLatch是更好的选择,可以和线程池结合使用;

  1. public static void main(String[] args) throws InterruptedException {
  2. CountDownLatch latch = new CountDownLatch(3);
  3. ExecutorService service = Executors.newFixedThreadPool(4);
  4. service.submit(() -> {
  5. log.debug("begin...");
  6. sleep(1);
  7. latch.countDown();
  8. log.debug("end...{}", latch.getCount());
  9. });
  10. service.submit(() -> {
  11. log.debug("begin...");
  12. sleep(1.5);
  13. latch.countDown();
  14. log.debug("end...{}", latch.getCount());
  15. });
  16. service.submit(() -> {
  17. log.debug("begin...");
  18. sleep(2);
  19. latch.countDown();
  20. log.debug("end...{}", latch.getCount());
  21. });
  22. service.submit(()->{
  23. try {
  24. log.debug("waiting...");
  25. latch.await();//计数减完前,在这阻塞;
  26. log.debug("wait end...");
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. });
  31. }

2)应用

①可以等待多个线程准备完毕:

比如王者5个玩家,必须等待5个玩家都准备完毕,主线程才能继续运行;

②可以等待多个远程调用结束

future更合适线程之间交换结果;

6.CycliBarrier

多个线程互相等待,直到达到同一个同步点,再继续一起执行。

CycliBarrier(循环栅栏):也是用来进行线程合作,等待线程满足某个计数

构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行(调用await的这些方法都阻塞)等待,当等待的线程数满足『计数个数』时,(这么多等待的线程)才继续执行。

它的特点是它可以重用!(而CountdownLatech重复进行同步的话,需要创建多个CountdownLatech对象)

  1. CyclicBarrier cb = new CyclicBarrier(2); // 个数为2时才会继续执行
  2. new Thread(()->{
  3. System.out.println("线程1开始.."+new Date());
  4. try {
  5. cb.await(); // 当个数不足时,等待
  6. } catch (InterruptedException | BrokenBarrierException e) {
  7. e.printStackTrace();
  8. }
  9. System.out.println("线程1继续向下运行..."+new Date());
  10. }).start();
  11. new Thread(()->{
  12. System.out.println("线程2开始.."+new Date());
  13. try { Thread.sleep(2000); } catch (InterruptedException e) { }
  14. try {
  15. cb.await(); // 2 秒后,线程个数够2,继续运行
  16. } catch (InterruptedException | BrokenBarrierException e) {
  17. e.printStackTrace();
  18. }
  19. System.out.println("线程2继续向下运行..."+new Date());
  20. }).start();

注意:

1)CountDownLatch的计数和阻塞方法是分开的两个方法,而CyclicBarrier是一个方法。

2)CyclicBarrier中最好把线程数和任务数设为一致!

和CountDownLatch的区别:

对于 CountDownLatch 来说,重点是“一个线程(多个线程)等待”,而其他的 N 个线程在完成“某件事情”之后,可以终止,也可以等待。而对于 CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。

CountDownLatch 是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而

CyclicBarrier 更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/149103
推荐阅读
相关标签
  

闽ICP备14008679号