赞
踩
在这一章节中,我将详细讲解JUC中提供的一些常用的并发工具类的使用,如CountDownLatch
, Semaphore
,ThreadLocal
,CyclicBarrier
等进行逐一讲解,并通过一行一行品源码的方式来逐步探索多线程的奥秘。
Semaphore
就是信号量,提供了资源数量的并发访问控制,相当于可以指定n个锁,允许n个线程同时访问。利用Semaphore可以很好的做到限流。Semaphore
主要提供了两个方法acquire()
获取资源和release()
释放资源。public static void main(String[] args) { // 定义一个拥有十个资源的信号池 Semaphore semaphore = new Semaphore(10); try { // 请求获取一个资源:相当于获取一个锁 semaphore.acquire(); doSomething(); } catch (InterruptedException e) { System.out.println("Thread is interrupted..."); } finally { // 执行结束后,释放锁。 semaphore.release(); } } private static void doSomething() throws InterruptedException { TimeUnit.SECONDS.sleep(10); }
Semaphore
构造方法new Semaphore(n)
中的n设定的为state的值。在acquire
方法中对state变量做CAS减操作,减到0后,后续线程执行acquire时阻塞;在release
方法中对state变量做CAS加操作。Semaphore
同互斥锁一样,内部也实现了AQS的两个子类(公平与非公平),Semaphore
相当于代理类,代理执行Sync
的具体逻辑。由于与互斥锁原理一致,这里不做过多的分析。分析:想象一个10运动员是互不相干的,相当于Java中的10个线程,当他们达到起跑线并准备好后,仍然需要等其他线程到达,裁判才能鸣枪开跑。
这个时候就可以通过CountDownLatch来实现(不过更好的方式时CyclicBarrier,这里留一个彩蛋。)
/** *Thread-1 is ready! *Thread-2 is ready! *Thread-3 is ready! *Thread-0 is ready! *you can running */ public static void main(String[] args) throws InterruptedException { // 定义一个有4个运动员的CountDownLatch CountDownLatch latch = new CountDownLatch(4); String prefix = "runner-"; // 创建4个线程,相当于4个运动员 IntStream.rangeClosed(1, 4).forEach(i-> new Task(prefix + i, latch).start()); // 裁判阻塞等待,知道所有运动员ready latch.await(); System.out.println("you can running"); } private static class Task extends Thread{ private String name; private CountDownLatch latch; public Task(String name, CountDownLatch latch){ this.name = name; this.latch= latch; } @Override public void run() { // 随机休眠一段时间,模拟运动员准备工作 sleepRandom(); // 准备好了,向裁判汇报,即调用countdown方法 System.out.println(Thread.currentThread().getName() + " is ready!"); latch.countDown(); } }
看上图CountDownLatch的结构,有没有很简单。就是很简单。这里需要注意的是,引用的Sync类不再向互斥锁一样是抽象类,而是一个实体类。CountDownLatch没有公平与非公平之分。
public CountDownLatch(int count) { // 校验count值必须大于0 if (count < 0) throw new IllegalArgumentException("count < 0"); // 将count值传递给Sync对象,前面也说到,CountDownLatch相当于一个代理类,实际执行加锁和释放锁的操作都是在Sync中完成的。 this.sync = new Sync(count); } // Sync类的构造方法 Sync(int count) { // 将count值作为AQS中的state的初始值,state的大小,就是CountDownLatch中需要等待到达的线程数的大小 setState(count); } // 父类AQS中的方法,直接进行赋值操作。 protected final void setState(int newState) { state = newState; }
await()
方法,分析await()
的阻塞策略:public void await() throws InterruptedException { // 二话不说,直接交由Sync对象进行处理 sync.acquireSharedInterruptibly(1); } // Sync继承的父类AQS中的方法 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 由子类Sync实现,实际就是判断当前state变量是否等于0 // 等于0,表示所有线程都已经到达,则返回1,不阻塞,往下执行 // 不等于0,表示还有线程没有到达,则返回-1,进入if语句,阻塞当前线程 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // Sync中实现的方法:判断当前state变量是否等于0 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // AQS中实现的方法 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 将当前线程封装成一个Node节点放到阻塞队列的末尾 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 取出node节点的前驱节点,没有这抛出NPE final Node p = node.predecessor(); // 若当前节点的前驱节点即为head节点,即当前节点是阻塞队列中的第一个节点。 // 前文讲过,head节点不属于阻塞队列中的节点 if (p == head) { // 重新获取state的值:前面介绍过,有子类Sync实现。 // 能够走到这里,r一定是<0是,所以不会进入if语句 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 判断该节点是否应该阻塞,AQS中实现的方法,即判断前驱节点的waitStatus是不是等于SIGNAL。前面的博客有讲,不再赘述。 if (shouldParkAfterFailedAcquire(p, node) && // 阻塞并检查线程是否被中断 parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
countdown
方法public void countDown() { // 一样,直接交给Sync对象处理 sync.releaseShared(1); } //AQS中的方法,用于释放共享锁 public final boolean releaseShared(int arg) { // tryReleaseShared有子类Sync实现,通过CAS方法更新state变量,使其减去arg的值。 if (tryReleaseShared(arg)) { // AQS中的方法,释放所有等待的线程。此处不再赘述 doReleaseShared(); return true; } return false; } // Sync实现的方法:更新state变量 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { // 获取state变量的当前值 int c = getState(); // 判断c==0时,则返回false。为什么返回的false呢,state为0不是应该唤醒所有等待的线程的了嘛? // 你是不是有这样的疑问? // 此处获取到c==0,说明此时已经由其他线程将state值更新为0,而唤醒等待的线程的操作只能由最后一个执行countdown的线程完成。因此这里返回false。请看下面 if (c == 0) return false; int nextc = c-1; // 通过CAS更新state的值,使其减一,若成功则返回true,该线程唤醒所有处于等待状态的线程。 // 锁失败,则说明有其他线程更新了state值,则自旋,等待再次更新 if (compareAndSetState(c, nextc)) return nextc == 0; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。