赞
踩
所谓AQS,指的是AbstractQueuedSynchronizer,中文:抽象的队列式的同步器.它提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架,ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等并发类均是基于AQS来实现的,具体用法是通过继承AQS实现其模板方法,然后将子类作为同步组件的内部类。把它比喻成扫地僧是比较合理的,把线程相关技术比喻成一些武功秘籍,那么AQS就是作者,掌握了秘籍的灵魂.
AQS示意图
它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。这里volatile是核心关键词,具体volatile的语义,在此不述。state的访问方式有三种:
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
用AQS自己模拟一个CountDownLatch
- package com.zhang.myjuc.a8.aqs;
-
- import java.util.concurrent.locks.AbstractQueuedSynchronizer;
-
- /**
- * MyCountDownLatch:自己用AQS实现一个简单的线程协作器,就相当于一次性的CountDownLatch
- *
- * @author zhangxiaoxiang
- * @date 2020/08/26
- */
- public class MyCountDownLatch {
- private final Sync sync = new Sync();
-
- /**
- * 以共享模式发布。如果tryReleaseShared返回true,则通过解除一个或多个线程的阻塞来实现。
- */
- public void signal() {
- sync.releaseShared(0);
- }
-
- /**
- * 以共享模式获取,忽略中断。首先至少调用一次tryacquiremred,成功后返回。否则,线程会排队,
- * 可能会反复阻塞和解除阻塞,调用tryacquiremred直到成功。
- */
- public void await() {
- sync.acquireShared(0);
- }
-
- /**
- * 内部类
- */
- private class Sync extends AbstractQueuedSynchronizer {
- /**
- * 尝试在共享模式中获取。此方法应该查询对象的状态是否允许在共享模式下获取它,如果允许,则获取它
- *
- * @param arg
- * @return
- */
- @Override
- protected int tryAcquireShared(int arg) {
- return (getState() == 1) ? 1 : -1;
- }
-
- /**
- * 尝试设置状态以反映共享模式下的发布。 这个方法总是由执行释放的线程调用。
- *
- * @param arg
- * @return
- */
- @Override
- protected boolean tryReleaseShared(int arg) {
- setState(1);
- return true;
- }
- }
- //测试
- public static void main(String[] args) throws InterruptedException {
- MyCountDownLatch myCountDownLatch = new MyCountDownLatch();
- for (int i = 0; i < 10; i++) {
- new Thread(() -> {
- System.out.println(Thread.currentThread().getName() + "尝试获取latch,获取失败就在哪里等待");
- myCountDownLatch.await();
- System.out.println("开闸放行" + Thread.currentThread().getName() + "继续运行");
- }).start();
- }
- Thread.sleep(5000);
- myCountDownLatch.signal();
- new Thread(() -> {
- System.out.println(Thread.currentThread().getName() + "尝试获取latch,获取失败就在哪里等待");
- myCountDownLatch.await();
- System.out.println("开闸放行" + Thread.currentThread().getName() + "继续运行");
- }).start();
-
- }
- }
运行结果
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。