赞
踩
这篇文章是主文章【Java多线程】JUC之深入队列同步器(AQS)(一)实现细节解析的分支之一,建议先从主文章开始阅读
Condition接口定义了ConditionObject需要实现的线程等待、通知
的所有方法
Object的wait()方法
,signal()和signalAll() 方法相当于Object的notify()和notifyAll()
方法。java.utils.concurrent
包下各种阻塞队列
和线程池
等就是使用Condition接口实现阻塞以await()的调用链为例说明,如下:
源码
public interface Condition {
/**
* 暂停此线程直至一下四种情况发生
* 1.此Condition被signal()
* 2.此Condition被signalAll()
* 3.Thread.interrupt()
* 4.伪wakeup
* 以上情况.在能恢复方法执行时,当前线程必须要能获得锁
*/
//响应中断的等待:调用此方法的线程将加入条件等待队列,阻塞直到被唤醒或者线程发生中断
void await() throws InterruptedException;
//不响应中断的等待:跟上面类似,调用此方法的线程将加入条件等待队列,阻塞直到被唤醒(不响应中断)
void awaitUninterruptibly();
//带超时时间的等待:调用此方法的线程将加入条件等待队列,阻塞直到被唤醒或者线程发生中断或者超出等待的时间
long awaitNanos(long nanosTimeout) throws InterruptedException;
//调用此方法的线程将加入条件等待队列,阻塞直到被唤醒或者线程发生中断或者超出等待的时间(可设置执行时间单位)
//等效于:awaitNanos(unit.toNanos(time)) > 0
boolean await(long time, TimeUnit unit) throws InterruptedException;
//带截止时间的等待:调用此方法的线程将加入条件等待队列,阻塞直到被唤醒或者线程发生中断或者超出指定截止时间
boolean awaitUntil(Date deadline) throws InterruptedException;
//唤醒某个等待在此condition的线程,在从 await()返回之前,该线程必须重新获取锁。
void signal();
//唤醒所有等待在此condition的所有线程
void signalAll();
}
AQS除了同步队列
之外,还存在Condition队列(条件等待队列,也可称为条件队列)
,也是一个FIFO的
单向队列 。该队列是Condition对象实现线程等待/通知功能
的核心。但条件队列不是必须的,只有当使用Condition时,才会存在此队列。并且可能会有多个
条件队列。
所有成员变量和方法
。内部变量firstWaiter、lastWaiter
分别表示等待条件的第一个节点
(线程)和最后一个节点
(线程)条件等待队列是一个FIFO的单向队列
,在队列中的每个节点都包含了一个线程引用
,存储的是在Condition对象上等待的线程
。
Condition的awaitXXX()
方法,会将当前线程构造成一个新的Node
添加的条件队列尾部
。这里不再使用Node.next属性
(它对于ASQ的实例变量head、 tail很有用),而只使用node,nextWaiter属性
,而且现在的Node的prev属性也不重要了,我们关注的是下一个等待条件唤醒的节点(线程)
。
要将原有的尾节点nextWaiter指向它,并且更新尾节点即可
。这里没有用CAS
的方式添加尾节点,因为在调用awaitXXX()方法时,已经获取到锁了,锁可以保证此更新过程是线程安全
的。条件等待队列 Condition 内部自己维护的一个队列,不同于 AQS 的 同步等待队列。它具有以下特点:
不能在同步等待队列
。移除
的节点,会进入同步等待队列
。一个
同步等待队列,但可以有多个
条件等待队列。并发包中的Lock的同步队列和条件等待队列
与AQS对象绑定的Condition条件对象
。一个AQS同步器(锁)对象拥有一个同步队列和多个条件队列
。> - 同步队列,表示`可以竞争锁的队列`,这个跟AQS一致,`waitStatus=0;`
> - 条件队列,表示等待的队列,其waitStatus=`Node.Condition`,由`firstWaiter和lastWaiter`2个属性操控.
- **调用await()** 就是把当前线程封装一个Node加`入Condition队列队尾`,接着就`一直循环判断其在不在Sync队列`,如果当前节点`在Sync队列`里了,就可以`竞争锁,恢复运行`了.
- **调用signal()** 就是把某个节点的`nextWaiter设为null,`再把其`从Condition队列转到Sync队列.`, 即能够唤醒`firstWaiter节点`,**将其添加到 "条件队列末尾",**
只有2个
属性,firstWaiter lastWaiter
分别表示等待条件的第一个节点
(线程)和最后一个节点
(线程)。当调用Condition.await()
方法时,线程会被构造成为Node节点,然后插入条件等待队列队尾
。 //在某个Condition实例上等待的条件队列的队头,即调用了某个condition实例的await()的等待线程链表
private transient Node firstWaiter;
//在某个Condition实例上等待的的条件队列的队尾
private transient Node lastWaiter;
ConditionObject还定义了2个常量,描述调用await()退出
时的处理方式,如下:
/** 表示将当前线程的中断标识重置为true,因为之前获取线程是否中断时,会将线程的中断标识给清除掉重置为false,此处是为了还原,但不会抛出异常 */
private static final int REINTERRUPT = 1;
/** 表示当前线程被唤醒是因为被中断,会抛出一个InterruptedException异常 */
private static final int THROW_IE = -1;
ConditionObject的awaitXXX()
方法相当于Object的wait()方法,signal()和signalAll()
方法分别对应于Object的notify()和notifyAll()方法
- 和Object一样,执行这些方法前都需要
检查当前线程是否持有锁
,如果没有持有则会抛出IllegalMonitorStateException
异常,
ConditionObject的方法和Object的对应方法的实现逻辑也大体一致,参考如下测试用例:
@Test
public void waitAndNotifyTest() throws Exception {
//声明锁对象
Object lock = new Object();
new Thread(() -> {
try {
log.info("run start,time->" + System.currentTimeMillis());
Thread.sleep(6000);//休眠6s
} catch (Exception e) {
e.printStackTrace();
}
//休眠6s,获取到锁后,输出到控制台然后唤醒等待在当前锁上的对象
synchronized (lock) {
log.info("run end");
lock.notify();//获取锁后唤醒一个在lock上等待的线程
}
}).start();
//主线程,输出控制台后,进入等待状态,等待其他线程唤醒
synchronized (lock) {
log.info("main start,time->" + System.currentTimeMillis());
long start = System.currentTimeMillis();
lock.wait();//获取锁后主线程进入等待状态(阻塞,释放锁)
log.info("wait time->" + (System.currentTimeMillis() - start));
}
log.info("main end");
}
@Test
public void awaitAndSignalTest() throws Exception {
//声明显示锁
ReentrantLock lock = new ReentrantLock();
//声明等待条件
Condition condition = lock.newCondition();
//休眠6s,获取到锁后,输出到控制台然后唤醒等待在当前锁上的对象
new Thread(() -> {
lock.lock();//获取锁
try {
log.info("run start,time->" + System.currentTimeMillis());
Thread.sleep(6000);//休眠6s
condition.signal();//获取锁后唤醒一个在condition上等待的线程
log.info("run end");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();//释放锁
}
}).start();
lock.lock();//获取锁
try {
log.info("main start,time->" + System.currentTimeMillis());
long start = System.currentTimeMillis();
condition.await();//获取锁后主线程进入等待状态(阻塞,释放锁)
log.info("wait time->" + (System.currentTimeMillis() - start));
} finally {
lock.unlock();//释放锁
}
log.info("main end");
}
2个方法的执行结果一样,如下
wait()明明最多等待1s,为啥实际等待了6s了,难道是1s后没有被唤醒?
释放占有的锁
,然后阻塞当前线程
。6s后Thread-0
线程释放锁
并唤醒阻塞的主线程
,主线程重新获取锁成功, 才从await()或者wait()退出,执行下面的逻辑。阻塞
,直到被唤醒或者被打断
,如果被打断则抛出异常InterruptedException
一直等待直到被唤醒,被打断时不会抛出异常
,可以通过线程的中断标识判断是否因为中断被唤醒的再次获取锁
才可以退出await方法
。tips:调用await()释放锁并将线程添加到条件等待队列中并没有采用
CAS自旋
来保证线程安全(对比AQS.enq()),因为Condition对象只能用于独占模式
,而且在调用await()之前会显示的获取独占锁,否则会抛出非法监视器状态异常
。
使用方法:
@Test
public void responseInterruptTest() throws Exception {
//声明锁
ReentrantLock lock=new ReentrantLock();
//声明等待条件
Condition condition=lock.newCondition();
Thread a = new Thread(() -> {
lock.lock();
try {
condition.await();//a线程获取锁后进入等待状态(阻塞,释放锁)
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
},"a");
//运行线程a,休眠100ms
a.start();
Thread.sleep(100);
Thread b=new Thread(() -> {
lock.lock();
try {
Thread.sleep(5000);
condition.signal();//b线程获取锁后,休眠5s,唤醒一个在condition上等待的线程
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
},"b");
//运行线程b,休眠100ms
b.start();
Thread.sleep(100);
//记录中断a线程次数
int interruptCount=0;
long start=System.currentTimeMillis();
while (a.isAlive()){
a.interrupt();
interruptCount++;
Thread.sleep(10);
}
System.out.println("main thread end,interruptCount->"+interruptCount+",time->"+(System.currentTimeMillis()-start));
}
执行结果
a线程在await()退出前一共调用interrupt()491次中断线程
,从开始调用到await()退出总耗时4912ms
,
第一次调用interrupt()会让当前线程从while循环
中退出,然后进入到acquireQueued()
尝试获取锁,
线程b
在休眠过程中未释放锁
,所以进入到acquireQueued()
后会被阻塞
,后面每次调用interrupt()
都会唤醒线程
然后重新尝试获取锁
,直到获取成功为止,从acquireQueued()退出。退出后因为interruptMode是THROW_IE
,然后抛出InterruptedException异常
。耗时4912ms
是因为执行到start()时,线程b
实际已经运行了100ms左右
,再过4900ms左右,就会执行signal()
唤醒在Condition
上等待的线程,线程a
获取锁从await()()退出
,main线程的while循环终止并退出。
部分源码,详见源码解析:
将上述用例换成synchronized关键字,效果是一样的,如下:
@Test
public void syncResponseInterruptTest() throws Exception {
Object lock=new Object();
Thread a=new Thread(() -> {
try {
synchronized (lock) {
lock.wait();
}
} catch (Exception e) {
e.printStackTrace();
}
});
a.start();
Thread.sleep(100);
Thread b=new Thread(() -> {
try {
synchronized (lock) {
Thread.sleep(5000);
lock.notify();//唤醒一个在lock对象上等待的线程
}
} catch (Exception e) {
e.printStackTrace();
}
});
b.start();
Thread.sleep(100);
int i=0;
long start=System.currentTimeMillis();
while (a.isAlive()){
a.interrupt();
i++;
Thread.sleep(10);
}
System.out.println("main thread end,i->"+i+",time->"+(System.currentTimeMillis()-start));
}
执行结果
如果换成awaitUninterruptibly(),则不会抛出异常
,此时可以通过线程的中断标识来判断是否是因为被中断唤醒的
,如下:
@Test
public void responseNotInterruptTest() throws Exception {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Thread a = new Thread(() -> {
lock.lock();
try {
condition.awaitUninterruptibly();//阻塞,释放锁
//可以通过线程的中断标识来判断是否是因为被中断唤醒的,然后进行一系列中断处理
boolean isInterrupted = Thread.currentThread().isInterrupted();
System.out.println("interrupted->" + isInterrupted);
if (isInterrupted) {
throw new InterruptedException("线程被中断");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
a.start();
Thread.sleep(100);
Thread b = new Thread(() -> {
lock.lock();
try {
Thread.sleep(5000);
condition.signal();//获取锁后,休眠5s,唤醒一个在condition上等待的线程
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
b.start();
Thread.sleep(100);
int i = 0;
long start = System.currentTimeMillis();
while (a.isAlive()) {
a.interrupt();
i++;
Thread.sleep(10);
}
System.out.println("main thread end,i->" + i + ",time->" + (System.currentTimeMillis() - start));
}
执行结果
await / awaitUninterruptibly详细代码详见源码解析:
上面的awai()是没有时间参数的,带时间参数
的await()有3个版本
被中断唤醒后会尝试获取锁并抛出异常
,如果是被signal()唤醒的,则返回等待时间是否超时了,返回false表示等待超时了,true没有超时返回值不同
,awaitNanos()返回剩余的等待时间
,如果为负值,表示等待超时了,否则未超时。入参不同
,await()传递的是相对时间,从线程开始休眠算起
,awaitUntil()传递的是绝对时间,即具体的某一个时间点
。使用方法:
@Test
public void waitWithTimeTest() throws Exception {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
CountDownLatch countDownLatch = new CountDownLatch(1);//初始化闭锁个数
Thread a = new Thread(() -> {
lock.lock();
try {
//1. 当前线程在被唤醒、被中断或 到达指定等待时间之前一直处于等待状态.等效于:awaitNanos(unit.toNanos(time)) > 0
//6s内没有获取到锁,返回false
//boolean result=condition.await(6, TimeUnit.SECONDS);
//2.功能同上,返回值不同
//不阻塞,4s没有获取到锁,返回剩余的等待时间,如果为负值,表示等待超时了,否则未超时。
//long result = condition.awaitNanos(TimeUnit.SECONDS.toNanos(4));
//3.当前线程在被唤醒、被中断或 到达到了指定的某个时间点一直处于等待状态
//不阻塞,5秒后没有获取到锁,不阻塞,返回false(释放锁)
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.SECOND, 5);
boolean result = condition.awaitUntil(calendar.getTime());
System.out.println("await result->" + result);;
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
a.start();
Thread.sleep(100);
//线程b获取锁后休眠5s,唤醒一个在condition上等待的线程
Thread b = new Thread(() -> {
lock.lock();
try {
Thread.sleep(5000);
condition.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
b.start();
Thread.sleep(100);
long start = System.currentTimeMillis();
countDownLatch.await();//主线程等待子线程释放
System.out.println("main thread end,time->" + (System.currentTimeMillis() - start));
}
await()执行结果:不阻塞,6s内获取到锁,返回true,否则false
awaitNanos ()执行结果:不阻塞,4s没有获取到锁,返回剩余的等待时间,如果为负值,表示等待超时了,否则未超时。
awaitUntil()执行结果:不阻塞,5秒后没有获取到锁,不阻塞,返回false(释放锁)
await / awaitNanos / awaitUntil详细代码详见源码解析:
transferForSignal()方法
,区别在于前者只处理firstWaiter一个节点,而signalAll会处理所有的节点。不会直接唤醒等待中的线程
,而是将其加入到等待获取锁的同步队列的队尾
,然后将该节点的前一个节点的状态改成SIGNAL,表示需要唤醒该节点。等同步队列前面的节点都获取锁并释放锁后
,才会让该节点获取锁。使用方法:
执行结果
#从上到下,5个线程依次启动,并依次加入到condition队列中
Thread-0,start time->290663615066700
Thread-2,start time->290663615144100
Thread-4,start time->290663615194700
Thread-1,start time->290663615065500
Thread-3,start time->290663615090500
all wait thread start
#线程a起来了,休眠5s后唤醒所有在condition上等待的线程
Thread-a,start time->290665616223000
#线程b和c都起来了,进入等待锁的同步队列中
Thread-b,start time->290665717086500
Thread-c,start time->290665816697000
#线程a休眠5结束,调用signalAll唤醒在condition上等待的线程
Thread-a,signalAll time->290670616085600
#线程b和c依次获取锁
Thread-b,lock time->290670616223300
Thread-c,lock time->290671616309500
#5个线程按照加入到condition队列中的顺序依次被唤醒获取锁
Thread-0,await time->290672716498500
Thread-2,await time->290672816533300
Thread-4,await time->290672916530900
Thread-1,await time->290673016570100
Thread-3,await time->290673116578900
main thread end
signal/ signalAll详细代码详见源码解析:
非公平锁
下,最先进入等待获取锁的同步队列中的线程优先获取锁
,当在某个condition实例
上等待的线程被唤醒后,是加入到同步队列
的队尾等待获取锁
; @Test
public void syncNotifyAllTest() throws Exception {
Object lock = new Object();
CountDownLatch countDownLatch = new CountDownLatch(8);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + ",start time->" + System.nanoTime());
synchronized (lock) {
lock.wait();
Thread.sleep(100);
}
System.out.println(Thread.currentThread().getName() + ",await time->" + System.nanoTime());
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
//休眠2s,后上面5个线程都被挂起
Thread.sleep(2000);
System.out.println("all wait thread start");
//a线程获取到锁,休眠5s后唤醒所有在lock上等待的线程
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + ",start time->" + System.nanoTime());
synchronized (lock) {
Thread.sleep(5000);
lock.notifyAll();
System.out.println(Thread.currentThread().getName() + ",notifyAll time->" + System.nanoTime());
}
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}, "Thread-a").start();
Thread.sleep(100);
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + ",start time->" + System.nanoTime());
synchronized (lock) {
System.out.println(Thread.currentThread().getName() + ",lock time->" + System.nanoTime());
Thread.sleep(1000);
}
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}, "Thread-b").start();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + ",start time->" + System.nanoTime());
synchronized (lock) {
System.out.println(Thread.currentThread().getName() + ",lock time->" + System.nanoTime());
Thread.sleep(1000);
}
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}, "Thread-c").start();
countDownLatch.await();
System.out.println("main thread end");
}
执行结果
# 5个线程依次启动,进入条件等待队列中
Thread-0,start time->291313531362200
Thread-1,start time->291313531465000
Thread-2,start time->291313531501200
Thread-3,start time->291313531509400
Thread-4,start time->291313531595700
all wait thread start
#线程a起来了,休眠5s后唤醒所有在lock上等待的线程
Thread-a,start time->291315532697000
#线程b和c都起来了,进入等待锁的队列中
Thread-b,start time->291315632593200
Thread-c,start time->291315632833100
#线程a休眠5结束,调用notifyAll唤醒在lock上等待的线程
Thread-a,notifyAll time->291320532059200
#线程a开始释放锁,notifyAll将所有条件等待队列中的节点按照相反的顺序加入到同步同步的前面
#按照同步队列的顺序依次唤醒线程,最近加入的优先被唤醒
Thread-4,await time->291320632035500
Thread-3,await time->291320732048800
Thread-2,await time->291320832064400
Thread-1,await time->291320932095600
Thread-0,await time->291321032135700
Thread-c,lock time->291321032170700
Thread-b,lock time->291322032343900
main thread end
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
// condition队列的头节点(第一个)
private transient Node firstWaiter;
// condition队列的尾节点(最后一个)
private transient Node lastWaiter;
//构造函数
public ConditionObject() { }
//添加CONDITION节点到条件等待队列中尾节点
private Node addConditionWaiter() {
Node t = lastWaiter;//获取尾节点
//如果尾节点的状态为CANCELLED则将其移除
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters(); //移除条件等待队列中所有非CONDITION的节点
t = lastWaiter;// 将尾节点重新赋值给t
}
// 新建CONDITION等待节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//如果尾节点为null,则此条件等待队列为空,则初始化一个首尾相等的队列
if (t == null)
firstWaiter = node; //初始化首节点
//否则插入队尾
else
t.nextWaiter = node; //插入到t节点的的后面
lastWaiter = node;// 更新尾节点
return node;// 返回新插入的节点
}
/**
* 唤醒一个等待线程
* 从首节点向后遍历直到遇见一个非CANCELLED或者为NULL的节点,并将其移除条件等待队列,并添加到AQS的FIFO同步队列尾部
* 唤醒线程去竞争锁
* @param first 非null的条件等待队列的首节点
*/
private void doSignal(Node first) {
// 循环,将待唤醒节点的后继节点作为新的首节点,即更新首节点
do {
if ( (firstWaiter = first.nextWaiter) == null)// 该节点的nextWaiter为空
lastWaiter = null;//if条件成立,说明新的首节点为null,表明条件队列中任何节点了,此时还需要更新尾节点为null
first.nextWaiter = null;// 设置first节点的nextWaiter域为空,因为首节点要出队了
//循环做唤醒操作,唤醒失败,则说明节点被取消了,那么就把firstWaiter赋值成首节点,继续循环唤醒下一个节点
// 即:将节点从condition队列转移到sync队列失败并且condition队列中的头节点不为空,一直循环
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* 唤醒所有等待线程
* 将条件等待队列所有节点转移到AQS的FIFO同步队列中
* @param first非null的条件等待队列的首节点
*/
private void doSignalAll(Node first) {
//first本身就是firstWaiter,所以此处可以先将lastWaiter和firstWaiter置为null
// condition队列的头节点尾节点都置为空
lastWaiter = firstWaiter = null;
// 循环获取first节点的下一个Waiter
do {
// 从firs的nextWaiter节点
Node next = first.nextWaiter;
// 设置当前first节点的nextWaiter域为空
first.nextWaiter = null;
//将目标节点first节点加入到同步队列中(从condition队列转移到sync队列)
transferForSignal(first);
// 重新设置first,用于开始下一次循环
first = next;
} while (first != null);
}
//从condition队列中清除状态为CANCEL的节点(清理掉所有不是CONDITION的节点)
private void unlinkCancelledWaiters() {
//记录遍历的1每一个节点
Node t = firstWaiter;
//用来保存当前遍历节点的前驱节点的引用(因为条件等待队列是单向队列)
Node trail = null; //trail表示上一个非CONDITION节点
//遍历整个条件等待队列
while (t != null) {
Node next = t.nextWaiter; // 后继节点
//如果当前遍历的节点的状态不为CONDITION则别除,并关联剔除后的单向队列
if (t.waitStatus != Node.CONDITION) {// t节点的状态不为CONDTION状态
t.nextWaiter = null; // 将t从条件等待队列中移除
if (trail == null)//未找到非CONDITION节点,即之前遍历的节点都被移除了
firstWaiter = next;// 重新设置condition队列的头节点
else// trail不为空
trail.nextWaiter = next; // 设置trail节点的nextWaiter域为next节点
if (next == null) //遍历到末尾了,将tail置为lastWaiter
lastWaiter = trail;// 设置condition队列的尾结点
}
else
trail = t;
t = next;
}
}
//公共方法
/**
* 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。
* 即:通知条件等待队列首节点转移到同步队列去竞争锁,用于独占模式
*/
public final void signal() {
if (!isHeldExclusively())//如果不是当前线程占有,抛出异常
throw new IllegalMonitorStateException();
Node first = firstWaiter;//condition队列头结点
if (first != null)//头结点不为空,说明完成了初始化,同时队列中有节点存在
doSignal(first); // 唤醒一个等待线程
}
/**
* 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程
* 即: 即:通知条件等待队列首节点转移到同步队列去竞争锁,用于独占模式
*/
public final void signalAll() {
if (!isHeldExclusively())// 不被当前线程独占,抛出异常
throw new IllegalMonitorStateException();
Node first = firstWaiter;//condition队列头结点
if (first != null)//头结点不为空
doSignalAll(first);// 唤醒所有等待线程
}
//一直等待直到被唤醒,被打断时不会抛出异常,可以通过线程的中断标识判断是否因为中断被唤醒的
public final void awaitUninterruptibly() {
Node node = addConditionWaiter(); // 添加等待节点到条件等待队列中
//进入等待状态释放锁,并返回释放前的同步状态值(正好和接下来需要获取同步状态一致)
int savedState = fullyRelease(node);
boolean interrupted = false;
//没有被通知,也就没有成功的转移到同步队列中则一直阻塞,当被通知时将此节点转移到同步队退出while循环并由前继节点释放锁后唤醒此节点
// isOnSyncQueue()判断node是否在等待获取锁的同步队列中,对于刚创建的ConditionWaiter节点,没有将其加入到等待获取锁的同步队列中
// 而是将其加入到通过nextWaiter属性维护的条件等待队列中了,该方法返回false
// 等该节点被signal方法唤醒后就会将其加入到同步队列中了,该方法返回true
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())//如果线程在等待过程中被中断
interrupted = true;//记录中断状态
}
//节点被通知并入队成功,则立马自旋获取锁,获取成功后返回前设置中断状态
// acquireQueued方法会尝试获取锁,如果失败则阻塞当前线程直到获取成功,返回值为true,表示最后一次唤醒是因为线程中断
// interruptMode为0或者REINTERRUPT
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();//获取资源成功后会用selfInterrupt补上中断
}
//标志:表示当从等待状态中退出时重新中断状态,但是不会抛出异常
private static final int REINTERRUPT = 1;
//标志:表示当从等待状态中退出时抛出异常,会抛出一个InterruptedException
private static final int THROW_IE = -1;
/**
* 检查线程中断标志
* 1,如果在通知之前发生中断则返回抛出中断异常的标志
* 2,如果异常发生在通知之后则返回重新中断的标志
* 3,如果没有发生中断则返回
*/
private int checkInterruptWhileWaiting(Node node) {
// 当发生中断,则确保中断的线程加入同步队列中,并根据transferAfterCancelledWait()的返回值来设置中断模式。
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
//根据中断标志进行相应处理,如果中断模式为THROW_IE则抛出InterruptedException异常
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)//标志为THROW_IE,抛出InterruptedException
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)//标志为REINTERRUPT,重置中断标记位
selfInterrupt();
}
//让当前线程阻塞,直到被唤醒或者被打断,如果被打断则抛出异常InterruptedException(响应中断)
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException(); //如果线程已中断抛出异常(响应中断)
Node node = addConditionWaiter(); //在条件等待队列队尾插入一个CONDITION节点
//进入等待状态释放锁,并返回释放前的同步状态值(正好和接下来需要获取同步状态一致)
int savedState = fullyRelease(node);
int interruptMode = 0;
//1.循环判断被唤醒的Node是否已经转移到AOS同步队列中,转移成功则退出循环
//2.线程程阻基时当发生中断则唤醒线程并确保节点转移成功后退出循环
//isOnSyncQueue()判断node是否在等待获取锁的同步队列中,对于刚创建的ConditionWaiter节点,
//没有将其加入到等待获取锁的同步队列中,只是将其加入到通过nextWaiter属性维护的条件等待队列中了,该方法返回false
//等该节点被signal()唤醒后就会将其加入到同步队列中了,该方法返回true
while (!isOnSyncQueue(node)) {//即如果当前节点不是在同步队列,也就是还在条件等待队列等待着被唤醒
//当前线程(节点)不在同步队列中,就阻塞当前线程
LockSupport.park(this);
//线程阻塞时,当发生中断则唤醒线程并确保节点转移到同步队列成功后退出while循环
// 线程被唤醒了,如果线程被中断的话,checkInterruptWhileWaiting的返回值就不是0,此时会退出循环,
// checkInterruptWhileWaiting在执行时会将当前节点加入到同步队列中
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;//发生了中断就停止当前线程是否在同步队列的检测
}
/**
* 执行到此,说明退出了上面的while循环,即从休眠状态中被唤醒了(从LockSupport.park()方法返回了),且当前线程(节点)在同步队列中。
* 在同步队列中了,当前线程又调用acquireQueued()尝试获取锁,如果失败则阻塞当前线程直到获取成功
* 返回值为true,表示最后一次唤醒是因为线程中断,设置中断模式为REINTERRUPT
*/
//1.被通知或者被中断的线程,继续获取竞争锁 2.获取成功或者被取消则再设置中断模式
//acquireQueued方法会尝试获取锁,如果失败则阻塞当前线程直到获取成功,返回值为true,表示最后一次唤醒是因为线程中断
//interruptMode为0或者REINTERRUPT
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;//设置中断模式
if (node.nextWaiter != null)
//将被从休眠中唤醒的的线程对应的节点从条件队列中移除(移除所有非CONDITION的节点)
unlinkCancelledWaiters();
if (interruptMode != 0)
/**
*根据中断模式决定最终是否抛出中断异常
* 如果interruptMode的值是THROW_IE,直接抛出中断异常
* 如果interruptMode的值是REINTERRUPT,则调用Thread.interrupt()中断当前线程(实际上只是置中断标志位,可能根本不会真正中断当前线程)
*/
reportInterruptAfterWait(interruptMode);
}
/**
* 超时中断条件等待,实现跟await基本一致,加了超时逻辑超时逻辑
* 和AQS的doAcquireNanos逻辑一样,返回剩余的等待时间,如果为负值,表示等待超时了,否则未超时。
*/
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException(); //如果线程中断,则抛出异常
Node node = addConditionWaiter(); //添加一个ConditionWaiter节点
//进入等待状态释放锁,并返回释放前的同步状态值(正好和接下来需要获取同步状态一致)
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout; //计算等待的最迟期限
int interruptMode = 0;
//判断node是否在等待获取锁的同步队列中
while (!isOnSyncQueue(node)) {
//如果发生超时,则确保节点加入同步队列中,并跳出循环
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
//等待的时间较长,将线程park
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
//当发生中断,则确保中断的线程加入同步队列中,并根据transferAfterCancelledWait的返回值
//来设置中断模式,返回1或者-1,终止循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
//线程是被signal方法唤醒的,重新计算等待的时间
nanosTimeout = deadline - System.nanoTime();
}
//这部分逻辑跟不带参数的await方法一致
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
//返回剩余的等待时间
return deadline - System.nanoTime();
}
/**
*awaitUntil()的实现及返回值都和await一致,但是入参不同,await()传递的是相对时间,从线程开始休眠算起,
* awaitUntil()传递的是绝对时间,即具体的某一个时间点。
* 超时逻辑:将日期时间转换成绝对时间,将当前绝对时间和超时日期的绝对时间进行比较
*/
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
//进入等待状态释放锁,并返回释放前的同步状态值(正好和接下来需要获取同步状态一致)
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
//判断node是否在等待获取锁的同步队列中
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
//abstime是绝对时间
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//这部分逻辑跟不带参数的await方法一致
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
/**
* await()被中断唤醒后会尝试获取锁并抛出异常,如果是被signal唤醒的,则返回等待时间是否超时了,
* 返回false表示等待超时了,true表示没有被超时。
*/
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
//转换为纳秒
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException(); //如果线程中断,则抛出异常
Node node = addConditionWaiter(); //添加一个ConditionWaiter节点
//进入等待状态释放锁,并返回释放前的同步状态值(正好和接下来需要获取同步状态一致)
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;//计算等待的最迟期限
boolean timedout = false;
int interruptMode = 0;
//判断node是否在等待获取锁的同步队列中
while (!isOnSyncQueue(node)) {
//等待的时间到了
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
//等待的时间较长,将线程park
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
//线程被唤醒,如果线程被中断则checkInterruptWhileWaiting返回1或者-1,终止循环
//如果是被signal方法唤醒则检查node是否在同步队列中,如果在则退出循环,尝试获取锁
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
//线程是被signal方法唤醒的,重新计算等待的时间
nanosTimeout = deadline - System.nanoTime();
}
//这部分逻辑跟不带参数的await方法一致
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
//如果这个Concition是由给定的AQS创建的,返回true
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}
//判断在某个Condition实例上是否有等待的线程,从firstWaiter开始往后遍历,
//找到一个状态是CONDITION的节点即返回true
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
//即获取在某个Condition实例(条件队列)上等待的大概线程个数,从firstWaiter开始往后遍历
// 此方法是计数累加(因为没有在查询时加锁,并不能保证查询时是否有新的节点加入)
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}
//获取在某个Condition实例上等待的线程集合,从firstWaiter开始往后遍历,
//将所有状态是`CONDITION`的节点对应的Thread加入到列表中并返回即可。
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}
ConditionObject中调用AQS类的方法
//-----------------------ConditionObject中调用AQS类的方法-----------------------
/**
*以当前同步器的状态为参数调用release(),返回调用前的状态值,失败则抛出异常,并将参数节点的状态设为CANCELLED
* 1.等待的线程,是已经获取到锁的线程,当线程调用await()时会首先释放锁,然后再阻塞自自身。
* 2.当没有显示的获取锁,直接调用await(),会在这个方法抛出非法监视器异常IllegalMonitorStateException的错误
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();// 获取当前的同步器状态
//唤醒同步队列的head节点的后继节点,返回true,表示成功释放锁,返回false表示不能释放锁
if (release(savedState)) {{
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
//如果release方法返回false进入此分支,将状态置为CANCELLED
node.waitStatus = Node.CANCELLED;
}
}
/**
* 将条件等待队列节点转移到AQs的FIFO同步队列中
* 尝试更新待唤醒节点的waitStatus,再将待唤醒节点加入同步队列,最后将待唤醒节点代表的线程从休眠中唤醒。
* 即: 每一个被通知的节点状态由CONDITION设置为0,并随后确保被通知的节点在加入到同
* 步队列后能被前继节点通知到(SIGNAL或者直接唤醒)。
* @param node 条件等待队列中的节点
* @return true转移成功
*/
final boolean transferForSignal(Node node) {
//如果CAS失败,说明节点在signal(唤醒)之前被取消了,即不处于CONDITION状态,返回false
//设置入队节点的等待状态CONDITION为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//如果设置成功,把节点插入到同步队列中队尾,
//重要提醒,返回的p对象是原来队列的尾节点,也就是现在新加入节点的前驱节点
Node p = enq(node);
//得到入队的前驱节点的状态
int ws = p.waitStatus;
//如果该新节点的前驱节点已经被取消,或者设置等待状态失败,那么就唤醒新节点,重新去做同步,保证能拼接到有效的尾节点
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);// 唤醒节点对应线程
return true;
}
/**
*如果一个节点之前在条件等待队列中,后来被转移到到同步队列中则返回true,表示需要重新获取同步状态
*
* 1.第1个if语句:
* 不管是因为中断还是被通知(详见transferAfterCancelledWait()和transferForSignal()方法)转移到AQS同步队列的节点状态为都会设置为初始状态(值为0),
* 所以当发现node.waitStatus == Node.CONDITION为真时,说明还没有转移到同步队列中,返回false,在下一次while循环中判断是否转移成功。
*
* 2.第2个if语句进行判断,当节点是AQS同步队列的中间节点时(在同步队列中含有next节点)则返回true;
*
* 3.当节点为尾节点时,在return语句里 ,从后到前遍历,如果存在则返回true,否则返回false。
*/
final boolean isOnSyncQueue(Node node) {
// 1。进入AQS同步队列的节点状态为CONDITION只是暂态,会在下一次自旋中变成SIGNAL
// 2。非head节点,队列其他节点前继节点都不为null (所以为null是被移除的节点)
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//判断是否有后继节点,如果后继节点不为null则一定入队了
if (node.next != null)
return true;
//状态不为CONDITION,前继节点不为null,后继节点为null,则可能是尾节点
return findNodeFromTail(node);
}
//从后向前遍历,查找参数节点是否存在在AOS的FIFO同步队列中,如果存在返回true
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
/**
* 确保取消的节点加入同步队列中,如果中断或者超时发生在通知之前则将状态设置为0并返回true,否则返回false。
*/
final boolean transferAfterCancelledWait(Node node) {
//当节点状态为CONDITION,说明在通知前取消,则需要设置waitStatus状态为0,并将取消的线程加入同步队列中
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
//如果节点状态不为CONDITION,则说明已经被通知,
//可能在入队的过程中通过自旋来确保入队成功,并返回false,代表线程在通知之后被取消。
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
每创建一个ConditionObject都会维护一个自己单向的等待队列
,但是每个ConditionObject都共享一个AQS的FIFO同步队列,当调用await()
时释放锁并进入阻塞状态,调用signal()
将条件等待队列
中的首节点线程
移动到AQS同步队列
中并将其前驱节点
设置为SIGNAL
或者直接唤醒线程
使得被通知的线程能去获取锁。
调用await()
释放锁并将线程添加到条件等待队列中并没有采用死循环CAS设置(参考AQS.enq()),因为Condition对象只能用于独占模式
,而且在调用await90之前会显示的获取独占锁
,否则会抛出非法监视器状态异常IllegalMonitorStateException
。
调用signal()
将转移等待节点,也不需要CAS
来保证,因为signal
内部会通过isHeldExclusively()
来判断调用者是获取独占锁的线程,如果为false
会抛出非法监视器状态的异常IllegalMonitorStateException
。
定义了2个Condition对象,notFull非满等待对象,notEmpty非空等待对象。
put ()
生产者线程
添加元素时,若容器已满
,释放notFull上的锁,当前线程进入阻塞
状态,等待
被消费者线程通知take()
消费者线程
读取元素时,若容器已空
,释放notEmpty上的锁,当前线程进入阻塞
状态,等待被生产者线程
通知java.concurrent.locks下面的锁 以及 并发工具都使用了Condition用于线程的等待通知,我就不一一举例说明了
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。