赞
踩
ReentrantLock主要利用CAS+AQS(AbstractQueuedSynchronizer)队列来实现。它支持公平锁和非公平锁,两者的实现类似。
CAS:Compare and Swap,比较并交换。CAS有3个操作数:内存值V、预期值A、要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。该操作是一个原子操作,被广泛的应用在Java的底层实现中。在Java中,CAS主要是由sun.misc.Unsafe这个类通过JNI调用CPU底层指令实现
公平锁:第一次加锁的时候,他不会去尝试加锁,会去判断队列中是否有线程在排队,如果下一个线程是自己,则尝试加锁。然后还不死心,会再看一下是否有拿锁的资格(前面那个人是否为head),如果有资格(前面那个人刚好是head),则继续拿锁,成功执行代码块。失败则park(排队)。
非公平锁:非公平锁会在lock的时候直接抢锁,如果加锁失败,则去看为什么加锁失败(是否被人持有了),他在判断的时候如果锁没有被人持有,则直接加锁,成功进入代码块。失败则进入队列,进入队列后如果前面那个人是head(可能在进入队列的时候,head刚好忙完了,或者CPU时间片轮询),则会再次尝试加锁,成功则执行同步代码块,失败则park(真正的排队)。
中断
Thread对象的native实现里有一个成员代表线程的中断状态,我们可以认为它是一个bool型的变量。初始为false。
final void lock() {
if (compareAndSetState(0, 1))//直接加锁
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); //加锁失败则再次判断是否有线程持有,有则直接加锁,否则进入队列再次tryAcquire,失败则park(排队)
}
public final void acquire(int arg) { if (!tryAcquire(arg) && //判断是否有拿锁资格,加锁成功则返回true,否则返回false acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //进入队列后,再判断自己是否有拿锁资格。 selfInterrupt(); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //获取锁的状态,锁没有被持有的state=0 if (c == 0) { if (compareAndSetState(0, acquires)) { //cas比较和交换,加锁 setExclusiveOwnerThread(current); //将锁持有线程设置成当前线程 return true; } } else if (current == getExclusiveOwnerThread()) { //可重入锁 int nextc = c + acquires; //state+1 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); //更新state return true; } return false; } //将node加入队列 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; //缓存tail节点 if (pred != null) { //如果tail节点不为null node.prev = pred; //将node(当前节点,需要加入队列的节点)节点的上一节点设置成原先的tail if (compareAndSetTail(pred, node)) {//如果原先tail节点为pred,则将tail节点设置成node pred.next = node;//将原先的tail节点的下一节点设置成更新后的tail节点(node) return node; } } //上一步没有将node放入队列,则再次将node节点放入队列,必要时会初始化队列 enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize 队列中tail节点为null if (compareAndSetHead(new Node())) //则初始化head节点,cas操作 tail = head; } else { //确保队列初始化过 node.prev = t; //设置node的上一节点为tail if (compareAndSetTail(t, node)) {//如果tail节点为t,则更新tail值为node t.next = node;//设置node的上一节点(t)的下一节点为node,可以理解成双向链表 return t; //返回node的前一节点 } } } } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //获取node的前一节点 final Node p = node.predecessor(); //如果前一节点刚好是head,则尝试加锁 if (p == head && tryAcquire(arg)) { setHead(node); //加锁成功后则设置head为当前node p.next = null; // help GC,解除原先head的引用 failed = false; return interrupted; } //加锁失败则真正的进入队列 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) // 如果抛出异常则取消锁的获取,进行出队(sync queue)操作 cancelAcquire(node); } } /** * 将node节点之前(包括当前node)取消状态的全部剔除 */ private void cancelAcquire(Node node) { if (node == null) return; /*剔除操作需要解绑node和thread关系*/ node.thread = null; /*获取node的前驱节点*/ Node pred = node.prev; /*大于0就是取消状态*/ while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; /*这里直接置为取消状态,是为了方便其他线程进行取消是的操作, * 也是为了方便跳跃该节点 */ node.waitStatus = Node.CANCELLED; /*如果node是队尾的tail,那么将队尾设置成node的前驱结点*/ if (node == tail && compareAndSetTail(node, pred)) { /*将队尾的pred节点的后继节点置空,这是一个队列的标准要求*/ compareAndSetNext(pred, predNext, null); } else { //如果是非队尾节点 int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { /*pred节点状态如果是有效节点且不是head,将pred的后继 * 节点指向node的后继节点。这里和C++指针指向是一个道理*/ Node next = node.next; if (next != null && next.waitStatus <= 0) /*node的后继节点是有效节点且不是取消状态,进行替换*/ compareAndSetNext(pred, predNext, next); } else { /* * 这里就是对上面提到的阻塞进行放行。里面 * 实际上是LockSupport.unpark进行放行的。 * 这个时候我们通过上面的if知道,这个时候在以下几种场景出现 * 1、pred==head * 2、pred是取消状态 * 3、pred.thread==null 即不是有效节点 * 以上这些情况都表示pred不是能进行唤醒的节点,我们 * 这里理解为不是标准节点。这个时候为了保证队列的活跃性, * 我们需要唤醒后继节点,实际上就是node的后继节点。 */ unparkSuccessor(node); } node.next = node; // help GC } } /** * 唤醒node节点 */ private void unparkSuccessor(Node node) { /*获取当前节点的状态*/ int ws = node.waitStatus; /*对状态进行判断*/ if (ws < 0) /*若果小于0,则进行强制纠偏为0*/ compareAndSetWaitStatus(node, ws, 0); /*获取当前节点的后继节点*/ Node s = node.next; /*判断*/ if (s == null || s.waitStatus > 0) { /*后继节点为有效节点且状态>0 , 这里即为CANCELLED状态, * 则将该节点在CLH中剔除,并进行断层连接*/ s = null; /*这里和向前去除取消状态的前驱节点一样,只不过这里是向后 *至于为什么是从后向前呢,是为了避免高并发带来的节点不一 * 致性。因为从node开始往后的话,很有可能后面会被其他 * 线程修改了。因为添加节点的往后添加的。所以从后往前的话这样能保证数据一致。但是这样就会导致其他线程添加的节点是无法访问到的。这一点和数据一致性比较还是前者比较重要。此次获取不到没关系,在获取锁的时候jdk使用的是for循环。会不停的检查队列中节点是否可以被唤醒的。这里我们理解是一个定时器。所以一次获取不到节点没关系。总有一次会被唤醒。 */ for (Node t = tail; t != null && t != node; t = t.prev) /*head节点状态应该是0,所以这里最后s就是head.所以后面释放* 的就是head的后继节点。*/ if (t.waitStatus <= 0) s = t; } if (s != null) /*这里对应的是parkAndCheckInterrupt中的 * LockSupport.lock(this)方法。unpark * 之后parkAndCheckInterrupt方法就会执行到Thread.interrupted * 并进行返回,这个时候回返回true*/ LockSupport.unpark(s.thread); }
总结:
第一个线程T1执行lock的时候,不会进入队列
第二个线程T2,假如T1正在执行,T2的上一节点是head,尝试拿锁,拿不到则进入真正的排队(park)
第三个线程T3,此时T2已在排队,T1正在执行,T3的上一节点不是head,直接排队(park)
final void lock() { acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && //判断是否有拿锁的资格 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //加入队列(这里仅仅只是加入队列,并不是真正的排队) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState();//获取锁的状态 if (c == 0) { if (!hasQueuedPredecessors() && //判断自己是否有拿锁的资格方式:前面没有人在排队,或则head的下一节点是我,或者head没有下一节点 compareAndSetState(0, acquires)) {//加锁 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { //可重入锁 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; //前面没有人在排队,或则head的下一节点是我,或者head没有下一节点则返回false return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } //这一块跟非公平锁逻辑一致 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } //这一块跟非公平锁逻辑一致 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 开始自旋,要么获取锁,要么中断 for (;;) { final Node p = node.predecessor(); // 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点) if (p == head && tryAcquire(arg)) {//有可能在进入队列的时候,排在你前面那个人已经处理好了,所以这里要再次tryAcquire setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { //加锁失败,取消xian if (failed) cancelAcquire(node); } } // 靠前驱节点判断当前线程是否应该被阻塞 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取头结点的节点状态 int ws = pred.waitStatus; // 说明头结点处于唤醒状态 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; // 通过枚举值我们知道waitStatus>0是取消状态 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { // 循环向前查找取消节点,把取消节点从队列中剔除 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 设置前任节点等待状态为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) {//尝试释放锁 Node h = head; //释放成功之后需要唤醒head的后续节点 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; //如果当前线程不是只有锁线程 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; //锁释放成功标志 setExclusiveOwnerThread(null); } setState(c); return free; }
总结:
同步队列调用tryAcquire可重写方法来判断是否已经获取竞争资源,如果没有获取,就将当前线程包装成节点入队列,然后再自旋获取资源.是否自旋取决于前置节点的waitStatus,如果前置节点的waitStatus(只是用于判断线程是否有效)的状态是signal,则代表,当前节点需要parking等待,parkding等待的目的是为了减少cpu空转,但会增加线程上下文切换,因为parking的原理是将用户态数据转为内核态. 后面unpark的操作则是将线程状态数据由内核态转为用户态. 等到前置节点release释放掉竞争状态后,后面的自旋判断就会竞争获取状态重复以上过程
public void lock() { sync.acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); //获取锁状态 int c = getState(); //写线程数量(即获取独占锁的重入数) int w = exclusiveCount(c); //说明已经有其他线程获取了读锁或写锁 if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) // 当前state不为0,此时:如果写锁状态为0说明读锁此时被占用返回false; // 如果写锁状态不为0且写锁没有被当前线程持有返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; //重入,判断同一线程获取写锁是否超过最大次数(65535),支持可重入 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire //更新状态 //此时当前线程已持有写锁,现在是重入,所以只需要修改锁的数量即可。 setState(c + acquires); return true; } //到这里说明此时c=0,读锁和写锁都没有被获取 //writerShouldBlock表示是否阻塞 if (writerShouldBlock() ||// 此函数对公平非公平进行了封装,返回false代表在当前公平模式判断下,接下来可以尝试获得锁 !compareAndSetState(c, c + acquires))// 如果CAS成功,则不会进入此分支 return false; //执行到这里,说明该函数开始检测到 没有任何锁,然后当前线程还获得到了写锁 setExclusiveOwnerThread(current); return true; }
其中exclusiveCount方法表示占有写锁的线程数量,源码如下:
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
说明:直接将状态state和(2^16 - 1)做与运算,其等效于将state模上2^16。写锁数量由state的低十六位表示。
从源代码可以看出,获取写锁的步骤如下:
(1)首先获取c、w。c表示当前锁状态;w表示写线程数量。然后判断同步状态state是否为0。如果state!=0,说明已经有其他线程获取了读锁或写锁,执行(2);否则执行(5)。
(2)如果锁状态不为零(c != 0),而写锁的状态为0(w = 0),说明读锁此时被其他线程占用,所以当前线程不能获取写锁,自然返回false。或者锁状态不为零,而写锁的状态也不为0,但是获取写锁的线程不是当前线程,则当前线程也不能获取写锁。
(3)判断当前线程获取写锁是否超过最大次数,若超过,抛异常,反之更新同步状态(此时当前线程已获取写锁,更新是线程安全的),返回true。
(4)如果state为0,此时读锁或写锁都没有被获取,判断是否需要阻塞(公平和非公平方式实现不同),在非公平策略下总是不会被阻塞,在公平策略下会进行判断(判断同步队列中是否有等待时间更长的线程,若存在,则需要被阻塞,否则,无需阻塞),如果不需要阻塞,则CAS更新同步状态,若CAS成功则返回true,失败则说明锁被别的线程抢去了,返回false。如果需要阻塞则也返回false。
(5)成功获取写锁后,将当前线程设置为占有写锁的线程,返回true。
方法流程图如下:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { //若锁的持有者不是当前线程,抛出异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //写锁的新线程数 int nextc = getState() - releases; //如果独占模式重入数为0了,说明独占模式被释放 boolean free = exclusiveCount(nextc) == 0; if (free) //若写锁的新线程数为0,则将锁的持有者设置为null setExclusiveOwnerThread(null); //设置写锁的新线程数 //不管独占模式是否被释放,更新独占重入数 setState(nextc); return free; }
写锁的释放过程还是相对而言比较简单的:首先查看当前线程是否为写锁的持有者,如果不是抛出异常。然后检查释放后写锁的线程数是否为0,如果为0则表示写锁空闲了,释放锁资源将锁的持有线程设置为null,否则释放仅仅只是一次重入锁而已,并不能将写锁的线程清空。
说明:此方法用于释放写锁资源,首先会判断该线程是否为独占线程,若不为独占线程,则抛出异常,否则,计算释放资源后的写锁的数量,若为0,表示成功释放,资源不将被占用,否则,表示资源还被占用。其方法流程图如下。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg);//开始排队自旋执行共享获取直到成功退出或异常 } private void doAcquireShared(int arg) { // 添加到节点 此处是共享节点 final Node node = addWaiter(Node.SHARED); // 根据是否拿到资源 判断是否需要取消 boolean failed = true; try { boolean interrupted = false; for (;;) { // 返回前一个节点 final Node p = node.predecessor(); if (p == head) { // 如果前驱节点是头结点就进行锁的抢夺 // 返回0和大于0也算成功,但是具体含义不同 int r = tryAcquireShared(arg); // 表示获取成功 if (r >= 0) { // 获取锁成功之后应该陆续唤醒其他节点 setHeadAndPropagate(node, r); // 释放头节点 GC 会回收 p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); //已经加过写锁并且不是当前线程加的 不允许获取state 返回-1表示获取读锁失败(当前线程可降级) if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 读锁数量 int r = sharedCount(c); // readerShouldBlock():读锁是否需要等待(公平锁原则) // r < MAX_COUNT:持有线程小于最大数(65535) // compareAndSetState(c, c + SHARED_UNIT):设置读取锁状态 // 读线程是否应该被阻塞(公平锁判断同步队列是否有排队 非公平锁并不是直接返回false,这里主要是为了防止写锁无限等待,假如head的下一节点是一个写锁,此时new Reader应该被阻塞)、并且小于最大值、并且CAS设置读锁加锁次数加1成功 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //r == 0,表示第一个读锁线程,第一个读锁firstRead是不会加入到readHolds中 //执行到这里说明 成功将读锁计数加1了,之后的逻辑都属于是善后操作 if (r == 0) { //设置第一个读线程 firstReader = current; // 读线程占用的资源数为1 firstReaderHoldCount = 1; } else if (firstReader == current) { // 当前线程为第一个读线程,表示第一个读锁线程重入 firstReaderHoldCount++; } else { //多个线程来申请读锁时会到这一步,默认会从cachedHoldCounter中拿 // cachedHoldCounter 代表的是最后一个获取读锁的线程的计数器。 // 读锁数量不为0并且不为当前线程 HoldCounter rh = cachedHoldCounter; //如果rh.tid == getThreadId(current),说明这个线程连续两次来拿读锁 //如果不等的话,则说明缓存失效了(缓存能提高性能,比在ThreadLocal里检索效率要更高),需要重新从ThreadLocal取出HoldCounter //顺便修改缓存 if (rh == null || rh.tid != getThreadId(current)) // 获取当前线程对应的计数器 cachedHoldCounter = rh = readHolds.get(); //如果cachedHoldCounter的线程id就是当前线程id,且count为0 //上一次拿读锁的是别的线程,这个线程是第一次来拿读锁 else if (rh.count == 0) //加入到readHolds中 readHolds.set(rh); //不管怎样,局部变量rh的count都要加1 rh.count++; } return 1; } //在readerShouldBlock()返回true时,或者CAS修改失败时走到这里 //在这个方法中会用自旋的方式一直获取读锁,中途写锁被其他线程持有会返回-1 return fullTryAcquireShared(current); } //说明:在tryAcquireShared函数中,如果下列三个条件不满足(读线程是否应该被阻塞、小于最大值、比较设置成功)则会进行fullTryAcquireShared函数中,它用来保证相关操作可以成功。其逻辑与tryAcquireShared逻辑类似,不再累赘。 final int fullTryAcquireShared(Thread current) { /* * This code is in part redundant with that in * tryAcquireShared but is simpler overall by not * complicating tryAcquireShared with interactions between * retries and lazily reading hold counts. */ //只要rh不为null,那么它一定指向当前线程的HoldCounter对象 HoldCounter rh = null; for (;;) { /*第一部分*/ int c = getState(); if (exclusiveCount(c) != 0) {//如果写锁被持有 if (getExclusiveOwnerThread() != current)//如果写锁不是当前线程持有 return -1; //说明写锁是当前线程持有。那么直接执行第二部分代码,尝试获得读锁。这说明只要当前线程持有了写锁,那么不管sync queue中有哪些节点,当前线程都可以继续获得读锁。 // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) {//如果写锁没有被持有,且当前线程排在其他线程后面 // Make sure we're not acquiring read lock reentrantly // 虽然readerShouldBlock返回了true,但如果当前线程已经获得了读锁,从语义上来说, // 当前线程是可以继续重入的,这也不属于插队的行为。 // 反之,如果当前线程没有持有着读锁,说明此时再去尝试获得读锁就真的是插队的行为了, // 所以,如果发现是这种情况,则直接返回-1,让当前线程去走阻塞等待的流程。 if (firstReader == current) {// 当前线程为第一个读线程 // assert firstReaderHoldCount > 0; // 进入这种情况,能保证当前线程当前是持有着读锁的,因为current肯定不为null, // 所以firstReader现在也不为null,它不为null,firstReaderHoldCount肯定也不为0 } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) {// 计数器为空或者计数器的tid不为当前正在运行的线程的tid rh = readHolds.get();//当前线程如果没有获得读锁,get到的肯定是0的count if (rh.count == 0)//没有持有读锁时,线程肯定没有ThreadLocal的HoldCounter对象 readHolds.remove();//当前线程没有获得读锁时,本来它的HoldCounter成员本来就应该为null,所以要remove } } //rh局部变量还保留着当前线程的HoldCounter成员的引用 if (rh.count == 0) return -1; } } /*第二部分*/ //执行到这里说明,当前线程接下来可以尝试获得读锁 if (sharedCount(c) == MAX_COUNT)// 读锁数量为最大值,抛出异常 throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) {// 比较并且设置成功 if (sharedCount(c) == 0) { firstReader = current; // 设置第一个读线程 firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } } 从整个逻辑来看,fullTryAcquireShared是利用自旋来不停尝试获得读锁,直到成功为止,这就是为什么称它为完全的尝试。不过函数退出不止会因为成功获得锁而退出(返回1),也会因为当前线程不符合继续获得锁的条件而退出(返回-1)。 我们把自旋的逻辑分为两个部分: 1.第一部分负责判断当前线程符不符合继续获得锁的条件,如果不符合则返回-1退出自旋;如果符合,则继续执行第二部分。 2.第二部分负责CAS修改同步器的状态,如果修改成功,则继续完成善后操作;如果修改失败,继续下一次循环。
其中sharedCount方法表示占有读锁的线程数量,源码如下:
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
说明:直接将state右移16位,就可以得到读锁的线程数量,因为state的高16位表示读锁,对应的第十六位表示写锁数量。
读锁获取锁的过程比写锁稍微复杂些,首先判断写锁是否为0并且当前线程不占有独占锁,直接返回;否则,判断读线程是否需要被阻塞并且读锁数量是否小于最大值并且比较设置状态成功,若当前没有读锁,则设置第一个读线程firstReader和firstReaderHoldCount;若当前线程线程为第一个读线程,则增加firstReaderHoldCount;否则,将设置当前线程对应的HoldCounter对象的值。流程图如下。
注意:更新成功后会在firstReaderHoldCount中或readHolds(ThreadLocal类型的)的本线程副本中记录当前线程重入数(23行至43行代码),这是为了实现jdk1.6中加入的getReadHoldCount()方法的,这个方法能获取当前线程重入共享锁的次数(state中记录的是多个线程的总重入次数),加入了这个方法让代码复杂了不少,但是其原理还是很简单的:如果当前只有一个线程的话,还不需要动用ThreadLocal,直接往firstReaderHoldCount这个成员变量里存重入数,当有第二个线程来的时候,就要动用ThreadLocal变量readHolds了,每个线程拥有自己的副本,用来保存自己的重入数。
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); // 进入这个分支,能保证firstReaderHoldCount > 0; if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1)//如果将从1变成0,那么只清空firstReader firstReader = null; else//如果当前大于1,那么减小firstReaderHoldCount firstReaderHoldCount--; } else { //获取当前线程的HoldCounter的老套路 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); //执行到这里,rh局部变量已经是当前线程的HoldCounter了 int count = rh.count; if (count <= 1) {//如果count为0,说明当前线程没有持有读锁中,HoldCounter是get()新生成的 readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } //执行到这里,说明当前线程持有读锁中,那么减小读锁计数1 --rh.count; } //此时读锁计数已成功减1,但同步状态却还没修改 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. //只有在读写锁都是干净的情况,才返回true return nextc == 0; } }
性能优化点:
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
来提供只有一个线程获取读锁的性能保障。
少部分情况下,不止一个线程获取读锁。那么就用readHolds来存储各个线程读锁的重入次数,这时候依然可以做一些优化,如果释放读锁的线程是刚刚获取读锁的线程。那么为了获取这个线程的重入次数。为了避免用thredlocal,可以设置一个缓存来存储最后一个获取读锁的线程。cachedHoldCounter就是用于缓存最后一个线程的重入次数。
2处if(rh.count == 0) readHolds.set(rh)。原因在于当最后一个获取读锁的线程进行tryRealseShared()的时候如果发现重入次数从1变成0了。那么会从readHolds移除这个HoldCounter。这时候就会存在cachedHoldCounter.count == 0 而该cachedHoldCounter却不存在readHolds中的情况。因此这时候需要set到readHolds中。
cahcedHoldCounter并不是volatile变量。他的用意就是让线程缓存在本地变量方便自己查询的。
在 JDK 1.8 引入 StampedLock,可以理解为对 ReentrantReadWriteLock 在某些方面的增强,在原先读写锁的基础上新增了一种叫乐观读(Optimistic Reading)的模式。该模式并不会加锁,所以不会阻塞线程,会有更高的吞吐量和更高的性能。
三种访问数据模式:
ReentrantReadWriteLock 当线程获取写锁后可以降级成读锁,但是反过来则不行。
StampedLock提供了读锁和写锁相互转换的功能,使得该类支持更多的应用场景。
注意事项
那为何 StampedLock 性能比 ReentrantReadWriteLock 好?
关键在于StampedLock 提供的乐观读,我们知道ReentrantReadWriteLock 支持多个线程同时获取读锁,但是当多个线程同时读的时候,所有的写线程都是阻塞的。
StampedLock 的乐观读允许一个写线程获取写锁,所以不会导致所有写线程阻塞,也就是当读多写少的时候,写线程有机会获取写锁,减少了线程饥饿的问题,吞吐量大大提高。
这里可能你就会有疑问,竟然同时允许多个乐观读和一个先线程同时进入临界资源操作,那读取的数据可能是错的怎么办?
是的,乐观读不能保证读取到的数据是最新的,所以将数据读取到局部变量的时候需要通过 lock.validate(stamp) 校验是否被写线程修改过,若是修改过则需要上悲观读锁,再重新读取数据到局部变量。
同时由于乐观读并不是锁,所以没有线程唤醒与阻塞导致的上下文切换,性能更好。
使用场景和注意事项
对于读多写少的高并发场景 StampedLock
的性能很好,通过乐观读模式很好的解决了写线程“饥饿”的问题,我们可以使用StampedLock
来代替ReentrantReadWriteLock
,但是需要注意的是 StampedLock 的功能仅仅是 ReadWriteLock 的子集,在使用的时候,还是有几个地方需要注意一下。
StampedLock
是不可重入锁,使用过程中一定要注意;Conditon
,当需要这个特性的时候需要注意;Semaphore 通常我们叫它信号量, 可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。
可以把它简单的理解成我们停车场入口立着的那个显示屏,每有一辆车进入停车场显示屏就会显示剩余车位减1,每有一辆车从停车场出去,显示屏上显示的剩余车辆就会加1,当显示屏上的剩余车位为0时,停车场入口的栏杆就不会再打开,车辆就无法进入停车场了,直到有一辆车从停车场出去为止。
Semaphore用来控制访问某资源的线程数,比如数据库连接.假设有这个的需求,读取几万个文件的数据到数据库中,由于文件读取是IO密集型任务,可以启动几十个线程并发读取,但是数据库连接数只有20个,这时就必须控制最多只有20个线程能够拿到数据库连接进行操作。这个时候,就可以使用Semaphore做流量控制。
acquire() 获取一个令牌,在获取到令牌、或者被其他线程调用中断之前线程一直处于阻塞状态。 acquire(int permits) 获取一个令牌,在获取到令牌、或者被其他线程调用中断、或超时之前线程一直处于阻塞状态。 acquireUninterruptibly() 获取一个令牌,在获取到令牌之前线程一直处于阻塞状态(忽略中断)。 tryAcquire() 尝试获得令牌,返回获取令牌成功或失败,不阻塞线程。 tryAcquire(long timeout, TimeUnit unit) 尝试获得令牌,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程。 release() 释放一个令牌,唤醒一个获取令牌不成功的阻塞线程。 hasQueuedThreads() 等待队列里是否还存在等待线程。 getQueueLength() 获取等待队列里阻塞的线程数。 drainPermits() 清空令牌把可用令牌数置为0,返回清空令牌的数量。 availablePermits() 返回可用的令牌数量。
每个停车场入口都有一个提示牌,上面显示着停车场的剩余车位还有多少,当剩余车位为0时,不允许车辆进入停车场,直到停车场里面有车离开停车场,这时提示牌上会显示新的剩余车位数。
业务场景 :
1、停车场容纳总停车量10。
2、当一辆车进入停车场后,显示牌的剩余车位数响应的减1.
3、每有一辆车驶出停车场后,显示牌的剩余车位数响应的加1。
4、停车场剩余车位不足时,车辆只能在外面等待。
代码:
public class TestCar { //停车场同时容纳的车辆10 private static Semaphore semaphore=new Semaphore(10); public static void main(String[] args) { //模拟100辆车进入停车场 for(int i=0;i<100;i++){ Thread thread=new Thread(new Runnable() { public void run() { try { System.out.println("===="+Thread.currentThread().getName()+"来到停车场"); if(semaphore.availablePermits()==0){ System.out.println("车位不足,请耐心等待"); } semaphore.acquire();//获取令牌尝试进入停车场 System.out.println(Thread.currentThread().getName()+"成功进入停车场"); Thread.sleep(new Random().nextInt(10000));//模拟车辆在停车场停留的时间 System.out.println(Thread.currentThread().getName()+"驶出停车场"); semaphore.release();//释放令牌,腾出停车场车位 } catch (InterruptedException e) { e.printStackTrace(); } } },i+"号车"); thread.start(); } } }
Semaphore semaphore=new Semaphore(2);
1、当调用new Semaphore(2) 方法时,默认会创建一个非公平的锁的同步阻塞队列。
2、把初始令牌数量赋值给同步队列的state状态,state的值就代表当前所剩余的令牌数量。
semaphore.acquire();
1、当前线程会尝试去同步队列获取一个令牌,获取令牌的过程也就是使用原子的操作去修改同步队列的state ,获取一个令牌则修改为state=state-1。
2、 当计算出来的state<0,则代表令牌数量不足,此时会创建一个Node节点加入阻塞队列,挂起当前线程。
3、当计算出来的state>=0,则代表获取令牌成功。
源码:
/**
* 获取1个令牌
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 共享模式下获取令牌,获取成功则返回,失败则加入阻塞队列,挂起线程
* @param arg
* @throws InterruptedException
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//尝试获取令牌,arg为获取令牌个数,当可用令牌数减当前令牌数结果小于0,则创建一个节点加入阻塞队列,挂起当前线程。
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
/** * 1、创建节点,加入阻塞队列, * 2、重双向链表的head,tail节点关系,清空无效节点 * 3、挂起当前节点线程 * @param arg * @throws InterruptedException */ //这里可以看上面的AQS代码 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //创建节点加入阻塞队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //获得当前节点pre节点 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg);//返回锁的state if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //重组双向链表,清空无效节点,挂起当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
semaphore.release();
当调用semaphore.release() 方法时
1、线程会尝试释放一个令牌,释放令牌的过程也就是把同步队列的state修改为state=state+1的过程
2、释放令牌成功之后,同时会唤醒同步队列的所有阻塞节共享节点线程
3、被唤醒的节点会重新尝试去修改state=state-1 的操作,如果state>=0则获取令牌成功,否则重新进入阻塞队列,挂起线程。
源码:
/**
* 释放令牌
*/
public void release() {
sync.releaseShared(1);
}
/**
*释放共享锁,同时唤醒所有阻塞队列共享节点线程
* @param arg
* @return
*/
public final boolean releaseShared(int arg) {
//释放共享锁
if (tryReleaseShared(arg)) {
//唤醒所有共享节点线程
doReleaseShared();
return true;
}
return false;
}
/** * 唤醒所有共享节点线程 */ private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) {//是否需要唤醒后继节点 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//修改状态为初始0 continue; unparkSuccessor(h);//唤醒h.nex节点线程 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)); } if (h == head) // loop if head changed break; } }
// 构造器,必须指定一个大于零的计数 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } // 线程阻塞,直到计数为0的时候唤醒;可以响应线程中断退出阻塞 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // 线程阻塞一段时间,如果计数依然不是0,则返回false;否则返回true public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } // 计数-1 public void countDown() { sync.releaseShared(1); } // 获取计数 public long getCount() { return sync.getCount(); }
总结下CountDownLatch与join的区别:调用thread.join() 方法必须等thread 执行完毕,当前线程才能继续往下执行,而CountDownLatch通过计数器提供了更灵活的控制,只要检测到计数器为0当前线程就可以往下执行而不用管相应的thread是否执行完毕
从字面上的意思可以知道,这个类的中文意思是“循环栅栏”。大概的意思就是一个可循环利用的屏障。
它的作用就是会让所有线程都等待完成后才会继续下一步行动。
现实生活中我们经常会遇到这样的情景,在进行某个活动前需要等待人全部都齐了才开始。例如吃饭时要等全家人都上座了才动筷子,旅游时要等全部人都到齐了才出发,比赛时要等运动员都上场后才开始。
在JUC包中为我们提供了一个同步工具类能够很好的模拟这类场景,它就是CyclicBarrier类。利用CyclicBarrier类可以实现一组线程相互等待,当所有线程都到达某个屏障点后再进行后续的操作。下图演示了这一过程。
CyclicBarrier字面意思是“可重复使用的栅栏”,CyclicBarrier 相比 CountDownLatch 来说,要简单很多,其源码没有什么高深的地方,它是 ReentrantLock 和 Condition 的组合使用。
看如下示意图,CyclicBarrier 和 CountDownLatch 是不是很像,只是 CyclicBarrier 可以有不止一个栅栏,因为它的栅栏(Barrier)可以重复使用(Cyclic)。
CountDownLatch: 一个线程(或者多个), 等待另外N个线程完成某个事情之后才能执行。
CyclicBrrier: N个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。
CyclicBarrier 与 CountDownLatch 区别
CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
CountDownLatch 参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束。CyclicBarrier 参与的线程职责是一样的。
CyclicBarrier 的源码实现和 CountDownLatch 大相径庭,CountDownLatch 基于 AQS 的共享模式的使用,而 CyclicBarrier 基于 Condition 来实现的。因为 CyclicBarrier 的源码相对来说简单许多,读者只要熟悉了前面关于 Condition 的分析,那么这里的源码是毫无压力的,就是几个特殊概念罢了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。