当前位置:   article > 正文

JUC 高并发编程(3):阻塞队列,线程池_juc为啥不提交到最大线程池而是到阻塞队列

juc为啥不提交到最大线程池而是到阻塞队列

阻塞队列

阻塞队列概述

定义:

  • 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞
  • 当阻塞队列是满时,往队列中添加元素的操作将会被阻塞
    在这里插入图片描述

用处

  • 是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为 BlockingQueue 都一手给你包办好了
  • concurrent包发布以前,在多线程环境下,我们每个程序员都必须自己去控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度.

架构介绍
在这里插入图片描述

阻塞队列种类

  • ①. ArrayBlockingQueue: 由数组结构组成的有界阻塞队列

  • ②.LinkedBlockingQueue: 由链表结构组成的有界(但大小默认值 Integer>MAX_VAL UE) 阻塞队列.

  • ③. SynchronousQueue: 不存储元素的阻塞队列, 也就是单个元素的队列.

    • SynchronousQueue是无界的,是一种无缓冲的等待队列,但是由于该Queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加
    • 可以认为 SynchronousQueue 是一个缓存值为1的阻塞队列,但是 isEmpty() 方法永远返回是true,remainingCapacity() 方法永远返回是0,remove() 和removeAll() 方法永远返回是false,iterator()方法永远返回空,peek()方法永远返回null。
    • 例子
      public class SynchronousQueueDemo {
          public static void main(String[] args) {
              BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
              new Thread(() -> {
                  try {
                      System.out.println(Thread.currentThread().getName() + "\t put 1");
                      blockingQueue.put("1");
                      System.out.println(Thread.currentThread().getName() + "\t put 2");
                      blockingQueue.put("2");
                      System.out.println(Thread.currentThread().getName() + "\t put 3");
                      blockingQueue.put("3");
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }, "AAA").start();
      
              new Thread(() -> {
                  try {
                      try {
                          TimeUnit.SECONDS.sleep(5);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take());
                      try {
                          TimeUnit.SECONDS.sleep(5);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take());
                      try {
                          TimeUnit.SECONDS.sleep(5);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take());
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }, "BBB").start();
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      结果显示:
      在这里插入图片描述
  • ④. PriorityBlockingQueue: 支持优先级排序的无界阻塞队列.

  • ⑤. LinkedTransferQueue: 由链表结构组成的无界阻塞队列.

  • ⑥. LinkedBlockingDeque: 由了解结构组成的双向阻塞队列.

BlockingQueue的核心方法

在这里插入图片描述
在这里插入图片描述
例子: 演示阻塞队列满了后的情况

public class BlockingQueueExceptionDemo {
    public static void main(String[] args) {
        //List list=new ArrayList();
        //注意:这里要给一个初始值
        BlockingQueue<String>blockingQueue=new ArrayBlockingQueue<>(3);
        //add() 方法是给ArrayBlockingQueue添加元素,如果超过会抛出异常!
        System.out.println(blockingQueue.add("a"));//true
        System.out.println(blockingQueue.add("b"));//true
        System.out.println(blockingQueue.add("c"));//true
        //element是检查元素有没有? 检查的是出栈的元素
        blockingQueue.element();

        //remove()
        System.out.println(blockingQueue.remove());//a
        System.out.println(blockingQueue.remove());//b
        System.out.println(blockingQueue.remove());//c
    }
}
    @Test
    public void offerAndPoll()throws Exception{
        BlockingQueue blockingQueue=new ArrayBlockingQueue(3);
        //前三个直接成功
        System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
        //下面这个会阻塞2s
        System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

结果:
前三个直接输出,最后一个会阻塞两秒,再输出。

线程池

线程池做的工作主要是控制运行的线程的数量,处理过程中将任务加入队列,然后在线程创建后启动这些任务,

  • 如果显示超过了最大数量,超出的数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行
  • 它的主要特点为:线程复用 | 控制最大并发数 | 管理线程

ThreadPoolExecutor

Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类,如下图:
在这里插入图片描述

线程池的创建
Executor 的接口方法中存在三种创建线程池的方法如下:

  • Executors.newFixedThreadPool(int) : 创建定长线程池(一池定线程)
    在这里插入图片描述

  • Executors.newSingleThreadExecutor( ): 一池一线程,保证任务的顺序执行
    在这里插入图片描述

  • Executors.newCachedThreadPool( ) : 创建可灵活改变大小的线程,一池N线程
    在这里插入图片描述
    在这里插入图片描述

例子如下:

/*
//看cpu的核数
//System.out.println(Runtime.getRuntime().availableProcessors());
* 第四种获取/使用java多线程的方式,线程池
* */
public class ExecutorTest {
    public static void main(String[] args) {

        //ExecutorService threadPool= Executors.newFixedThreadPool(5);//一池5个处理线程
        //ExecutorService threadPool=Executors.newSingleThreadExecutor();//一池一线程
        ExecutorService threadPool=Executors.newCachedThreadPool();//一池N线程

        try {
            for (int i = 1; i <= 10; i++) {
                //使用
                threadPool.execute(() -> {
                    //模拟10个用户来办理业务,每个用户就是一个来自外部的请求线程
                    System.out.println(Thread.currentThread().getName() + "\t 办理业务~!");
                });
                //try { TimeUnit.SECONDS.sleep(3);  } catch (InterruptedException e) {e.printStackTrace();}
            }

        }catch (Exception e){

        }finally {
            //关闭
            threadPool.shutdown();
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 一池N线程结果:

    pool-1-thread-2	 办理业务~!
    pool-1-thread-1	 办理业务~!
    pool-1-thread-1	 办理业务~!
    pool-1-thread-3	 办理业务~!
    pool-1-thread-4	 办理业务~!
    pool-1-thread-5	 办理业务~!
    pool-1-thread-6	 办理业务~!
    pool-1-thread-7	 办理业务~!
    pool-1-thread-9	 办理业务~!
    pool-1-thread-8	 办理业务~
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  • 一池 5线程结果

    pool-1-thread-1	 办理业务~!
    pool-1-thread-2	 办理业务~!
    pool-1-thread-2	 办理业务~!
    pool-1-thread-2	 办理业务~!
    pool-1-thread-2	 办理业务~!
    pool-1-thread-2	 办理业务~!
    pool-1-thread-3	 办理业务~!
    pool-1-thread-1	 办理业务~!
    pool-1-thread-4	 办理业务~!
    pool-1-thread-5	 办理业务~
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  • 一池一线程结果:

    pool-1-thread-1	 办理业务~!
    pool-1-thread-1	 办理业务~!
    pool-1-thread-1	 办理业务~!
    pool-1-thread-1	 办理业务~!
    pool-1-thread-1	 办理业务~!
    pool-1-thread-1	 办理业务~!
    pool-1-thread-1	 办理业务~!
    pool-1-thread-1	 办理业务~!
    pool-1-thread-1	 办理业务~!
    pool-1-thread-1	 办理业务~
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

线程池的七大参数

①. corePoolSize: 线程池中的常驻核心线程数。

  • 在创建了线程池后,当有请求任务来之后,就会安排池中的线程去执行请求任务,近似理解为今日当值线程
  • 当线程池中的线程数目达到corePoolSize后,就会把到达的任务放入到缓存队列当中.

②. maximumPoolSize: 线程池能够容纳同时执行的最大线程数,此值大于等于1

③. keepAliveTime: 多余的空闲线程存活时间

  • 当线程超过 corePoolsize 时,其他
  • 当空间时间达到keepAliveTime值时,多余的线程会被销毁直到只剩下corePoolSize个线程为止(非核心线程)
  • unit: keepAliveTime的单位

⑤. workQueue: 任务队列,被提交但尚未被执行的任务(候客区)

⑥.threadFactory: 表示生成线程池中工作线程的线程工厂,

  • 用户创建新线程,一般用默认即可(银行网站的logo | 工作人员的制服 | 胸卡等)

⑦. handler:拒绝策略

  • 表示当线程队列满了并且工作线程大于等于线程池的最大显示 数(maxnumPoolSize)时如何来拒绝

形象化的理解:
在这里插入图片描述

在这里插入图片描述

线程池的底层工作原理

在这里插入图片描述
在这里插入图片描述

线程池的拒绝策略请你谈谈

等待队列也已经排满了,再也塞不下新的任务了。同时,线程池的maximumPoolSize也到达了,无法接续为新任务服务,这时我们需要拒绝策略机制合理的处理这个问题。

  • AbortPolicy(默认):直接抛出RejectedException异常阻止系统正常运行
  • CallerRunsPolicy: "调用者运行"一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是返回给调用者进行处理
  • DiscardOldestPolicy: 将最早进入队列的任务删除,之后再尝试加入队列
  • DiscardPolicy: 直接丢弃任务,不予任何处理也不抛出异常.如果允许任务丢失,这是最好的拒绝策略

以上内置策略均实现了RejectExecutionHandler接口

单一的/固定数的/可变你的三种创建线程池的方法,你用哪个多?

答案 :一个都不用,我们生产上只能使用自定义的。

参考阿里巴巴java开发手册
【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。 说明:使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
【强制】==线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,==这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

说明:Executors返回的线程池对象的弊端如下:

  • (1). FixedThreadPool和SingleThreadPool:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
  • (2). CachedThreadPool和ScheduledThreadPool:允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

自定义创建线程池的方法

利用 常见线程池,

    public ThreadPoolExecutor(int corePoolSize,//允许正在服务线程数
                              int maximumPoolSize,//最大线程数
                              long keepAliveTime,//最大空闲时间
                              TimeUnit unit,//时间单位
                              BlockingQueue<Runnable> workQueue,//阻塞队列,等待任务队列
                              RejectedExecutionHandler handler) {// 拒绝策略
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

例子:

public class MyThreadPoolDemo {
    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                1L,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<Runnable>(3),
                Executors.defaultThreadFactory(),
                //默认抛出异常
                //new ThreadPoolExecutor.AbortPolicy()
                //回退调用者
                //new ThreadPoolExecutor.CallerRunsPolicy()
                //处理不来的不处理,丢弃时间最长的
                //new ThreadPoolExecutor.DiscardOldestPolicy()
                //直接丢弃任务,不予任何处理也不抛出异常
                new ThreadPoolExecutor.DiscardPolicy()
        );
        //模拟10个用户来办理业务 没有用户就是来自外部的请求线程.
        try {
            for (int i = 1; i <= 10; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t 办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
        //threadPoolInit();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

上面代码由于拒绝策略不同,得到的结果也是不同的:

  • ①. AbortPolicy: 最大不会抛出异常的值= maximumPoolSize + new LinkedBlockin gDeque<Runnable>(3) =8个。如果超过8个,默认的拒绝策略会抛出异常
  • ②. CallerRunPolicy: 如果超过8个,不会抛出异常,会返回给调用者去
  • ③. DiscardOldestPolicy:如果超过8个,将最早进入队列的任务删除,之后再尝试加入队列 注意是删除队列,而非已在执行的任务
  • ④. DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常.如果允许任务丢失,这是最好的拒绝策略

合理配置线程池你是如何考虑的?

①. CPU密集型
在这里插入图片描述
②. IO密集型
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/968473
推荐阅读
相关标签
  

闽ICP备14008679号