当前位置:   article > 正文

JAVA多线程(四)——AQS_aqs single

aqs single

什么是AQS

AQS全称AbstractQueuedSynchronizer,它是一个静态类,它规定了一组操作流程,它的子类可以根据自己的具体实现来丰富各自得方法。AQS其实更多的是一种思想,由这个思想在JUC包中产生了好几个并发工具类,如:ReentrantLock,CountDownLatch,Semaphore等。
在了解AQS之前我先说几个关键的名词 state(volatile修饰的)、线程节点、队列(双向队列)、CAS,记住这几个词语,这都是AQS的关键点。
AQS是一种模板方法,对于模板方法设计模式可以参考我的一篇文章:模板方法

AQS源码解读

首先我们来看它的入口方法,也就是核心方法acquire:
在这里插入图片描述
我们再来看上面的英文介绍:
在这里插入图片描述
接下来我们来看acquire方法的内容,其中它调用了三个方法 tryAcquire、acquireQueued、addWaiter以及selfInterrupt。注意acquire方法是final修饰。
tryAcquire方法是尝试获得锁的意思;
acquireQueued方法是如果线程tryAcquire没有获得锁,当把线程包装成一个节点后,再尝试获得锁——记住这一步很重要
addWaiter方法是将线程包装成一个节点放入等待队列,放入的过程是CAS操作。
selfInterrupt方法看名字是安全中断线程的意思。
好了,我们知道AQS是一种思想,它是利用模板模式实现的,那么上述这几个方法最主要的实现就在它的子类里面,我们先来看看ReentrantLock这个类,使用这个类来感受AQS的魅力:

ReentrantLock

在这里插入图片描述
我们发现ReentrantLock没有直接继承AbstractQueuedSynchronizer,而是它内部的一个final变量sync类,Sync这个类才是继承了AbstractQueuedSynchronizer,接着往下看,还有两个类FairSync和NonFairSync前面是公平后面是非公平,这两个类都继承了Sync这个类;在往下看就是ReentrantLock的两个构造函数,无参构造默认new一个非公平的Sync,有参构造可以根据boolean类型来指定是否公平——所以这里就可以看出ReentrantLock与Synchronized第一个区别就是ReentrantLock可以指定公平锁和非公平锁,而Synchronized只是非公平锁。
我们在使用ReentrantLock时候,是通过lock()加锁unlock()解锁,所以我们先来看它默认非公平锁的lock方法:
在这里插入图片描述
首先调用compareAndSetState方法,注意了这里开始用到了state这个参数,从方法名可以看出它是通过CAS操作将state这个值由0变成1,如果操作成功则说明获得锁,执行setExclusiveOwnerThread方法将当前线程设置成锁的所有者,如果操作失败,则调用acquire方法,说明这里就走到了我们上面说的AbstractQueuedSynchronizer中的acquire方法。所以我们这个时候跟踪代码到NonFairTryAcquire方法
在这里插入图片描述
参照代码阅读:获得当前线程,获得state,如果state=0,尝试获得锁,成功设置锁的拥有者,返回true退出;如果state!=0,判断用当前线程和锁拥有者线程比较,如果当前线程是锁的拥有者,将原先state值加1,同时将新值赋值给state,返回true 退出;如果state既不是0同时线程又不是锁的拥有者,返回false,说明的锁失败。总结:返回true表示获得锁成功,那就不需要执行后面的方法了;返回false说明获得锁失败,需要将线程作为节点放入到队列。
从上面的代码阅读可以看出几点:1、ReentrantLock是可重入锁;2、state是用来记录锁重入了多少次;3、ReentrantLock是独占锁,
下面自己看公平锁下tryAcquire方法:
在这里插入图片描述
这里自己去阅读hasQueuedPredecessors方法,它的本质就是查看当前阻塞队列有没有线程在等待。
好了接下看addWaiter方法:
在这里插入图片描述
参考代码阅读:将线程包装成一个节点,获得tail(队列的尾节点)节点,将刚刚包装的线程节点里的前驱指针指向tail节点,compareAndSetTail方法将刚刚包装的线程节点设置成tail节点,设置成功后将原先的tail节点后驱指针指向刚刚包装的线程节点,返回这个新的线程节点。如果队列里tail节点为null,说明队列里没有等待的线程节点了,执行enq方法,enq方法:
在这里插入图片描述
代码阅读:首先自旋,如果获得的tail节点为空,new 一个节点设置为head节点;如果不为空,和addWaiter里一样设置好node节点,返回。
总结:这里出现了很多CAS操作,所以AQS一个很重要的组成部分就是CAS。

接下来就是分析acquireQueued方法了:
在这里插入图片描述
代码解读:首先设置了两个布尔型变量,开启自旋,获得当前节点(也就是刚刚包装好的线程节点)的上一个节点;如果上一个节点是head头节点也就是第一个节点,调用tryAcquire尝试获得锁,如果获得锁成功,将当前节点设置成头节点,将上一个头节点的next设置为空,看注释“help GC”说明当一个头节点的next设置为空,那么这个节点对象将会被回收;设置参数,返回false;进入下一个if判断,看名称shouldParkAfterFailedAcquire可以知道,当获得锁是需要干嘛,park——阻塞,所以这个方法返回true,parkAndCheckInterrupt方法表示,线程阻塞(LockSupport的park方法)同时返回了线程中断标识,两个判断都是true,返回true;finally里面,cancelAcquire方法表示取消获得锁。
通过这个方法我们来总结下:线程一开始进来没有获得锁,将线程包装成一个节点,放到队列的时候再自旋尝试获得锁,失败后才进入阻塞状态。
好了,上面就是AQS的实现原理,所以AQS是一种思想,具体的实现是根据它不同的实现模板来确认的,它只是维护了一套逻辑结构即它的acquire方法。记住AQS的重点:volatile,节点队列以及CAS。
这里补充个shouldParkAfterFailedAcquire方法里的内容,也算是重点:waitStatus——节点状态;
1、CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
2、SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
3、CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
4、PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
5、0:新结点入队时的默认状态。

ReentrantLock用法

上面讲完了AQS接下来就开始来看一些基于它实现的锁。
首先来看ReentrantLock的基本用法:

	package com.example.demo.thread;
	import java.util.concurrent.TimeUnit;  
	import java.util.concurrent.locks.ReentrantLock;  
	  
	/** 
	 * @author : sun 
	 * create at:  2020/4/26  20:16 
	 * @description: ReentrantLock 
	 */  
	public class TestReentrantLock {  
	  
	    static ReentrantLock lock = new ReentrantLock();  
	  
  	  void m() {  
	        try {  
	            lock.lock();  
	            System.out.println("获得了M方法的执行权——开始");  
	            TimeUnit.SECONDS.sleep(10);  
	            System.out.println("获得了M方法的执行权——结束");  
	        } catch (InterruptedException e) {  
	            e.printStackTrace();  
	        } finally {  
	            lock.unlock();  
	        }  
	    }  
	  
	    void n() {  
	        boolean locked = false;  
	     	 try {  
            	locked = lock.tryLock(3, TimeUnit.SECONDS);  
            	//只有获得锁后才能执行方法  
	            if(locked){  
	                System.out.println("获得了N方法的执行权");  
           		}  
       	 	} catch (InterruptedException e) {  
           		e.printStackTrace();  
	        } finally {  
            	if (locked){  
	                lock.unlock();  
	            	}  
	        }  
	}  
    public static void main(String[] args) {  
        TestReentrantLock testReentrantLock = new TestReentrantLock();  
	        new Thread(testReentrantLock::m).start();  
        new Thread(testReentrantLock::n).start();  
	    }  
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

lock加锁,unlock解锁,其中unlock需要放在finally里。

上面的代码执行结果是:
获得了M方法的执行权——开始
获得了M方法的执行权——结束

我们接着看这段代码:

package com.example.demo.thread.lock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author : sun
 * create at:  2020/4/26  20:16
 * @description: ReentrantLock
 */
public class TestReentrantLock {

    static ReentrantLock lock = new ReentrantLock();

    void m() {
        try {
            lock.lock();
            System.out.println("获得了M方法的执行权——开始");
            TimeUnit.SECONDS.sleep(3);
            System.out.println("获得了M方法的执行权——结束");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    void n() {
        boolean locked = false;
        try {
            locked = lock.tryLock(10, TimeUnit.SECONDS);
            //只有获得锁后才能执行方法
            if(locked){
                System.out.println("获得了N方法的执行权");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (locked){
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) {
        TestReentrantLock testReentrantLock = new TestReentrantLock();
        new Thread(testReentrantLock::m).start();
        new Thread(testReentrantLock::n).start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

代码执行的结果是:
获得了M方法的执行权——开始
获得了M方法的执行权——结束
获得了N方法的执行权
虽然执行的结果如我们所料,但是“获得了N方法的执行权”这句话打印出现,不是10秒后,它是在第一个线程释放锁后就立马出现的,说明tryLock不会阻塞所填参数时间,当其他线程释放锁后会立马尝试获得锁。
同时ReentrantLock可以通过构造函数来指定它是一个公平锁。

ReentrantLock实现生产者与消费者

接下来介绍ReentrantLock一个非常重要的用法,它结合Condition来实现生产和消费者,上代码:

package com.example.demo.thread.lock;

import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author : sun
 * create at:  2020/5/6  21:21
 * @description: 多线程实现生产和消费者
 */
public class MyCondition01<T> {
    LinkedList<T> list = new LinkedList<>();
    final static int MAX_LENGTH = 10;
    static ReentrantLock lock = new ReentrantLock();
    Condition producter = lock.newCondition();
    Condition consumer = lock.newCondition();

    public void put(T t){
        try {
            lock.lock();
            if(list.size() == MAX_LENGTH){
                producter.await();
            }
            list.add(t);
            System.out.println(Thread.currentThread().getName() + "--生产了!");
            consumer.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public T get(){
        T t = null;
        try {
            lock.lock();
            if(list.size() == 0){
                consumer.await();
            }
            t = list.removeFirst();
            System.out.println(Thread.currentThread().getName() + "--消费了!");
            producter.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
        return t;
    }

    public static void main(String[] args) {
        MyCondition01<Object> myCondition01 = new MyCondition01<>();

        for (int i = 0; i < 5; i++) {
            new Thread(()->{
                for (int j = 0; j < 4; j++) {
                    myCondition01.put(new Object());
                }
            },"生产者 " + i).start();
        }

        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        for (int i = 0; i < 2; i++) {
            new Thread(()->{
                for (int j = 0; j < 10; j++) {
                    myCondition01.get();
                }
            },"消费者 "+i).start();
        }
    }
}

class A<T>{
    private  static  ReentrantLock lock = new ReentrantLock();
    static Condition condition1 = lock.newCondition();
    static Condition condition2 = lock.newCondition();
    private final static int MAX_LENGTH = 10;
    LinkedList<T> list = new LinkedList<>();

    public void put(T t){
        try {
            lock.lock();
            if (list.size() == 10){
                condition1.await();
            }
            list.add(t);
            System.out.println(Thread.currentThread().getName()+" 放入");
            condition2.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public T get(){
        T t = null;
        try {
            lock.lock();
            if (list.size() ==0){
                condition2.await();
            }
            t = list.removeFirst();
            System.out.println(Thread.currentThread().getName()+" 拿出");
            condition1.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
        return t;
    }

    public static void main(String[] args) {
        A<Object> myCondition01 = new A<>();

        for (int i = 0; i < 5; i++) {
            new Thread(()->{
                for (int j = 0; j < 4; j++) {
                    myCondition01.put(new Object());
                }
            },"生产者 " + i).start();
        }

        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        for (int i = 0; i < 2; i++) {
            new Thread(()->{
                for (int j = 0; j < 10; j++) {
                    myCondition01.get();
                }
            },"消费者 "+i).start();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147

上面的代码实现了生产者和消费者,我们用wait和notifyAll也可以实现,但是这种方法好就好在他将生产者和消费者分开了,两个Condition,一个生产队列一个消费队列,当生产队列满了,让生产队列里的线程停止生产,当生产好了唤醒消费队列消费。这样子就比传统的wait和notifyAll更加效率,因为notifyAll是唤醒所有线程,如果生产满了,唤醒所有线程,这个时候又是生产线程获得锁,这样就降低了效率。
这里我用一张图,来展示一下AQS同步队列和Comdition等待队列的模型:
在这里插入图片描述
调用Condition的await方法,会将这个线程构造成一个新的节点放到等待队列(Condition队列)中去;调用Condition的single方法,会唤醒Condition队列中的头节点也就是等待时间最长的节点(singleAll方法就是唤醒所有节点,加入竞争),将它移动到同步队列中去,这个时候它还是需要去竞争锁的,它只是加入到同步队列里去了,这里一定要记住调用single方法线程是没有立马被唤醒执行的,只是把它从等待队列移到同步队列。
有兴趣的同学可以看下源码。

ReentrantLock和Synchronized的区别

  • Synchronized是关键字;ReentrantLock是一个类,它实现了Lock接口;

  • Synchronized是非公平锁;ReentrantLock可以指定其公平性;

  • Synchronized是通过锁升级来实现的;ReentrantLock是通过CAS和队列来实现的。

  • Synchronized用法单一;ReentrantLock使用更加灵活,同时也提供了很多方法比如:tryLock等方法

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/149169?site
推荐阅读
相关标签
  

闽ICP备14008679号