赞
踩
上一篇:Java并发编程(7) —— 锁的分类概述
在上一篇中我们提到并发包中的ReentrantLock类是一种可重入独占锁,其锁机制是基于AQS实现的。实际上,并发包java.util.concurrent.locks中的锁都是基于AQS类和Lock接口实现的。
AbstractQueuedSynchronizer抽象同步队列简称AQS,它是实现锁和同步器的基础组件,并发包中锁的底层就是使用AQS实现的。AQS 为构建锁和同步器提供了一些通用功能的实现,因此使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue等等皆是基于 AQS 的。
AQS的类图结构如下:
在这些模板方法中AQS实现了锁资源获取与释放后同步队列的调度维护,具体的资源获取与释放过程则由用户继承AQS时通过钩子方法自行实现(模板方法设计模式)
acquire(int arg):独占式获取资源模板。内部调用了钩子方法tryAcquire()获取资源,若其返回true则直接放行,若其返回false则构造Node节点(waitStatus=0)将其加入同步队列,然后调用acquireQueued将线程阻塞等待前驱节点来唤醒竞争锁。
release(int arg):独占式释放资源模板。内部调用了钩子tryRelease(),若其返回true则唤醒同步队列head的后继节点中下一个需要唤醒的线程来竞争锁。
acquireShared(int arg):共享式获取资源模板。内部调用了tryAcquireShared()
releaseShared(int arg):共享式释放资源模板。内部调用了tryReleaseShared()
其它
在模板方法中会调用这些钩子方法,AQS中这些方法默认直接抛出不支持异常,继承AQS实现同步器时需根据需要选择实现这些方法
继承AQS并重写所需的钩子方法。如下为实现了独占式可重入锁的自定义同步器:
public class MyLock extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire(int arg) { if (getExclusiveOwnerThread() == Thread.currentThread()) { //若尝试获取锁的线程已持有锁,则直接返回true,以实现可重入性 return true; } //定义锁空闲状态state为0,尝试用CAS操作将其修改为1。多个线程同时竞争时只有一个线程能够修改成功,其余进入同步队列阻塞等待,实现独占性 if (compareAndSetState(0, 1)) { //若修改成功则将exclusiveOwnerThread设置为当前线程 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int arg) { if (getExclusiveOwnerThread() != Thread.currentThread()) //若当前线程不持有锁而调用了此方法,则抛出非法监视器状态异常 throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0);//将锁标志置为空闲 return true; } public void lock() {//独占式申请锁资源 acquire(1); } public void unLock() {//独占式释放锁资源 release(1); } }
我们通过如下的一个多线程下num++的案例来测试上述同步器
class MyLockTest { private static int num = 0; public static void main(String[] args) throws InterruptedException { MyLock myLock = new MyLock(); for (int i = 1;i <= 10;i++) { new Thread(() -> { myLock.lock();//临界:只有获取到锁的线程才能继续往下执行 try { for(int j=0;j<1000;j++){ myLock.lock();//验证可重入锁,若不可重入则会在这里陷入死锁,阻塞自己等待自己释放锁 num++; } } catch (Exception ignore) { } finally { myLock.unLock();//在finally中释放锁,防止线程被异常终止时锁没有被释放 } }).start(); } Thread.sleep(3000);//等待上面10个子线程执行完毕后打印num值,预期值10000 System.out.println("num = " + num); } }
上述我们实现的自定义同步器中只能对资源进行加锁解锁,如需实现线程间的通知等待机制,还需要依赖AQS中的内部类ConditionObject。
ConditionObject实现了Condition接口(JUC.locks包下定义的一个接口),提供了一种类似Object类中notify()/wait()的监视器方法,用于与Lock配合实现通知/等待机制。
Obejct的监视器模型中,一个对象关联的监视器中拥有一个同步队列和一个等待队列,而AQS在维护一个同步队列的同时支持创建多个等待队列,一个等待队列对应一个ConditionObject对象
每个ConditionObject对象都维护了一个FIFO等待队列,节点类型与同步队列节点类型相同(静态内部类AQS.Node),实例变量firstWaiter和lastWaiter分别指向了等待队列的头结点和尾结点。
调用condition.await(),将会以当前线程构造节点从尾部加入等待队列并阻塞。新增节点只需将原有尾结点nextWaiter指向新节点,并且更新lastWaiter即可,调用await()方法的线程必定是获取了锁的线程(否则不能正常release),因此这个过程不需要CAS保证。
持有锁的线程,可以通过调用condition.await()方法,内部调用release()方法释放锁,然后进入condition的等待队列。await()源码如下:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); //1.构造Node节点加入等待队列 int savedState = fullyRelease(node);//2.内部调用release()释放锁并唤醒同步队列中的下一个待唤醒线程 int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); //3.阻塞当前线程 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //4.被唤醒后开始尝试获取锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
假设当前持有锁的线程为T:
此时线程T可以调用CO1.await()方法进入等待状态,同步器的状态发生变化:
通知机制在下一节剖析,但通过分析await()方法的源码可以看出:
持有锁的线程,可以通过调用condition.signal()/signalAll()方法,将指定condition等待队列中的节点移动到同步队列中,signal()方法处理的是等待队列中的首个节点,signalAll()处理的是等待队列中的全部节点。以signal()方法为例:
public final void signal() { if (!isHeldExclusively()) //1.调用isHeldExclusively()确定当前线程是持有锁的线程。因此同步器需重写此钩子方法 throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { //2.1首个节点出队,while循环是为了排除已取消的节点 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { //3.1更改node的状态为0(同时跳过已取消的节点) if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); //3.2 将节点添加到同步队列尾部(CAS) int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //3.3如果前驱节点取消了,需要主动唤醒(这种思想在cancleAcquire方法解析的博文中已经做了详细剖析) LockSupport.unpark(node.thread); return true; }
假设上一节中T1线程获取到锁后,调用了CO1.signal()方法:
可以看出,T1线程调用signal()方法之后,T3线程并没有从阻塞态退出(按signal()的字面意思理解只是发出一个信号,和notify()方法类似,当前线程并不会释放锁),仍依赖于同步队列的前驱节点(即T2线程)唤醒
总结一下:调用signal()的前提是当前线程获取了锁,调用signal()后会将等待队列中的首个节点移动到同步队列尾部,但并不会直接唤醒该节点的阻塞态,仍依赖于同步队列中的前驱节点唤醒
此节转载于:详解AQS对Condition接口的具体实现
通过上述分析我们发现如果要使同步器额外实现通知等待功能,只需要开放创建等待队列ConditionObject对象的方法并重写钩子方法isHeldExclusively()。因此我们在前面实现的同步器中加入如下代码即可实现功能拓展:
public class MyLock extends AbstractQueuedSynchronizer {
final ConditionObject newCondition() {//开放创建等待队列,实现等待/通知机制
return new ConditionObject();
}
@Override
protected boolean isHeldExclusively() {//判断当前线程是否已独占式持有锁
return getExclusiveOwnerThread() == Thread.currentThread();
}
......
}
在之前的文章(Java并发编程(3) —— 线程的生命周期和状态)中,我们基于Object的监视器方法wait()/notifyAll()实现了一个生产者-消费者队列,但是有个问题就是notifyAll()会唤醒所有生产者和消费者线程(因为一个对象关联的监视器中拥有一个同步队列和一个等待队列,生产者和消费者线程都在同一个等待队列中),这显然是不必要的且会造成性能浪费。而在AQS中可以有多个等待队列,因此在这里我们就可以使用上述基于AQS实现的具有等待通知功能的同步器来实现一个生产者-消费者队列,将生产者等待队列和消费者等待队列分离,这样在需要唤醒生产者/消费者时只需要在生产者/消费者等待队列中去唤醒。实现如下:
/** * 生产者消费者等待队列分离的生产者-消费者队列 */ class ProduceConsumeQueue<T> { private final LinkedList<T> queue = new LinkedList<>(); private final static int MAX_COUNT = 10;//最大库存 MyLock myLock = new MyLock(); Condition producerWaitCond = myLock.newCondition();//生产者等待队列 Condition consumerWaitCond = myLock.newCondition();//消费者等待队列 public void put(T resource) { myLock.lock(); try { while (queue.size() == MAX_COUNT) { //若库存已满,则将当前线程(生产者)加入生产者等待队列阻塞等待 System.out.println("生产者:队列已满,无法插入..."); producerWaitCond.await(); } //否则生产一个资源,并从消费者等待队列随机唤醒一个消费者线程到同步队列参与锁竞争 queue.addFirst(resource); System.out.println("生产者:插入"+resource + "! ! !"); consumerWaitCond.signal(); } catch (Exception e) { e.printStackTrace(); } finally { myLock.unLock(); } } public void take() { myLock.lock(); try { while (queue.size() == 0) { //若库存为空,则将当前线程(消费者)加入消费者等待队列阻塞等待 System.out.println("消费者:队列为空,无法取出..."); consumerWaitCond.await(); } //否则消费一个资源,并从生产者者等待队列随机唤醒一个生产者线程到同步队列参与锁竞争 queue.removeLast(); System.out.println("消费者:取出消息! !!"); producerWaitCond.signal(); } catch (Exception e) { e.printStackTrace(); } finally { myLock.unLock(); } } }
测试:
class PCQueueTest { public static void main(String[] args) { ProduceConsumeQueue<String> produceConsumeQueue = new ProduceConsumeQueue<>(); //生产者线程 可多个 for (int p = 1;p <= 3;p++) { new Thread(() -> { for (int i = 0;i < 50;i++) { produceConsumeQueue.put("消息" + Thread.currentThread().getName() + "-msg" + i);//生产 } }).start(); } //消费者线程 可多个 for (int c = 1;c <= 3;c++) { new Thread(() -> { for (int i = 0; i < 50; i++) { produceConsumeQueue.take();//消费 } }).start(); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。