赞
踩
在实际使用中,线程是很占用系统资源的,如果对线程管理不善很容易导致系统问题。因此,在大多数并发框架中都会使用线程池来管理线程,使用线程池管理线程主要有如下好处:
在Java中可以通过线程池来达到这样的效果。今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起,然后再讲述它的实现原理,接着给出了它的使用示例,最后讨论了一下如何合理配置线程池的大小。
当一个并发任务提交给线程池,线程池分配线程去执行任务的过程如下图所示:
execute方法执行逻辑有这样几种情况:
需要注意的是,线程池的设计思想就是使用了核心线程池corePoolSize,阻塞队列workQueue和线程池maximumPoolSize,这样的缓存策略来处理任务,实际上这样的设计思想在需要框架中都会使用。
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- int c = ctl.get();
- //如果线程池的线程个数少于corePoolSize则创建新线程执行当前任务
- if (workerCountOf(c) < corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- //如果线程个数大于corePoolSize或者创建线程失败,则将任务存放在阻塞队列workQueue中
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- //如果当前任务无法放进阻塞队列中,则创建新的线程来执行任务
- else if (!addWorker(command, false))
- reject(command);
- }
ThreadPoolExecutor继承了AbstractExecutorService,我们来看一下AbstractExecutorService的实现:
- public abstract class AbstractExecutorService implements ExecutorService {
-
-
- protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
- protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
- public Future<?> submit(Runnable task) {};
- public <T> Future<T> submit(Runnable task, T result) { };
- public <T> Future<T> submit(Callable<T> task) { };
- private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
- boolean timed, long nanos)
- throws InterruptedException, ExecutionException, TimeoutException {
- };
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
- throws InterruptedException, ExecutionException {
- };
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- };
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException {
- };
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException {
- };
- }
AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。
我们接着看ExecutorService接口的实现:
- public interface ExecutorService extends Executor {
-
- void shutdown();
- boolean isShutdown();
- boolean isTerminated();
- boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException;
- <T> Future<T> submit(Callable<T> task);
- <T> Future<T> submit(Runnable task, T result);
- Future<?> submit(Runnable task);
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException;
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException;
-
- <T> T invokeAny(Collection<? extends Callable<T>> tasks)
- throws InterruptedException, ExecutionException;
- <T> T invokeAny(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException;
- }
而ExecutorService又是继承了Executor接口,我们看一下Executor接口的实现:
- public interface Executor {
- void execute(Runnable command);
- }
到这里,大家应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。
在ThreadPoolExecutor类中有几个非常重要的方法:
shutdownNow():设置线程池的状态,还会尝试停止正在运行或者暂停任务的线程。
shutdown():设置线程池的状态,只会中断所有没有执行任务的线程。
还有很多其他的方法:
比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,有兴趣的朋友可以自行查阅API。
java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。
在ThreadPoolExecutor类中提供了四个构造方法:
- public class ThreadPoolExecutor extends AbstractExecutorService {
- .....
- public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
- BlockingQueue<Runnable> workQueue);
-
- public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
- BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
-
- public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
- BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
-
- public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
- BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
- ...
- }
从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。
handler:饱和策略。当线程池的阻塞队列已满和指定的线程都已经开启,说明当前线程池已经处于饱和状态了,那么就需要采用一种策略来处理这种情况。采用的策略有这几种:
要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:
任务性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务配置尽可能少的线程数量,如配置Ncpu+1个线程的线程池。IO密集型任务则由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2xNcpu。混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过Runtime.getRuntime().availableProcessors()
方法获得当前设备的CPU个数。
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。
依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。
并且,阻塞队列最好是使用有界队列,如果采用无界队列的话,一旦任务积压在阻塞队列中的话就会占用过多的内存资源,甚至会使得系统崩溃。
固定大小的线程池,可以指定线程池的大小,该线程池corePoolSize和maximumPoolSize相等,阻塞队列使用的是LinkedBlockingQueue,大小为整数最大值。
该线程池中的线程数量始终不变,当有新任务提交时,线程池中有空闲线程则会立即执行,如果没有,则会暂存到阻塞队列。对于固定大小的线程池,不存在线程数量的变化。同时使用几乎无界的LinkedBlockingQueue来存放执行的任务。当任务提交十分频繁的时候,LinkedBlockingQueue迅速增大,存在着耗尽系统资源的问题。而且在线程池空闲时,即线程池中没有可运行任务时,它也不会释放工作线程,还会占用一定的系统资源,需要shutdown。
- public static ExecutorService newFixedThreadPool(int var0) {
- return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
- }
-
- public static ExecutorService newFixedThreadPool(int var0, ThreadFactory var1) {
- return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var1);
- }
- public class FixPoolDemo {
-
- private static Runnable getThread(final int i) {
- return new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(i);
- }
- };
- }
-
- public static void main(String args[]) {
- ExecutorService fixPool = Executors.newFixedThreadPool(5);
- for (int i = 0; i < 10; i++) {
- fixPool.execute(getThread(i));
- }
- fixPool.shutdown();
- }
- }
单个线程线程池,只有一个线程的线程池,阻塞队列使用的是LinkedBlockingQueue,若有多余的任务提交到线程池中,则会被暂存到阻塞队列,待空闲时再去执行。按照先入先出的顺序执行任务。
- public static ExecutorService newSingleThreadExecutor() {
- return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
- }
-
- public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
- return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0));
- }
- public class SingPoolDemo {
- private static Runnable getThread(final int i){
- return new Runnable() {
- @Override
- public void run() {
- try {
-
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(i);
- }
- };
- }
-
- public static void main(String args[]) throws InterruptedException {
- ExecutorService singPool = Executors.newSingleThreadExecutor();
- for (int i=0;i<10;i++){
- singPool.execute(getThread(i));
- }
- singPool.shutdown();
- }
这里需要注意一点,newSingleThreadExecutor和newFixedThreadPool一样,在线程池中没有任务时可执行,也不会释放系统资源的,所以需要shudown。
缓存线程池,缓存线程默认存活60秒。线程的核心池corePoolSize大小为0,核心池最大为Integer.MAX_VALUE,阻塞队列使用的是SynchronousQueue。是一个直接提交的阻塞队列, 他总会迫使线程池增加新的线程去执行新的任务。在没有任务执行时,当线程的空闲时间超过keepAliveTime(60秒),则工作线程将会终止被回收,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销。如果同时又大量任务被提交,而且任务执行的时间不是特别快,那么线程池便会新增出等量的线程池处理任务,这很可能会很快耗尽系统的资源。
- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
- }
-
- public static ExecutorService newCachedThreadPool(ThreadFactory var0) {
- return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), var0);
- }
- public class CachePool {
- private static Runnable getThread(final int i){
- return new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(1000);
- }catch (Exception e){
-
- }
- System.out.println(i);
- }
- };
- }
-
- public static void main(String args[]){
- ExecutorService cachePool = Executors.newCachedThreadPool();
- for (int i=1;i<=10;i++){
- cachePool.execute(getThread(i));
- }
- }
- }
定时线程池,该线程池可用于周期性地去执行任务,通常用于周期性的同步数据。
scheduleAtFixedRate: 是以固定的频率去执行任务,周期是指每次执行任务成功执行之间的间隔。
schedultWithFixedDelay: 是以固定的延时去执行任务,延时是指上一次执行成功之后和下一次开始执行的之前的时间。
- public static ScheduledExecutorService newScheduledThreadPool(int var0) {
- return new ScheduledThreadPoolExecutor(var0);
- }
-
- public static ScheduledExecutorService newScheduledThreadPool(int var0, ThreadFactory var1) {
- return new ScheduledThreadPoolExecutor(var0, var1);
- }
创建一个ScheduledExecutorService线程池的方法,以下为创建一个大小为2的线程池:
ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
实例:
- public class TestNewScheduledThreadPool {
-
- public static void main(String[] args) {
-
- ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
-
- scheduleAtFixedRate(service,1000);
- scheduleAtFixedRate(service,6000);
-
- scheduleWithFixedDelay(service,1000);
- scheduleWithFixedDelay(service,6000);
-
-
- }
-
- private static void scheduleAtFixedRate(ScheduledExecutorService service, final int sleepTime){
- service.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- long start = new Date().getTime();
- System.out.println("scheduleAtFixedRate 开始执行时间:" +
- DateFormat.getTimeInstance().format(new Date()));
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- long end = new Date().getTime();
- System.out.println("scheduleAtFixedRate 执行花费时间=" + (end -start)/1000 + "m");
- System.out.println("scheduleAtFixedRate 执行完成时间:"
- + DateFormat.getTimeInstance().format(new Date()));
- System.out.println("======================================");
- }
- },1000,5000,TimeUnit.MILLISECONDS);
- }
-
- private static void scheduleWithFixedDelay(ScheduledExecutorService service,final int sleepTime){
- service.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- long start = new Date().getTime();
- System.out.println("scheduleWithFixedDelay 开始执行时间:" +
- DateFormat.getTimeInstance().format(new Date()));
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- long end = new Date().getTime();
- System.out.println("scheduleWithFixedDelay执行花费时间=" + (end -start)/1000 + "m");
- System.out.println("scheduleWithFixedDelay执行完成时间:"
- + DateFormat.getTimeInstance().format(new Date()));
- System.out.println("======================================");
- }
- },1000,5000,TimeUnit.MILLISECONDS);
- }
- }
scheduleAtFixedRate
实验一:
scheduleAtFixedRate(service,1000);
输出结果为:
scheduleAtFixedRate 开始执行时间:15:03:15
scheduleAtFixedRate 执行花费时间=1m
scheduleAtFixedRate 执行完成时间:15:03:16
======================================
scheduleAtFixedRate 开始执行时间:15:03:20
scheduleAtFixedRate 执行花费时间=1m
scheduleAtFixedRate 执行完成时间:15:03:21
======================================
分析得出:在任务执行时间小于间隔时间的情况下,程序以起始时间为准则,每隔指定时间执行一次,不受任务执行时间影响。
实验二:
scheduleAtFixedRate(service,6000);
输出结果为:
scheduleAtFixedRate 开始执行时间:15:06:12
scheduleAtFixedRate 执行花费时间=6m
scheduleAtFixedRate 执行完成时间:15:06:18
======================================
scheduleAtFixedRate 开始执行时间:15:06:18
scheduleAtFixedRate 执行花费时间=6m
scheduleAtFixedRate 执行完成时间:15:06:24
======================================
scheduleAtFixedRate 开始执行时间:15:06:24
scheduleAtFixedRate 执行花费时间=6m
scheduleAtFixedRate 执行完成时间:15:06:30
分析得出:当执行任务时间大于间隔时间,此方法不会重新开启一个新的任务进行执行,而是等待原有任务执行完成,马上开启下一个任务进行执行。此时,执行间隔时间已经被打乱。
scheduleWithFixedDelay
实验一
scheduleWithFixedDelay(service,1000);
输出结果为:
scheduleWithFixedDelay 开始执行时间:15:11:03
scheduleWithFixedDelay执行花费时间=1m
scheduleWithFixedDelay执行完成时间:15:11:04
======================================
scheduleWithFixedDelay 开始执行时间:15:11:09
scheduleWithFixedDelay执行花费时间=1m
scheduleWithFixedDelay执行完成时间:15:11:10
======================================
分析得出:当执行任务小于延迟时间时,第一个任务执行之后,延迟指定时间,然后开始执行第二个任务。
实验二
scheduleWithFixedDelay(service,6000);
输出结果为:
scheduleWithFixedDelay 开始执行时间:15:12:53
scheduleWithFixedDelay执行花费时间=6m
scheduleWithFixedDelay执行完成时间:15:12:59
======================================
scheduleWithFixedDelay 开始执行时间:15:13:04
scheduleWithFixedDelay执行花费时间=6m
scheduleWithFixedDelay执行完成时间:15:13:10
======================================
得出结论:当执行任务大于延迟时间时,第一个任务执行之后,延迟指定时间,然后开始执行第二个任务。
总之:此方法无论任务执行时间长短,都是当第一个任务执行完成之后,延迟指定时间再开始执行第二个任务。
参考文章:
《Java并发编程的艺术》
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。