赞
踩
Java并发编程之美——第五章 Java并发包中并发List(CopyOnWriteArrayList)源码剖析
Java并发编程之美——第四章 Java并发包中原子操作类原理剖析
Java并发编程之美——第三章 Java并发包中ThreadLocalRandom类原理剖析
JDK 中的 rt.jar 包里面的是个 LockSupport 是个工具类,它的主要作用是挂起和唤醒线程,该工具类是创建锁和其他同步类的基础。
LockSupport 类与每个使用它的线程都会关联一个许可证,在默认情况下调用 LockSupport 类的方法的线程是不持有许可证的。LockSupport 是使用 Unsafe 类实现的,下面介绍 LockSupport 中的几个主要函数。
如果调用park方法的线程已经拿到了与LockSupport关联的许可证,则调用LockSupport.park()时会马上返回,否则调用线程会被禁止参与线程的调度,也就是会被阻塞挂起。
如下代码直接在main函数里面调用park方法,最终只会输出begin park!,然后当前线程被挂起,这是因为在默认情况下调用线程是不持有许可证的。
public static void main(String[] args) {
System.out.println("begin park! ");
LockSupport.park();
System.out.println("end park! ");
}
在其他线程调用unpark(Thread thread)方法并且将当前线程作为参数时,调用park方法而被阻塞的线程会返回。另外,如果其他线程调用了阻塞线程的interrupt()方法,设置了中断标志或者被虚假唤醒,则阻塞线程也会返回。所以调用park方法时最好也使用循环条件判断方式。
需要注意的是,因调用park()方法而被阻塞的线程被其他线程中断而返回时并不会抛出InterruptedException异常。
当一个线程调用unpark时,如果参数thread线程没有持有thread与LockSupport类关联的许可证,则让thread线程持有。如果thread之前因调用park()而被挂起,则调用unpark后,该线程会被唤醒。如果thread之前没有调用park,则调用unpark方法后,再调用park方法,则会立即返回。修改代码如下。
public static void main(String[] args) {
System.out.println("begin park! ");
LockSupport.unpark(Thread.currentThread());
LockSupport.park();
System.out.println("end park! ");
/*
输出结果:
begin park!
end park!
*/
}
下面再来看一个例子以加深对 park 和 unpark 的理解
public static void main(String[] args) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("child thread begin park!");
LockSupport.park();
System.out.println("child thread unpark!");
}
});
thread.start();
Thread.sleep(1000);
System.out.println("main thread begin unpark!");
LockSupport.unpark(thread);
/*
输出结果
child thread begin park!
main thread begin unpark!
child thread unpark!
*/
}
上边代码执行过程如下:
park 方法返回时不会告诉你因何种原因返回,所以调用者需要根据之前调用 park 方法的原因,再次检查条件是否满足,如果不满足还需再次调用 park 方法。
例如,根据调用前后中断状态的对比就可以判断是不是因为被中断才返回的。
为了说明调用 park 方法后的线程被中断后会返回,我们修改上面的例子代码,删除 LockSupport.unpark(thread),然后添加 thread.interrupt(),具体代码如下。
public static void main(String[] args) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("child thread begin park!");
while (!Thread.currentThread().isInterrupted()){
LockSupport.park();
}
System.out.println("child thread unpark!");
}
});
thread.start();
Thread.sleep(1000);
System.out.println("main thread begin unpark!");
thread.interrupt();
/*
输出结果
child thread begin park!
main thread begin unpark!
child thread unpark!
*/
}
在如上代码中,只有中断子线程,子线程才会运行结束,如果子线程不被中断,即使调用 unpark(thread) 方法子线程也不会结束。
和 park 方法类似,如果调用 park 方法的线程已经拿到了与 LockSupport 关联的许可证,则调用 LockSupport.parkNanos(Long nanos) 方法后会马上返回。该方法的不同在于,如果没有拿到许可证,则调用线程会被挂起 nanos 时间后修改为自动返回。
park 方法还支持带有 blocker 参数的方法 void park(Object blocker) 方法,当线程在没有持有许可证的情况下调用 park 方法而被阻塞挂起时,这个 blocker 对象会被记录到该线程内部。
使用诊断工具可以观察线程被阻塞的原因,诊断工具是通过调用 getBlocker(Thread) 方法来获取 blocker 对象的,所以 JDK 推荐我们使用带有 blocker 参数的 park 方法,并且 blocker 被设置为 this,这样当在打印线程堆栈排查问题时就能知道是哪个类被阻塞了。
例如下面的代码。
/**
* 描述
*
* @author Hireek
* @date 2021/12/31 07:51
*/
public class TestPark {
public void testPark() {
LockSupport.park();
}
public static void main(String[] args) {
TestPark testPark = new TestPark();
testPark.testPark();
}
}
运行代码后,使用 jps 查看运行进程号,然后通过 jstack 命令查看线程堆栈时可以啊看到如下输出结果。
修改代码 (1) 为 LockSupport.park(this) 后运行代码,则使用 jstack 命令输出结果为:
使用带 blocker 参数的 park 方法,线程堆栈可以提供更多有关阻塞对象的信息。
接下来看看内部实现
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
Thread 类里面有个变量 volatile Object parkBlocker,用来存放 park 方法传递的 blocker 对象,也就是把 blocker 变量存放到了调用 park 方法的线程的成员变量里面。
相比于 park(Object blocker) 方法多了个超时时间。
它的代码如下:
public static void parkUntil(Object blocker, long deadline) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(true, deadline);
setBlocker(t, null);
}
其中参数 deadline 的时间单位为 ms,改时间是从 1970 年到现在某一时间点的毫秒值。这个方法和 park(Object blocker, long nanos) 方法的区别是,后则会是从当前算等待 nanos 秒时间,而前者是指定一个时间点,比如需要等到 2019.11.11 日 11:11:11, 则把这个时间点转化为从 1970 年到这个时间点的总毫秒数。
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
/**
* 描述
*
* @author Hireek
* @date 2021/12/31 07:57
*/
public class FIFOMutex {
private final AtomicBoolean locked = new AtomicBoolean((false));
private final Queue<Thread> waiters = new ConcurrentLinkedDeque<>();
public void lock() {
boolean wasInterrupted = false;
Thread current = Thread.currentThread();
waiters.add(current);
while (waiters.peek() != current || !locked.compareAndSet(false, true)) {
LockSupport.park(this);
if (Thread.interrupted()) {
wasInterrupted = true;
}
}
waiters.remove();
if (wasInterrupted) {
current.interrupt();
}
}
public void unlock() {
locked.set(false);
LockSupport.unpark(waiters.peek());
}
}
这是一个先进先出的锁,也就是只有队列的首元素可以获取锁。在代码 (1) 处,如果当前线程不是队首或者当前锁已经被其他线程获取,则调用 park 方法挂起自己。
然后在代码 (2) 处判断,如果 park 方法是因为被中断而返回,则忽略中断,并且重置中断标志,做个标记,然后再次判断当前线程是不是队首元素或者当前锁是否已经被其他线程获取,如果是则继续调用 park 方法挂起自己。
然后再代码 (3) 中,判断标记,如果标记为true则中断该线程,这个怎么理解呢?其实就是其他线程中断了该线程,虽然我对中断信号不感兴趣,忽略它,但是不代表其他线程对该标志不感兴趣,所以要恢复下。
上述都是原文,为了加深下使用和理解。下面我们探讨下cpp源码
jdk12->os_posix.cpp文件
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) {
HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time);
EventThreadPark event;
JavaThreadParkedState jtps(thread, time != 0);
thread->parker()->park(isAbsolute != 0, time);
if (event.should_commit()) {
const oop obj = thread->current_park_blocker();
if (time == 0) {
post_thread_park_event(&event, obj, min_jlong, min_jlong);
} else {
if (isAbsolute != 0) {
post_thread_park_event(&event, obj, min_jlong, time);
} else {
post_thread_park_event(&event, obj, time, min_jlong);
}
}
}
HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker());
} UNSAFE_END
// 每个线程都有一个Parker类型的_parker变量。最终park()在Parker实现
void Parker::park(bool isAbsolute, jlong time) {
// Optional fast-path check:
// Return immediately if a permit is available.
// We depend on Atomic::xchg() having full barrier semantics
// since we are doing a lock-free update to _counter.
if (Atomic::xchg(0, &_counter) > 0) return;
Thread* thread = Thread::current();
assert(thread->is_Java_thread(), "Must be JavaThread");
JavaThread *jt = (JavaThread *)thread;
// Optional optimization -- avoid state transitions if there's
// an interrupt pending.
if (Thread::is_interrupted(thread, false)) {
return;
}
// Next, demultiplex/decode time arguments
struct timespec absTime;
if (time < 0 || (isAbsolute && time == 0)) { // don't wait at all
return;
}
if (time > 0) {
to_abstime(&absTime, time, isAbsolute);
}
// Enter safepoint region
// Beware of deadlocks such as 6317397.
// The per-thread Parker:: mutex is a classic leaf-lock.
// In particular a thread must never block on the Threads_lock while
// holding the Parker:: mutex. If safepoints are pending both the
// the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
ThreadBlockInVM tbivm(jt);
// Don't wait if cannot get lock since interference arises from
// unparking. Also re-check interrupt before trying wait.
if (Thread::is_interrupted(thread, false) ||
pthread_mutex_trylock(_mutex) != 0) { // 锁住信号量
return;
}
int status;
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "invariant");
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
return;
}
OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()
assert(_cur_index == -1, "invariant");
if (time == 0) {
_cur_index = REL_INDEX; // arbitrary choice when not timed
status = pthread_cond_wait(&_cond[_cur_index], _mutex); //释放信号量,并在条件变量上等待
assert_status(status == 0, status, "cond_timedwait");
}
else {
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
status = pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);
assert_status(status == 0 || status == ETIMEDOUT,
status, "cond_timedwait");
}
_cur_index = -1;
_counter = 0;
status = pthread_mutex_unlock(_mutex); // 释放信号量
assert_status(status == 0, status, "invariant");
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
// If externally suspended while waiting, re-suspend
if (jt->handle_special_suspend_equivalent_condition()) {
jt->java_suspend_self();
}
}
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread)) {
Parker* p = NULL;
if (jthread != NULL) {
ThreadsListHandle tlh;
JavaThread* thr = NULL;
oop java_thread = NULL;
(void) tlh.cv_internal_thread_to_JavaThread(jthread, &thr, &java_thread);
if (java_thread != NULL) {
// This is a valid oop.
jlong lp = java_lang_Thread::park_event(java_thread);
if (lp != 0) {
// This cast is OK even though the jlong might have been read
// non-atomically on 32bit systems, since there, one word will
// always be zero anyway and the value set is always the same
p = (Parker*)addr_from_java(lp);
} else {
// Not cached in the java.lang.Thread oop yet (could be an
// older version of library).
if (thr != NULL) {
// The JavaThread is alive.
p = thr->parker();
if (p != NULL) {
// Cache the Parker in the java.lang.Thread oop for next time.
java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
}
}
}
}
} // ThreadsListHandle is destroyed here.
if (p != NULL) {
HOTSPOT_THREAD_UNPARK((uintptr_t) p);
p->unpark();
}
} UNSAFE_END
解析可以参考:https://blog.csdn.net/weixin_43767015/article/details/107207643
/**
* Provides a framework for implementing blocking locks and related
* synchronizers (semaphores, events, etc) that rely on
* first-in-first-out (FIFO) wait queues. This class is designed to
* be a useful basis for most kinds of synchronizers that rely on a
* single atomic {@code int} value to represent state.
* This class provides an efficient and scalable basis for synchronization in part by specializing its range of * use to synchronizers that can rely on int state, acquire, and release parameters, and an internal FIFO wait * queue.
/
为了实现一个同步器的框架(模板,基石)。
一个同步器框架。一个抽象同步器模板。依赖CLH队列和一个原子变量(volatile修饰)。
如何实现呢?其实借鉴的是jvm底层加锁的一套机制。
有同步队列(双向链表->内部类Node实现),ConditionObject(维护一个单向链表->等待队列),一个原子变量,LockSupport,CAS…
具体变量参照下面类图
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
/** 注意是下一个线程需要阻塞 */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
}
ConditionObject是条件变量,每个条件变量对应一个条件队列(单向链表队列),其用来存放调用条件变量的await方法后被阻塞的线程。
类似wait()和notify()配合synchronized实现线程同步(通信)。区别是ConditionObject(条件变量)可以有多个,而synchronized只有一个共享变量。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
// Internal methods
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// ...
}
/**
* The synchronization state.
*/
private volatile int state;
在AQS中维持了一个单一的状态信息state,可以通过getState、setState、compareAndSetState函数修改其值。对于ReentrantLock的实现来说,state可以用来表示当前线程获取锁的可重入次数;对于读写锁ReentrantReadWriteLock来说,state的高16位表示读状态,也就是获取该读锁的次数,低16位表示获取到写锁的线程的可重入次数;对于semaphore来说,state用来表示当前可用信号的个数:对于CountDownlatch来说,state用来表示计数器当前的值。
To use this class as the basis of a synchronizer, redefine the following methods, as applicable, by inspecting and/or modifying the synchronization state using getState, setState and/or compareAndSetState:
下面讲讲独占和共享两种模式(不对中断响应)
当一个线程调用 acquire(int arg)方法获取独占资源时,会首先使用 tryAcquire方 法尝试获取资源, 具体是设置状态变量 state 的值,成功则直接返回,失败则将当前线程 封装为类型为 Node.EXCLUSIVE 的 Node 节点后插入到 AQS 阻塞 队列的尾部,并调用 LockSupport.park(this) 方法挂起自己 。
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
@ReservedStackAccess
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
当一个线程调用 release(int arg)方法时会尝试使用 tryRelease操作释放资源,这里 是设置状态变 量 state 的值,然后调用 LockSupport.unpark(thread)方法激活 AQS 队列 里 面 被阻塞的一个线程(thread)。 被激活的线程则使用 tryAcquire尝试,看当前状态变量 state 的值是否能满足自己的需要,满足则该线程被激活,然后继续 向下运行,否则还是会被放 入 AQS 队列并被挂起。
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
@ReservedStackAccess
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
当线程调用 acquireShared(intarg)获取共享资源时,会首先使用 trγAcq山reShared 尝试获取资源 , 具体是设置状态变量 state 的 值,成功则 直接返 回,失败则将当前线 程 封 装为类型为 Node.SHARED 的 Node 节 点后插入 到 AQS 阻 塞 队列的尾部,并使用 LockSupport.park(this) 方法挂起自己。
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
当一个线程调用 releaseShared(int a电)时会尝试使用 tryReleaseShared 操作释放资 源,这里是设置状态变量 state 的值,然后使用 LockSupport.unpark (thread)激活 AQS 队 列里面被阻塞的一个线程 (thread)。被激活的线程则使用 tryReleaseShared查看当前状态变 量 state 的值是否能 满足自 己的 需要,满足 则 该线程被撤活,然后继续向下运行,否则还 是会被放入 AQS 队列并被挂起 。
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
@ReservedStackAccess
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
下面就来谈谈基于AQS具体的锁实现ReentrantLock
如果对AQS十分了解的话,就能快速了解ReentrantLock的思想。AQS独占模式一种具体实现。
在看文档时,Serialization of this class behaves in the same way as built-in locks: a deserialized lock is in the unlocked state, regardless of its state when serialized.锁和序列化的关系,为什么ReentrantLock要实现序列化接口?还有AQS仅仅在https://stackoverflow.com/questions/17979009/why-locks-are-serializable-in-java找到了一点答案。之后在研究…
/** Synchronizer providing all implementation mechanics */
private final Sync sync; //
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
// 默认nonfair
}
默认非公平锁。抢夺策略,新的线程可以直接抢占,可能不需要阻塞、排队、唤醒,也就减少了上下文的切换。吞吐量更高。
/**
* nonfair
* Performs lock. Try immediate barge, backing up to normal 立即进行cas抢占锁,否则正常流程
* acquire on failure.
*/
@ReservedStackAccess // This lock supports a maximum of 2147483647 recursive locks by the same thread. 增加额外堆栈空间,但也有可能延迟抛出StackOverflowError
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
/**
* fair 正常流程
*/
final void lock() {
acquire(1);
}
/**
* nonfair
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) { // 直接尝试cas
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 当前线程和当前已获得锁的线程是同一个 可重入
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && // no waiters or is first才会尝试cas
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
解决线程安全问题使用ReentrantLock就可以,但是ReentrantLock是独占锁,某时只有一个线程可以获取该锁,而实际中会有写少读多的场景,显然ReentrantLock满足不了这个需求,所以ReentrantReadWriteLock应运而生。ReentrantReadWriteLock采用读写分离的策略,允许多个线程可以同时获取读锁**。AQS的独占和共享模式结合使用**。支持锁降级(写锁降级到读锁,不支持升级)。
之前讲AQS也提过。用state的高16位表示读状态,也就是获取到读锁的次数;使用低16位表示获取到写锁的线程的可重入次数。
/*
* Read vs write count extraction constants and functions.
* Lock state is logically divided into two unsigned shorts:
* The lower one representing the exclusive (writer) lock hold count,
* and the upper the shared (reader) hold count.
*/
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
在这两个方法的区分是否公平
/**
* Returns true if the current thread, when trying to acquire
* the read lock, and otherwise eligible to do so, should block
* because of policy for overtaking other waiting threads.
*/
abstract boolean readerShouldBlock();
/**
* Returns true if the current thread, when trying to acquire
* the write lock, and otherwise eligible to do so, should block
* because of policy for overtaking other waiting threads.
*/
abstract boolean writerShouldBlock();
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c); // 获取写锁可重入次数
if (c != 0) { // 有锁
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread()) // 读锁或当前线程是不是该写锁的持有者
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT) // 判断边界
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() || // 非公平 writerShouldBlock false,公平 判断当前线程节点是否有前驱节点
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 重入次数记录
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 重试
return fullTryAcquireShared(current);
}
未完待遇…
2022年愿你成为光!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。