当前位置:   article > 正文

AQS 组件_aqs组件

aqs组件


AQS 是一个构造同步器的框架,用来构造同步器,如 ReentrantLock、倒计时器、以及自定义同步器

使用方法(模板模式)

继承 AbstractQueuedSynchronizer 并重写指定的方法,定义获取与释放state的流程

有关等待队列的步骤不能也不用重写(因为被final定义,而且进队列出队列的方式以及被AQS写好了)

源码分析以及原理

使用volatile关键字来修饰int类型的状态state,这个值实现了可重入锁与信号量等功能

    private volatile int state;
  • 1

如果某个线程可以修改state,标记该线程为可用线程,如果不可获取,则加入等待队列,使用CAS实现对state的修改(导入了Unsafe类),获取资源的流程则是由用户继承后写入的

	protected final int getState() {
	        return state;
	}
	protected final void setState(int newState) {
	        state = newState;
	}
    protected final boolean compareAndSetState(int expect, int update) {
        return U.compareAndSetInt(this, STATE, expect, update);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

有一个FIFO的队列CLH来完成获取资源线程的排队工作,这个队列中装着内部类Node,AQS有一套完整的线程等待与唤醒机制

    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;
        ...
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

同样,还有一些这个节点进行操作的对象

    private transient volatile Node head;
    private transient volatile Node tail;
    ...
  • 1
  • 2
  • 3

AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中

公平锁和非公平锁

ReentrantLock 默认采用非公平锁,因为考虑获得更好的性能,通过 boolean 来决定是否用公平锁(传入true则使用公平锁)

对公平锁而言,首先判断state是否为0,如果为0,直接判断CLH队列中有没有在等待的线程,如果有,它会在后面排队;如果没有则CAS拿锁;如果state不为0,后面排队

	// 这是公平锁的acquire方法
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    // 公平锁特有的tryAcquire方法
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 和非公平锁相比,这里多了一个判断:是否有线程在等待
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 这里和非公平锁一样,都是去排队
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

对非公平锁而言,在调用lock函数的时候它会直接CAS试试能不能拿锁,然后进入和公平锁差不多的acquire方法,如果发现锁这个时候被释放了(state==0),非公平锁会直接CAS抢锁,其他的步骤与公平锁相似

static final class NonfairSync extends Sync {
    final void lock() {
        // 和公平锁相比,这里会直接先进行一次CAS,成功就返回了
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    // AbstractQueuedSynchronizer.acquire(int arg)
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    // 非公平锁的tryAcquire,主要执行nonfairTryAcquire方法
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 这里没有对阻塞队列进行判断,直接尝试去抢锁
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果抢不到,那就算了
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

对资源共享的方式

独占

只有一个线程可以获取状态,如 ReentrantLock

在独占状态下可以实现两种模式,公平锁(排队获取锁)与非公平锁(抢锁)

ReentrantLock

锁的使用非常简单,但是需要注意一点,lock 方法下必须使用 try 环绕

// 方式一:Oracle 官方推荐的写法
private val look = ReentrantLock()
fun printNumber() {
    look.lock()
    try {
        // TODO
    } finally {
        look.unlock()
    }
}

// 方式二:错误的写法
private val look = ReentrantLock()
fun printNumber() {
    try {
        look.lock()
    } finally {
        look.unlock()
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

方法一是 Oracle 推荐的方式, 并且在 「阿里巴巴JAVA开发手册」 明确规定了不建议使用 方式二, 即不建议将 lock.lock() 写在 try…finally 代码块内部。这么做是为了避免线程还未加锁就抛出异常,解锁时对没有没有被上锁的对象解锁,此时会 unlock 方法会抛出异常,覆盖之前的异常信息

有 tryLock 尝试加锁操作,如果失败了,则会立即返回 false。lockInterruptibly 打断其他线程操作

private val look = ReentrantLock()
fun printNumber() {
    val isLocked = look.tryLock()
    if (isLocked) {
        try {
            // TODO
        } finally {
            look.unlock()
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

在构造方法中加入 true 定义此锁为公平锁

ReadWriteLock

读写锁

和数据库的读写锁定义一样

        ReadWriteLock lock = new ReentrantReadWriteLock();
        Lock lock1 = lock.readLock();
        Lock lock2 = lock.writeLock();
  • 1
  • 2
  • 3

共享

多个资源都可以获得状态,如信号量、倒计时器

CountDownLatch(减少计数)

计算减少锁,中文翻译为倒计时器,基于AQS

作用是调用 await 方法让一个或者多个线程阻塞,直至一些线程调用 countDown 方法将减少计数内部的 state 减少为0,被阻塞的方法才会继续执行

以下是一些常用方法

CountDownLatch(int count); //构造方法,创建一个值为count 的计数器。await();//阻塞当前线程,将当前线程加入阻塞队列。await(long timeout, TimeUnit unit);//在timeout的时间之内阻塞当前线程,时间一过则当前线程可以执行,countDown();//对计数器进行递减1操作,当计数器递减至0时,当前线程会去唤醒阻塞队列里的所有线程。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

比如如果多个线程执行完毕之后,才可以打印日志,此时可以使用这个类,注意,这个倒计时器只能使用一次。以下是一些使用示例

        CountDownLatch latch = new CountDownLatch(priceKeyList.size());
        List<String> hotel = priceKeyList.stream().map(t -> {
        			try {
                        return intToList.apply(hotelPriceQueryParam);
                    } finally {
                        latch.countDown();
                    }
                })
        ).collect(Collectors.toList());

        try {
            latch.await(timeout, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
        	e.sout;
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

CyclicBarrier(循环栅栏)

它的作用是调用await方法让线程等待并且将栅栏中的state加1,直到屏障满了才会让它们继续执行并且调用栅栏中的方法,和人满发车一个道理,以下是它的使用方法:

//循环屏障的定义
CyclicBarrier cyclicBarrier = new CyclicBarrier(20, () -> {System.out.println("ready");});
//在线程中使用这个方法让线程等待
cyclicBarrier.await();
  • 1
  • 2
  • 3
  • 4

它与CountDownLatch的最大区别在与它是让多个线程同时等待,而CountDownLatch是让一个线程等待多个线程完成

Semphore(信号量)

信号量在Linux中也是一个比较重要的进程间通信方式

它定义最多有几个线程同时执行,只有抢到state的线程才会运行,抢到state的线程可能有多个,其他的线程会在CLH队列中等待,如果其中某一个线程运行完毕调用release方法,信号量会自动唤醒等待队列中的线程

信号量最大的作用就是限制一定数目的线程同时访问某个资源,因此,在某些业务资源需要被更改的情况需要特别注意

//定义最多有两个线程同时运行
Semaphore semaphore = new Semaphore(2);
//得到运行许可
semaphore.acquire();
//释放运行许可
semaphore.release();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

除了以上这些之外,还有BlockingQueue族(用来解决生产者消费者问题)等一些其他API,都在JUC包中

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号