赞
踩
面试: 一个线程池 core 7; max 20 ,queue:50,100 并发进来怎么分配的;
先有 7 个能直接得到执行,接下来 50 个进入队列排队,在多开 13 个继续执行。现在 70 个 被安排上了。剩下 30 个默认拒绝策略。
线程池做的工作主要是控制运行的线程的数量,处理过程中将任务加入队列,然后在线程创建后启动这些任务,如果线程超过了最大数量,超出的数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行。
他的主要特点为: 线程复用、控制最大并发数、管理线程。
线程池可以通过ThreadPoolExecutor
来创建。
private ExecutorService executor; private int queueSize = 10000; // 任务队列数 private int corePoolSize = 10; // 核心线程数 private int maxPoolSize = 10; // 最大线程数 public void initExecutor() { if (executor == null) { executor = new ThreadPoolExecutor( corePoolSize, maxPoolSize, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueSize), new ThreadPoolExecutor.CallerRunsPolicy() ); } }
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
构造函数参数说明:
corePoolSize
: 一直保持的线程的数量,即使线程空闲也不会释放。除非设置了 allowCoreThreadTimeout 为 true;maxPoolSize
:允许最大的线程数,队列满时开启新线程直到等于该值;workQueue
:用于缓存任务的阻塞队列;线程池添加新的任务说明三者之间的关系。
1、如果没有空闲的线程执行该任务且当前运行的线程数少于 corePoolSize,则添加新的线程执行该任务。
2、如果没有空闲的线程执行该任务且当前的线程数等于corePoolSize同时阻塞队列未满,则将任务入队列,而不添加新的线程。
3、如果没有空闲的线程执行该任务且阻塞队列已满同时池中的线程数小于maximumPoolSize,则创建新的线程执行任务。
4、如果没有空闲的线程执行该任务且阻塞队列已满同时池中的线程数等于maximumPoolSize,则根据 handler 指定的策略来拒绝新的任务。总结:
如果把线程池比作一个单位的话,corePoolSize 表示正式员工,线程可以表示一个员工。当向单位委派一项工作时,如果发现正式员工还没招满,单位就会招个正式员工来完成这项工作。随着向这个单位委派的工作增多,即使正式员工全部满了,工作还是干不完,那么单位只能按照新委派的工作先后顺序将他们找个地方搁置起来,这个地方就是 workQueue,等某个正式员工完成了手上的工作,就到放置任务的地方来领取新的任务。如果有大量的任务向这个单位委派,导致 workQueue 已经没有空位来放置新的任务了,于是单位决定招临时工来完成任务。临时工不是想招多少就是多少,通过 maxPoolSize 规定了单位的人数最大值。
keepAliveTime
:表示空闲线程的存活时间。keepAliveTime
,该线程会退出,直到线程数量等于corePoolSize
。corePoolSize
时keepAliveTime
才会起作用,直到线程中的线程数不大于corepoolSIze
。TimeUnitunit
:表示keepAliveTime的单位。为了解释keepAliveTime的作用,我们在上述情况下做一种假设。假设线程池这个单位已经招了些临时工,但新任务没有继续增加,所以随着每个员工忙完手头的工作,都来workQueue领取新的任务(看看这个单位的员工多自觉啊)。随着各个员工齐心协力,任务越来越少,员工数没变,那么就必定有闲着没事干的员工。这样的话领导不乐意啦,但是又不能轻易fire没事干的员工,因为随时可能有新任务来,于是领导想了个办法,设定了keepAliveTime,当空闲的员工在keepAliveTime这段时间还没有找到事情干,就被辞退啦,毕竟地主家也没有余粮啊!当然辞退到corePoolSize个员工时就不再辞退了,领导也不想当光杆司令啊!
workQueue
:缓存任务的队列。实现 BlockingQueue
。
LinkedBlockingQueue<Runnable>:
用链表实现的队列,可以是有界的,也可以是无界的,但在Executors
中默认使用无界的。
handler
:表示当 workQueue
已满,且池中的线程数达到 maxPoolSize
时,线程池拒绝添加新任务时采取的策略。
ThreadPoolExecutor.AbortPolicy():默认值;抛出RejectedExecutionException异常
ThreadPoolExecutor.CallerRunsPolicy():由向线程池提交任务的线程来执行该任务
ThreadPoolExecutor.DiscardOldestPolicy():抛弃最旧的任务(最先提交而没有得到执行的任务)
ThreadPoolExecutor.DiscardPolicy():抛弃当前的任务
threadFactory
:指定创建线程的工厂。Executors.defaultThreadFactory()
、new ThreadFactoryBuilder().setNameFormat("task-service-pool-%d").build()
一个额外参数:
allowCoreThreadTimeout
:是否允许核心线程空闲退出,默认值为false。allowCoreThreadTimeout
为 true 时才会超时退出,默认不会退出。运行流程:
1、线程池创建,预先准备好 core 数量的核心线程,准备接受任务。
2、新的任务进来,用 core 准备好的空闲线程执行任务。
3、core 满了,就将再进来的任务放入到阻塞队列中。空闲的 core 就会自己去阻塞队列获取任务执行。
4、阻塞队列满了,就会直接开启新线程执行,最大只能开到 max指定的数量。
5、max 都执行好了,空闲的线程会在 keepAliveTime 指定的时间后自动释放线程。最总保持到 core 大小。
6、如果线程数开到 max 的数量,还有新任务进来,就会使用 reject指定的拒绝策略进行处理。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
ExecutorService threadPool = Executors.newFixedThreadPool(10);
用Executors.newFixedThreadPool(5)
方式创建线程池,并提交任务方式来讲解,刚开始的时候其实线程池里是空的,就是一个线程都没有的。
接着如果使用线程池提交一个任务进去,希望由线程池里的一个线程来执行,如下代码所示,就是提交一个任务:
threadPool.execute(() -> {
});
这个时候,线程池会先看一下,现在池子里的线程数量有没有有达到corePoolSize
指定的数量。
现在线程池里的线程数量是0,然后corePoolSize是10,那么肯定没达到了,所以直接会在线程池里创建一个线程出来然后执行这个任务。
接着假如说,这个线程处理完一个任务了,那么此时线程是不会被销毁的,因为线程池内的线程数量小于corePoolSize
,他会一直等待下一个提交过来的任务。
接下来,到底是怎么等待的呢?
线程池会搭配一个workQueue,比如这里搭配的就是一个无界的 LinkedBlockingQueue
,几乎可以无限量放入任务。
然后那个线程处理完一个任务之后,就会用阻塞的方式尝试从任务队列里获取任务,如果队列是空的,他就会阻塞卡在那儿不动,直到有人放一个任务到队列里,他才会获取到一个任务然后继续执行,循环往复。
接着再次提交任务,线程池一判断发现,好像线程数量才只有1个,完全比 corePoolSize(10个)要小,那么继续直接在池子里创建一个线程,然后处理这个任务,处理完了继续尝试从workQueue里阻塞式获取任务。
一直重复上面的操作,直到线程池里有10个线程了,达到了corePoolSize指定的数量。
这个时候你如果再提交任务,他一下子发现,线程池里已经有10个线程了,跟corePoolSize指定的线程数量一样了。
那么现在,我就不需要创建任何一个额外的线程了,现在你只要提交任务,全部直接入队到workQueue里就好。
此时线程池里的线程都阻塞式在workQueue上等待获取任务,有一个任务进来就会唤醒一个线程来处理这个任务,处理完了任务再次阻塞在workQueue上尝试获取下一个任务。
这里我们看到他用的是一个无界的LinkedBlockingQueue,但是假如说他用的是一个有界的队列呢?
比如说限定好了队列最多只能放10个任务,那么假如说,线程池里的线程来不及处理任务了,然后队列一下子放满了10个任务。
此时就会出现任务入队的失败,因为队列满了,无法入队。
然后就会尝试再次在线程池里创建线程,这个时候就会一直创建线程直到线程池里的线程数量达到maximumPoolSize
指定的数量为止。
虽然这里fixed线程池默认corePoolSize
和maximumPoolSize
的数量都是一致的,但是可以假设此时maximumPoolSize
的数量是20呢?
那么就会继续创建线程,直到线程数量达到20个,然后用额外创建的10个线程在队列满的情况下,继续处理任务。
整个过程:
接着万一队列满了,然后线程池的线程数量达到了maximumPoolSize
指定的数量了,你额外创建线程都无法创建了,此时会如何呢?
答案是:会reject掉,不让你继续提交任务了,此时默认的就是抛出一个异常。
那么,在上图中额外创建出来的,超出corePoolSize的那些线程呢?
他们一旦创建出来之后,会发现线程池数量已经超过corePoolSize了,此时他们会尝试等待workQueue里的任务。
一旦超过keepAliveTime指定的时间,还获取不到任务,比如keepAliveTime是60秒,那么假如超过60秒获取不到任务,他就会自动释放掉了,这个线程就销毁了。
整个过程:
以最常用的fixed线程池举例,他的线程池数量是固定的,因为他用的是近乎于无界的LinkedBlockingQueue,几乎可以无限制的放入任务到队列里。
所以只要线程池里的线程数量达到了corePoolSize指定的数量之后,接下来就维持这个固定数量的线程了。
然后,所有任务都会入队到workQueue里去,线程从workQueue获取任务来处理。
这个队列几乎永远不会满,当然这是几乎,因为LinkedBlockingQueue默认的最大任务数量是Integer.MAX_VALUE,非常大,近乎于可以理解为无限吧。
只要队列不满,就跟maximumPoolSize、keepAliveTime这些没关系了,因为不会创建超过corePoolSize数量的线程的。
那么此时万一每个线程获取到一个任务之后,他处理的时间特别特别的长,长到了令人发指的地步。比如处理一个任务要几个小时,此时会如何?
当然会出现workQueue里不断的积压越来越多得任务,不停的增加。
这个过程中会导致机器的内存使用不停的飙升,最后也许极端情况下就导致JVM OOM了,系统就挂掉了。
线程池执行流程,即对应execute()
/submit()
方法:
执行逻辑说明:
corePoolSize
参数有关,未满则创建线程执行任务workQueue
参数有关,若未满则加入队列中maximumPoolSize
参数有关,若未满创建线程执行任务handler
参数有关形象描述线程池执行过程:
等待队列也已经排满了,再也塞不下新的任务了
同时,
线程池的max也到达了,无法接续为新任务服务。
这时我们需要拒绝策略机制合理的处理这个问题.。
JDK内置的拒绝策略:
AbortPolicy
(默认):抛出一个异常。CallerRunPolicy
:交给线程池调用所在的线程进行处理。DiscardOldestPolicy
:丢弃队列里最老的任务,将当前这个任务继续提交给线程池。DiscardPolicy
:直接丢弃任务。ArrayBlockingQueue
LinkedBlockingQueue
DelayQueue
PriorityBlockingQueue
SynchronousQueue
ArrayBlockingQueue
ArrayBlockingQueue(有界队列)是一个用数组实现的有界阻塞队列,按FIFO排序量。
LinkedBlockingQueue
LinkedBlockingQueue(可设置容量队列)基于链表结构的阻塞队列,按FIFO排序任务,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE
,吞吐量通常要高于ArrayBlockingQuene;newFixedThreadPool线程池使用了这个队列
DelayQueue
DelayQueue(延迟队列)是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序。newScheduledThreadPool线程池使用了这个队列。
PriorityBlockingQueue
PriorityBlockingQueue(优先级队列)是具有优先级的无界阻塞队列;
SynchronousQueue
SynchronousQueue(同步队列)一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene,newCachedThreadPool线程池使用了这个队列。
针对面试题:线程池都有哪几种工作队列?
回答以上几种ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue等,说出它们的特点,并结合使用到对应队列的常用线程池(如newFixedThreadPool线程池使用LinkedBlockingQueue),进行展开阐述。
Executors
调用了 ThreadPoolExecutor
来创建线程池的,可以帮助用户根据不用的场景,配置不同的参数。
不推荐使用,因为默认保存任务的队列是无界的,使用原生 new ThreadPoolExecutor()
创建线程池根据自己的需求设置参数来创建线程池比较好。
总结:
FixedThreadPool
和SingleThreadExecutor
=> 允许的请求队列长度为Integer.MAX_VALUE
,可能会堆积大量的请求,从而引起OOM
异常。
CachedThreadPool
=> 允许创建的线程数为Integer.MAX_VALUE
,可能会创建大量的线程,从而引起OOM
异常。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
newFixedThreadPool线程池特点
工作机制
1、提交任务
2、如果线程数少于核心线程,创建核心线程执行任务
3、如果线程数等于核心线程,把任务添加到LinkedBlockingQueue
阻塞队列
4、如果线程执行完任务,去阻塞队列取任务,继续执行
案例代码
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executor.execute(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
//do nothing
}
});
}
IDE指定JVM参数:-Xmx8m -Xms8m
RUN以上代码,会抛出OOM:
针对面试题::使用无界队列的线程池会导致内存飙升吗?
答案 :会的,newFixedThreadPool使用了无界的阻塞队列LinkedBlockingQueue,如果线程获取一个任务后,任务的执行时间比较长(比如,上面demo设置了10秒),会导致队列的任务越积越多,导致机器内存使用不停飙升, 最终导致OOM。
使用场景
FixedThreadPool 适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
线程池特点
当提交任务的速度大于处理任务的速度时,每次提交一个任务,就必然会创建一个线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。
工作机制
1、提交任务
2、因为没有核心线程,所以任务直接加到SynchronousQueue队列。
3、判断是否有空闲线程,如果有,就去取出任务执行。
4、如果没有空闲线程,就新建一个线程执行。
5、执行完任务的线程,还可以存活60秒,如果在这期间,接到任务,可以继续活下去;否则,被销毁。
实例代码
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName()+"正在执行");
});
}
运行结果:
使用场景
用于并发执行大量短期的小任务。
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
线程池特点
工作机制
1、提交任务
2、线程池是否有一条线程在,如果没有,新建线程执行任务
3、如果有,讲任务加到阻塞队列
4、当前的唯一线程,从队列取任务,执行完一个,再继续取,一个人(一条线程)夜以继日地干活。
实例代码
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName()+"正在执行");
});
}
运行结果:
使用场景
适用于串行执行任务的场景,一个任务一个任务地执行。
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
线程池特点
工作机制
1、添加一个任务
2、线程池中的线程从 DelayQueue 中取任务
3、线程从 DelayQueue 中获取 time 大于等于当前时间的task
4、执行完后修改这个 task 的 time 为下次被执行的时间
5、这个 task 放回DelayQueue队列中
实例代码
/**
创建一个给定初始延迟的间隔性的任务,之后的下次执行时间是上一次任务从执行到结束所需要的时间+* 给定的间隔时间
*/
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleWithFixedDelay(()->{
System.out.println("current Time" + System.currentTimeMillis());
System.out.println(Thread.currentThread().getName()+"正在执行");
}, 1, 3, TimeUnit.SECONDS);
运行结果:
使用场景
周期性执行任务的场景,需要限制线程数量的场景。
线程有这几个状态:RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED
。
// 线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
线程各个状态切换图:
RUNNING
SHUTDOWN
STOP
TIDYING
TERMINATED
shutdown():停止接收新任务,原来的任务继续执行。
1、停止接收新的submit的任务;
2、已经提交的任务(包括正在跑的和队列中等待的),会继续执行完成;
3、等到第2步完成后,才真正停止;
shutdownNow():停止接收新任务,原来的任务停止执行。
1、跟 shutdown() 一样,先停止接收新submit的任务;
2、忽略队列里等待的任务;
3、尝试将正在执行的任务interrupt中断;
4、返回未执行的任务列表;
说明:它试图终止线程的方法是通过调用 Thread.interrupt() 方法来实现的,这种方法的作用有限,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt() 方法是无法中断当前的线程的。所以,shutdownNow() 并不代表线程池就一定立即就能退出,它也可能必须要等待所有正在执行的任务都执行完成了才能退出。但是大多数时候是能立即退出的。
awaitTermination(long timeOut, TimeUnit unit):当前线程阻塞。
timeout 和 TimeUnit 两个参数,用于设定超时的时间及单位
当前线程阻塞,直到:
等所有已提交的任务(包括正在跑的和队列中等待的)执行完;
或者 等超时时间到了(timeout 和 TimeUnit设定的时间);
或者 线程被中断,抛出InterruptedException
然后会监测 ExecutorService 是否已经关闭,返回true(shutdown请求后所有任务执行完毕)或false(已超时)
shutdown() 和 shutdownNow() 的区别
shutdown() 只是关闭了提交通道,用submit()是无效的;而内部该怎么跑还是怎么跑,跑完再停。
shutdownNow() 能立即停止线程池,正在跑的和正在等待的任务都停下了。
shutdown() 和 awaitTermination() 的区别
shutdown() 后,不能再提交新的任务进去;但是 awaitTermination() 后,可以继续提交。
awaitTermination()是阻塞的,返回结果是线程池是否已停止(true/false);shutdown() 不阻塞。
总结
1、优雅的关闭,用 shutdown()
2、想立马关闭,并得到未执行任务列表,用shutdownNow()
3、优雅的关闭,并允许关闭声明后新任务能提交,用 awaitTermination()
4、关闭功能 【从强到弱】 依次是:shuntdownNow() > shutdown() > awaitTermination()
理论上会出现OOM异常,必须测试一波验证之前的说法:
测试类:TaskTest.java
public class TaskTest {
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();
int i = 0;
while (true) {
es.submit(new Task(i++));
}
}
}
使用Executors创建的CachedThreadPool,往线程池中无限添加线程
在启动测试类之前先将JVM内存调整小一点,不然很容易将电脑跑出问题,在idea里:Run -> Edit Configurations
JVM参数说明:
-Xms10M => Java Heap内存初始化值
-Xmx10M => Java Heap内存最大值
运行结果:
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "main"
Disconnected from the target VM, address: '127.0.0.1:60416', transport: 'socket'
创建到3w多个线程的时候开始报OOM错误
另外两个线程池就不做测试了,测试方法一致,只是创建的线程池不一样
扩展ThreadPoolTaskExecutor:
每次提交线程的时候都会将当前线程池的运行状况打印出来。
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class); private void showThreadPoolInfo(String prefix) { ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(); if (null == threadPoolExecutor) { return; } logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]", // 线程名称前缀 this.getThreadNamePrefix(), prefix, // 任务总数 threadPoolExecutor.getTaskCount(), // 已完成数 threadPoolExecutor.getCompletedTaskCount(), // 活跃线程数 threadPoolExecutor.getActiveCount(), // 队列大小 threadPoolExecutor.getQueue().size()); } @Override public void execute(Runnable task) { showThreadPoolInfo("1. do execute"); super.execute(task); } @Override public void execute(Runnable task, long startTimeout) { showThreadPoolInfo("2. do execute"); super.execute(task, startTimeout); } @Override public Future<?> submit(Runnable task) { showThreadPoolInfo("1. do submit"); return super.submit(task); } @Override public <T> Future<T> submit(Callable<T> task) { showThreadPoolInfo("2. do submit"); return super.submit(task); } @Override public ListenableFuture<?> submitListenable(Runnable task) { showThreadPoolInfo("1. do submitListenable"); return super.submitListenable(task); } @Override public <T> ListenableFuture<T> submitListenable(Callable<T> task) { showThreadPoolInfo("2. do submitListenable"); return super.submitListenable(task); } }
线程池配置:
@Configuration @EnableAsync public class ExecutorConfig { private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class); @Bean public Executor asyncServiceExecutor() { logger.info("start asyncServiceExecutor"); ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(); // 配置核心线程数 executor.setCorePoolSize(5); // 配置最大线程数 executor.setMaxPoolSize(5); // 配置队列大小 executor.setQueueCapacity(99999); // 配置线程池中的线程的名称前缀 executor.setThreadNamePrefix("async-service-"); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化 executor.initialize(); return executor; } }
业务逻辑代码:
将Service层的服务异步化,使用 @Async(“asyncServiceExecutor”)
,表明executeAsync方法进入线程池运行。
public interface AsyncService { // 执行异步任务 void executeAsync(); } @Service public class AsyncServiceImpl implements AsyncService { private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class); @Async("asyncServiceExecutor") @Override public void executeAsync() { logger.info("start executeAsync"); try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } logger.info("end executeAsync"); } }
Http服务:
@RestController public class DemoController { private static final Logger logger = LoggerFactory.getLogger(DemoController.class); @Autowired private AsyncService asyncService; @GetMapping("/demo") public String demo() { logger.info("start submit"); //调用service层的任务 asyncService.executeAsync(); logger.info("end submit"); return "demo"; } }
测试:
2020-09-25 17:52:47.400 INFO 6552 --- [async-service-1] com.example.service.AsyncServiceImpl : start executeAsync
2020-09-25 17:52:47.495 INFO 6552 --- [nio-8080-exec-1] com.example.controller.DemoController : start submit
2020-09-25 17:52:47.495 INFO 6552 --- [nio-8080-exec-1] c.e.c.VisiableThreadPoolTaskExecutor : async-service-, 2. do submit,taskCount [34], completedTaskCount [26], activeCount [5], queueSize [3]
2020-09-25 17:52:47.495 INFO 6552 --- [nio-8080-exec-1] com.example.controller.DemoController : end submit
2020-09-25 17:52:47.559 INFO 6552 --- [async-service-2] com.example.service.AsyncServiceImpl : end executeAsync
2020-09-25 17:52:47.559 INFO 6552 --- [async-service-2] com.example.service.AsyncServiceImpl : start executeAsync
2020-09-25 17:52:47.658 INFO 6552 --- [nio-8080-exec-3] com.example.controller.DemoController : start submit
2020-09-25 17:52:47.658 INFO 6552 --- [nio-8080-exec-3] c.e.c.VisiableThreadPoolTaskExecutor : async-service-, 2. do submit,taskCount [35], completedTaskCount [27], activeCount [5], queueSize [3]
2020-09-25 17:52:47.658 INFO 6552 --- [nio-8080-exec-3] com.example.controller.DemoController : end submit
注意这一行日志: 2. do submit,taskCount [35], completedTaskCount [27], activeCount [5], queueSize [3]
说明提交任务到线程池的时候,调用的是submit(Callable task)
这个方法,当前已经提交了35
个任务,完成了27
个,当前有5
个线程在处理任务,还剩3
个任务在队列中等待,线程池的基本情况一路了然。
答案是一个都不用,我们生产上只能使用自定义的。
Executors
中JDK已经给你提供了,为什么不用?
参考阿里巴巴Java开发手册。
从两个方面考虑,你的任务是CPU密集型还是IO密集型。
CPU密集型
System.out.println(Runtime.getRuntime().availableProcessors());
// 查看CPU核数
lscpu
Linux 命令查看CPU信息。
双核双线程:配置 2+1 = 3。
IO密集型
双核双线程:配置 2/(1-0.9) = 2/0.1 = 20
线程池配置:
import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** * 具体问题具体分析,比较理想的方案,仅供参考 * 计算密集型: 线程数 = CPU核数 + 1,也可以设置成CPU核数*2,一般设置CPU*2 * IO密集型: 线程数 = CPU核心数/(1-阻塞系数),这个组赛系数一般为0.8~0.9之间,也可以取0.8或者0.9 */ @Slf4j @EnableAsync @Configuration public class ThreadPoolConfig { // 获取服务器的cpu个数 private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); // 核心线程数量 private static final int COUR_SIZE = CPU_COUNT * 2; // 线程最大数量 private static final int MAX_COUR_SIZE = COUR_SIZE * 4; @Bean("threadExecutor") public ThreadPoolTaskExecutor threadExecutor() { ThreadPoolTaskExecutor threadPoolTaskExecutor = null; try { threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); // 获取CPU核心数 int i = Runtime.getRuntime().availableProcessors(); //核心线程数目 threadPoolTaskExecutor.setCorePoolSize(COUR_SIZE); //指定最大线程数 threadPoolTaskExecutor.setMaxPoolSize(MAX_COUR_SIZE); //队列中最大的数目 threadPoolTaskExecutor.setQueueCapacity(MAX_COUR_SIZE * 2 * 10); //线程空闲后的最大存活时间 threadPoolTaskExecutor.setKeepAliveSeconds(60); //线程名称前缀 threadPoolTaskExecutor.setThreadNamePrefix("threadExecutor-"); //拒绝策略 threadPoolTaskExecutor.setRejectedExecutionHandler(new SelfRejectedExecutionHandler()); //当调度器shutdown被调用时等待当前被调度的任务完成 threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); //加载 threadPoolTaskExecutor.initialize(); log.info("初始化线程池成功"); } catch (Exception e) { log.error("初始化线程池失败: {}", e.getMessage()); } return threadPoolTaskExecutor; } }
线程任务存取工具类:
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @Slf4j @Component public class ThreadReader { private static final BlockingQueue<Runnable> BLOCKING_QUEUE = new LinkedBlockingQueue<>(); @Autowired @Qualifier("threadExecutor") private ThreadPoolTaskExecutor threadExecutor; public static void put(Runnable runnable) { try { BLOCKING_QUEUE.put(runnable); } catch (InterruptedException e) { log.error("ThreadReader.put异常:{}", e.getMessage()); } } public void take() { if (CollectionUtils.isEmpty(BLOCKING_QUEUE)) { return; } try { Runnable runnable = BLOCKING_QUEUE.take(); log.info("取出当前线程池没来得及执行的任务, runnable:{}", runnable); threadExecutor.execute(runnable); } catch (InterruptedException e) { log.error("ThreadReader.take异常:{}", e.getMessage()); } } }
自定义线程池的拒绝策略:
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class SelfRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
if (null != runnable) {
// 线程池没来得及执行的任务先放入队列
ThreadReader.put(runnable);
}
}
}
重新执行未执行的任务:
import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.stereotype.Component; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @Slf4j @Component public class CustomizeScheduler implements ApplicationListener<ContextRefreshedEvent> { @Autowired private ThreadReader threadReader; @Override public void onApplicationEvent(ContextRefreshedEvent event) { // 线程工厂 ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("scheduledExecutor-pool-%d") .setUncaughtExceptionHandler((thread, throwable) -> log.error("ThreadPool {} got exception", thread, throwable)).build(); ScheduledExecutorService scheduledExecutor = new ScheduledThreadPoolExecutor(1, threadFactory, new SelfRejectedExecutionHandler()); // 之后每隔1秒执行队列中没有来得及执行的任务 scheduledExecutor.scheduleAtFixedRate(() -> threadReader.take(), 1, 1000, TimeUnit.MILLISECONDS); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。