当前位置:   article > 正文

java 并发编程之AQS(并发界的扫地僧)_并发编程aqs

并发编程aqs

AQS是什么

所谓AQS,指的是AbstractQueuedSynchronizer,中文:抽象的队列式的同步器.它提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架,ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等并发类均是基于AQS来实现的,具体用法是通过继承AQS实现其模板方法,然后将子类作为同步组件的内部类。把它比喻成扫地僧是比较合理的,把线程相关技术比喻成一些武功秘籍,那么AQS就是作者,掌握了秘籍的灵魂.

AQS示意图

 

它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。这里volatile是核心关键词,具体volatile的语义,在此不述。state的访问方式有三种:

  • getState()
  • setState()
  • compareAndSetState()

  AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

  不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

  以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

  再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

  一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

用AQS自己模拟一个CountDownLatch

  1. package com.zhang.myjuc.a8.aqs;
  2. import java.util.concurrent.locks.AbstractQueuedSynchronizer;
  3. /**
  4. * MyCountDownLatch:自己用AQS实现一个简单的线程协作器,就相当于一次性的CountDownLatch
  5. *
  6. * @author zhangxiaoxiang
  7. * @date 2020/08/26
  8. */
  9. public class MyCountDownLatch {
  10. private final Sync sync = new Sync();
  11. /**
  12. * 以共享模式发布。如果tryReleaseShared返回true,则通过解除一个或多个线程的阻塞来实现。
  13. */
  14. public void signal() {
  15. sync.releaseShared(0);
  16. }
  17. /**
  18. * 以共享模式获取,忽略中断。首先至少调用一次tryacquiremred,成功后返回。否则,线程会排队,
  19. * 可能会反复阻塞和解除阻塞,调用tryacquiremred直到成功。
  20. */
  21. public void await() {
  22. sync.acquireShared(0);
  23. }
  24. /**
  25. * 内部类
  26. */
  27. private class Sync extends AbstractQueuedSynchronizer {
  28. /**
  29. * 尝试在共享模式中获取。此方法应该查询对象的状态是否允许在共享模式下获取它,如果允许,则获取它
  30. *
  31. * @param arg
  32. * @return
  33. */
  34. @Override
  35. protected int tryAcquireShared(int arg) {
  36. return (getState() == 1) ? 1 : -1;
  37. }
  38. /**
  39. * 尝试设置状态以反映共享模式下的发布。 这个方法总是由执行释放的线程调用。
  40. *
  41. * @param arg
  42. * @return
  43. */
  44. @Override
  45. protected boolean tryReleaseShared(int arg) {
  46. setState(1);
  47. return true;
  48. }
  49. }
  50. //测试
  51. public static void main(String[] args) throws InterruptedException {
  52. MyCountDownLatch myCountDownLatch = new MyCountDownLatch();
  53. for (int i = 0; i < 10; i++) {
  54. new Thread(() -> {
  55. System.out.println(Thread.currentThread().getName() + "尝试获取latch,获取失败就在哪里等待");
  56. myCountDownLatch.await();
  57. System.out.println("开闸放行" + Thread.currentThread().getName() + "继续运行");
  58. }).start();
  59. }
  60. Thread.sleep(5000);
  61. myCountDownLatch.signal();
  62. new Thread(() -> {
  63. System.out.println(Thread.currentThread().getName() + "尝试获取latch,获取失败就在哪里等待");
  64. myCountDownLatch.await();
  65. System.out.println("开闸放行" + Thread.currentThread().getName() + "继续运行");
  66. }).start();
  67. }
  68. }

运行结果

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/149175
推荐阅读
相关标签
  

闽ICP备14008679号