当前位置:   article > 正文

Java并发编程 之线程池ThreadPoolExecutor详解_java threadpoolexecutor

java threadpoolexecutor

线程池经典面试题

  • 面试问题1:Java的线程池说一下,各个参数的作用,如何进行的?
  • 面试问题2:按线程池内部机制,当提交新任务时,有哪些异常要考虑。
  • 面试问题3:线程池都有哪几种工作队列?
  • 面试问题4:使用无界队列的线程池会导致内存飙升吗?
  • 面试问题5:说说几种常见的线程池及使用场景?

面试: 一个线程池 core 7; max 20 ,queue:50,100 并发进来怎么分配的;
先有 7 个能直接得到执行,接下来 50 个进入队列排队,在多开 13 个继续执行。现在 70 个 被安排上了。剩下 30 个默认拒绝策略。

为什么使用线程池

线程池做的工作主要是控制运行的线程的数量,处理过程中将任务加入队列,然后在线程创建后启动这些任务,如果线程超过了最大数量,超出的数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行。

他的主要特点为: 线程复用、控制最大并发数、管理线程。

  • 降低资源消耗。通过重复利用自己创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗资源,还会降低系统的稳定性,使用线程池可以进行统一分配,调优和监控。

线程池的创建

线程池可以通过ThreadPoolExecutor来创建。

private ExecutorService executor;
private int             queueSize      	= 10000; 	// 任务队列数
private int             corePoolSize	= 10;    	// 核心线程数
private int             maxPoolSize		= 10;   	// 最大线程数 
public void initExecutor() {
	if (executor == null) {
		executor = new ThreadPoolExecutor(
								corePoolSize, 
								maxPoolSize, 
								60L, 
								TimeUnit.MILLISECONDS,
                 				new LinkedBlockingQueue<Runnable>(queueSize), 
                 				new ThreadPoolExecutor.CallerRunsPolicy()
       );
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

线程池ThreadPoolExecutor常用方法

  • submit():提交任务,返回Future,可以返回结果,可以传入Callable、Runnable。
  • execute():提交任务,没有返回值,只可以传入Runnable。
  • shutdown():关闭线程池,等待任务都执行完
  • shutdownNow():关闭线程池,不等待任务执行完
  • getTaskCount():线程池已执行和未执行的任务总数
  • getCompletedTaskCount():已完成的任务数量
  • getPoolSize():线程池当前线程数量
  • getActiveCount():当前线程池中正在执行任务的线程数量

线程池ThreadPoolExecutor七个参数详解

在这里插入图片描述

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

构造函数参数说明:

  • corePoolSize: 一直保持的线程的数量,即使线程空闲也不会释放。除非设置了 allowCoreThreadTimeout 为 true;
  • maxPoolSize:允许最大的线程数,队列满时开启新线程直到等于该值;
  • workQueue:用于缓存任务的阻塞队列;

线程池添加新的任务说明三者之间的关系。

1、如果没有空闲的线程执行该任务且当前运行的线程数少于 corePoolSize,则添加新的线程执行该任务。
2、如果没有空闲的线程执行该任务且当前的线程数等于corePoolSize同时阻塞队列未满,则将任务入队列,而不添加新的线程。
3、如果没有空闲的线程执行该任务且阻塞队列已满同时池中的线程数小于maximumPoolSize,则创建新的线程执行任务。
4、如果没有空闲的线程执行该任务且阻塞队列已满同时池中的线程数等于maximumPoolSize,则根据 handler 指定的策略来拒绝新的任务。

总结:
如果把线程池比作一个单位的话,corePoolSize 表示正式员工,线程可以表示一个员工。当向单位委派一项工作时,如果发现正式员工还没招满,单位就会招个正式员工来完成这项工作。随着向这个单位委派的工作增多,即使正式员工全部满了,工作还是干不完,那么单位只能按照新委派的工作先后顺序将他们找个地方搁置起来,这个地方就是 workQueue,等某个正式员工完成了手上的工作,就到放置任务的地方来领取新的任务。如果有大量的任务向这个单位委派,导致 workQueue 已经没有空位来放置新的任务了,于是单位决定招临时工来完成任务。临时工不是想招多少就是多少,通过 maxPoolSize 规定了单位的人数最大值。

  • keepAliveTime:表示空闲线程的存活时间。
    当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize
    只有当线程池中的线程数大于corePoolSizekeepAliveTime才会起作用,直到线程中的线程数不大于corepoolSIze
  • TimeUnitunit:表示keepAliveTime的单位。

为了解释keepAliveTime的作用,我们在上述情况下做一种假设。假设线程池这个单位已经招了些临时工,但新任务没有继续增加,所以随着每个员工忙完手头的工作,都来workQueue领取新的任务(看看这个单位的员工多自觉啊)。随着各个员工齐心协力,任务越来越少,员工数没变,那么就必定有闲着没事干的员工。这样的话领导不乐意啦,但是又不能轻易fire没事干的员工,因为随时可能有新任务来,于是领导想了个办法,设定了keepAliveTime,当空闲的员工在keepAliveTime这段时间还没有找到事情干,就被辞退啦,毕竟地主家也没有余粮啊!当然辞退到corePoolSize个员工时就不再辞退了,领导也不想当光杆司令啊!

  • workQueue:缓存任务的队列。实现 BlockingQueue
    LinkedBlockingQueue<Runnable>:用链表实现的队列,可以是有界的,也可以是无界的,但在Executors中默认使用无界的。

  • handler:表示当 workQueue 已满,且池中的线程数达到 maxPoolSize 时,线程池拒绝添加新任务时采取的策略。

ThreadPoolExecutor.AbortPolicy():默认值;抛出RejectedExecutionException异常
ThreadPoolExecutor.CallerRunsPolicy():由向线程池提交任务的线程来执行该任务
ThreadPoolExecutor.DiscardOldestPolicy():抛弃最旧的任务(最先提交而没有得到执行的任务)
ThreadPoolExecutor.DiscardPolicy():抛弃当前的任务

  • threadFactory:指定创建线程的工厂。
    创建工厂的两种方式:Executors.defaultThreadFactory()new ThreadFactoryBuilder().setNameFormat("task-service-pool-%d").build()

一个额外参数:

  • allowCoreThreadTimeout:是否允许核心线程空闲退出,默认值为false。
    核心线程在 allowCoreThreadTimeout 为 true 时才会超时退出,默认不会退出。

运行流程:
1、线程池创建,预先准备好 core 数量的核心线程,准备接受任务。
2、新的任务进来,用 core 准备好的空闲线程执行任务。
3、core 满了,就将再进来的任务放入到阻塞队列中。空闲的 core 就会自己去阻塞队列获取任务执行。
4、阻塞队列满了,就会直接开启新线程执行,最大只能开到 max指定的数量。
5、max 都执行好了,空闲的线程会在 keepAliveTime 指定的时间后自动释放线程。最总保持到 core 大小。
6、如果线程数开到 max 的数量,还有新任务进来,就会使用 reject指定的拒绝策略进行处理。

线程池的运行原理

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

ExecutorService threadPool = Executors.newFixedThreadPool(10);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

Executors.newFixedThreadPool(5)方式创建线程池,并提交任务方式来讲解,刚开始的时候其实线程池里是空的,就是一个线程都没有的。
在这里插入图片描述
接着如果使用线程池提交一个任务进去,希望由线程池里的一个线程来执行,如下代码所示,就是提交一个任务:

threadPool.execute(() -> {
});
  • 1
  • 2

这个时候,线程池会先看一下,现在池子里的线程数量有没有有达到corePoolSize指定的数量。

现在线程池里的线程数量是0,然后corePoolSize是10,那么肯定没达到了,所以直接会在线程池里创建一个线程出来然后执行这个任务。
在这里插入图片描述
接着假如说,这个线程处理完一个任务了,那么此时线程是不会被销毁的,因为线程池内的线程数量小于corePoolSize,他会一直等待下一个提交过来的任务。

接下来,到底是怎么等待的呢?

线程池会搭配一个workQueue,比如这里搭配的就是一个无界的 LinkedBlockingQueue,几乎可以无限量放入任务。

然后那个线程处理完一个任务之后,就会用阻塞的方式尝试从任务队列里获取任务,如果队列是空的,他就会阻塞卡在那儿不动,直到有人放一个任务到队列里,他才会获取到一个任务然后继续执行,循环往复。
在这里插入图片描述
接着再次提交任务,线程池一判断发现,好像线程数量才只有1个,完全比 corePoolSize(10个)要小,那么继续直接在池子里创建一个线程,然后处理这个任务,处理完了继续尝试从workQueue里阻塞式获取任务。

一直重复上面的操作,直到线程池里有10个线程了,达到了corePoolSize指定的数量。
在这里插入图片描述
这个时候你如果再提交任务,他一下子发现,线程池里已经有10个线程了,跟corePoolSize指定的线程数量一样了。

那么现在,我就不需要创建任何一个额外的线程了,现在你只要提交任务,全部直接入队到workQueue里就好。

此时线程池里的线程都阻塞式在workQueue上等待获取任务,有一个任务进来就会唤醒一个线程来处理这个任务,处理完了任务再次阻塞在workQueue上尝试获取下一个任务。
在这里插入图片描述
这里我们看到他用的是一个无界的LinkedBlockingQueue,但是假如说他用的是一个有界的队列呢?

比如说限定好了队列最多只能放10个任务,那么假如说,线程池里的线程来不及处理任务了,然后队列一下子放满了10个任务。

此时就会出现任务入队的失败,因为队列满了,无法入队。

然后就会尝试再次在线程池里创建线程,这个时候就会一直创建线程直到线程池里的线程数量达到maximumPoolSize指定的数量为止。

虽然这里fixed线程池默认corePoolSizemaximumPoolSize的数量都是一致的,但是可以假设此时maximumPoolSize的数量是20呢?

那么就会继续创建线程,直到线程数量达到20个,然后用额外创建的10个线程在队列满的情况下,继续处理任务。

整个过程:
在这里插入图片描述
接着万一队列满了,然后线程池的线程数量达到了maximumPoolSize指定的数量了,你额外创建线程都无法创建了,此时会如何呢?

答案是:会reject掉,不让你继续提交任务了,此时默认的就是抛出一个异常。

那么,在上图中额外创建出来的,超出corePoolSize的那些线程呢?

他们一旦创建出来之后,会发现线程池数量已经超过corePoolSize了,此时他们会尝试等待workQueue里的任务。

一旦超过keepAliveTime指定的时间,还获取不到任务,比如keepAliveTime是60秒,那么假如超过60秒获取不到任务,他就会自动释放掉了,这个线程就销毁了。

整个过程:
在这里插入图片描述

无界队列引发的内存飙升

以最常用的fixed线程池举例,他的线程池数量是固定的,因为他用的是近乎于无界的LinkedBlockingQueue,几乎可以无限制的放入任务到队列里。

所以只要线程池里的线程数量达到了corePoolSize指定的数量之后,接下来就维持这个固定数量的线程了。

然后,所有任务都会入队到workQueue里去,线程从workQueue获取任务来处理。

这个队列几乎永远不会满,当然这是几乎,因为LinkedBlockingQueue默认的最大任务数量是Integer.MAX_VALUE,非常大,近乎于可以理解为无限吧。

只要队列不满,就跟maximumPoolSize、keepAliveTime这些没关系了,因为不会创建超过corePoolSize数量的线程的。
在这里插入图片描述
那么此时万一每个线程获取到一个任务之后,他处理的时间特别特别的长,长到了令人发指的地步。比如处理一个任务要几个小时,此时会如何?

当然会出现workQueue里不断的积压越来越多得任务,不停的增加。

这个过程中会导致机器的内存使用不停的飙升,最后也许极端情况下就导致JVM OOM了,系统就挂掉了。

任务执行逻辑

线程池执行流程,即对应execute()/submit()方法:
在这里插入图片描述
执行逻辑说明:

  • 判断核心线程数是否已满,核心线程数大小和corePoolSize参数有关,未满则创建线程执行任务
  • 若核心线程池已满,判断队列是否满,队列是否满和workQueue参数有关,若未满则加入队列中
  • 若队列已满,判断线程池是否已满,线程池是否已满和maximumPoolSize参数有关,若未满创建线程执行任务
  • 若线程池已满,则采用拒绝策略处理无法执执行的任务,拒绝策略和handler参数有关

形象描述线程池执行过程:

  • 核心线程比作公司正式员工
  • 非核心线程比作外包员工
  • 阻塞队列比作需求池
  • 提交任务比作提需求
    在这里插入图片描述
  • 当产品提个需求,正式员工(核心线程)先接需求(执行任务)
  • 如果正式员工都有需求在做,即核心线程数已满),产品就把需求先放需求池(阻塞队列)。
  • 如果需求池(阻塞队列)也满了,但是这时候产品继续提需求,怎么办呢?那就请外包(非核心线程)来做。
  • 如果所有员工(最大线程数也满了)都有需求在做了,那就执行拒绝策略。
  • 如果外包员工把需求做完了,它经过一段(keepAliveTime)空闲时间,就离开公司了。

四种拒绝策略

等待队列也已经排满了,再也塞不下新的任务了
同时,
线程池的max也到达了,无法接续为新任务服务。

这时我们需要拒绝策略机制合理的处理这个问题.。

JDK内置的拒绝策略:

  • AbortPolicy(默认):抛出一个异常。
  • CallerRunPolicy:交给线程池调用所在的线程进行处理。
  • DiscardOldestPolicy:丢弃队列里最老的任务,将当前这个任务继续提交给线程池。
  • DiscardPolicy:直接丢弃任务。

线程池的工作队列

ArrayBlockingQueue
LinkedBlockingQueue
DelayQueue
PriorityBlockingQueue
SynchronousQueue
  • 1
  • 2
  • 3
  • 4
  • 5

ArrayBlockingQueue
ArrayBlockingQueue(有界队列)是一个用数组实现的有界阻塞队列,按FIFO排序量。

LinkedBlockingQueue
LinkedBlockingQueue(可设置容量队列)基于链表结构的阻塞队列,按FIFO排序任务,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,吞吐量通常要高于ArrayBlockingQuene;newFixedThreadPool线程池使用了这个队列

DelayQueue
DelayQueue(延迟队列)是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序。newScheduledThreadPool线程池使用了这个队列。

PriorityBlockingQueue
PriorityBlockingQueue(优先级队列)是具有优先级的无界阻塞队列;

SynchronousQueue
SynchronousQueue(同步队列)一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene,newCachedThreadPool线程池使用了这个队列。

针对面试题:线程池都有哪几种工作队列?
回答以上几种ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue等,说出它们的特点,并结合使用到对应队列的常用线程池(如newFixedThreadPool线程池使用LinkedBlockingQueue),进行展开阐述。

Executors 创建线程池工具类(不推荐)

Executors 调用了 ThreadPoolExecutor 来创建线程池的,可以帮助用户根据不用的场景,配置不同的参数。
不推荐使用,因为默认保存任务的队列是无界的,使用原生 new ThreadPoolExecutor() 创建线程池根据自己的需求设置参数来创建线程池比较好。
总结:
FixedThreadPoolSingleThreadExecutor => 允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而引起OOM异常。
CachedThreadPool => 允许创建的线程数为Integer.MAX_VALUE,可能会创建大量的线程,从而引起OOM异常。

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

newFixedThreadPool线程池特点

  • 核心线程数和最大线程数大小一样
  • 没有所谓的非空闲时间,即keepAliveTime为0
  • 阻塞队列为无界队列LinkedBlockingQueue

工作机制
在这里插入图片描述
1、提交任务
2、如果线程数少于核心线程,创建核心线程执行任务
3、如果线程数等于核心线程,把任务添加到LinkedBlockingQueue阻塞队列
4、如果线程执行完任务,去阻塞队列取任务,继续执行

案例代码

ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
    executor.execute(() -> {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            //do nothing
        }
    });
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

IDE指定JVM参数:-Xmx8m -Xms8m
在这里插入图片描述
RUN以上代码,会抛出OOM:
在这里插入图片描述
针对面试题::使用无界队列的线程池会导致内存飙升吗?
答案 :会的,newFixedThreadPool使用了无界的阻塞队列LinkedBlockingQueue,如果线程获取一个任务后,任务的执行时间比较长(比如,上面demo设置了10秒),会导致队列的任务越积越多,导致机器内存使用不停飙升, 最终导致OOM。

使用场景
FixedThreadPool 适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。

newCachedThreadPool

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

线程池特点

  • 核心线程数为0
  • 最大线程数为Integer.MAX_VALUE
  • 阻塞队列是SynchronousQueue
  • 非核心线程空闲存活时间为60秒

当提交任务的速度大于处理任务的速度时,每次提交一个任务,就必然会创建一个线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。

工作机制
在这里插入图片描述
1、提交任务
2、因为没有核心线程,所以任务直接加到SynchronousQueue队列。
3、判断是否有空闲线程,如果有,就去取出任务执行。
4、如果没有空闲线程,就新建一个线程执行。
5、执行完任务的线程,还可以存活60秒,如果在这期间,接到任务,可以继续活下去;否则,被销毁。

实例代码

ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
    executor.execute(() -> {
        System.out.println(Thread.currentThread().getName()+"正在执行");
    });
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

运行结果:
在这里插入图片描述
使用场景
用于并发执行大量短期的小任务。

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

线程池特点

  • 核心线程数为1
  • 最大线程数也为1
  • 阻塞队列是LinkedBlockingQueue
  • keepAliveTime为0

工作机制
在这里插入图片描述
1、提交任务
2、线程池是否有一条线程在,如果没有,新建线程执行任务
3、如果有,讲任务加到阻塞队列
4、当前的唯一线程,从队列取任务,执行完一个,再继续取,一个人(一条线程)夜以继日地干活。

实例代码

ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
    executor.execute(() -> {
        System.out.println(Thread.currentThread().getName()+"正在执行");
    });
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

运行结果:
在这里插入图片描述
使用场景
适用于串行执行任务的场景,一个任务一个任务地执行。

newScheduledThreadPool

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}
  • 1
  • 2
  • 3
  • 4
  • 5

线程池特点

  • 最大线程数为Integer.MAX_VALUE
  • 阻塞队列是DelayedWorkQueue
  • keepAliveTime为0
  • scheduleAtFixedRate() :按某种速率周期执行
  • scheduleWithFixedDelay():在某个延迟后执行

工作机制
1、添加一个任务
2、线程池中的线程从 DelayQueue 中取任务
3、线程从 DelayQueue 中获取 time 大于等于当前时间的task
4、执行完后修改这个 task 的 time 为下次被执行的时间
5、这个 task 放回DelayQueue队列中

实例代码

 /**
 创建一个给定初始延迟的间隔性的任务,之后的下次执行时间是上一次任务从执行到结束所需要的时间+* 给定的间隔时间
 */
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleWithFixedDelay(()->{
    System.out.println("current Time" + System.currentTimeMillis());
    System.out.println(Thread.currentThread().getName()+"正在执行");
}, 1, 3, TimeUnit.SECONDS);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

运行结果:
在这里插入图片描述
使用场景
周期性执行任务的场景,需要限制线程数量的场景。

线程状态

线程有这几个状态:RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED

 // 线程池状态
 private static final int RUNNING    = -1 << COUNT_BITS;
 private static final int SHUTDOWN   =  0 << COUNT_BITS;
 private static final int STOP       =  1 << COUNT_BITS;
 private static final int TIDYING    =  2 << COUNT_BITS;
 private static final int TERMINATED =  3 << COUNT_BITS;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

线程各个状态切换图:
在这里插入图片描述
RUNNING

  • 该状态的线程池会接收新任务,并处理阻塞队列中的任务
  • 调用线程池的shutdown()方法,可以切换到SHUTDOWN状态
    调用线程池的shutdownNow()方法,可以切换到STOP状态

SHUTDOWN

  • 该状态的线程池不会接收新任务,但会处理阻塞队列中的任务
  • 队列为空,并且线程池中执行的任务也为空,进入TIDYING状态

STOP

  • 该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务
  • 线程池中执行的任务为空,进入TIDYING状态

TIDYING

  • 该状态表明所有的任务已经运行终止,记录的任务数量为0
  • terminated()执行完毕,进入TERMINATED状态

TERMINATED

  • 该状态表示线程池彻底终止

ExecutorService关闭方法

在这里插入图片描述
shutdown():停止接收新任务,原来的任务继续执行。
1、停止接收新的submit的任务;
2、已经提交的任务(包括正在跑的和队列中等待的),会继续执行完成;
3、等到第2步完成后,才真正停止;

shutdownNow():停止接收新任务,原来的任务停止执行。
1、跟 shutdown() 一样,先停止接收新submit的任务;
2、忽略队列里等待的任务;
3、尝试将正在执行的任务interrupt中断;
4、返回未执行的任务列表;
说明:它试图终止线程的方法是通过调用 Thread.interrupt() 方法来实现的,这种方法的作用有限,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt() 方法是无法中断当前的线程的。所以,shutdownNow() 并不代表线程池就一定立即就能退出,它也可能必须要等待所有正在执行的任务都执行完成了才能退出。但是大多数时候是能立即退出的。

awaitTermination(long timeOut, TimeUnit unit):当前线程阻塞。
timeout 和 TimeUnit 两个参数,用于设定超时的时间及单位
当前线程阻塞,直到:
等所有已提交的任务(包括正在跑的和队列中等待的)执行完;
或者 等超时时间到了(timeout 和 TimeUnit设定的时间);
或者 线程被中断,抛出InterruptedException
然后会监测 ExecutorService 是否已经关闭,返回true(shutdown请求后所有任务执行完毕)或false(已超时)

shutdown() 和 shutdownNow() 的区别
shutdown() 只是关闭了提交通道,用submit()是无效的;而内部该怎么跑还是怎么跑,跑完再停。
shutdownNow() 能立即停止线程池,正在跑的和正在等待的任务都停下了。

shutdown() 和 awaitTermination() 的区别
shutdown() 后,不能再提交新的任务进去;但是 awaitTermination() 后,可以继续提交。
awaitTermination()是阻塞的,返回结果是线程池是否已停止(true/false);shutdown() 不阻塞。

总结
1、优雅的关闭,用 shutdown()
2、想立马关闭,并得到未执行任务列表,用shutdownNow()
3、优雅的关闭,并允许关闭声明后新任务能提交,用 awaitTermination()
4、关闭功能 【从强到弱】 依次是:shuntdownNow() > shutdown() > awaitTermination()

OOM异常测试

理论上会出现OOM异常,必须测试一波验证之前的说法:

测试类:TaskTest.java

public class TaskTest {
    public static void main(String[] args) {
        ExecutorService es = Executors.newCachedThreadPool();
        int i = 0;
        while (true) {
            es.submit(new Task(i++));
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

使用Executors创建的CachedThreadPool,往线程池中无限添加线程

在启动测试类之前先将JVM内存调整小一点,不然很容易将电脑跑出问题,在idea里:Run -> Edit Configurations

JVM参数说明:

-Xms10M => Java Heap内存初始化值
-Xmx10M => Java Heap内存最大值
  • 1
  • 2

运行结果:

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "main"
Disconnected from the target VM, address: '127.0.0.1:60416', transport: 'socket'
  • 1
  • 2

创建到3w多个线程的时候开始报OOM错误
另外两个线程池就不做测试了,测试方法一致,只是创建的线程池不一样

SpringBoot 线程池的使用

扩展ThreadPoolTaskExecutor:
每次提交线程的时候都会将当前线程池的运行状况打印出来。

public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

    private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);

    private void showThreadPoolInfo(String prefix) {
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

        if (null == threadPoolExecutor) {
            return;
        }

        logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
                // 线程名称前缀
                this.getThreadNamePrefix(),
                prefix,
                // 任务总数
                threadPoolExecutor.getTaskCount(),
                // 已完成数
                threadPoolExecutor.getCompletedTaskCount(),
                // 活跃线程数
                threadPoolExecutor.getActiveCount(),
                // 队列大小
                threadPoolExecutor.getQueue().size());
    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1. do execute");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2. do execute");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("1. do submit");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("2. do submit");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(task);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

线程池配置:

@Configuration
@EnableAsync
public class ExecutorConfig {
    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);

    @Bean
    public Executor asyncServiceExecutor() {
        logger.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
        // 配置核心线程数
        executor.setCorePoolSize(5);
        // 配置最大线程数
        executor.setMaxPoolSize(5);
        // 配置队列大小
        executor.setQueueCapacity(99999);
        // 配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("async-service-");

        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

业务逻辑代码:
将Service层的服务异步化,使用 @Async(“asyncServiceExecutor”),表明executeAsync方法进入线程池运行。

public interface AsyncService {
    // 执行异步任务
    void executeAsync();
}

@Service
public class AsyncServiceImpl implements AsyncService {

    private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);
    
    @Async("asyncServiceExecutor")
    @Override
    public void executeAsync() {
        logger.info("start executeAsync");
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        logger.info("end executeAsync");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

Http服务:

@RestController
public class DemoController {

    private static final Logger logger = LoggerFactory.getLogger(DemoController.class);

    @Autowired
    private AsyncService asyncService;

    @GetMapping("/demo")
    public String demo() {
        logger.info("start submit");

        //调用service层的任务
        asyncService.executeAsync();

        logger.info("end submit");
        return "demo";
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

测试:

2020-09-25 17:52:47.400  INFO 6552 --- [async-service-1] com.example.service.AsyncServiceImpl     : start executeAsync
2020-09-25 17:52:47.495  INFO 6552 --- [nio-8080-exec-1] com.example.controller.DemoController    : start submit
2020-09-25 17:52:47.495  INFO 6552 --- [nio-8080-exec-1] c.e.c.VisiableThreadPoolTaskExecutor     : async-service-, 2. do submit,taskCount [34], completedTaskCount [26], activeCount [5], queueSize [3]
2020-09-25 17:52:47.495  INFO 6552 --- [nio-8080-exec-1] com.example.controller.DemoController    : end submit
2020-09-25 17:52:47.559  INFO 6552 --- [async-service-2] com.example.service.AsyncServiceImpl     : end executeAsync
2020-09-25 17:52:47.559  INFO 6552 --- [async-service-2] com.example.service.AsyncServiceImpl     : start executeAsync
2020-09-25 17:52:47.658  INFO 6552 --- [nio-8080-exec-3] com.example.controller.DemoController    : start submit
2020-09-25 17:52:47.658  INFO 6552 --- [nio-8080-exec-3] c.e.c.VisiableThreadPoolTaskExecutor     : async-service-, 2. do submit,taskCount [35], completedTaskCount [27], activeCount [5], queueSize [3]
2020-09-25 17:52:47.658  INFO 6552 --- [nio-8080-exec-3] com.example.controller.DemoController    : end submit
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

注意这一行日志: 2. do submit,taskCount [35], completedTaskCount [27], activeCount [5], queueSize [3]
说明提交任务到线程池的时候,调用的是submit(Callable task)这个方法,当前已经提交了35个任务,完成了27个,当前有5个线程在处理任务,还剩3个任务在队列中等待,线程池的基本情况一路了然。

你在工作中单一的/固定数的/可变你的三种创建线程池的方法,你用哪个多?超级大坑

答案是一个都不用,我们生产上只能使用自定义的。

Executors中JDK已经给你提供了,为什么不用?
参考阿里巴巴Java开发手册。
在这里插入图片描述
在这里插入图片描述

合理配置线程池你是如何考虑的

从两个方面考虑,你的任务是CPU密集型还是IO密集型。

  • CPU密集型
    System.out.println(Runtime.getRuntime().availableProcessors()); // 查看CPU核数
    lscpu Linux 命令查看CPU信息。
    在这里插入图片描述
    双核双线程:配置 2+1 = 3。

  • IO密集型
    在这里插入图片描述
    在这里插入图片描述
    双核双线程:配置 2/(1-0.9) = 2/0.1 = 20

ThreadPoolTaskExecutor线程池使用

线程池配置:

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
 * 具体问题具体分析,比较理想的方案,仅供参考
 * 计算密集型: 线程数 = CPU核数 + 1,也可以设置成CPU核数*2,一般设置CPU*2
 * IO密集型: 线程数 = CPU核心数/(1-阻塞系数),这个组赛系数一般为0.8~0.9之间,也可以取0.8或者0.9
 */
@Slf4j
@EnableAsync
@Configuration
public class ThreadPoolConfig {

    // 获取服务器的cpu个数
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
    // 核心线程数量
    private static final int COUR_SIZE = CPU_COUNT * 2;
    // 线程最大数量
    private static final int MAX_COUR_SIZE = COUR_SIZE * 4;

    @Bean("threadExecutor")
    public ThreadPoolTaskExecutor threadExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = null;
        try {
            threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
            // 获取CPU核心数
            int i = Runtime.getRuntime().availableProcessors();
            //核心线程数目
            threadPoolTaskExecutor.setCorePoolSize(COUR_SIZE);
            //指定最大线程数
            threadPoolTaskExecutor.setMaxPoolSize(MAX_COUR_SIZE);
            //队列中最大的数目
            threadPoolTaskExecutor.setQueueCapacity(MAX_COUR_SIZE * 2 * 10);
            //线程空闲后的最大存活时间
            threadPoolTaskExecutor.setKeepAliveSeconds(60);
            //线程名称前缀
            threadPoolTaskExecutor.setThreadNamePrefix("threadExecutor-");
            //拒绝策略
            threadPoolTaskExecutor.setRejectedExecutionHandler(new SelfRejectedExecutionHandler());
            //当调度器shutdown被调用时等待当前被调度的任务完成
            threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
            //加载
            threadPoolTaskExecutor.initialize();
            log.info("初始化线程池成功");
        } catch (Exception e) {
            log.error("初始化线程池失败: {}", e.getMessage());
        }
        return threadPoolTaskExecutor;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

线程任务存取工具类:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

@Slf4j
@Component
public class ThreadReader {

    private static final BlockingQueue<Runnable> BLOCKING_QUEUE = new LinkedBlockingQueue<>();

    @Autowired
    @Qualifier("threadExecutor")
    private ThreadPoolTaskExecutor threadExecutor;

    public static void put(Runnable runnable) {
        try {
            BLOCKING_QUEUE.put(runnable);
        } catch (InterruptedException e) {
            log.error("ThreadReader.put异常:{}", e.getMessage());
        }
    }

    public void take() {
        if (CollectionUtils.isEmpty(BLOCKING_QUEUE)) {
            return;
        }
        try {
            Runnable runnable = BLOCKING_QUEUE.take();
            log.info("取出当前线程池没来得及执行的任务, runnable:{}", runnable);
            threadExecutor.execute(runnable);
        } catch (InterruptedException e) {
            log.error("ThreadReader.take异常:{}", e.getMessage());
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

自定义线程池的拒绝策略:

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class SelfRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
        if (null != runnable) {
            // 线程池没来得及执行的任务先放入队列
            ThreadReader.put(runnable);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

重新执行未执行的任务:

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
public class CustomizeScheduler implements ApplicationListener<ContextRefreshedEvent> {

    @Autowired
    private ThreadReader threadReader;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        // 线程工厂
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("scheduledExecutor-pool-%d")
                .setUncaughtExceptionHandler((thread, throwable) -> log.error("ThreadPool {} got exception", thread, throwable)).build();
        ScheduledExecutorService scheduledExecutor = new ScheduledThreadPoolExecutor(1, threadFactory, new SelfRejectedExecutionHandler());
        // 之后每隔1秒执行队列中没有来得及执行的任务
        scheduledExecutor.scheduleAtFixedRate(() -> threadReader.take(), 1, 1000, TimeUnit.MILLISECONDS);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/1008908
推荐阅读
相关标签
  

闽ICP备14008679号