当前位置:   article > 正文

多线程--AQS_aqs 多线程

aqs 多线程

一、AQS详解

  1. 什么是AQS

    1. AQS:AbstractQuenedSynchronizer抽象的队列式同步器。是除了java自带的synchronized关键字之外的锁机制。AQS的全称为(AbstractQueuedSynchronizer),这个类在java.util.concurrent.locks包
    2. AQS的核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态,如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中
    3. AQS是将每一条请求共享资源的线程封装成一个CLH锁队列的一个Node(双向链表结构,封装了上一个Node,下一个Node,当前线程的相关信息),来实现锁的分配
    4. AQS实现图示:
      在这里插入图片描述
  2. 实现AQS锁的简单概括

    1. AQS底层使用了模板方法模式
      1. 使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放)
      2. 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法,即不同的实现产生不同的机制。这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用
    2. ReentrantLock
      1. state初始化为0,表示未锁定状态,A线程lock()时,会调用tryAcquire()独占锁并将state+1.之后其他线程再想tryAcquire的时候就会失败,直到A线程unlock()到state=0为止,其他线程才有机会获取该锁。A释放锁之前,自己也是可以重复获取此锁(state累加)
      2. 获取多少次锁就要释放多少次锁,保证state是能回到零态的
    3. CountDownLatch
      1. 任务分N个子线程去执行,state就初始化 为N,N个线程并行执行,每个线程执行完之后countDown()一次,state就会CAS减一。当N子线程全部执行完毕 state=0,会unpark()主调用线程,主调用线程就会从await()函数返回,继续之后的动作
    4. CycliBarrier
      1. 栅栏锁,类似于闭锁,它能阻塞一组线程直到某个事件的发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程
      2. CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用
      3. 内部使用了ReentrantLock,用Condition队列来存储等待的线程
      4. CyclicBarrier和CountDownLatch的区别
        1. CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景
        2. CyclicBarrier还提供了一些其他有用的方法,比如getNumberWaiting()方法可以获得CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断
        3. CountDownLatch允许一个或多个线程等待一组事件的产生,而CyclicBarrier用于等待其他线程运行到栅栏位置
    5. Phaser
      1. 与CycliBarrier有点像,都是线程数到了临界值才能继续往下执行,但不同的是Phaser是分段执行的
    6. Semaphore
      1. Semaphore 是 synchronized 的加强版,作用是控制线程的并发数量。就这一点而言,单纯的synchronized 关键字是实现不了的

二、源码阅读

  1. ReentrentLock的lock()方法执行流程(无其他线程争抢,成功获取锁)

    1. 如果通过点进代码进行阅读的话,会发现很多方法都阅读不了,因为ReentrantLock是AbstractQueuedSynchronizer的实现类,直接点进去大都是AbstractQueuedSynchronizer中的方法,所以我们需要通过debug的方式来阅读源码
    2. debug定位到lock(),下一步,进入lock
      在这里插入图片描述
    3. 调用了sync.lock(),Sync:继承自AQS类,实现AQS钩子方法,不同的类有不同的Syns来实现自己的逻辑。继续下一步
      在这里插入图片描述
    4. 发现需要调用compareAndSetState(0,1)并获取放回值,显然由方法名称可以推断,这是一个CAS操作,需要修改的值为AQS类中的state,返回true为成功,false为失败!继续下一步
      在这里插入图片描述
    5. 这里由本地静态属性unsafe再次调用了一个CAS方法,unsafe属性是通过UnSafe类来获得的,UnSafe类由C/C++实现,里面封装了直接对内存进行的一些操作,这里不作过多讨论,继续下一步
      在这里插入图片描述
    6. 由于我们的测试程序只有一个线程争抢锁,所以自然可以成功修改,返回true,进入下面的方法,由方法名称我们就可以推断这个方法为设置持有锁的线程为当前的线程,继续下一步
      在这里插入图片描述
    7. 发现该方法其实是将AQS类中的exclusiveOwnerThread属性值设置为传入的当前线程,点进exclusiveOwner看看是什么
      在这里插入图片描述
    8. 由上面的描述我们知道这个属性代表当前持有锁的线程,回到debug,继续下一步
      在这里插入图片描述
    9. 返回void,调用结束,当前线程成功获得锁
      在这里插入图片描述
    10. 总结:通过CAS操作来修改AQS中的state值,如果修改成功(state由0变为1)则代表当前线程获得锁,将AQS中的exclusiveOwnerThread(持有锁的线程)属性设置为当前线程
  2. ReentrentLock的lock()方法执行流程(无法获取锁)

    1. 依旧是刚才的测试案例,加了一个m3()方法,因为m1方法会重复调用1000次,每次停一秒,所以m3不会获取到锁
      在这里插入图片描述

    2. 给m3打断点,开始debug
      在这里插入图片描述

    3. 与上面的前几步一样,进到lock方法中
      在这里插入图片描述

    4. 由于m1一直持有锁,所以m3自然修改不到state值,返回fasle,进入acquire(1)
      在这里插入图片描述

    5. 里面调用了tryAcquire(arg),arg我们由上一步可以知道值为1,进入该方法
      在这里插入图片描述

    6. 里面调用了nonfairTryAcquire(1),由名称我们可以推断ReentrantLock中锁的争抢是不公平的,继续进入
      在这里插入图片描述

    7. 方法比较长,这里大概解释一下跳过几步:首先获得当前线程,然后获得此时state的值,如果state此时的值为0,代表现在没有线程持有锁,用CAS尝试获取锁,获取成功修改对应属性,返回true;若state不为0,判断当前线程是否为持有锁的线程(锁的重入),若是则state+1,返回true;但显然这里我们以上2种情况都不是,所以返回fasle!
      在这里插入图片描述
      在这里插入图片描述

    8. 回到第5步的地方,因为返回是false,所以!false为true,进入下面的addWaiter方法;若返回为true则不会进入!由方法名可以推断这是将当前线程放入等待队列的方法,传入的参数为一个标记,表示该节点正在独占模式下等待
      在这里插入图片描述

    9. 我们知道AQS其实里面有一个等待队列,队列由Node组成,所以此时为创建一个Node,将当前线程给该Node,并将该Node放入等待队列
      在这里插入图片描述

    10. 因为等待队列是双向链表,我们要往后添加需要获得当前最后一个Node,tail即为最后一个Node,如果tail不等于null,用CAS将该Node插入队列即可,但显然我们此时的tail为空,所以进入下面的enq(node)
      在这里插入图片描述

    11. 进入enq发现里面是一个死循环,因为我们的tail为空(即等待队列为空),所以第一次进来,会进行CAS的初始化,发现初始化参数传入的是一个空Node,进入该方法
      在这里插入图片描述

    12. 由注释我们可以知道这就是用CAS初始化等待队列的一个方法
      在这里插入图片描述

    13. 回到之前的方法,由于是死循环且初始化完毕所以第二次我们就进到了else里,也是用CAS操作将Node放入队列尾,最后return,这里用死循环其实就是CAS的自旋,确保添加时的安全性
      在这里插入图片描述

    14. 回到addWaiter,将node返回
      在这里插入图片描述

    15. 下一步就是调用acquireQueued了,我们现在可以知道它的实参实际上是acquireQueued(node(当前线程的Node,处于等待队列尾),1),进入方法
      在这里插入图片描述

    16. 这个方法会先获取当前node的上一个节点(node.predecessor()),如果上一个节点是首节点,则代表node可能会获取到锁,所以node尝试获取锁,成功则将node设置为首节点,返回false,acquire(第8步)结束;若再次获取失败,调用shouldParkAfterFailedAcquire方法,即将该线程阻塞,因为for是死循环,所以会一直进行上面的操作直到获取到锁!
      在这里插入图片描述

    17. 总结:

      1. 通过CAS操作去尝试修改state的值
      2. 修改失败,别的线程已经获得锁,进入acquire(1)
      3. 里面有一个if()方法,首先进入!tryAcquire()再次尝试获取锁。由于前面有个!,所以该方法如果返回true,即成功获取到锁,就不会往下执行,直接返回;若返回false才会继续执行。
      4. tryAcquire()调用了nofairTryAcquire(),主要用于再次尝试获取锁和判断当前线程是否就是获得锁的线程(锁重入),如果是则state+1,返回true,以上2种情况都不是返回false
      5. 接下来就需要将当前线程加入等待队列,执行addWaiter
      6. addWaiter中首先创建包含当前线程信息的Node对象,然后获取到等待队列尾的Node tail,如果tail不为空,则用CAS将Node放入队列尾,如果为空,则需要初始化队列,调用enq(node)
      7. enq中有一个死循环,里面会先获取到tail,如果为空则调用CAS方法初始化队列,不为空也是使用CAS将node插入到tail后,并将node设置为tail
      8. addWaiter执行完后会执行acquireQueue方法
      9. acquireQueue也是死循环:首先会获得传入node的上一个Node pre,如果pre是队列中的head,则当前node尝试获取锁,获取失败则当前node中的线程阻塞,自旋,继续下一次的尝试获取…
    18. 参考的文章:https://blog.csdn.net/mulinsen77/article/details/84583716

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号