赞
踩
AQS的核心思想为:如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是使用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。它是一个抽象类,主要是通过继承的方式来使用,只是定义了一套多线程访问共享资源的框架
CLH队列是一个虚拟的双向队列(不存在队列的实例,只存在节点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH队列的一个节点来实现锁得分配
AQS使用一个int的变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值得修改。可以通过三种方式来获取state变量getState() ,setState() ,compareAndSetState()
private volatile int state;//共享变量,使用volatile修饰保证线程可见性
Exclusive(独占):只有一个线程能执行,如ReentrantLock[又可分为公平锁与非公平锁两种方式]
Share(共享):多个线程可以同步执行,如Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock等等
AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法。
- isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
默认情况下,每个方法都抛出 UnsupportedOperationException
。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。
以ReentrantLock为例:state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
再以CountDownLatch以例:任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS(Compare and Swap)减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
其继承关系为(Sync为其的静态内部类):
AQS的实现依赖内部的同步队列,也就是FIFO的双向队列(相当于一个同步队列),如果当前线程竞争锁失败,那么AQS会把当前线程以及等待状态信息构造成一个Node加入到同步队列中,同时再阻塞该线程。当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。
ConditionObject是AQS中定义的内部类,实现了Condition接口,ConditionObject是基于Lock实现的,在其内部通过对列来维护等待队列(条件队列,而非同步队列)。Contidion必须在lock的同步控制块中使用,调用Condition的signal方法并不代表线程可以马上执行,signal方法的作用是将线程所在的节点从等待队列中移除,然后加入到同步队列中,线程的执行始终都需要根据同步状态(即线程是否占有锁)。
当成功调用了Condition的await方法,此时线程拿到了锁,接下来需要将当前线程的节点从同步队列中移除,在await方法内部的addConditionWaiter执行后,节点加入到了等待队列中;之后await方法会通过isOnSyncQueue方法判断,节点是否在同步队列中;signal和signalAll方法就是将条件队列中的节点按顺序移除,并重新添加到同步队列。就是通过Condition来实现ReenTrantLock的多条件锁。
Node结点的结构:
- static final class Node {
- static final Node SHARED = new Node();
- static final Node EXCLUSIVE = null;
- static final int CANCELLED = 1;
- static final int SIGNAL = -1;
- static final int CONDITION = -2;
- static final int PROPAGATE = -3;
- volatile int waitStatus;
- volatile Node prev; //前驱节点
- volatile Node next; //后继节点
- volatile Thread thread;//当前线程
- Node nextWaiter; //存储在condition队列中的后继节点
- //是否为共享锁
- final boolean isShared() {
- return nextWaiter == SHARED;
- }
-
- final Node predecessor() throws NullPointerException {
- Node p = prev;
- if (p == null)
- throw new NullPointerException();
- else
- return p;
- }
-
- Node() { // Used to establish initial head or SHARED marker
- }
- //将线程构造成一个Node,添加到等待队列
- Node(Thread thread, Node mode) { // Used by addWaiter
- this.nextWaiter = mode;
- this.thread = thread;
- }
- //这个方法会在Condition队列使用,后续单独写一篇文章分析condition
- Node(Thread thread, int waitStatus) { // Used by Condition
- this.waitStatus = waitStatus;
- this.thread = thread;
- }
- }
synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源.Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信 号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore(信号量)可以指定多个线程同时访问某个资源。主要方法为:acquire,tryAcquire,release等
Semaphore构造方法:
- public Semaphore(int permits) { //非公平锁
- sync = new NonfairSync(permits);
- }
-
- public Semaphore(int permits, boolean fair) { //公平锁
- sync = fair ? new FairSync(permits) : new NonfairSync(permits);
- }
-
实现:
- public class SemaphoreExample1 {
- // 请求的数量
- private static final int threadCount = 550;
-
- public static void main(String[] args) throws InterruptedException {
- // 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
- ExecutorService threadPool = Executors.newFixedThreadPool(300);
- // 一次只能允许执行的线程数量。
- final Semaphore semaphore = new Semaphore(20);
-
- for (int i = 0; i < threadCount; i++) {
- final int threadnum = i;
- threadPool.execute(() -> {// Lambda 表达式的运用
- try {
- semaphore.acquire();// 获取一个许可,所以可运行线程数量为20/1=20
- test(threadnum);
- semaphore.release();// 释放一个许可
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- });
- }
- threadPool.shutdown();
- System.out.println("finish");
- }
-
- public static void test(int threadnum) throws InterruptedException {
- Thread.sleep(1000);// 模拟请求的耗时操作
- System.out.println("threadnum:" + threadnum);
- Thread.sleep(1000);// 模拟请求的耗时操作
- }
- }
CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。
CountDownLatch 的两种典型用法:
①某一线程在开始运行前等待n个线程执行完毕。将 CountDownLatch 的计数器初始化为n :new CountDownLatch(n)
,每当一个任务线程执行完毕,就将计数器减1 countdownlatch.countDown()
,当计数器的值变为0时,在CountDownLatch上 await()
的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
②实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 CountDownLatch
对象,将其计数器初始化为 1 :new CountDownLatch(1)
,多个线程在开始执行任务前首先 coundownlatch.await()
,当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒。
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是 CyclicBarrier(int parties)
,其参数表示屏障拦截的线程数量,每个线程调用await
方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
另外,CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction)
,用于在线程到达屏障时,优先执行barrierAction.
CountDownLatch是计数器,只能使用一次,而CyclicBarrier的计数器提供reset功能,可以多次使用。
对于CountDownLatch来说,重点是“一个线程(多个线程)等待”,而其他的N个线程在完成“某件事情”之后,可以终止,也可以等待。而对于CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。CountDownLatch是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而CyclicBarrier更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。
于2020/09/28完成
参考链接:
https://www.imooc.com/article/258571
https://segmentfault.com/a/1190000017372067
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。