当前位置:   article > 正文

【JAVA多线程】JDK中的各种锁,看这一篇就够了

【JAVA多线程】JDK中的各种锁,看这一篇就够了

目录

1.概论

1.1.实现锁的要素

1.2.阻塞队列

1.3.Lock接口和Sync类

2.各种锁

2.1.互斥锁

2.1.1.概论

2.1.2.源码

1.lock()

2.unlock()

2.2.读写锁

2.3.Condition

2.3.1.概论

2.3.2.底层实现


1.概论

1.1.实现锁的要素

JAVA中的锁都是可重入的锁,因为不可重入的试用的时候很容易造成死锁。这个道理很好想明白:

当一个线程已经持有一个锁,并在持有该锁的过程中再次尝试获取同一把锁时,如果没有重入机制,第二次请求会被阻塞,因为锁已经被自己持有。这会导致线程自我死锁,因为它在等待自己释放的锁。

可重入是指获取锁的线程可以继续重复的获得此锁。其实我们想都能想到要实现一把锁需要些什么,首先肯定是:

  • 标志位,也叫信号量,标记锁的状态和重入次数,这样才能完成持有锁和释放锁。

接下来要考虑的是拒接策略,当前锁被持有期间,后续的请求线程该怎么处理,当然可以直接拒绝,JAVA的选择委婉点,选择了允许这些线程躺在锁上阻塞等待锁被释放。要实现让线程躺在锁上等待,我们想想无非要:

  • 需要支持对一个线程的阻塞、唤醒

  • 需要记录当前哪个线程持有锁

  • 需要一个队列维护所有阻塞在当前锁上的线程

OK,以上四点就是JAVA锁的核心,总结起来就是信号量+队列,分别用来记录持有者和等待者。

1.2.阻塞、唤醒操作

首先我们来看看阻塞和唤醒的操作,在JDK中提供了一个Unsafe类,该类中提供了阻塞或唤醒线程的一对操作 原语——park/unpark:

  1. public native void unpark(Object var1);
  2. public native void park(boolean var1, long var2);

这对原语最终会调用操作系统的程序接口执行线程操作。

1.2.阻塞队列

拿来维护所有阻塞在当前锁上的线程的队列能是个普通队列吗?很显然不是,它的操作必须是线程安全的是吧,所以这个队列用阻塞队列实现才合适。什么是阻塞队列:

阻塞队列提供了线程安全的元素插入和移除操作,并且在特定条件下会阻塞线程,直到满足操作条件。

说到JDK中的阻塞队列,其核心就是AbstractQueuedSynchronizer,简称AQS,由双向链表实现的一个元素操作绝对安全的队列,用来在锁的实现中维护阻塞在锁上的线程上的队列的这个角色。

来看看AQS的源码:

它有指向前后节点的指针、有一个标志位state、还有一个提供线程操作原原语(阻塞、唤醒)的unsafe类。

所以其实AQS就长这样:

点进源码可以看到其随便一个方法都是线程安全的:

由于本文不是专门聊AQS这里就不扩展了,反正知道AQS是一个线程安全的阻塞队列就对了。

1.3.Lock接口和Sync类

JAVA中所有锁的顶级父接口,用来规范定义一把锁应该有那些行为职责:

  1. public interface Lock {
  2.   void lock();
  3.   void lockInterruptibly() throws InterruptedException;
  4.   boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  5.   void unlock();
  6.   Condition newCondition();
  7. }

JAVA中所有锁的实现都是依托AQS去作为阻塞队列,每个锁内部都会实现一个Sync内部类,在自身Sync内部以不同的策略去操作AQS实现不同种类的锁。

abstract static class Sync extends AbstractQueuedSynchronizer {......}

2.各种锁

2.1.互斥锁

2.1.1.概论

ReentrantLock,互斥锁,ReentrantLock本身没有任何代码逻辑,依靠内部类Sync干活儿:

  1. public class ReentrantLock implements Lock, Serializable {
  2. private final ReentrantLock.Sync sync;
  3. public void lock() {
  4.        this.sync.lock();
  5.   }
  6.    public void unlock() {
  7.        this.sync.release(1);
  8.   }
  9.   ......
  10. }

ReentrantLock的Sync继承了AQS

abstract static class Sync extends AbstractQueuedSynchronizer {......}

Sync是抽象类,有两个实现:

  • NonfairSync,公平锁

  • FairSync,非公平锁

实例化ReentrantLock的实例时,根据传入的标志位可以创建公平和公平的实现

  1. public class ReentrantLock implements Lock, java.io.Serializable{
  2. public ReentrantLock() {
  3.        sync = new NonfairSync();
  4.   }
  5.    public ReentrantLock(boolean fair) {
  6.        sync = fair ? new FairSync() : new NonfairSync();
  7.   }
  8.   ......
  9. }
  10. }

2.1.2.源码

1.lock()

公平锁的lock():

  1. static final class FairSync extends Sync {
  2.        final void lock() {
  3.            acquire(1);//进来直接排队
  4.       }

非公平锁的lock():

  1. static final class NonfairSync extends Sync {
  2.        final void lock() {
  3.            if (compareAndSetState(0, 1))//进来直接抢锁
  4.                setExclusiveOwnerThread(Thread.currentThread());//将锁的持有者设置为当前线程
  5.            else
  6.                acquire(1);//没抢过再去排队
  7.       }
  8.   }

acquire()是AQS的模板方法:

tryAcquire,尝试再去获取一次锁,公平锁依然是排队抢,去看看阻塞队列是否为空;非公平锁依然是直接抢。

acquireQueued,将线程放入阻塞队列。

  1. public final void acquire(int arg) {
  2.        if (!tryAcquire(arg) &&
  3.            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4.            selfInterrupt();
  5.   }

acquireQueued(..)是lock()最关键的一部分,addWaiter(..)把Thread对象加入阻塞队列,acquireQueued(..)完成对线程的阻塞。

  1. final boolean acquireQueued(final Node node, int arg) {
  2.        boolean failed = true;
  3.        try {
  4.            boolean interrupted = false;
  5.            for (;;) {
  6.                final Node p = node.predecessor();
  7.                if (p == head && tryAcquire(arg)) {//如果发现自己在队头就去拿锁
  8.                    setHead(node);
  9.                    p.next = null; // help GC
  10.                    failed = false;
  11.                    return interrupted;
  12.               }
  13.                if (shouldParkAfterFailedAcquire(p, node) &&
  14.                    parkAndCheckInterrupt())//调用原语,阻塞自己
  15.                    interrupted = true;
  16.           }
  17.       } finally {
  18.            if (failed)
  19.                cancelAcquire(node);
  20.       }
  21.   }

acquireQueued(..)函数有一个返回值,表示什么意思 呢?虽然该函数不会中断响应,但它会记录被阻塞期间有没有其他线 程向它发送过中断信号。如果有,则该函数会返回true;否则,返回false。所以才有了以下逻辑:

  1. public final void acquire(int arg) {
  2.        if (!tryAcquire(arg) &&
  3.            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4.            selfInterrupt();
  5.   }public final void acquire(int arg) {
  6.        if (!tryAcquire(arg) &&
  7.            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  8.            selfInterrupt();
  9.   }

当 acquireQueued(..) 返回 true 时,会调用 selfInterrupt (),自己给自己发送中断信号,也就是自己把自己的中断标志位设 为true。之所以要这么做,是因为自己在阻塞期间,收到其他线程中 断信号没有及时响应,现在要进行补偿。这样一来,如果该线程在loc k代码块内部有调用sleep()之类的阻塞方法,就可以抛出异常,响 应该中断信号。

2.unlock()

unlock的逻辑很简单,每次unlock,state-1,直到state=0时,将锁的拥有者置null,释放锁。由于只有锁的持有线程才能操作lock,所以unlock()不需要用CAS,操作时直接判断一下是不是锁的持有线程在操作即可。

  1. public void unlock() {
  2.        sync.release(1);
  3.   }
  4. public final boolean release(int arg) {
  5.        if (tryRelease(arg)) {//释放锁
  6.            Node h = head;
  7.            if (h != null && h.waitStatus != 0)
  8.                unparkSuccessor(h);//唤醒阻塞队列中的后继者
  9.            return true;
  10.       }
  11.        return false;
  12.   }

释放锁:

  1. protected final boolean tryRelease(int releases) {
  2.            int c = getState() - releases;//每次unlock,state减1
  3.            if (Thread.currentThread() != getExclusiveOwnerThread())//判断是不是锁的持有线程
  4.                throw new IllegalMonitorStateException();
  5.            boolean free = false;
  6.            if (c == 0) {//state为0表示该锁没有被持有
  7.                free = true;
  8.                setExclusiveOwnerThread(null);//将锁的持有者置null
  9.           }
  10.            setState(c);
  11.            return free;
  12.       }

唤醒后继者:

  1. private void unparkSuccessor(Node node) {
  2.        int ws = node.waitStatus;
  3.        if (ws < 0)
  4.            compareAndSetWaitStatus(node, ws, 0);
  5.        Node s = node.next;
  6.        if (s == null || s.waitStatus > 0) {
  7.            s = null;
  8.            for (Node t = tail; t != null && t != node; t = t.prev)
  9.                if (t.waitStatus <= 0)
  10.                    s = t;
  11.       }
  12.        if (s != null)
  13.            LockSupport.unpark(s.thread);
  14.   }

2.2.读写锁

读写锁是一个实现读写互斥的锁,读写锁包含一个读锁、一个写锁:

  1. public interface ReadWriteLock{
  2. Lock readLock();
  3. Lock writeLock();
  4. }

读写锁的使用就是直接调用对应锁进行锁定和解锁:

  1. ReadWriteLock rwLock=new ReetrantReadWriteLock();
  2. Lock rLock=rwLock.readLock();
  3. rLock.lock();
  4. rLock.unLock();
  5. Lock wLock=rwLock.writeLock();
  6. wLock.lock();
  7. wLock.unLock();

读写锁的Sync内部类对读锁和写锁采用同一个int型的信号量的高16位和低16位分别表示读写锁的状态和重入次数,这样一次CAS就能统一处理进行读写互斥操作:

  1. abstract static class Sync extends AbstractQueuedSynchronizer {
  2.        static final int SHARED_SHIFT   = 16;
  3.        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
  4.        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
  5.        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
  6. static int sharedCount(int c)   { return c >>> SHARED_SHIFT; }
  7.        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
  8. }

2.3.Condition

2.3.1.概论

condition用于更加细粒度的控制锁上面的线程阻塞、唤醒。

以下以一个经典的生产、消费者问题为例:

队列空的时候进来的消费者线程阻塞,有数据放进来后唤醒阻塞的消费者线程。

队列满的时候进来的生产者线程阻塞,有空位后唤醒阻塞的生产者线程。

锁粒度的实现:

  1. public void enqueue(){
  2. synchronized(queue){
  3. while(queue.full()){
  4. queue.wait();
  5. }
  6. //入队列
  7. ......
  8. //通知消费者,队列中有数据了
  9. queue.notify();
  10. }
  11. }
  12. public void dequeue(){
  13. synchronized(queue){
  14. while(queue.empty()){
  15. queue.wait();
  16. }
  17. //出队列
  18. ......
  19. //通知生产者,队列中有空位了,可以继续放数据
  20. queue.notify();
  21. }
  22. }

可以发现,唤醒的时候把阻塞的生产消费线程一起唤醒了。

条件粒度的实现:

  1. private final Lock lock = new ReentrantLock();
  2. private final Condition notFull  = lock.newCondition(); // 用于等待队列不满
  3. private final Condition notEmpty = lock.newCondition(); // 用于等待队列非空
  4. public void enqueue(Object item) {
  5.     try {
  6.         while (queue.isFull()) {
  7.             notFull.await(); // 等待队列不满
  8.         }
  9.         // 入队列操作
  10.         // ...
  11.         
  12.         // 入队后,通知等待的消费者
  13.         notEmpty.signal();
  14.     } catch (InterruptedException e) {
  15.         Thread.currentThread().interrupt(); // 保持中断状态
  16.         // 处理中断逻辑
  17.     } finally {
  18.         queue.unlock();
  19.     }
  20. }
  21. public void dequeue() {
  22.     try {
  23.         while (queue.isEmpty()) {
  24.             notEmpty.await(); // 等待队列非空
  25.         }
  26.         // 出队列操作
  27.         // ...
  28.         
  29.         // 出队后,通知等待的生产者
  30.         notFull.signal();
  31.     } catch (InterruptedException e) {
  32.         Thread.currentThread().interrupt(); // 保持中断状态
  33.         // 处理中断逻辑
  34.     } finally {
  35.         queue.unlock();
  36.     }
  37. }

2.3.2.底层实现

Condition由Lock产生,因此Lock中持有Condition:

  1. public interface Lock {
  2. ......
  3.    Condition newCondition();
  4. }

承担功能的其实就是Syn中的ConditionObject,也就是AQS中的ConditionObject:

  1. final ConditionObject newCondition() {
  2.            return new ConditionObject(this);
  3.       }

一个Condition上面阻塞着多个线程,所以每个Condition内部都有一个队列,用来记录阻塞在这个condition上面的线程,这个队列其实也是AQS实现的,AQS中除了实现一个以Node为节点的队列,还实现了一个以ConditionObject为节点的队列:

  1. public abstract class AbstractQueuedSynchronizer
  2.    extends AbstractOwnableSynchronizer
  3.    implements java.io.Serializable {
  4.   public class ConditionObject implements Condition, java.io.Serializable {
  5.        private static final long serialVersionUID = 1173984872572414699L;
  6.        private transient Node firstWaiter;
  7.        private transient Node lastWaiter;
  8.       ......
  9.       }
  10.   }

Condition是个接口,定义了一系列条件操作:

  1. public interface Condition {
  2.    void await() throws InterruptedException;
  3.    void awaitUninterruptibly();
  4.    long awaitNanos(long var1) throws InterruptedException;
  5.    boolean await(long var1, TimeUnit var3) throws InterruptedException;
  6.    boolean awaitUntil(Date var1) throws InterruptedException;
  7.    void signal();
  8.    void signalAll();
  9. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/801774
推荐阅读
相关标签
  

闽ICP备14008679号