当前位置:   article > 正文

玩转高并发系列----JUC并发工具类(一)_高并发工具包

高并发工具包

在这一章节中,我将详细讲解JUC中提供的一些常用的并发工具类的使用,如CountDownLatch, SemaphoreThreadLocal,CyclicBarrier等进行逐一讲解,并通过一行一行源码的方式来逐步探索多线程的奥秘。

Semaphore
  1. Semaphore就是信号量,提供了资源数量的并发访问控制,相当于可以指定n个锁,允许n个线程同时访问。利用Semaphore可以很好的做到限流。
  2. Semaphore主要提供了两个方法acquire()获取资源和release()释放资源。
public static void main(String[] args) {
		// 定义一个拥有十个资源的信号池
        Semaphore semaphore = new Semaphore(10);
        try {
        // 请求获取一个资源:相当于获取一个锁
            semaphore.acquire();
            doSomething();
        } catch (InterruptedException e) {
            System.out.println("Thread is interrupted...");
        } finally {
         // 执行结束后,释放锁。
            semaphore.release();
        }
    }
    
    private static void doSomething() throws InterruptedException {
        TimeUnit.SECONDS.sleep(10);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  1. Semaphore的实现原理和互斥锁基本相同,通过AQS实现,AQS中的state变量作为资源总数,即Semaphore构造方法new Semaphore(n)中的n设定的为state的值。在acquire方法中对state变量做CAS减操作,减到0后,后续线程执行acquire时阻塞;在release方法中对state变量做CAS加操作。
  2. Semaphore同互斥锁一样,内部也实现了AQS的两个子类(公平与非公平),Semaphore相当于代理类,代理执行Sync的具体逻辑。由于与互斥锁原理一致,这里不做过多的分析。
CountDownLatch
  1. 首先我们来想象一个场景:10个运动员参加100米赛跑,裁判需要等10个运动员都准备好之后才能鸣枪,开始赛跑。这个时候要怎么解决呢?

分析:想象一个10运动员是互不相干的,相当于Java中的10个线程,当他们达到起跑线并准备好后,仍然需要等其他线程到达,裁判才能鸣枪开跑。
这个时候就可以通过CountDownLatch来实现(不过更好的方式时CyclicBarrier,这里留一个彩蛋。)

  1. CountDownLatch与Semaphore类似,也是基于AQS实现的,不过没有公平与非公平之分。
/**
*Thread-1 is ready!
*Thread-2 is ready!
*Thread-3 is ready!
*Thread-0 is ready!
*you can running
*/
public static void main(String[] args) throws InterruptedException {
// 定义一个有4个运动员的CountDownLatch
        CountDownLatch latch = new CountDownLatch(4);
        String prefix = "runner-";
        // 创建4个线程,相当于4个运动员
        IntStream.rangeClosed(1, 4).forEach(i-> new Task(prefix + i, latch).start());

		// 裁判阻塞等待,知道所有运动员ready
        latch.await();
        System.out.println("you can running");
    }

    private static class Task extends Thread{

        private String name;

        private CountDownLatch latch;

        public Task(String name, CountDownLatch latch){
            this.name = name;
            this.latch=  latch;
        }

        @Override
        public void run() {
        // 随机休眠一段时间,模拟运动员准备工作
            sleepRandom();
		// 准备好了,向裁判汇报,即调用countdown方法            System.out.println(Thread.currentThread().getName() + " is ready!");
            latch.countDown();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  1. 前面说到,CountDownLatch也是通过AQS实现的,所以其内部也一定有一个实现的AQS的内部类Sync。先看类结构图:
    CountDownLatch

看上图CountDownLatch的结构,有没有很简单。就是很简单。这里需要注意的是,引用的Sync类不再向互斥锁一样是抽象类,而是一个实体类。CountDownLatch没有公平与非公平之分。

  1. 首先分析其构造方法:直接看源码:
    public CountDownLatch(int count) {
    // 校验count值必须大于0
        if (count < 0) throw new IllegalArgumentException("count < 0");
        // 将count值传递给Sync对象,前面也说到,CountDownLatch相当于一个代理类,实际执行加锁和释放锁的操作都是在Sync中完成的。
        this.sync = new Sync(count);
    }
    // Sync类的构造方法
    Sync(int count) {
    // 将count值作为AQS中的state的初始值,state的大小,就是CountDownLatch中需要等待到达的线程数的大小
       setState(count);
    }

// 父类AQS中的方法,直接进行赋值操作。
    protected final void setState(int newState) {
        state = newState;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  1. 接下来分析一下await()方法,分析await()的阻塞策略:
    public void await() throws InterruptedException {
    // 二话不说,直接交由Sync对象进行处理
        sync.acquireSharedInterruptibly(1);
    }
	// Sync继承的父类AQS中的方法
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 由子类Sync实现,实际就是判断当前state变量是否等于0
        // 等于0,表示所有线程都已经到达,则返回1,不阻塞,往下执行
        // 不等于0,表示还有线程没有到达,则返回-1,进入if语句,阻塞当前线程
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    
    // Sync中实现的方法:判断当前state变量是否等于0
    protected int tryAcquireShared(int acquires) {
       return (getState() == 0) ? 1 : -1;
    }
	// AQS中实现的方法
	private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 将当前线程封装成一个Node节点放到阻塞队列的末尾
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
            // 取出node节点的前驱节点,没有这抛出NPE
                final Node p = node.predecessor();
                // 若当前节点的前驱节点即为head节点,即当前节点是阻塞队列中的第一个节点。
                // 前文讲过,head节点不属于阻塞队列中的节点
                if (p == head) {
                // 重新获取state的值:前面介绍过,有子类Sync实现。
                // 能够走到这里,r一定是<0是,所以不会进入if语句
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                // 判断该节点是否应该阻塞,AQS中实现的方法,即判断前驱节点的waitStatus是不是等于SIGNAL。前面的博客有讲,不再赘述。
                if (shouldParkAfterFailedAcquire(p, node) &&
                // 阻塞并检查线程是否被中断
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  1. 接下来分析countdown方法
    public void countDown() {
    // 一样,直接交给Sync对象处理
        sync.releaseShared(1);
    }
    
	//AQS中的方法,用于释放共享锁
    public final boolean releaseShared(int arg) {
    // tryReleaseShared有子类Sync实现,通过CAS方法更新state变量,使其减去arg的值。
        if (tryReleaseShared(arg)) {
        // AQS中的方法,释放所有等待的线程。此处不再赘述
            doReleaseShared();
            return true;
        }
        return false;
    }
// Sync实现的方法:更新state变量
   protected boolean tryReleaseShared(int releases) {
      // Decrement count; signal when transition to zero
     for (;;) {
     // 获取state变量的当前值
        int c = getState();
        // 判断c==0时,则返回false。为什么返回的false呢,state为0不是应该唤醒所有等待的线程的了嘛?
        // 你是不是有这样的疑问?
        // 此处获取到c==0,说明此时已经由其他线程将state值更新为0,而唤醒等待的线程的操作只能由最后一个执行countdown的线程完成。因此这里返回false。请看下面
        if (c == 0)
            return false;
        int nextc = c-1;
        // 通过CAS更新state的值,使其减一,若成功则返回true,该线程唤醒所有处于等待状态的线程。
        // 锁失败,则说明有其他线程更新了state值,则自旋,等待再次更新
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/828800
推荐阅读
相关标签
  

闽ICP备14008679号