赞
踩
线程Thread是一个重量级资源,线程的创建、启动以及销毁都是比较耗费系统资源的,同时受限于系统资源的限制,线程的数量与系统性能是一种抛物线的关系,因此对线程的管理,是一种非常好的程序设计习惯,自JDK1.5起,utils包提供了ExecutorService[ɪɡˈzɛkjətɚ]线程池的实现。通俗的将:为了避免重复的创建线程,线程池的出现可以让线程进行复用。当有工作来,就会向线程池拿一个线程,当工作完成后,并不是直接关闭线程,而是将这个线程归还给线程池供其他任务使用。
一个线程池包括以下四个基本组成部分:
(1)、降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗;
(2)、提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行;
(3)方便线程并发数的管控。因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM,并且会造成cpu过度切换(cpu切换线程是有时间成本的(需要保持当前执行线程的现场,并恢复要执行线程的现场))。
(4)提供更强大的功能,延时定时线程池。
Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池。
其中AtomicInteger变量ctl的功能非常强大:利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态:
1、RUNNING:-1 << COUNT_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
2、SHUTDOWN: 0 << COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
3、STOP : 1 << COUNT_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
4、TIDYING : 2 << COUNT_BITS,即高3位为010,该状态表示线程池对线程进行整理优化;
5、TERMINATED: 3 << COUNT_BITS,即高3位为011,该状态表示线程池停止工作;
首先创建一个自定义的线程,模拟并发任务。
- public class MyThread extends Thread {
- private int i;
- public MyThread(int in) {
- this.i = in;
- }
- public void run() {
- try {
- this.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(currentThread().getName()+"正在打印:"+i);
- }
- }
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
-
- public class MyThreadText {
- public static void main(String[] args) {
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- for (int i = 0; i < 10; i++) {
- executorService.execute(new MyThread(i));
- }
- executorService.shutdown();
- }
- }
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
-
- public class MyThreadText {
- public static void main(String[] args) {
- ExecutorService executorService = Executors.newFixedThreadPool(5);
- for (int i = 0; i < 10; i++) {
- executorService.execute(new MyThread(i));
- }
- executorService.shutdown();
- }
- }
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
-
- public class MyThreadText {
- public static void main(String[] args) {
- ExecutorService executorService = Executors.newCachedThreadPool();
- for (int i = 0; i < 10; i++) {
- executorService.execute(new MyThread(i));
- }
- executorService.shutdown();
- }
- }
- package com.zhw.learning.thread;
-
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.TimeUnit;
-
- /**
- * @author zhw
- * 创建一个可定期或者延时执行任务的定长线程池,支持定时及周期性任务执行
- *
- * Executors.newScheduledThreadPool(3):
- * public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
- * return new ScheduledThreadPoolExecutor(corePoolSize);
- * }
- * public ScheduledThreadPoolExecutor(int corePoolSize) {
- * super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
- * new DelayedWorkQueue());
- * }
- */
- public class ScheduledThreadPoolTest {
-
- public static void main(String[] args) throws InterruptedException {
- final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
-
- ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
-
- System.out.println("提交时间: " + sdf.format(new Date()));
-
- //延迟3秒钟后执行任务
- // scheduledThreadPool.schedule(new Runnable() {
- // @Override
- // public void run() {
- // System.out.println("运行时间: " + sdf.format(new Date()));
- // }
- // }, 3, TimeUnit.SECONDS);
-
- //延迟1秒钟后每隔3秒执行一次任务
- scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- System.out.println("运行时间: " + sdf.format(new Date()));
- }
- }, 1, 3, TimeUnit.SECONDS);
- Thread.sleep(10000);
-
- scheduledThreadPool.shutdown();
- }
-
- }
-
延迟3秒钟后执行任务,运行结果如下:
延迟1秒钟后每隔3秒执行一次任务,运行结果如下:
ThreadPoolExecutor执行execute方法分下面4种情况。
1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤 需要获取全局锁)。
2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。 3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。 4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution()方法。 ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能 地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后 (当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而 步骤2不需要获取全局锁。
假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;然后就将任务也分配给这4个临时工人做; 如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。10个人是全职工人,4个人是零时工,全职工人和工作队列满的时候才会找临时工,零食工在一定时间内没工作那么就辞掉它
- public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
- Executors.defaultThreadFactory(), defaultHandler);
- }
1)corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线 程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任 务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法, 线程池会提前创建并启动所有基本线程。
2)runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几 个阻塞队列。
·ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原 则对元素进行排序。
·LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通 常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
·SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用 移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工 厂方法Executors.newCachedThreadPool使用了这个队列。
·PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
3)maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并 且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如 果使用了无界的任务队列这个参数就没什么效果。
4)ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设 置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线 程设置有意义的名字,代码如下。
new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
5)RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状 态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法 处理新任务时抛出异常。在JDK 1.5中Java线程池框架提供了以下4种策略。
·AbortPolicy:直接抛出异常。
·CallerRunsPolicy:只用调用者所在线程来运行任务。
·DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
,并执行当前任务。
·DiscardPolicy:不处理,丢弃掉。
当然,也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录 日志或持久化存储不能处理的任务。
6.·keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以, 如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。
7.·TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟 (MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)。
9.2.2 向线程池提交任务 可以使用两个方法向线程池提交任务,分别为execute()和submit()方法。
handler有四个选择:
策略1:ThreadPoolExecutor.AbortPolicy()
抛出java.util.concurrent.RejectedExecutionException异常 ,示例如下:
小疑问:笔者对运行结果还是有一些疑问,为什么异常在下面第3行就出现了
策略2:ThreadPoolExecutor.CallerRunsPolicy
如果任务被拒绝了,则由调用线程(提交任务的线程)直接执行此任务,我们可以通过代码来验证这一点:
解释:上面zxai
策略3:RejectedExecutionHandler handler =
new ThreadPoolExecutor.DiscardOldestPolicy();
这样运行结果就不会有100个线程全部被执行。处理源码如下:
Java中Queue的一些常用方法:
poll() 移除并返问队列头部的元素
解释:该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。(问:为什么是弹出队列的头元素呢?答:是因为队列的先进先出原则,所以队列的头元素肯定是最老的一个请求,而DiscardOldestPolicy策略就是把最老的请求废除,把这个机会(相当于抢了最老的请求的吃到嘴边的肉)留给当前新提交到线程池里面的线程(这个时候线程池已经满了,队列也已经满了,如果不满的话,也不会有拒绝策略了!))
策略4:ThreadPoolExecutor.DiscardPolicy
用于被拒绝任务的处理程序,默认情况下它将丢弃被拒绝的任务。
运行结果也不会全部执行100个线程。
源码如下,实际就是对线程不执行操作:
public class ExecutorsDemo {
private static ExecutorService executor = Executors.newFixedThreadPool(15);
public static void main(String[] args) {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executor.execute(new SubThread());
}
}
}
class SubThread implements Runnable {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
//do nothing
}
}
}
从图中可以看出,当提交一个新任务到线程池时,线程池的处理流程如下。
1)线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作 线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。
2)线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这 个工作队列里。如果工作队列满了,则进入下个流程。
3)线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程 来执行任务。如果已经满了,则交给饱和策略来处理这个任务。
·
·keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以, 如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用 移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工 厂方法Executors.newCachedThreadPool使用了这个队列。
10.2 ThreadPoolExecutor详解 Executor框架最核心的类是ThreadPoolExecutor,它是线程池的实现类,主要由下列4个组
件构成。
·corePool:核心线程池的大小。
·maximumPool:最大线程池的大小。
·BlockingQueue:用来暂时保存任务的工作队列。
·RejectedExecutionHandler:当ThreadPoolExecutor已经关闭或ThreadPoolExecutor已经饱和 时(达到了最大线程池大小且工作队列已满),execute()方法将要调用的Handler。
·通过Executor框架的工具类Executors,可以创建3种类型的ThreadPoolExecutor。 ·FixedThreadPool。
·SingleThreadExecutor。
·CachedThreadPool。
下面将分别介绍这3种ThreadPoolExecutor。
10.2.1 FixedThreadPool详解 FixedThreadPool被称为可重用固定线程数的线程池。下面是FixedThreadPool的源代码实
现。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时指 定的参数nThreads。
当线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的 最长时间,超过这个时间后多余的线程将被终止。这里把keepAliveTime设置为0L,意味着多余 的空闲线程会被立即终止。
FixedThreadPool的execute()方法的运行示意图如图10-4所示。
图10-4 FixedThreadPool的execute()的运行示意图 对图10-4的说明如下。
1)如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务。 2)在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入
LinkedBlockingQueue。
3)线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行。
FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为 Integer.MAX_VALUE)。使用无界队列作为工作队列会对线程池带来如下影响。
1)当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中 的线程数不会超过corePoolSize。
2)由于1,使用无界队列时maximumPoolSize将是一个无效参数。
3)由于1和2,使用无界队列时keepAliveTime将是一个无效参数。
4)由于使用无界队列,运行中的FixedThreadPool(未执行方法shutdown()或 shutdownNow())不会拒绝任务(不会调用RejectedExecutionHandler.rejectedExecution方法)。
10.2.2 SingleThreadExecutor详解 SingleThreadExecutor是使用单个worker线程的Executor。下面是SingleThreadExecutor的源
代码实现。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1。其他参数与 FixedThreadPool相同。SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工 作队列(队列的容量为Integer.MAX_VALUE)。SingleThreadExecutor使用无界队列作为工作队列 对线程池带来的影响与FixedThreadPool相同,这里就不赘述了。
SingleThreadExecutor的运行示意图如图10-5所示。
图10-5 SingleThreadExecutor的execute()的运行示意图 对图10-5的说明如下。
1)如果当前运行的线程数少于corePoolSize(即线程池中无运行的线程),则创建一个新线 程来执行任务。
2)在线程池完成预热之后(当前线程池中有一个运行的线程),将任务加入Linked- BlockingQueue。
3)线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来 执行。
10.2.3 CachedThreadPool详解 CachedThreadPool是一个会根据需要创建新线程的线程池。下面是创建CachedThread-
Pool的源代码。
- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
}
CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为 Integer.MAX_VALUE,即maximumPool是无界的。这里把keepAliveTime设置为60L,意味着 CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被 终止。
FixedThreadPool和SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的 工作队列。CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但 CachedThreadPool的maximumPool是无界的。这意味着,如果主线程提交任务的速度高于 maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下, CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。
CachedThreadPool的execute()方法的执行示意图如图10-6所示。
图10-6 CachedThreadPool的execute()的运行示意图 对图10-6的说明如下。
1)首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPool中有空闲线程 正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行 offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方 法执行完成;否则执行下面的步骤2)。
2)当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤1)将失 败。此时CachedThreadPool会创建一个新线程执行任务,execute()方法执行完成。
3)在步骤2)中新创建的线程将任务执行完后,会执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这个poll操作会让空闲线
程最多在SynchronousQueue中等待60秒钟。如果60秒钟内主线程提交了一个新任务(主线程执 行步骤1)),那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。由于 空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源。
前面提到过,SynchronousQueue是一个没有容量的阻塞队列。每个插入操作必须等待另一 个线程的对应移除操作,反之亦然。CachedThreadPool使用SynchronousQueue,把主线程提交的 任务传递给空闲线程执行。CachedThreadPool中任务传递的示意图如图10-7所示。
图10-7 CachedThreadPool的任务传递示意图
10.3 ScheduledThreadPoolExecutor详解
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运 行任务,或者定期执行任务。ScheduledThreadPoolExecutor的功能与Timer类似,但 ScheduledThreadPoolExecutor功能更强大、更灵活。Timer对应的是单个后台线程,而 ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。