赞
踩
重入锁ReentrantLock,顾名思义,就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。除此之外,该锁的还支持获取锁时的公平和非公平性选择。
对于独占锁(Mutex),考虑如下场景:当一个线程调用Mutex的lock()方法获取锁之后,如果再次调用lock()方法,则该线程将会被自己所阻塞,原因是Mutex在实现tryAcquire(int acquires)方法时没有考虑占有锁的线程再次获取锁的场景,而在调用tryAcquire(int acquires)方法时返回了false,导致该线程被阻塞。简单地说,Mutex是一个不支持重进入的锁。
synchronized关键字隐式的支持重进入,比如一个synchronized修饰的递归方法,在方法执行时,执行线程在获取了锁之后仍能连续多次地获得该锁,而不像Mutex由于获取了锁,而在下一次获取锁时出现阻塞自己的情况。
ReentrantLock虽然没能像synchronized关键字一样支持隐式的重进入,但是在调用lock()方法时,已经获取到锁的线程,能够再次调用lock()方法获取锁而不被阻塞。
重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞,该特性的实现需要解决以下两个问题。
1)线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
2)锁的最终释放。线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于0时表示锁已经成功释放。
ReentrantLock是通过组合自定义同步器来实现锁的获取与释放。我们以非公平锁为例:
- public class ReentrantLock implements Lock, java.io.Serializable {
- private final Sync sync;
-
- ......
- abstract static class Sync extends AbstractQueuedSynchronizer {
- private static final long serialVersionUID = -5179523762034025860L;
-
- abstract void lock();//抽象方法
-
- final boolean nonfairTryAcquire(int acquires) {//非公平的获取锁
- final Thread current = Thread.currentThread();
- int c = getState();
- if (c == 0) {//首次获取同步状态
- if (compareAndSetState(0, acquires)) {//只要设置成功就获取到锁
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- else if (current == getExclusiveOwnerThread()) {//再次获取同步状态(可重入的关键)
- //如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。
- int nextc = c + acquires;
- if (nextc < 0) // overflow
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
-
- protected final boolean tryRelease(int releases) {
- int c = getState() - releases;
- if (Thread.currentThread() != getExclusiveOwnerThread())
- throw new IllegalMonitorStateException();
- boolean free = false;
- if (c == 0) {//当同步状态为0时,将占有线程设置为null
- free = true;
- setExclusiveOwnerThread(null);
- }
- setState(c);//更新同步状态
- return free;
- }
- ......
- }
- static final class NonfairSync extends Sync {
- private static final long serialVersionUID = 7316153563782823691L;
-
- /**Performs lock. Try immediate barge, backing up to normal acquire on failure. */
- final void lock() {
- if (compareAndSetState(0, 1))//首次获取锁成功
- setExclusiveOwnerThread(Thread.currentThread());
- else
- acquire(1);//申请加锁
- }
-
- protected final boolean tryAcquire(int acquires) {
- return nonfairTryAcquire(acquires);//非公平获取锁
- }
- }
- ......
- public void lock() {
- sync.lock();
- }
- public void unlock() {
- sync.release(1);
- }
- ......
- }

- import java.util.Random;
- import java.util.concurrent.locks.ReentrantLock;
-
- public class ReentrantLockTest {
- private static Random r=new Random(47);
- private static int threadCount=10;
- private static ReentrantLock mut=new ReentrantLock();
- private static class Weight implements Runnable{//给苹果称重的任务
- String name;
- public Weight(String name){
- this.name=name;
- }
- @Override
- public void run() {
- mut.lock();
- System.out.println(name+"放苹果!");
- System.out.println(name+"重量:"+(r.nextInt(10)+3));
- System.out.println(name+"取苹果!");
- if(r.nextInt()%2==0){run();}//递归调用
- mut.unlock();
- }
- }
- public static void main(String[] args) throws InterruptedException {
- Thread[] threads=new Thread[threadCount];
- for(int i=0;i<threadCount;i++){
- threads[i]=new Thread(new Weight("Weight-"+i),"Thread-"+i);
- }
- for(int i=0;i<threadCount;i++){
- threads[i].start();
- Thread.sleep(10);
- }
- }
- }

输出:
Weight-0放苹果!
Weight-0重量:11
Weight-0取苹果!
Weight-0放苹果!
Weight-0重量:6
Weight-0取苹果!
Weight-0放苹果!
Weight-0重量:4
Weight-0取苹果!
Weight-1放苹果!
Weight-1重量:11
Weight-1取苹果!
Weight-2放苹果!
Weight-2重量:5
Weight-2取苹果!
Weight-2放苹果!
Weight-2重量:11
Weight-2取苹果!
Weight-2放苹果!
Weight-2重量:4
Weight-2取苹果!
Weight-3放苹果!
Weight-3重量:12
Weight-3取苹果!
Weight-3放苹果!
Weight-3重量:11
Weight-3取苹果!
Weight-3放苹果!
Weight-3重量:3
Weight-3取苹果!
Weight-3放苹果!
Weight-3重量:9
Weight-3取苹果!
Weight-5放苹果!
Weight-5重量:4
Weight-5取苹果!
Weight-7放苹果!
Weight-7重量:7
Weight-7取苹果!
Weight-8放苹果!
Weight-8重量:9
Weight-8取苹果!
Weight-6放苹果!
Weight-6重量:3
Weight-6取苹果!
Weight-4放苹果!
Weight-4重量:7
Weight-4取苹果!
Weight-4放苹果!
Weight-4重量:3
Weight-4取苹果!
Weight-9放苹果!
Weight-9重量:5
Weight-9取苹果!
Weight-9放苹果!
Weight-9重量:6
Weight-9取苹果!
Weight-9放苹果!
Weight-9重量:7
Weight-9取苹果!
从输出中,可以看出可重入特性。如果,我们将可重入锁换成独占锁Mutex程序将会阻塞,不具有可重入性。
此外,我们还发现,线程的执行是乱序的(从线程名称的角度看),即与start()方法调用顺序不一致。这是为什么呢?
原来重入锁ReentrantLock默认采用非公平实现?那好,我们将可重入锁设置为公平锁:
private static ReentrantLock mut=new ReentrantLock(true);//设置为公平锁
输出:
Weight-0放苹果!
Weight-0重量:11
Weight-0取苹果!
Weight-0放苹果!
Weight-0重量:6
Weight-0取苹果!
Weight-0放苹果!
Weight-0重量:4
Weight-0取苹果!
Weight-1放苹果!
Weight-1重量:11
Weight-1取苹果!
Weight-2放苹果!
Weight-2重量:5
Weight-2取苹果!
Weight-2放苹果!
Weight-2重量:11
Weight-2取苹果!
Weight-2放苹果!
Weight-2重量:4
Weight-2取苹果!
Weight-3放苹果!
Weight-3重量:12
Weight-3取苹果!
Weight-3放苹果!
Weight-3重量:11
Weight-3取苹果!
Weight-3放苹果!
Weight-3重量:3
Weight-3取苹果!
Weight-3放苹果!
Weight-3重量:9
Weight-3取苹果!
Weight-7放苹果!
Weight-7重量:4
Weight-7取苹果!
Weight-6放苹果!
Weight-6重量:7
Weight-6取苹果!
Weight-4放苹果!
Weight-4重量:9
Weight-4取苹果!
Weight-5放苹果!
Weight-5重量:3
Weight-5取苹果!
Weight-8放苹果!
Weight-8重量:7
Weight-8取苹果!
Weight-8放苹果!
Weight-8重量:3
Weight-8取苹果!
Weight-9放苹果!
Weight-9重量:5
Weight-9取苹果!
Weight-9放苹果!
Weight-9重量:6
Weight-9取苹果!
Weight-9放苹果!
Weight-9重量:7
Weight-9取苹果!
从输出中我们看到,线程的执行顺序与对应start()方法被调用的顺序依然不一样,说好的公平锁呢?
原因分析:start()语句调用的顺序与线程进入Runnable状态的顺序不一定一致,也就是说先调用start()语句所对应的线程不一定先进入Runnable状态,即使先进入Runnable状态也不一定先分得处理器开始执行。
读写锁是一种特殊的自旋锁,它把对共享资源对访问者划分成了读者和写者,读者只对共享资源进行访问,写者则是对共享资源进行写操作。读写锁在ReentrantLock上进行了拓展使得该锁更适合读操作远远大于写操作对场景。一个读写锁同时只能存在一个写锁但是可以存在多个读锁,但不能同时存在写锁和读锁。
如果读写锁当前没有读者,也没有写者,那么写者可以立刻获的读写锁,否则必须自旋,直到没有任何的写锁或者读锁存在。如果读写锁没有写锁,那么读锁可以立马获取,否则必须等待写锁释放。(但是有一个例外,就是读写锁中的锁降级操作,当同一个线程获取写锁后,在写锁没有释放的情况下可以获取读锁再释放读锁这就是锁降级的一个过程)
- /**
- * 读写锁Demo
- */
- public class ReentrantReadWriteLockDemo {
-
- class MyObject {
- private Object object;
-
- private ReadWriteLock lock = new java.util.concurrent.locks.ReentrantReadWriteLock();
-
- public void get() throws InterruptedException {
- lock.readLock().lock(); //上读锁
- try {
- System.out.println(Thread.currentThread().getName() + "准备读取数据");
- Thread.sleep(new Random().nextInt(1000));
- System.out.println(Thread.currentThread().getName() + "读数据为:" + this.object);
- } finally {
- lock.readLock().unlock(); //释放读锁
- }
- }
-
- public void put(Object object) throws InterruptedException {
- lock.writeLock().lock(); //上写锁
- try {
- System.out.println(Thread.currentThread().getName() + "准备写数据");
- Thread.sleep(new Random().nextInt(1000));
- this.object = object;
- System.out.println(Thread.currentThread().getName() + "写数据为" + this.object);
- } finally {
- lock.writeLock().unlock(); //释放写锁
- }
- }
- }
-
- public static void main(String[] args) throws InterruptedException {
- final MyObject myObject = new ReentrantReadWriteLockDemo().new MyObject();
-
- class WorkerRead implements Runnable
- {
- private CountDownLatch latch;
-
- public WorkerRead(CountDownLatch latch) {
- this.latch = latch;
- }
-
- public void run()
- {
- try {
- myObject.get();//读操作
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- latch.countDown();
- }
- }
-
- class WorkerWrite implements Runnable
- {
- private CountDownLatch latch;
-
- public WorkerWrite(CountDownLatch latch) {
- this.latch = latch;
- }
-
- public void run()
- {
- try {
- myObject.put(new Random().nextInt(1000));//写操作
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- latch.countDown();
- }
- }
-
- CountDownLatch latch = new CountDownLatch(10);
-
- //启动5个写线程
- for(int i =0; i < 5; i++)
- {
- new Thread(new WorkerWrite(latch)).start();
- }
-
- //启动5个读线程
- for(int i =0; i < 5; i++)
- {
- new Thread(new WorkerRead(latch)).start();
- }
-
- latch.await();
- }
- }

下面是代码运行结果的一种:
- Thread-1准备写数据
- Thread-1写数据为521
- Thread-2准备写数据
- Thread-2写数据为223
- Thread-0准备写数据
- Thread-0写数据为10
- Thread-4准备写数据
- Thread-4写数据为422
- Thread-6准备读取数据
- Thread-8准备读取数据
- Thread-9准备读取数据
- Thread-8读数据为:422
- Thread-9读数据为:422
- Thread-6读数据为:422
- Thread-3准备写数据
- Thread-3写数据为172
- Thread-5准备读取数据
- Thread-7准备读取数据
- Thread-5读数据为:172
- Thread-7读数据为:172

从数据中也可以发现写数据具有原子性,只有一个线程可以写操作,读线程可以并发执行而且读取的数据都是一样的,这就是共享读的特性。
ReentrantReadWriteLock的基本原理和ReentrantLock没有很大的区别,只不过在ReentantLock的基础上拓展了两个不同类型的锁,读锁和写锁。首先可以看一下ReentrantReadWriteLock的内部结构:
内部维护了一个ReadLock和一个WriteLock,整个类的附加功能也就是通过这两个内部类实现的。
那么内部又是怎么实现这个读锁和写锁的呢。由于一个类既要维护读锁又要维护写锁,那么这两个锁的状态又是如何区分的。在ReentrantReadWriteLock对象内部维护了一个读写状态:
- abstract static class Sync extends AbstractQueuedSynchronizer {
-
- static final int SHARED_SHIFT = 16;
- // 由于读锁用高位部分,所以读锁个数加1,其实是状态值加 2^16
- static final int SHARED_UNIT = (1 << SHARED_SHIFT);
- // 写锁的可重入的最大次数、读锁允许的最大数量
- static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
- // 写锁的掩码,用于状态的低16位有效值
- static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
- // 读锁计数,当前持有读锁的线程数
- static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
- // 写锁的计数,也就是它的重入次数
- static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
- }
读写锁依赖自定义同步器实现同步功能,读写状态也就是同步器的同步状态。读写锁将整形变量切分成两部分,高16位表示读,低16位表示写:
读写锁通过位运算计算各自的同步状态。假设当前同步状态的值为c,写状态就为c&0x0000FFFF,读状态为c >>> 16(无符号位补0右移16位)。当写状态增加1状态变为c+1,当读状态增加1时,状态编码就是c+(1 <<< 16)。
怎么维护读写状态的已经了解了,那么就可以开始了解具体怎么样实现的多个线程可以读,一个线程写的情况
首先介绍的是ReadLock获取锁的过程
lock():获取读锁方法
acquireShared(int arg):共享式获取读锁
- public void lock() {
- sync.acquireShared(1);
- }
-
- public final void acquireShared(int arg) {
- //自定义实现的获取锁方式
- if (tryAcquireShared(arg) < 0)
- doAcquireShared(arg);
- }
内部运用到了ThreadLocal线程本地对象,将每个线程获取锁的次数保存到每个线程内部,这样释放锁的时候就不会影响到其它的线程。
- protected final int tryAcquireShared(int unused) {
- Thread current = Thread.currentThread();//获取当前线程
- int c = getState();//获取锁状态
- //如果写锁已经被获取,并且获取锁的不是当前线程,则当前线程获取读锁失败
- if (exclusiveCount(c) != 0 &&
- getExclusiveOwnerThread() != current)
- return -1;
- int r = sharedCount(c);//获取当前共享资源的数量
- if (!readerShouldBlock() &&
- r < MAX_COUNT &&
- compareAndSetState(c, c + SHARED_UNIT)) {//代表可以获取读锁
- if (r == 0) {//如果当前没有线程获取读锁
- firstReader = current;//当前线程是第一个读锁获取者
- firstReaderHoldCount = 1;//在计数器上加1
- } else if (firstReader == current) {
- firstReaderHoldCount++;//代表重入锁计数器累加
- } else { //内部定义的线程记录缓存
- //HoldCounter主要是一个类用来记录线程已经线程获取锁的数量
- HoldCounter rh = cachedHoldCounter;
- if (rh == null || rh.tid != current.getId())//如果不是当前线程
- //从每个线程的本地变量ThreadLocal中获取
- cachedHoldCounter = rh = readHolds.get();
- else if (rh.count == 0)//如果记录为0初始值设置
- readHolds.set(rh);//设置记录
- rh.count++;//自增
- }
- return 1;//返回1代表获取到了同步状态
- }
- //用来处理CAS设置状态失败的和tryAcquireShared非阻塞获取读锁失败的
- return fullTryAcquireShared(current);
- }

fullTryAcquireShared(Thread current):此方法用于处理在获取读锁过程中CAS设置状态失败的和非阻塞获取读锁失败的线程
- final int fullTryAcquireShared(Thread current) {
- //内部线程记录器
- HoldCounter rh = null;
- for (; ; ) {
- int c = getState();//同步状态
- if (exclusiveCount(c) != 0) {//代表存在独占锁
- if (getExclusiveOwnerThread() != current)//获取独占锁的线程不是当前线程返回失败
- return -1;
- } else if (readerShouldBlock()) {//判断读锁是否应该被阻塞
- if (firstReader == current) {
- } else {
- if (rh == null) {//为null
- rh = cachedHoldCounter;//从缓存中进行获取
- if (rh == null || rh.tid != current.getId()) {
- rh = readHolds.get();//获取线程内部计数状态
- if (rh.count == 0)
- readHolds.remove();//移除
- }
- }
- if (rh.count == 0)//如果内部计数为0代表获取失败
- return -1;
- }
- }
- if (sharedCount(c) == MAX_COUNT)
- throw new Error("Maximum lock count exceeded");
- if (compareAndSetState(c, c + SHARED_UNIT)) {//CAS设置成功
- if (sharedCount(c) == 0) {
- firstReader = current;//代表为第一个获取读锁
- firstReaderHoldCount = 1;
- } else if (firstReader == current) {
- firstReaderHoldCount++;//重入锁
- } else {
- if (rh == null)
- rh = cachedHoldCounter;
- if (rh == null || rh.tid != current.getId())
- rh = readHolds.get();
- else if (rh.count == 0)
- readHolds.set(rh);
- rh.count++;
- cachedHoldCounter = rh; //将当前多少读锁记录下来
- }
- return 1;//返回获取同步状态成功
- }
- }
- }

分析完上面的方法可以总结一下获取读锁的过程:首先读写锁中读状态为所有线程获取读锁的次数,由于是可重入锁,又因为每个锁获取的读锁的次数由每个锁的本地变量ThreadLocal对象去保存因此增加了读取获取的流程难度,在每次获取读锁之前都会进行一次判断是否存在独占式写锁,如果存在独占式写锁就直接返回获取失败,进入同步队列中。如果当前没有写锁被获取,则线程可以获取读锁,由于共享锁的存在,每次获取都会判断线程的类型,以便每个线程获取同步状态的时候都在其对应的本地变量上进行自增操作。
lock(int arg):写锁的获取
- public void lock() {
- sync.acquire(1);//AQS独占式获取锁
- }
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
tryAcquire(int arg):独占式的获取写锁
- protected final boolean tryAcquire(int acquires) {
- Thread current = Thread.currentThread();//获取当前线程
- int c = getState();//获取同步状态值
- int w = exclusiveCount(c);//获取独占式资源值
- if (c != 0) {//已经有线程获取了
- //代表已经存在读锁,或者当前线程不是获取到写锁的线程
- if (w == 0 || current != getExclusiveOwnerThread())
- return false;//获取失败
- if (w + exclusiveCount(acquires) > MAX_COUNT)
- throw new Error("Maximum lock count exceeded");
- //设置同步状态
- setState(c + acquires);
- return true;
- }
- if (writerShouldBlock() ||
- !compareAndSetState(c, c + acquires))//判断当前写锁线程是否应该阻塞,这里会有公平锁和非公平锁之间的区分
- return false;
- setExclusiveOwnerThread(current);//设置为当前线程
- return true;
- }

获取写锁相比获取读锁就简单了很多,在获取读锁之前只需要判断当前是否存在读锁,如果存在读锁那么获取失败,进而再判断获取写锁的线程是否为当前线程如果不是也就是失败否则就是重入锁在已有的状态值上进行自增
unlock():读锁释放
- public void unlock() {
- sync.releaseShared(1);//AQS释放共享锁操作
- }
tryReleaseShared(int arg):释放共享锁
- protected final boolean tryReleaseShared(int unused) {
- Thread current = Thread.currentThread();//获取当前线程
- if (firstReader == current) {//如果当前线程就是获取读锁的线程
- if (firstReaderHoldCount == 1)//如果此时获取资源为1
- firstReader = null;//直接赋值null
- else
- firstReaderHoldCount--;//否则计数器自减
- } else {//其他线程
- HoldCounter rh = cachedHoldCounter;//获取本地计数器
- if (rh == null || rh.tid != current.getId())
- rh = readHolds.get();
- int count = rh.count;
- if (count <= 1) {//代表只获取了一次
- readHolds.remove();
- if (count <= 0)
- throw unmatchedUnlockException();
- }
- --rh.count;
- }
- for (; ; ) {
- int c = getState();
- int nextc = c - SHARED_UNIT;
- if (compareAndSetState(c, nextc))
- return nextc == 0;//代表已经全部释放
- }
- }

释放锁的过程不难,但是有一个注意点,并不是释放一次就已经代表可以获取独占式写锁了,只有当同步状态的值为0的时候也就是代表既没有读锁存在也没有写锁存在才代表完全释放了读锁。
unlock():释放写锁
- public void unlock() {
- sync.release(1);//释放独占式同步状态
- }
tryRelease(int arg):释放独占式写锁
- protected final boolean tryRelease(int releases) {
- if (!isHeldExclusively())//判断是否
- throw new IllegalMonitorStateException();
- int nextc = getState() - releases;//同步状态值自减
- boolean free = exclusiveCount(nextc) == 0;//如果状态值为0代表全部释放
- if (free)
- setExclusiveOwnerThread(null);
- setState(nextc);
- return free;
- }
写锁的释放相比读锁的释放简单很多,只需要判断当前的写锁是否全部释放完毕即可
什么是锁降级,锁降级就是从写锁降级成为读锁。在当前线程拥有写锁的情况下,再次获取到读锁,随后释放写锁的过程就是锁降级。这里可以举个例子:
- public class CacheDemo {
- private Map<String, Object> cache = new HashMap<String, Object>();
-
- private ReadWriteLock rwl = new ReentrantReadWriteLock();
- public ReadLock rdl = rwl.readLock();
- public WriteLock wl = rwl.writeLock();
-
- public volatile boolean update = false;
-
- public void processData() {
- rdl.lock();//获取读锁
- if (!update) {
- rdl.unlock();//释放读锁
- wl.lock();//获取写锁
- try {
- if (!update) {
- update = true;
- }
- rdl.lock();//获取读锁
- } finally {
- wl.unlock();//释放写锁
- }
- }
- try {
- } finally {
- rdl.unlock();//释放读锁
- }
- }
- }

读写锁是在重入锁ReentrantLock基础上的一大改造,其通过在重入锁上维护一个读锁一个写锁实现的。对于ReentrantLock和ReentrantreadWriteLock的使用需要在开发者自己根据实际项目的情况而定。对于读写锁当读的操作远远大于写操作的时候会增加程序很高的并发量和吞吐量。虽说在高并发的情况下,读写锁的效率很高,但是同时又会存在一些问题,比如当读并发很高时读操作长时间占有锁,导致写锁长时间无法被获取而导致的线程饥饿问题,因此在JDK1.8中又在ReentrantReadWriteLock的基础上新增了一个读写并发锁StampLock。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。