当前位置:   article > 正文

ReentrantLock源码解析

ReentrantLock源码解析

定义

可重入锁,对于同一个线程可以重复获得此锁。分为FailLock和NonfairLock。
加锁就是将exclusiveOwnerThread设置为当前线程,且将status加一,解锁就status-1,且exclusiveOwnerThread设置为null。
公平锁:根据先来后到的顺序获得锁,可以避免饥饿现象,所有线程都有同等的机会获取锁。
非公平锁:一进入临界区就开始竞争锁,竞争不到再进入阻塞队列等待,会出现饥饿现象,但是可以减少线程阻塞和唤醒及队列维护的开销,有效提高吞吐量

ReentrantLock和Synchronized的区别

  1. 前者是在Java Api层面进行加锁的,而Synchronized是在JVM层面进行加锁的。
  2. 前者是底层使用cas操作和volatile实现的加锁,而后者是利用操作系统的互斥机制实现的。
  3. 前者需要手动解锁操作,后者自动解锁。
  4. 前者提供了公平锁和非公平锁,而后者只有非公平锁
  5. 前者如果在临界区抛出异常,不会释放锁,需要自己在finally中释放,而后者自动释放。
  6. 前者允许线程在等待锁的过程中响应中断阻塞lockInterruptibly,后者不可以。
  7. 前者可以非阻塞地获取锁,使用tryLock方法,如果没获得锁就返回false,后者不行。
  8. 前者可以通过condition实现分组唤醒阻塞的线程,而后者只能通过notify和notifyall唤醒阻塞的线程。
  9. 前者有很多api,更灵活地实现加锁,也更加复杂了。

在这里插入图片描述

主要方法:

  • lock:加锁
  • lockInterruptibly:等待获取锁的过程中,如果当前线程被中断(即其他线程调用了当前线程的 interrupt 方法),则会立即响应中断,抛出 InterruptedException 异常,并且在异常抛出之前会释放之前已经获得的锁
    示例代码如下:
 public static void main(String[] args) {
        Lock lock = new ReentrantLock();

        Thread thread1 = new Thread(() -> {
            lock.lock();
            try {
                System.out.println("Thread 1 acquired the lock");
                Thread.sleep(3000); // 模拟线程1持有锁的情况
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        });
        thread1.start();

        Thread thread = new Thread(() -> {
            try {
                // 尝试获取锁,但是可以响应中断信号
                lock.lockInterruptibly();
                System.out.println("Lock acquired by thread: " + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                System.out.println("Thread interrupted while waiting for lock");
            } finally {
                if (lock.tryLock()) {
                    lock.unlock();
                }
            }
        });
        thread.start();

        // 让主线程休眠一段时间
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 在主线程中尝试中断等待锁的线程
        thread.interrupt();
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • tryLock:非阻塞获取锁,如果获取到了返回true,获取不到返回false。
  • unlock:解锁。

公平锁lock方法

FairSync.lock();

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

Sync继承了AbstractQueuedSynchronized,AQS的acquire方法

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&//尝试获取锁,如果没有获取到
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//如果没有获取到锁,则封装成node,追加到阻塞队列中,然后将线程挂起
            selfInterrupt();//当前线程中断
}
  • 1
  • 2
  • 3
  • 4
  • 5

FairLock.tryAcquire(arg)

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();//获取AQS同步器状态,volatile修饰,
            //作用:1,保证共享变量在多线程之间的可见性2,防止指令重排序
            if (c == 0) {//无锁状态
                if (!hasQueuedPredecessors() &&//如果阻塞队列中已经没有其他线程了
                    compareAndSetState(0, acquires)) {//cas将c从0更新为acquires
                    setExclusiveOwnerThread(current);//将锁的独占线程设置为当前线程
                    return true;
                }
            }
            //下面这块代码就是可重入锁的重入。
            else if (current == getExclusiveOwnerThread()) {//或者当前线程已经获得了这个锁,增加status
                int nextc = c + acquires;
                if (nextc < 0)//当达到integer最大值,对于有符号二进制而言,再+1,就会变成负数
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

AQS.acquireQueued

final boolean acquireQueued(final Node node, int arg) {//返回true,表示需要中断线程
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {//不间断地获取锁,如果获取到就返回,没获取到就将自己挂起,等到前一个节点执行完,就能将当前节点唤醒
                final Node p = node.predecessor();//上一个节点
                if (p == head && tryAcquire(arg)) {//上一个节点是头节点,重新获取锁,获取到锁返回true
                    setHead(node);//获取到锁之后,将自己设置为头节点
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;//lockInterruptibly使用的
                }
                //没轮到自己,或者轮到自己但是被非公平锁抢了,则
                if (shouldParkAfterFailedAcquire(p, node) &&//获取锁失败是否将线程挂起,可以进入这个方法查看
                    parkAndCheckInterrupt())//进入到这里说明要将线程挂起,线程挂起后,前一个节点的线程释放锁前会将当前线程唤醒,唤醒之后,当前线程继续执行这里的代码,然后重新获取锁。 然后检查是否中断线程(和lockInterruptibly有关)
                    interrupted = true;//和lockInterruptibly有关
            }
        } finally {
            if (failed)
                cancelAcquire(node);//取消当前线程的获取锁操作。
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

AQS.shouldParkAfterFailedAcquire

       waitStatus的取值
       /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;//表示当前节点线程已取消
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;//表示可以成功唤醒下一个节点
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;//表示线程正在等待条件被唤醒
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;//先不管
        
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;//前一个节点线程的等待状态
        if (ws == Node.SIGNAL)//此标志表示当前节点可以被前一个节点唤醒,所以当前节点可以安心地挂起了,之后这里可以返回true,如果没有进入这里,那么就需要第二次进入此方法。
            return true;
        if (ws > 0) {//表示上一个节点线程取消,那么就一直往上找到可以成功唤醒自己的线程。(reentrantlock不会出现,其他继承的AQS的类可能会出现)
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {//等于0或者小于-1,则通过cas将上一个线程的waitstatus改为signal状态。
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

NonfairSync

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))//一来就竞争锁,看是否有刚好释放的锁。
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

非公平锁实现 nonfairTryAcquire

 final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {//不用排队,直接竞争,
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;//没有获取到锁,则和公平锁一样,将线程加入阻塞队列。
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

总结:

1,不管是公平锁还是非公平锁,获得锁的成功的标志是:status加一,然后AQS的独占线程是自己(exclusiveOwnerThread = thread), 如果获取锁的时候发现是自己的锁,那么可以再次获取,举措就是状态加一。
在这里插入图片描述

2,非公平锁一开始就会cas竞争一次锁,然后在获取锁的时候不管等待队列,再cas竞争一次锁。而公平锁获取锁的时候,首先会看等待队列里有没有线程,有的话就加入等待队列,没有就cas竞争锁。
在这里插入图片描述

3,两者如果都没有获取到锁,则会被封装成节点加入等待队列(addWaiter方法)。加入队列操作也是采用cas自旋操作。入队的操作,可以保证多个线程进来不会混乱。
在这里插入图片描述

阻塞队列为空或cas加入节点失败的情况。
![在这里插入图片描述](https://img-blog.csdnimg.cn/355d406e28994c2b923501beb72fcd4f.png

4,加入到等待队列后,看是否自己前一个节点是否头节点,如果是头节点,那么可以尝试获取锁。不是头节点看后一步,。
5,确保当前节点的上一个节点可以成功将自己唤醒,也就是上一个节点的waitstatus为-1。
在这里插入图片描述

6,接下来当前线程就可以挂起了。等前一个节点释放锁将自己唤醒,就可以继续重新cas自旋竞争锁。
在这里插入图片描述
制作不易,点赞收藏,有不对的地方,一定要指正,大家一起探讨。

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号