当前位置:   article > 正文

线程池详解_fixpool

fixpool

前言:

在实际使用中,线程是很占用系统资源的,如果对线程管理不善很容易导致系统问题。因此,在大多数并发框架中都会使用线程池来管理线程,使用线程池管理线程主要有如下好处:

  1. 降低资源消耗。通过复用已存在的线程和降低线程关闭的次数来尽可能降低系统性能损耗;
  2. 提升系统响应速度。通过复用线程,省去创建线程的过程,因此整体上提升了系统的响应速度;
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此,需要使用线程池来管理线程。

  在Java中可以通过线程池来达到这样的效果。今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起,然后再讲述它的实现原理,接着给出了它的使用示例,最后讨论了一下如何合理配置线程池的大小。

当一个并发任务提交给线程池,线程池分配线程去执行任务的过程如下图所示:

execute方法执行逻辑有这样几种情况:

  1. 如果当前运行的线程少于corePoolSize,则会创建新的线程来执行新的任务;
  2. 如果运行的线程个数等于或者大于corePoolSize,则会将提交的任务存放到阻塞队列workQueue中;
  3. 如果当前workQueue队列已满的话,则会创建新的线程来执行任务;
  4. 如果线程个数已经超过了maximumPoolSize,则会使用饱和策略RejectedExecutionHandler来进行处理。

       需要注意的是,线程池的设计思想就是使用了核心线程池corePoolSize,阻塞队列workQueue和线程池maximumPoolSize,这样的缓存策略来处理任务,实际上这样的设计思想在需要框架中都会使用。

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. int c = ctl.get();
  5. //如果线程池的线程个数少于corePoolSize则创建新线程执行当前任务
  6. if (workerCountOf(c) < corePoolSize) {
  7. if (addWorker(command, true))
  8. return;
  9. c = ctl.get();
  10. }
  11. //如果线程个数大于corePoolSize或者创建线程失败,则将任务存放在阻塞队列workQueue中
  12. if (isRunning(c) && workQueue.offer(command)) {
  13. int recheck = ctl.get();
  14. if (! isRunning(recheck) && remove(command))
  15. reject(command);
  16. else if (workerCountOf(recheck) == 0)
  17. addWorker(null, false);
  18. }
  19. //如果当前任务无法放进阻塞队列中,则创建新的线程来执行任务
  20. else if (!addWorker(command, false))
  21. reject(command);
  22. }

ThreadPoolExecutor的类关系图

ThreadPoolExecutor继承了AbstractExecutorService,我们来看一下AbstractExecutorService的实现:

  1. public abstract class AbstractExecutorService implements ExecutorService {
  2. protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
  3. protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
  4. public Future<?> submit(Runnable task) {};
  5. public <T> Future<T> submit(Runnable task, T result) { };
  6. public <T> Future<T> submit(Callable<T> task) { };
  7. private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
  8. boolean timed, long nanos)
  9. throws InterruptedException, ExecutionException, TimeoutException {
  10. };
  11. public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  12. throws InterruptedException, ExecutionException {
  13. };
  14. public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  15. long timeout, TimeUnit unit)
  16. throws InterruptedException, ExecutionException, TimeoutException {
  17. };
  18. public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  19. throws InterruptedException {
  20. };
  21. public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  22. long timeout, TimeUnit unit)
  23. throws InterruptedException {
  24. };
  25. }

AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。

  我们接着看ExecutorService接口的实现:

  1. public interface ExecutorService extends Executor {
  2. void shutdown();
  3. boolean isShutdown();
  4. boolean isTerminated();
  5. boolean awaitTermination(long timeout, TimeUnit unit)
  6. throws InterruptedException;
  7. <T> Future<T> submit(Callable<T> task);
  8. <T> Future<T> submit(Runnable task, T result);
  9. Future<?> submit(Runnable task);
  10. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  11. throws InterruptedException;
  12. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  13. long timeout, TimeUnit unit)
  14. throws InterruptedException;
  15. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  16. throws InterruptedException, ExecutionException;
  17. <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  18. long timeout, TimeUnit unit)
  19. throws InterruptedException, ExecutionException, TimeoutException;
  20. }

 而ExecutorService又是继承了Executor接口,我们看一下Executor接口的实现:

  1. public interface Executor {
  2. void execute(Runnable command);
  3. }

到这里,大家应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。

  1. Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;
  2. 然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
  3. 抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;
  4. 然后ThreadPoolExecutor继承了类AbstractExecutorService。

在ThreadPoolExecutor类中有几个非常重要的方法:

  • execute(Runnable command):实际上是Executor中声明的方法,在ThreadPoolExecutor进行具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。无返回值
  • Future<T> submit(Callable<T> task):是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。
  • shutdownNow():设置线程池的状态,还会尝试停止正在运行或者暂停任务的线程。

  • shutdown():设置线程池的状态,只会中断所有没有执行任务的线程。

还有很多其他的方法:

  比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,有兴趣的朋友可以自行查阅API。

ThreadPoolExecutor类

java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。

在ThreadPoolExecutor类中提供了四个构造方法:

  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. .....
  3. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
  4. BlockingQueue<Runnable> workQueue);
  5. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
  6. BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
  7. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
  8. BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
  9. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
  10. BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
  11. ...
  12. }

从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

下面解释下一下构造器中各个参数的含义:

  • corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,即使有其他空闲的线程能够执行任务也会创建线程。当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
  • maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
  • unit:参数keepAliveTime的时间单位,有7种取值。
  • workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue;
  • threadFactory:线程工厂,主要用来创建线程;
  • handler:饱和策略。当线程池的阻塞队列已满和指定的线程都已经开启,说明当前线程池已经处于饱和状态了,那么就需要采用一种策略来处理这种情况。采用的策略有这几种:

如何合理配置线程池参数?

要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:

  1. 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
  2. 任务的优先级:高,中和低。
  3. 任务的执行时间:长,中和短。
  4. 任务的依赖性:是否依赖其他系统资源,如数据库连接。

       任务性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务配置尽可能少的线程数量,如配置Ncpu+1个线程的线程池。IO密集型任务则由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2xNcpu。混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。

      优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。

      执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。

      依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。

      并且,阻塞队列最好是使用有界队列,如果采用无界队列的话,一旦任务积压在阻塞队列中的话就会占用过多的内存资源,甚至会使得系统崩溃。

常见的四种线程池

1、newFixedThreadPool

        固定大小的线程池,可以指定线程池的大小,该线程池corePoolSize和maximumPoolSize相等,阻塞队列使用的是LinkedBlockingQueue,大小为整数最大值。

        该线程池中的线程数量始终不变,当有新任务提交时,线程池中有空闲线程则会立即执行,如果没有,则会暂存到阻塞队列。对于固定大小的线程池,不存在线程数量的变化。同时使用几乎无界的LinkedBlockingQueue来存放执行的任务。当任务提交十分频繁的时候,LinkedBlockingQueue迅速增大,存在着耗尽系统资源的问题。而且在线程池空闲时,即线程池中没有可运行任务时,它也不会释放工作线程,还会占用一定的系统资源,需要shutdown。

  1. public static ExecutorService newFixedThreadPool(int var0) {
  2. return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
  3. }
  4. public static ExecutorService newFixedThreadPool(int var0, ThreadFactory var1) {
  5. return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var1);
  6. }

newFixedThreadPool实例:

  1. public class FixPoolDemo {
  2. private static Runnable getThread(final int i) {
  3. return new Runnable() {
  4. @Override
  5. public void run() {
  6. try {
  7. Thread.sleep(500);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. System.out.println(i);
  12. }
  13. };
  14. }
  15. public static void main(String args[]) {
  16. ExecutorService fixPool = Executors.newFixedThreadPool(5);
  17. for (int i = 0; i < 10; i++) {
  18. fixPool.execute(getThread(i));
  19. }
  20. fixPool.shutdown();
  21. }
  22. }

2、newSingleThreadExecutor

        单个线程线程池,只有一个线程的线程池,阻塞队列使用的是LinkedBlockingQueue,若有多余的任务提交到线程池中,则会被暂存到阻塞队列,待空闲时再去执行。按照先入先出的顺序执行任务

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
  3. }
  4. public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
  5. return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0));
  6. }

 newSingleThreadExecutor实例

  1. public class SingPoolDemo {
  2. private static Runnable getThread(final int i){
  3. return new Runnable() {
  4. @Override
  5. public void run() {
  6. try {
  7. Thread.sleep(500);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. System.out.println(i);
  12. }
  13. };
  14. }
  15. public static void main(String args[]) throws InterruptedException {
  16. ExecutorService singPool = Executors.newSingleThreadExecutor();
  17. for (int i=0;i<10;i++){
  18. singPool.execute(getThread(i));
  19. }
  20. singPool.shutdown();
  21. }

        这里需要注意一点,newSingleThreadExecutor和newFixedThreadPool一样,在线程池中没有任务时可执行,也不会释放系统资源的,所以需要shudown。 

3、newCachedThreadPool

        缓存线程池,缓存线程默认存活60秒。线程的核心池corePoolSize大小为0,核心池最大为Integer.MAX_VALUE,阻塞队列使用的是SynchronousQueue。是一个直接提交的阻塞队列, 他总会迫使线程池增加新的线程去执行新的任务。在没有任务执行时,当线程的空闲时间超过keepAliveTime(60秒),则工作线程将会终止被回收,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销。如果同时又大量任务被提交,而且任务执行的时间不是特别快,那么线程池便会新增出等量的线程池处理任务,这很可能会很快耗尽系统的资源。

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
  3. }
  4. public static ExecutorService newCachedThreadPool(ThreadFactory var0) {
  5. return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), var0);
  6. }

 newCachedThreadPool实例:

  1. public class CachePool {
  2. private static Runnable getThread(final int i){
  3. return new Runnable() {
  4. @Override
  5. public void run() {
  6. try {
  7. Thread.sleep(1000);
  8. }catch (Exception e){
  9. }
  10. System.out.println(i);
  11. }
  12. };
  13. }
  14. public static void main(String args[]){
  15. ExecutorService cachePool = Executors.newCachedThreadPool();
  16. for (int i=1;i<=10;i++){
  17. cachePool.execute(getThread(i));
  18. }
  19. }
  20. }

4、newScheduledThreadPool

定时线程池,该线程池可用于周期性地去执行任务,通常用于周期性的同步数据。

scheduleAtFixedRate: 是以固定的频率去执行任务,周期是指每次执行任务成功执行之间的间隔。

schedultWithFixedDelay: 是以固定的延时去执行任务,延时是指上一次执行成功之后和下一次开始执行的之前的时间。

  1. public static ScheduledExecutorService newScheduledThreadPool(int var0) {
  2. return new ScheduledThreadPoolExecutor(var0);
  3. }
  4. public static ScheduledExecutorService newScheduledThreadPool(int var0, ThreadFactory var1) {
  5. return new ScheduledThreadPoolExecutor(var0, var1);
  6. }

 创建一个ScheduledExecutorService线程池的方法,以下为创建一个大小为2的线程池:

ScheduledExecutorService service = Executors.newScheduledThreadPool(2);

 实例:

  1. public class TestNewScheduledThreadPool {
  2. public static void main(String[] args) {
  3. ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
  4. scheduleAtFixedRate(service,1000);
  5. scheduleAtFixedRate(service,6000);
  6. scheduleWithFixedDelay(service,1000);
  7. scheduleWithFixedDelay(service,6000);
  8. }
  9. private static void scheduleAtFixedRate(ScheduledExecutorService service, final int sleepTime){
  10. service.scheduleAtFixedRate(new Runnable() {
  11. @Override
  12. public void run() {
  13. long start = new Date().getTime();
  14. System.out.println("scheduleAtFixedRate 开始执行时间:" +
  15. DateFormat.getTimeInstance().format(new Date()));
  16. try {
  17. Thread.sleep(sleepTime);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. long end = new Date().getTime();
  22. System.out.println("scheduleAtFixedRate 执行花费时间=" + (end -start)/1000 + "m");
  23. System.out.println("scheduleAtFixedRate 执行完成时间:"
  24. + DateFormat.getTimeInstance().format(new Date()));
  25. System.out.println("======================================");
  26. }
  27. },1000,5000,TimeUnit.MILLISECONDS);
  28. }
  29. private static void scheduleWithFixedDelay(ScheduledExecutorService service,final int sleepTime){
  30. service.scheduleWithFixedDelay(new Runnable() {
  31. @Override
  32. public void run() {
  33. long start = new Date().getTime();
  34. System.out.println("scheduleWithFixedDelay 开始执行时间:" +
  35. DateFormat.getTimeInstance().format(new Date()));
  36. try {
  37. Thread.sleep(sleepTime);
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. }
  41. long end = new Date().getTime();
  42. System.out.println("scheduleWithFixedDelay执行花费时间=" + (end -start)/1000 + "m");
  43. System.out.println("scheduleWithFixedDelay执行完成时间:"
  44. + DateFormat.getTimeInstance().format(new Date()));
  45. System.out.println("======================================");
  46. }
  47. },1000,5000,TimeUnit.MILLISECONDS);
  48. }
  49. }

scheduleAtFixedRate

实验一:

scheduleAtFixedRate(service,1000);

 输出结果为:

scheduleAtFixedRate 开始执行时间:15:03:15
scheduleAtFixedRate 执行花费时间=1m
scheduleAtFixedRate 执行完成时间:15:03:16
======================================
scheduleAtFixedRate 开始执行时间:15:03:20
scheduleAtFixedRate 执行花费时间=1m
scheduleAtFixedRate 执行完成时间:15:03:21
======================================

 分析得出:在任务执行时间小于间隔时间的情况下,程序以起始时间为准则,每隔指定时间执行一次,不受任务执行时间影响。

实验二:

scheduleAtFixedRate(service,6000);

输出结果为:

scheduleAtFixedRate 开始执行时间:15:06:12
scheduleAtFixedRate 执行花费时间=6m
scheduleAtFixedRate 执行完成时间:15:06:18
======================================
scheduleAtFixedRate 开始执行时间:15:06:18
scheduleAtFixedRate 执行花费时间=6m
scheduleAtFixedRate 执行完成时间:15:06:24
======================================
scheduleAtFixedRate 开始执行时间:15:06:24
scheduleAtFixedRate 执行花费时间=6m
scheduleAtFixedRate 执行完成时间:15:06:30

分析得出:当执行任务时间大于间隔时间,此方法不会重新开启一个新的任务进行执行,而是等待原有任务执行完成,马上开启下一个任务进行执行。此时,执行间隔时间已经被打乱。

scheduleWithFixedDelay

实验一

scheduleWithFixedDelay(service,1000);

输出结果为:

scheduleWithFixedDelay 开始执行时间:15:11:03
scheduleWithFixedDelay执行花费时间=1m
scheduleWithFixedDelay执行完成时间:15:11:04
======================================
scheduleWithFixedDelay 开始执行时间:15:11:09
scheduleWithFixedDelay执行花费时间=1m
scheduleWithFixedDelay执行完成时间:15:11:10
======================================

分析得出:当执行任务小于延迟时间时,第一个任务执行之后,延迟指定时间,然后开始执行第二个任务。

实验二

scheduleWithFixedDelay(service,6000);

输出结果为:
 

scheduleWithFixedDelay 开始执行时间:15:12:53
scheduleWithFixedDelay执行花费时间=6m
scheduleWithFixedDelay执行完成时间:15:12:59
======================================
scheduleWithFixedDelay 开始执行时间:15:13:04
scheduleWithFixedDelay执行花费时间=6m
scheduleWithFixedDelay执行完成时间:15:13:10
======================================

得出结论:当执行任务大于延迟时间时,第一个任务执行之后,延迟指定时间,然后开始执行第二个任务。

总之:此方法无论任务执行时间长短,都是当第一个任务执行完成之后,延迟指定时间再开始执行第二个任务。

 

参考文章:

《Java并发编程的艺术》

https://www.cnblogs.com/superfj/p/7544971.html

https://www.cnblogs.com/dolphin0520/p/3932921.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/246480
推荐阅读
相关标签
  

闽ICP备14008679号