赞
踩
AQS(AbstractQueuedSynchronizer)队列同步器,是JUC中非常重要的一个组件,基于它可以简单高效地构建一些通用的锁和同步器,如ReentrantLock、Semaphore等(本文学习内容基于JDK1.8),本文主要关注AQS的源码实现及基于AQS实现的一些常用的同步组件
网络中对AQS的学习已经不乏有非常优秀的总结和笔记,比如摘自以上一段摘自AQS原理学习笔记;我看过《Java并发编程的艺术》,也有做架构的朋友告诉过我AQS写的非常好;作者Doug Lea是JSR-166专家组成员之一,也是公认的并发编程大师,写出来的都值得学习;
总之就是非常牛批的意思,但整个JUC毕竟是这么大一个框架,从零开始的话,小白可能也和我一样不知道从何入手,其实总之想深入学都是要看的,我就挑着开始做自己的笔记。由于工作关系也不确定能定期更新,但也要尽自己努力吧
开头就从AbstractQueuedSynchronizer类文档的翻译开始,本人英语水平能力有限,有任何问题都恳请指正
旨在提供一个实现阻塞锁和相关依赖于FIFO队列的同步器(semaphores信号量, events事件 等等)的框架。这个类是作为大多数同步器的基础而设计的,方式是用一个原子的int值标识状态,状态就是指能够表示实现AQS的对象是正在被获取或者释放,其子类必须实现这些改变状态的protected方法;相对的,AQS类中的其他方法都是在实现队列或者阻塞的机制。 子类可以维护自己的属性,但如果想原子地修改同步状态,可以使用同步器提供的 getState
方法、setState
方法以及compareAndSetState
方法
AQS的子类应该被定义为非公开的内部类,作为在封闭类中实现同步属性。 AbstractQueuedSynchronizer
类中并没有实现任何的同步接口,反而定义了像acquireInterruptibly
这样的方法能被适当的用在Lock的实现或者相关的同步器里的public方法里。
该类支持一种默认的独占模式或是共享模式,或者两者同时支持。 当在独占模式里获取状态时,仅有一次尝试能获取而其他线程不会成功;共享模式里如果多线程尝试获取状态值会有可能成功(但不应该成功)。该类本身并不“理解”,除了字面本身的差异,当在共享模式里一个线程获取成功时,下一个等待线程(如果存在)还必须确定它是不是也能获取成功。 线程即便在不同的模式里等待,但也会共享使用相同的FIFO队列。 通常,子类的实现一般只使用一种模式,但是确实也能够两种模式一同使用,例如在ReadWriteLock
中就是这样。子类只使用一种模式时,就不需要去定义另一种模式的方法了。
该类定义了一个嵌套的成员内部类ConditionObject
,该类可以被用作Condition
接口的实现类来支持独占模式,独占模式中有一个isHeldExclusively
方法,可以得知当前线程是否排他得持有同步状态,release
方法配合着getState
得知当前同步值,可以完全使对象释放同步状态;acquire
方法在已得知已写入的状态值的时候,最终将使用AQS的对象还原到先前获取的状态。 AbstractQueuedSynchronizer
没有其他方法创建这样的condition
对象,如果真有了,约束将不会其作用,所以不要这么做。 ConditionObject
的行为当然完全取决于同步器本身实现的语义(即锁的语义)。
该类提供了对内部FIFO队列和condition对象的检查、监听、维护方法,这些方法可以根据需要被导出使用,来维护AbstractQueuedSynchronizer
同步器同步机制,(相同于同步器对同步状态的管理实现依赖于底层内部的队列和condition对象)
这个类的序列化仅存储原子Integer类来维护同步状态,所以在反序列化的时候,得到的对象内部的线程同步队列是空的。 典型的子类如果需要有序列化的能力,需要定义readObject
方法,目的是使反序列化的对象恢复到初始化的状态。
为了作为同步器的基础而去使用AQS这个类,需要重新定义下列方法;如果需要检查或修改同步状态值可使用的方法为 getState
方法、setState
方法以及compareAndSetState
方法。
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
isHeldExclusively
每一个方法默认都会抛出UnsupportedOperationException
异常,每一个方法实现都必须是内在线程安全的,一般来说也应该是比较短而且不阻塞的。定义这些方法也仅在使用这个类的类中使用,其他的所有的方法都被声明成final
。
你也可能会发现一些从AbstractOwnableSynchronizer
类里继承过来的方法用来跟踪拥有排他同步器的线程是挺方便的,我也鼓励你去使用它们,这些方法可以用来当做监听或诊断工具去协助开发者判断是哪些线程持有锁。
即便这个类是基于内部的FIFO队列构建,但它不会自动执行FIFO获取策略。 独占同步器的核心采取以下这样的形式
Acquire: while (!tryAcquire(arg)) { enqueue thread if it is not already queued; possibly block current thread; } Release: if (tryRelease(arg)) unblock the first queued thread;
(共享模式相似但会涉及级联信号)
因为在调用获取的过程中,检查校验是先于入列的,所以一个新的获取中的线程可能会插队到队列中已经入列或者阻塞的线程的前面。 如果你需要的话,可以定义tryAcquire
或者tryAcquireShared
通过内部调用检查的方法来排除冲突,因此提供一个公平的FIFO队列获取顺序。特别地,大多数公平的同步器可以定义tryAcquire
,当hasQueuedPredecessors
方法(一个专门设计来为公平同步器使用的方法,用来检查是否还有线程比当前线程等待的更久) 返回true
的时候返回false
。 其他的变化是可能发生的。
使用默认的冲突策略,吞吐量和可扩展性都是最高的,该策略也称为greedy策略,renouncement策略或者convoy-avoidance策略。 但此策略并不能保证公平或starvation-free,更早时候入列的线程被允许可以和新加入的线程重竞争,每一次和入列的新线程重竞争时都有同样的机会胜出。此外,虽然获取同步值不是通常意义上’自旋’的,但获取过程中可能会在阻塞之前多次调用tryAcquire
方法,这些方法都散布在其他的计算中。 当发生短暂的持有独占同步时,自旋带来的收益会最大化,当长时间时自旋也不是主要的责任承担者。 所以有需要的话,可以通过之前调用带有“快速路径”检查的获取同步值的方法来扩展,可能的方法是预先检查hasContended
方法或者是hasQueuedThreads
方法,以仅当同步器不可能被竞争时再自旋获取同步值
此类为实现同步提供了一个高效且易扩展的基础,部分是因为专门界定了同步器的使用范围,该同步器依赖于一个int
值state
,可以获取和释放;部分是因为内部维护了一个FIFO等待队列;如果这些还不足以满足需求,你能从更底层使用java.util.concurrent.atomic
原子包下的原子类、 自定义的队列java.util.Queue
以及LockSupport
类提供阻塞支持来构建自定义的同步器。
这里有一个不可重入的、相互的排它锁的类,使用数字0来表示未加锁状态,数字1表示加锁状态。 虽然一个不可重入锁不会严格需要记录当前持有锁的线程,这个类这么做了只是因为让它更容易去监控。 它还支持Condition,也暴露了一些instrumentation方法
import java.io.IOException; import java.io.ObjectInputStream; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; public class Mutex implements Lock, java.io.Serializable { // Our internal helper class private static class Sync extends AbstractQueuedSynchronizer { // Reports whether in locked state protected boolean isHeldExclusively() { return getState() == 1; } // Acquires the lock if state is zero public boolean tryAcquire(int acquires) { assert acquires == 1; // Otherwise unused if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // Releases the lock by setting state to zero protected boolean tryRelease(int releases) { assert releases == 1; // Otherwise unused if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); return true; } // Provides a Condition Condition newCondition() { return new ConditionObject(); } // Deserializes properly private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } } // The sync object does all the hard work. We just forward to it. private final Sync sync = new Sync(); public void lock() { sync.acquire(1); } public boolean tryLock() { return sync.tryAcquire(1); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } public boolean isLocked() { return sync.isHeldExclusively(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }
(笔者: 这个类虽然简单但也是基础,更是一次Lock和AQS的理解的敲门砖,建议多写几次,以后还会在后续的学习中用到)
这里是一个Latch类(门闩),除了该类仅需要单个signal
都类似于java.util.concurrent.CountDownLatch
CountDownLatch类。 因为一个latch类是非独占的,所以它使用了共享的acquire和release方法
import java.util.concurrent.locks.AbstractQueuedSynchronizer; class BooleanLatch { private static class Sync extends AbstractQueuedSynchronizer { boolean isSignalled() { return getState() != 0; } protected int tryAcquireShared(int ignore) { return isSignalled() ? 1 : -1; } protected boolean tryReleaseShared(int ignore) { setState(1); return true; } } private final Sync sync = new Sync(); public boolean isSignalled() { return sync.isSignalled(); } }
@since 1.5
@author Doug Lea
下一篇:AQS学习笔记(二)- AbstractQueuedSynchronizer概览
(后续还会陆续更新内部类和重要方法的文档)
该内部类就是用来定义等待队列的,非常重要
/** * Provides a framework for implementing blocking locks and related * synchronizers (semaphores, events, etc) that rely on * first-in-first-out (FIFO) wait queues. This class is designed to * be a useful basis for most kinds of synchronizers that rely on a * single atomic {@code int} value to represent state. Subclasses * must define the protected methods that change this state, and which * define what that state means in terms of this object being acquired * or released. Given these, the other methods in this class carry * out all queuing and blocking mechanics. Subclasses can maintain * other state fields, but only the atomically updated {@code int} * value manipulated using methods {@link #getState}, {@link * #setState} and {@link #compareAndSetState} is tracked with respect * to synchronization. * * <p>Subclasses should be defined as non-public internal helper * classes that are used to implement the synchronization properties * of their enclosing class. Class * {@code AbstractQueuedSynchronizer} does not implement any * synchronization interface. Instead it defines methods such as * {@link #acquireInterruptibly} that can be invoked as * appropriate by concrete locks and related synchronizers to * implement their public methods. * * <p>This class supports either or both a default <em>exclusive</em> * mode and a <em>shared</em> mode. When acquired in exclusive mode, * attempted acquires by other threads cannot succeed. Shared mode * acquires by multiple threads may (but need not) succeed. This class * does not "understand" these differences except in the * mechanical sense that when a shared mode acquire succeeds, the next * waiting thread (if one exists) must also determine whether it can * acquire as well. Threads waiting in the different modes share the * same FIFO queue. Usually, implementation subclasses support only * one of these modes, but both can come into play for example in a * {@link ReadWriteLock}. Subclasses that support only exclusive or * only shared modes need not define the methods supporting the unused mode. * * <p>This class defines a nested {@link ConditionObject} class that * can be used as a {@link Condition} implementation by subclasses * supporting exclusive mode for which method {@link * #isHeldExclusively} reports whether synchronization is exclusively * held with respect to the current thread, method {@link #release} * invoked with the current {@link #getState} value fully releases * this object, and {@link #acquire}, given this saved state value, * eventually restores this object to its previous acquired state. No * {@code AbstractQueuedSynchronizer} method otherwise creates such a * condition, so if this constraint cannot be met, do not use it. The * behavior of {@link ConditionObject} depends of course on the * semantics of its synchronizer implementation. * * <p>This class provides inspection, instrumentation, and monitoring * methods for the internal queue, as well as similar methods for * condition objects. These can be exported as desired into classes * using an {@code AbstractQueuedSynchronizer} for their * synchronization mechanics. * * <p>Serialization of this class stores only the underlying atomic * integer maintaining state, so deserialized objects have empty * thread queues. Typical subclasses requiring serializability will * define a {@code readObject} method that restores this to a known * initial state upon deserialization. * * <h3>Usage</h3> * * <p>To use this class as the basis of a synchronizer, redefine the * following methods, as applicable, by inspecting and/or modifying * the synchronization state using {@link #getState}, {@link * #setState} and/or {@link #compareAndSetState}: * * <ul> * <li> {@link #tryAcquire} * <li> {@link #tryRelease} * <li> {@link #tryAcquireShared} * <li> {@link #tryReleaseShared} * <li> {@link #isHeldExclusively} * </ul> * * Each of these methods by default throws {@link * UnsupportedOperationException}. Implementations of these methods * must be internally thread-safe, and should in general be short and * not block. Defining these methods is the <em>only</em> supported * means of using this class. All other methods are declared * {@code final} because they cannot be independently varied. * * <p>You may also find the inherited methods from {@link * AbstractOwnableSynchronizer} useful to keep track of the thread * owning an exclusive synchronizer. You are encouraged to use them * -- this enables monitoring and diagnostic tools to assist users in * determining which threads hold locks. * * <p>Even though this class is based on an internal FIFO queue, it * does not automatically enforce FIFO acquisition policies. The core * of exclusive synchronization takes the form: * * <pre> * Acquire: * while (!tryAcquire(arg)) { * <em>enqueue thread if it is not already queued</em>; * <em>possibly block current thread</em>; * } * * Release: * if (tryRelease(arg)) * <em>unblock the first queued thread</em>; * </pre> * * (Shared mode is similar but may involve cascading signals.) * * <p id="barging">Because checks in acquire are invoked before * enqueuing, a newly acquiring thread may <em>barge</em> ahead of * others that are blocked and queued. However, you can, if desired, * define {@code tryAcquire} and/or {@code tryAcquireShared} to * disable barging by internally invoking one or more of the inspection * methods, thereby providing a <em>fair</em> FIFO acquisition order. * In particular, most fair synchronizers can define {@code tryAcquire} * to return {@code false} if {@link #hasQueuedPredecessors} (a method * specifically designed to be used by fair synchronizers) returns * {@code true}. Other variations are possible. * * <p>Throughput and scalability are generally highest for the * default barging (also known as <em>greedy</em>, * <em>renouncement</em>, and <em>convoy-avoidance</em>) strategy. * While this is not guaranteed to be fair or starvation-free, earlier * queued threads are allowed to recontend before later queued * threads, and each recontention has an unbiased chance to succeed * against incoming threads. Also, while acquires do not * "spin" in the usual sense, they may perform multiple * invocations of {@code tryAcquire} interspersed with other * computations before blocking. This gives most of the benefits of * spins when exclusive synchronization is only briefly held, without * most of the liabilities when it isn't. If so desired, you can * augment this by preceding calls to acquire methods with * "fast-path" checks, possibly prechecking {@link #hasContended} * and/or {@link #hasQueuedThreads} to only do so if the synchronizer * is likely not to be contended. * * <p>This class provides an efficient and scalable basis for * synchronization in part by specializing its range of use to * synchronizers that can rely on {@code int} state, acquire, and * release parameters, and an internal FIFO wait queue. When this does * not suffice, you can build synchronizers from a lower level using * {@link java.util.concurrent.atomic atomic} classes, your own custom * {@link java.util.Queue} classes, and {@link LockSupport} blocking * support. * * <h3>Usage Examples</h3> * * <p>Here is a non-reentrant mutual exclusion lock class that uses * the value zero to represent the unlocked state, and one to * represent the locked state. While a non-reentrant lock * does not strictly require recording of the current owner * thread, this class does so anyway to make usage easier to monitor. * It also supports conditions and exposes * one of the instrumentation methods: * * <pre> {@code * class Mutex implements Lock, java.io.Serializable { * * // Our internal helper class * private static class Sync extends AbstractQueuedSynchronizer { * // Reports whether in locked state * protected boolean isHeldExclusively() { * return getState() == 1; * } * * // Acquires the lock if state is zero * public boolean tryAcquire(int acquires) { * assert acquires == 1; // Otherwise unused * if (compareAndSetState(0, 1)) { * setExclusiveOwnerThread(Thread.currentThread()); * return true; * } * return false; * } * * // Releases the lock by setting state to zero * protected boolean tryRelease(int releases) { * assert releases == 1; // Otherwise unused * if (getState() == 0) throw new IllegalMonitorStateException(); * setExclusiveOwnerThread(null); * setState(0); * return true; * } * * // Provides a Condition * Condition newCondition() { return new ConditionObject(); } * * // Deserializes properly * private void readObject(ObjectInputStream s) * throws IOException, ClassNotFoundException { * s.defaultReadObject(); * setState(0); // reset to unlocked state * } * } * * // The sync object does all the hard work. We just forward to it. * private final Sync sync = new Sync(); * * public void lock() { sync.acquire(1); } * public boolean tryLock() { return sync.tryAcquire(1); } * public void unlock() { sync.release(1); } * public Condition newCondition() { return sync.newCondition(); } * public boolean isLocked() { return sync.isHeldExclusively(); } * public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } * public void lockInterruptibly() throws InterruptedException { * sync.acquireInterruptibly(1); * } * public boolean tryLock(long timeout, TimeUnit unit) * throws InterruptedException { * return sync.tryAcquireNanos(1, unit.toNanos(timeout)); * } * }}</pre> * * <p>Here is a latch class that is like a * {@link java.util.concurrent.CountDownLatch CountDownLatch} * except that it only requires a single {@code signal} to * fire. Because a latch is non-exclusive, it uses the {@code shared} * acquire and release methods. * * <pre> {@code * class BooleanLatch { * * private static class Sync extends AbstractQueuedSynchronizer { * boolean isSignalled() { return getState() != 0; } * * protected int tryAcquireShared(int ignore) { * return isSignalled() ? 1 : -1; * } * * protected boolean tryReleaseShared(int ignore) { * setState(1); * return true; * } * } * * private final Sync sync = new Sync(); * public boolean isSignalled() { return sync.isSignalled(); } * public void signal() { sync.releaseShared(1); } * public void await() throws InterruptedException { * sync.acquireSharedInterruptibly(1); * } * }}</pre> * * @since 1.5 * @author Doug Lea */
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。