赞
踩
信号量,也是基于AQS实现的一种类似操作系统PV操作的在Java层面实现的互斥锁。
PV操作是操作系统一种实现进程互斥与同步的有效方法。PV操作与信号量(S)的处理相关,P表示通过的意思,V表示释放的意思。用PV操作来管理共享资源时,首先要确保PV操作自身执行的正确性。
P操作的主要动作是:
V操作的主要动作是:
非公平,permits指的是可用资源的数量
- public Semaphore(int permits) {
- sync = new NonfairSync(permits);
- }
公平,fair为true则代表公平
- public Semaphore(int permits, boolean fair) {
- sync = fair ? new FairSync(permits) : new NonfairSync(permits);
- }
- public void acquire() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
-
- public void release() {
- sync.releaseShared(1);
- }
- public boolean tryAcquire()
- public int availablePermits()
- public final int getQueueLength()
- public final boolean hasQueuedThreads()
- protected void reducePermits(int reduction)
- protected Collection<Thread> getQueuedThreads()
- /**
- * 实现一个同时只能处理5个请求的限流器
- */
- private static Semaphore semaphore = new Semaphore(5);
-
- /**
- * 定义一个线程池
- */
- private static ThreadPoolExecutor executor = new ThreadPoolExecutor
- (10, 50, 60,
- TimeUnit.SECONDS, new LinkedBlockingDeque<>(200));
-
- /**
- * 模拟执行方法
- */
- public static void exec() {
- try {
- //占用1个资源
- semaphore.acquire(1);
- //TODO 模拟业务执行
- System.out.println("执行exec方法");
- Thread.sleep(2000);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- //释放一个资源
- semaphore.release(1);
- }
- }
-
- public static void main(String[] args) throws InterruptedException {
- {
- for (; ; ) {
- Thread.sleep(100);
- // 模拟请求以10个/s的速度
- executor.execute(() -> exec());
- }
- }
- }
结论:并发场景下,同一时间只允许设定资源数量的线程获取到锁。
是一个计数类型的同步协助类的共享锁,它允许一个或多个线程等待,直到操作完成为止。
实现原理是初始化的时候会先设置一个数值,然后通过await进行阻塞,再调用countDown对数值进行-1操作,直到值为0会直接执行所有等待的线程。
- public class CountDownLatch {
- /**
- * Synchronization control For CountDownLatch.
- * Uses AQS state to represent count.
- */
- private static final class Sync extends AbstractQueuedSynchronizer {
- private static final long serialVersionUID = 4982264981922014374L;
-
- Sync(int count) {
- setState(count);
- }
对应的其实使用的还是AQS中的state
- public CountDownLatch(int count) {
- if (count < 0) throw new IllegalArgumentException("count < 0");
- this.sync = new Sync(count);
- }
- public void await() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
- public boolean await(long timeout, TimeUnit unit)
- throws InterruptedException {
- return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
- }
- public void countDown() {
- sync.releaseShared(1);
- }
await():阻塞,无时限等待CountDownLatch中数值变为0,才执行线程中逻辑
await(long timeout, TimeUnit unit):阻塞,有时限等待CountDownLatch中数值变为0,才执行线程中逻辑。
countDown():将count的值-1,直至为0。
- CountDownLatch countDownLatch = new CountDownLatch(1);
- for (int i = 0; i < 5; i++) {
- new Thread(() -> {
- try {
- //准备完毕,阻塞在这,等待号令
- //countDownLatch.await(1000, TimeUnit.SECONDS);
- countDownLatch.await();
- String parter = "【" + Thread.currentThread().getName() + "】";
- System.out.println(parter + "开始执行……");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }).start();
- }
-
- Thread.sleep(2000);
- countDownLatch.countDown();// 执行发令
- }
- CountDownLatch countDownLatch = new CountDownLatch(5);
- for (int i = 0; i < 5; i++) {
- final int index = i;
- new Thread(() -> {
- try {
- Thread.sleep(1000 +
- ThreadLocalRandom.current().nextInt(1000));
- System.out.println(Thread.currentThread().getName()
- + " finish task" + index);
-
- countDownLatch.countDown();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }).start();
- }
- // 主线程在阻塞,当计数器==0,就唤醒主线程往下执行。
- countDownLatch.await();
- System.out.println("主线程:在所有任务运行完成后,进行结果汇总");
-
- }
把count的-1操作放到线程中,意味着前置线程执行结束,此时可执行阻塞的主线程。
线程中的join方法也能让线程等待,它和CountDownLock的区别在于
循环屏障,同countDownLatch一样可以让线程等待,它会让一组线程等待到某个状态之后再执行,但它还可以等所有线程释放后,再重复和之前一组线程的类似的等待,可循环利用。
1. CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次
2. CyclicBarrier还提供getNumberWaiting(可以获得CyclicBarrier阻塞的线程数量)、 isBroken(用来知道阻塞的线程是否被中断)等方法。
3. CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。
4. CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同。CountDownLatch一般用于一个或多个线程,等待其他线程执行完任务后,再执行。CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。
5. CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。
6. CyclicBarrier是通过ReentrantLock的"独占锁"和Conditon来实现一组线程的阻塞唤醒的,而CountDownLatch则是通过AQS的“共享锁”实现
参数parties代表的是初始化让等待的线程数量,barrierAction则是在阻塞时要先执行的任务,属性中的parties是代表的count的计数副本,会记录初始的count值,等待一组线程释放后使用。
- public CyclicBarrier(int parties, Runnable barrierAction) {
- if (parties <= 0) throw new IllegalArgumentException();
- this.parties = parties;
- this.count = parties;
- this.barrierCommand = barrierAction;
- }
- public CyclicBarrier(int parties) {
- this(parties, null);
- }
await方法的作用是当一组线程全部调用了该方法,就开始执行业务逻辑
- public int await() throws InterruptedException, BrokenBarrierException {
- try {
- return dowait(false, 0L);
- } catch (TimeoutException toe) {
- throw new Error(toe); // cannot happen
- }
- }
-
- public int await(long timeout, TimeUnit unit)
- throws InterruptedException,
- BrokenBarrierException,
- TimeoutException {
- return dowait(true, unit.toNanos(timeout));
- }
可用于多线程合并计算结果
- private ConcurrentHashMap<String, Integer> map=new ConcurrentHashMap<String,Integer>();
-
- private ExecutorService threadPool= Executors.newFixedThreadPool(3);
-
- private CyclicBarrier cb=new CyclicBarrier(3,()->{
- int result=0;
- Set<String> set = map.keySet();
- for(String s:set){
- result+=map.get(s);
- }
- System.out.println("三人平均成绩为:"+(result/3)+"分");
- });
-
-
- public void count(){
- for(int i=0;i<3;i++){
- threadPool.execute(new Runnable(){
-
- @Override
- public void run() {
- //获取学生平均成绩
- int score=(int)(Math.random()*40+60);
- map.put(Thread.currentThread().getName(), score);
- System.out.println(Thread.currentThread().getName()
- +"同学的平均成绩为:"+score);
- try {
- //执行完运行await(),等待所有学生平均成绩都计算完毕
- cb.await();
- } catch (InterruptedException | BrokenBarrierException e) {
- e.printStackTrace();
- }
- }
-
- });
- }
- }
-
-
- public static void main(String[] args) {
- CyclicBarrierTest2 cb=new CyclicBarrierTest2();
- cb.count();
- }
可应用于多线程等待的重复事件
- public static void main(String[] args) {
-
- AtomicInteger counter = new AtomicInteger();
- ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
- 5, 5, 1000, TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(100),
- (r) -> new Thread(r, counter.addAndGet(1) + " 号 "),
- new ThreadPoolExecutor.AbortPolicy());
-
- CyclicBarrier cyclicBarrier = new CyclicBarrier(5,
- () -> System.out.println("裁判:比赛开始~~"));
-
- for (int i = 0; i < 10; i++) {
- threadPoolExecutor.submit(new Runner(cyclicBarrier));
- }
-
- }
- static class Runner extends Thread{
- private CyclicBarrier cyclicBarrier;
- public Runner (CyclicBarrier cyclicBarrier) {
- this.cyclicBarrier = cyclicBarrier;
- }
-
- @Override
- public void run() {
- try {
- int sleepMills = ThreadLocalRandom.current().nextInt(1000);
- Thread.sleep(sleepMills);
- System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting());
- cyclicBarrier.await();
-
- } catch (InterruptedException e) {
- e.printStackTrace();
- }catch(BrokenBarrierException e){
- e.printStackTrace();
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。