赞
踩
1、【java线程及线程池系列】java线程及线程池概念详解
2、【java线程及线程池系列】synchronized、ReentrantLock和ReentrantReadWriteLock介绍及示例
3、【java线程及线程池系列】线程池ThreadPoolExecutor的类结构、使用方式示例、线程池数量配置原则和线程池使用注意事项
本文介绍了ThreadPoolExecutor类结构、正确的使用示例和线程池线程数量配置的计算方式、线程使用过程中需要注意事项(技巧)。
本文分为四个部分,即ThreadPoolExecutor类结构、线程池创建示例、线程数量推荐配置。
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}
核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程
表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
PriorityBlockingQueue;
ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
线程工厂,主要用来创建线程;
表示当拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy//丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy//也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy//丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy//由调用线程处理该任务
ThreadPoolExecutor继承了AbstractExecutorService,AbstractExecutorService的实现如下
public abstract class AbstractExecutorService implements ExecutorService {
/**
* Returns a {@code RunnableFuture} for the given runnable and default
* value.
*
* @param runnable the runnable task being wrapped
* @param value the default value for the returned future
* @param <T> the type of the given value
* @return a {@code RunnableFuture} which, when run, will run the
* underlying runnable and which, as a {@code Future}, will yield
* the given value as its result and provide for cancellation of
* the underlying task
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
......
}
/**
* Returns a {@code RunnableFuture} for the given callable task.
*
* @param callable the callable task being wrapped
* @param <T> the type of the callable's result
* @return a {@code RunnableFuture} which, when run, will call the
* underlying callable and which, as a {@code Future}, will yield
* the callable's result as its result and provide for
* cancellation of the underlying task
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
......
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
......
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
......
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
......
}
/**
* the main mechanics of invokeAny.
*/
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
......
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
......
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
......
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
......
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
......
}
}
AbstractExecutorService是一个抽象类,它实现了ExecutorService接口
public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
public interface Executor {
void execute(Runnable command);
}
在ThreadPoolExecutor类中有几个非常重要的方法:
execute()
submit()
shutdown()
shutdownNow()
public class Test {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(5));
for(int i=0;i<15;i++){
MyTask myTask = new MyTask(i);
executor.execute(myTask);
System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount());
}
executor.shutdown();
}
}
class MyTask implements Runnable {
private int taskNum;
public MyTask(int num) {
this.taskNum = num;
}
@Override
public void run() {
System.out.println("正在执行task "+taskNum);
try {
Thread.currentThread().sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task "+taskNum+"执行完毕");
}
}
//从执行结果可以看出,当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。
//如果上面程序中,将for循环中改成执行20个任务,就会抛出任务拒绝异常了。
//当前线程池大小是5,最大是10,缓冲队列是5,即线程池最大的数量时15,包含可以一起执行的10个和队列中的5个,超出这个数字则会抛出拒绝异常了。
不提倡直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:
Executors.newCachedThreadPool(); //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
Executors.newSingleThreadExecutor(); //创建容量为1的缓冲池
Executors.newFixedThreadPool(int); //创建固定容量大小的缓冲池
下面是这三个静态方法的具体实现;
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。
如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置
如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写
具体的参数需要自己调整
//org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
需要引入第三方包
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>24.1-jre</version>
</dependency>
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
//Common Thread Pool
ExecutorService pool = new ThreadPoolExecutor(5, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
pool.execute(()-> System.out.println(Thread.currentThread().getName()));
pool.shutdown();
<bean id="userThreadPool"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="100" />
<property name="queueCapacity" value="2000" />
<property name="threadFactory" value= threadFactory />
<property name="rejectedExecutionHandler">
<ref local="rejectedExecutionHandler" />
</property>
</bean>
//in code
userThreadPool.execute(thread);
Executors.newSingleThreadExecutor() 创建一个单线程化(容量为1)的线程池,它只会用唯一的工作线程来执行任务, 保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
/**
*
*/
public static void main(String[] args) {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
public void run() {
try {
System.out.println("ThreadName:" + Thread.currentThread().getName()+","+index);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
//运行结果:
//ThreadName:pool-1-thread-1,0
//ThreadName:pool-1-thread-1,1
//ThreadName:pool-1-thread-1,2
//ThreadName:pool-1-thread-1,3
//ThreadName:pool-1-thread-1,4
Executors.newFixedThreadPool(int) 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。
定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
public void run() {
try {
System.out.println("ThreadName:" + Thread.currentThread().getName()+","+index);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
//运行结果:
//ThreadName:pool-1-thread-1,0
//ThreadName:pool-1-thread-2,1
//ThreadName:pool-1-thread-3,2
//ThreadName:pool-1-thread-3,3
//ThreadName:pool-1-thread-2,4
//ThreadName:pool-1-thread-1,5
Executors.newCachedThreadPool(),创建一个可缓存线程池,缓冲池容量大小为Integer.MAX_VALUE,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;
try {
Thread.sleep( 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//任务提交
cachedThreadPool.execute(new Runnable() {
public void run() {
//线程名一样
System.out.println("ThreadName:" + Thread.currentThread().getName()+","+index);
}
});
}
/**
* ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
* shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
* shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
*/
cachedThreadPool.shutdownNow();
}
//运行结果:
//ThreadName:pool-1-thread-1,0
//ThreadName:pool-1-thread-1,1
//ThreadName:pool-1-thread-1,2
//ThreadName:pool-1-thread-1,3
//ThreadName:pool-1-thread-1,4
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
public static void main(String[] args) {
//延迟执行示例代码如下:
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new Runnable() {
public void run() {
System.out.println("delay 3 seconds");
}
}, 3, TimeUnit.SECONDS); //延迟3秒后执行run方法。
// 定期执行示例代码如下:
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println("delay 1 seconds, and excute every 3 seconds");
}
}, 1, 3, TimeUnit.SECONDS); //延迟1秒后每3秒执行一次。
}
设置并行级别为2,即默认每时每刻只有2个线程同时执行
public static void main(String[] args) throws Exception {
ExecutorService m = Executors.newWorkStealingPool(2);
for (int i = 1; i <= 10; i++) {
final int count=i;
m.submit(new Runnable() {
@Override
public void run() {
Date now=new Date();
System.out.println("线程" + Thread.currentThread() + "完成任务:"
+ count+" 时间为:"+ now.getSeconds());
try {
Thread.sleep(1000);//此任务耗时1s
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
while(true){
//主线程陷入死循环来观察结果,否则是看不到结果的
}
}
ForkJoinPool是一种支持任务分解的线程池,当提交给他的任务“过大”,他就会按照预先定义的规则将大任务分解成小任务,多线程并发执行。
public class ForkJoinTaskTest {
/**
* 定义一个可分解的的任务类,继承了RecursiveAction抽象类 必须实现它的compute方法
*/
public static class myTask extends RecursiveAction {
private static final long serialVersionUID = 1L;
// 定义一个分解任务的阈值——50,即一个任务最多承担50个工作量
int THRESHOLD = 50;
// 任务量
int task_Num = 0;
myTask(int Num) {
this.task_Num = Num;
}
@Override
protected void compute() {
if (task_Num <= THRESHOLD) {
System.out.println(Thread.currentThread().getName() + "承担了" + task_Num + "份工作");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
// 随机解成两个任务
Random m = new Random();
int x = m.nextInt(50);
myTask left = new myTask(x);
myTask right = new myTask(task_Num - x);
left.fork();
right.fork();
}
}
}
public static void main(String[] args) throws Exception {
// 创建一个支持分解任务的线程池ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();
myTask task = new myTask(178);
pool.submit(task);
pool.awaitTermination(20, TimeUnit.SECONDS);// 等待20s,观察结果
pool.shutdown();
}
}
一般需要根据任务的类型来配置线程池大小,一般原则如下
当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。
计算密集型,就是应用需要非常多的CPU计算资源,在多核CPU时代,要让每一个CPU核心都参与计算,将CPU的性能充分利用起来,这样才算是没有浪费服务器配置。对于计算密集型的应用,完全是靠CPU的核数来工作,避免过多的线程上下文切换,比较理想方案是:
线程数 = CPU核数+1
也可以设置成CPU核数2,这还是要看JDK的使用版本,以及CPU配置(服务器的CPU有超线程)。对于JDK1.8+来说,里面增加了一个并行计算,计算密集型的较理想线程数 = CPU内核线程数2
对于IO密集型的应用,现在做的开发大部分都是WEB应用,涉及到大量的网络传输,与数据库,与缓存间的交互也涉及到IO,一旦发生IO,线程就会处于等待状态,当IO结束,数据准备好后,线程才会继续执行。因此从这里可以发现,对于IO密集型的应用,可以多设置一些线程池中线程的数量,这样就能让在等待IO的这段时间内,线程可以去做其它事,提高并发处理效率。
线程上下文切换是有代价的。对于IO密集型应用一般推荐:
线程数 = CPU核心数/(1-阻塞系数)
这个阻塞系数一般为0.8~0.9之间,也可以取0.8或者0.9。对于双核CPU来说,它比较理想的线程数就是20,当然这都不是绝对的,需要根据实际情况以及实际业务来调整。
final int poolSize = (int)(cpuCore/(1-0.9))
最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
示例
平均每个线程CPU运行时间为0.5s,而线程等待时间(非CPU运行时间,比如IO)为1.5s,CPU核心数为8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)*8=32
以上,仅仅是估算,具体以实际为准。
在调试程序将JVM中线程导出Dump时,会出现pool-N-thread-M这样的提示,这是缺省的线程池名称,其中N代表池的序列号,每次你创建一个新的线程池,这个N数字就增加1;而M是这个线程池中的线程顺序。
举例, pool-2-thread-3 表示在第二个线程池中的第三个线程,JDK将这种命名策略封装在ThreadFactory。
使用Guava可以方便命名:
import com.google.common.util.concurrent.ThreadFactoryBuilder;
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("Orders-%d")
.setDaemon(true)
.build();
final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);
一旦我们记住线程的名称,我们就可以在运行时改变它们,因为线程dump值显示类和方法名称,没有参数和本地变量。
通过调整线程名称能实现事务标识,以便方便跟踪具体的消息记录或查询,找出哪个是引起死锁的原因。如下:
private void process(String messageId) {
executorService.submit(() -> {
final Thread currentThread = Thread.currentThread();
final String oldName = currentThread.getName();
currentThread.setName("Processing-" + messageId);
try {
//real logic here...
} finally {
currentThread.setName(oldName);
}
});
}
在try-finally中线程被取名为Processing-消息ID,这样就能跟踪是哪个消息流经系统。
客户端线程和线程池之间有一个任务队列。 当应用程序关闭时,必须注意两件事,即在排队任务发生了什么以及已经运行的任务具体行为如何。
有两种方法,要么让所有队列任务执行( shutdown() )或使用(shutdownNow()立即关闭 。
private void sendAllEmails(List<String> emails) throws InterruptedException {
emails.forEach(email ->
executorService.submit(() ->
sendEmail(email)));
executorService.shutdown();
final boolean done = executorService.awaitTermination(1, TimeUnit.MINUTES);
log.debug("All e-mails were sent so far? {}", done);
}
这种情况下发送大量电子邮件,每个作为在线程池中一个单独的任务。 这些任务提交后,关闭池不再接受任何新的任务。 然后等待最多一分钟,直到完成所有这些任务。 然而,如果某些任务仍悬而未决,awaitTermination() 只会返回false,等待任务将继续进行处理。
final List<Runnable> rejected = executorService.shutdownNow();
log.debug("Rejected tasks: {}", rejected.size());
这次所有队列中任务将被取消返回,而已经运行的任务将允许继续。
当线程堵塞在某个带有InterruptedException的方法时,可以在这个线程中调用Thread.interrupt(),大多数堵塞的方法将立即抛出InterruptedException。
如果将任务提交给线程池(ExecutorService.submit()),当认为已经在执行时,能调用Future.cancel(true),在这种情况下线程池将试图中断线程运行的任务,能很有效率的中断任务。
不正确大小的线程池可能会导致缓慢、不稳定和内存泄漏。 如果配置线程太少,将建立队列消耗大量的内存。 另一方面,太多的线程由于过度上下文切换会减慢整个系统,导致相同的症状。 查看队列深度很重要,保持它有界,以便超负荷的线程池可以暂时拒绝新任务。
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
executorService = new ThreadPoolExecutor(n, n,
0L, TimeUnit.MILLISECONDS,
queue);
上面代码等同于Executors.newFixedThreadPool(n),但是我们使用有固定容量100的ArrayBlockQueue替代了缺省是无限的LinkedBlockingQueue,这意味着如果有100个任务在队列中,n代表被执行,新的任务将被拒绝加入抛出RejectedExecutionException,因为队列现在置于外部,可以定期地调用其size()放入logs/JMX等监控机制中。
下面这段代码会是什么结果?
executorService.submit(() -> {
System.out.println(1 / 0);
});
它不会打印任何东西。也没有抛出除0的错误,线程池自己吞进了这个exception,好像从来没有发生一样,这是线程池与普通线程的区别,如果你提交一个Runnable,必须自己使用try-catch环抱方法体,至少记录日志,如果提交Callable确保总是能解除引用,使用堵塞get()到re-throw exception:
final Future<Integer> division = executorService.submit(() -> 1 / 0);
//下面将抛出由除零ArithmeticException引起的ExecutionException
division.get();
监控工作队列深度是一方面。 然而,当单一事务/任务发生故障排除时,能看到提交任务和实际执行之间有多少时间是值得。 这个时间最好是应该接近0(当有空闲线程池),然而队列中有任务必须排队时。 而且如果没有固定数量的线程池,运行新任务可能需要生成线程,这也会消耗些短的时间。 为了清晰地监控这一指标,包装原始 ExecutorService 类似这样。
public class WaitTimeMonitoringExecutorService implements ExecutorService {
private final ExecutorService target;
public WaitTimeMonitoringExecutorService(ExecutorService target) {
this.target = target;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
final long startTime = System.currentTimeMillis();
return target.submit(() -> {
final long queueDuration = System.currentTimeMillis() - startTime;
log.debug("Task {} spent {}ms in queue", task, queueDuration);
return task.call();
}
);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return submit(() -> {
task.run();
return result;
});
}
@Override
public Future<?> submit(Runnable task) {
return submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
task.run();
return null;
}
});
}
//...
}
这不是一个完整的实现,但可以体现基本的概念。 当提交一个任务给线程池,立即开始测量时间。
比如一个例外发生在任务提交给线程池:
java.lang.NullPointerException: null
at com.nurkiewicz.MyTask.call(Main.java:76) ~[classes/:na]
at com.nurkiewicz.MyTask.call(Main.java:72) ~[classes/:na]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0]
at java.lang.Thread.run(Thread.java:744) ~[na:1.8.0]
可以很容易发现 mytask 在第76行抛出NPE错误。 但是不知道谁提交了这个任务,因为堆栈跟踪只显示Thread 和ThreadPoolExecutor 。 在技术上我们可以浏览源代码的希望能找到 mytask 被创建的地方。 但是不能立即看到线程全貌。 如果可以保留客户端代码(提交的任务)的堆栈跟踪并显示它,例如在失败时显示,那就是一个好办法。
在Java 8引入了更强大 CompletableFuture , 请尽可能使用它。 ExecutorService 没有扩展到支持这个增强的抽象,所以必须自己使用。
下面是没有使用:
final Future<BigDecimal> future = executorService.submit(this::calculate);
使用后效果:
final CompletableFuture<BigDecimal> future = CompletableFuture.supplyAsync(this::calculate, executorService);
SynchronousQueue 是一个BlockingQueue,但不是真正的队列, 是一个容量0的队列。
引用JavaDoc:
每一个 插入操作必须等待由另一个线程删除,反之亦然。 同步队列没有任何内部容量,甚至一个也没有。你不能插入任何一个元素(使用任何方法),除非有另一个线程正在试图删除它,你不能没有迭代遍历。
同步队列实现的线程池:
BlockingQueue<Runnable> queue = new SynchronousQueue<>();
ExecutorService executorService = new ThreadPoolExecutor(n, n,
0L, TimeUnit.MILLISECONDS,
queue);
当创建带有两个线程的线程池,因为SynchronousQueue 是一个容量为0的队列,因此线程池只有在有空闲线程的情况下接受新任务,如果所有线程很忙,将拒绝新任务,不会有任何等待,这种模式适合后台启动任务需要要么立即执行要么取消不能执行。
以上,介绍了线程池ThreadPoolExecutor的类结构、使用方式示例、线程池数量配置原则和线程池使用注意事项。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。