当前位置:   article > 正文

AQS常用组件_aqs 组件

aqs 组件

Semaphore介绍

基本概念

  • 信号量,它的作用是控制访问资源的线程数目,底层依赖AQS的状态State,生产当中比较常用的一个工具类
  • 举个例子:上游服务1000QPS,下游支持300QPS,可以通过Semaphore来进行限制,只有获得信号量的线程才可以进行访问,没有获得信号量的只能阻塞

怎么使用Semaphore

构造方法

Semaphore(int permits)
Semaphore(int permits, boolean fair)
  • 1
  • 2
  • permits:表示许可线程的数量
  • fair:表示公平性,如果这个设为true的话,下次执行的线程会是等待最久的线程

重要方法

//一次获取一个信号量        		//释放一个信号量
acquire()               	  release()
//一次获取permits个信号量        //释放permits个信号量
acquire(int permits)          release(int permits)
//尝试获取permits个信号量,等待单位为unit的timeout,如果能拿到就走拿到的逻辑,否则就可以走降级的逻辑或者相隔一定的时间后再去获取   
tryAcquire(int permits, long timeout, TimeUnit unit)
//不支持中断特性,线程执行过程中发现执行错误,不想让其继续执行,就可以发送中断信号,如果支持中断特性,就会抛出一个异常
acquireUninterruptibly(int permits)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

基本使用

需求场景
  • 资源访问,服务限流(Hystrix里面就有基于信号量的方式)
代码实现
Semaphore:信号量 类似于操作系统中的信号量;用来做限流操作
两个作用:多个共享资源的互斥使用;并发线程数的控制
原理:
Semaphore semaphore = new Semaphore(4);//初始化信号量,当数量为1时可用作互斥锁.
//AQS中的State设为4,所允许的最大的并行度为4
semaphore.acquire();  //得到一个信号量  p操作  否则阻塞
//state-1
semaphore.release(); //释放一个信号量 v操作     否则阻塞
//state+1
作用:多个共享资源的互斥使用;并发限流,控制最大的线程数
Demo:
public class SemaphoreTest01 {
    public static void main(String[] args) throws InterruptedException {
        Semaphore mutex = new Semaphore(1); //互斥锁
        Semaphore notFull = new Semaphore(4);//不满的数量
        Semaphore notEmpty = new Semaphore(0);//不空的数量
        Consumen consumen = new Consumen();
        Produce produce = new Produce();
        //生产者
        while (true){
            new Thread( () ->{
                try {
                    notFull.acquire();
                    mutex.acquire();
                    consumen.cons();
                    System.out.println("已经生产了一个产品");                 System.out.println("-----------------------------------");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {                 System.out.println(notEmpty.availablePermits());                System.out.println(notFull.availablePermits());
                    mutex.release();
                    notEmpty.release();
                }
            }).start();
            new Thread( () ->{
                try {
                    notEmpty.acquire();
                    mutex.acquire();
                    produce.prod();
                    System.out.println("已经消耗了一个产品");                System.out.println("-----------------------------------");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    mutex.release();
                    notFull.release();
                }
            }).start();
        }
    }
}
class Consumen{
    public void cons(){
        System.out.println("生产者生产了一个产品");
    }
}
class Produce{
    public void prod(){
        System.out.println("消费者消费了一个产品");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

源码分析

acquire()

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
    	//Semaphore内部定义了两个类Sync和NonfairSync,tryAcquireShared在两个类内部都有实现
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

protected int tryAcquireShared(int acquires) {
            for (;;) {
                //公平锁:如果有人排队的话,返回-1,需要进行排队,不会直接获取 	
                if (hasQueuedPredecessors())
                    return -1;
                //获得当前的State
                int available = getState();
                //减去自己要获得的信号量的个数
                int remaining = available - acquires;
                //remarting<0也就是初始化的信号量已经用完,需要排队
                if (remaining < 0 ||
                    //通过CAS写回最新的State
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }


AbstractQueuedSynchronizer
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    	//构建一个节点,插入到CLH队列中
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                //获得插入节点的前一个节点,判断是不是头部节点
                final Node p = node.predecessor();
                if (p == head) {
                    //再次获取信号量
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //获取到一个信号量,会进行广播,因为是一种共享的模式,可能在当前节点tryAcquireShared的时候,已经有其他的节点添加到了这个节点后面,所以需要广播
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    }
                }
                //当线程唤醒后就会走这个方法,因为之前的线程在这里被阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }


private void setHeadAndPropagate(Node node, int propagate) {
    	//h指向之前的head节点
        Node h = head; 
    	//设置头节点为当前head节点
        setHead(node);
    	//propagate大于0
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            //得到node的下一个节点
            Node s = node.next;
            //s为shard
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }



private void doReleaseShared() {
         */
        for (;;) {
            //h指向最新的head
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //waitStatuw为-1
                if (ws == Node.SIGNAL) {
                    //改为0
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;            
                    // 唤醒后续节点
                    unparkSuccessor(h);
                }
                //将最新头节点的waitStatue设为-3
                else if (ws == 0 &&
                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            //如果头节点没有发生变化,则退出循环
            //此时可能有线程t1进来,t2线程将head修改,在这里判断
            if (h == head)                   
                break;
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102

release()

public final boolean releaseShared(int arg) {
    	//返回true则已经写成功
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }


protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                //将当前的信号量加回去
                if (compareAndSetState(current, next))
                    return true;
            }
        }


private void doReleaseShared() {
         */
        for (;;) {
            //h指向最新的head
            Node h = head;
            //判断队列里面有没有正在排队的
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //头节点的waitStatus为-1
                if (ws == Node.SIGNAL) {
                    //将waitStatue设为0
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //唤醒后面的节点
                    unparkSuccessor(h);
                }
                //将最新头节点的waitStatue设为-3
                else if (ws == 0 &&
                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            //如果头节点没有发生变化,则退出循环
            //此时可能有线程t1进来,t2线程将head修改,在这里判断
            if (h == head)                   
                break;
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

Countdownlatch

基本概念

​ 允许一个线程或者多个线程等待直到在其他线程中执行的一组操作完成的同步操作.里面Await方法等待计数器归零然后再向下执行(先让必须要完成的任务先执行)

  • countDownLatch.await(); //等待计数器归零,然后再向下执行
  • countDownLatch.countDown():每次有线程调用countDown数量减一,假如计数器变为0,await()就会被唤醒
  • 一个CountDownLatch初始化N可以用来做一个线程等待,直到N个线程完成某项操作,或某些动作已经完成N次

如何工作

  • 一般是每个线程countDown,然后await
  • 其实也可以反过来使用:每个线程await,然后在countDown:保证每个线程

CylicBarrier

  • CylicBarrier(加法计数器):允许一组线程全部等待彼此到达共同屏障点的同步辅助
    只有当new CyclicBarrier(7,A(线程))只有当调用cyclicBarrier.await()的线程数目等于7时A线程才会执行(集齐七颗龙珠才能召唤神龙)
  • 用法和CountdownLatch反过来使用效果相同,但是CylicBarrier可以反复使用同一个CyclicBarrier

LockSupport

LockSupport是用来创建锁和其他同步类的基本线程阻塞原语.LockSupport中的park(()和unpark()的作用分别是阻塞线程和解除阻塞线程

3种让线程等待和唤醒的方法

使用object中的wait()方法让线程等待,使用object中的notify方法唤醒线程

  • wait和notify方法必须要在同步块或者方法里面且成对出现使用
  • 先wait后notify才ok,如果将notify放在wait方法前面,程序无法执行,无法唤醒

使用JUC中Condition的await()方法让线程等待,使用signal()方法唤醒线程

  • 线程先要获得并持有锁,必须在锁块(synchronized或lock中)
  • 必须要先等待后唤醒,线程才能够被唤醒
  • 用于创建锁和其他同步类的基本线程阻塞原语(线程等待唤醒机制(wait和notify)的改良加强版)

LockSuport类可以阻塞当前线程以及唤醒指定被阻塞的线程

  • LockSupport是用来创建锁和其他同步类的基本线程阻塞原语
  • LockSupport类使用了一种名为Permit(许可)的概念来做到阻塞和唤醒线程的功能,每个线程都有一个许可.
  • permit只有两个值1和0默认是0,可以把许可看成是一种(0,1)信号量,但与semaphore不同的是,许可的累加上限是1
  • park()/park(object blocker)
    • 阻塞当前线程/阻塞传入的具体的线程
  • unpark(Thread thread)
    • 唤醒处于阻塞等待状态的执行线程,需要传一个线程,所以需要有一个地方存放Thread
//调用LockSupport()时,permit默认是0,所以一开始调用park()方法,当前线程就会阻塞,直到别的线程将当前线程的permit设为1,park方法会被唤醒
public static void park() {
   U.park(false, 0L);
}

//调用unpark(thread)方法后,将thread线程的许可permit设置为1(注意多次调用unpark方法,不会累加,permit值还是1)会自动唤醒thread线程,即之前阻塞中的LockSupport.park()方法会立即返回
public static void unpark(Thread thread) {
        if (thread != null)
            U.unpark(thread);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

纯粹的是只想阻塞线程,并不想加锁

在这里插入图片描述

支持先唤醒后等待
在这里插入图片描述

重点说明

  • LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。
  • LockSupport是一个线程阻寨工具类,所有的方法都是静态方法,可以让线程在任意位置阻赛,阻塞之后也有对应的唤醒方法。归根结底,LockSupport调用的Unsafe中的native代码。
  • LockSupport提供park()和lunpark()方法实现阻塞线程和解除线程阻塞的过程
  • LockSupport和每个使用它的线程都有一个许可(permit)关联。permit相当于1,0的开关,默认是0,
  • 调用一次unpark就加1变成1,
  • 调用一次park会消费permit,也就是将1变成o,同时park立即返回。
  • 如再次调用park会变成阻塞(因为permit为零了会阻塞在这里,一直到permit变为1),这时调用unpark会把permit置为1。每个线程都有一个相关的permit, permit最多只有一个,重复调用unpark也不会积累凭证。

面试题

  • 为什么可以先唤醒线程后阻塞线程?
    • 因为unpark获得了一个凭证,之后再调用park方法,就可以名正言顺的凭证消费,故不会阻塞。
  • 为什么唤醒两次后阻塞两次,但最终结果还会阻塞线程?
    • 因为凭证的数量最多为1,连续调用两次unpark和调用一次unpark效果一样, 只会增加-一个凭证;
      而调用两次park却需要消费两个凭证,证不够,不能放行。

变成阻塞(因为permit为零了会阻塞在这里,一直到permit变为1),这时调用unpark会把permit置为1。每个线程都有一个相关的permit, permit最多只有一个,重复调用unpark也不会积累凭证。

面试题

  • 为什么可以先唤醒线程后阻塞线程?
    • 因为unpark获得了一个凭证,之后再调用park方法,就可以名正言顺的凭证消费,故不会阻塞。
  • 为什么唤醒两次后阻塞两次,但最终结果还会阻塞线程?
    • 因为凭证的数量最多为1,连续调用两次unpark和调用一次unpark效果一样, 只会增加-一个凭证;
      而调用两次park却需要消费两个凭证,证不够,不能放行。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/149023
推荐阅读
相关标签
  

闽ICP备14008679号