赞
踩
Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序 都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源, 还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用 线程池,必须对其实现原理了如指掌。
java中涉及到线程池的相关类均在jdk1.5开始的java.util.concurrent包中,涉及到的几个核心类及接口包括:Executor、Executors、ExecutorService、ThreadPoolExecutor、FutureTask、Callable、Runnable等
Executor接口
这个接口也是整个线程池中最顶层的接口,提供了一个无返回值的提交任务的方法
- public interface Executor {
-
- //提交运行任务,参数为Runnable接口对象,无返回值
- void execute(Runnable command);
- }
由于这个接口过于简单,我们无法得知线程池的执行结果数据,如果我们不再使用线程池,也无法通过Executor接口来关闭线程 池。此时,我们就需要ExecutorService接口的支持了。
ExecutorService接口
非定时任务类线程池的核心接口,通过ExecutorService接口能够向线程池中提交任务(支持有返回结果和无 返回结果两种方式)、关闭线程池、唤醒线程池中的任务等。ExecutorService接口的源码如下所示,这个接口也是我们在使用非定时任务类的线程池中最常使用的接口
- public interface ExecutorService extends Executor {
-
- //关闭线程池,线程池中不再接受新提交的任务,但是之前提交的任务继续运行,直到完成
- void shutdown();
-
- //关闭线程池,线程池中不再接受新提交的任务,会尝试停止线程池中正在执行的任务。
- List<Runnable> shutdownNow();
- //判断线程池是否已经关闭
-
- boolean isShutdown();
- //判断线程池中的所有任务是否结束,只有在调用shutdown或者shutdownNow方法之后调用此方法才会返回true。
-
- boolean isTerminated();
- //等待线程池中的所有任务执行结束,并设置超时时间
-
- boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException;
- //提交一个Callable接口类型的任务,返回一个Future类型的结果
-
- <T> Future<T> submit(Callable<T> task);
-
- //提交一个Callable接口类型的任务,并且给定一个泛型类型的接收结果数据参数,返回一个Future类型的结果
- <T> Future<T> submit(Runnable task, T result);
- //提交一个Runnable接口类型的任务,返回一个Future类型的结果
-
- Future<?> submit(Runnable task);
- //批量提交任务并获得他们的future,Task列表与Future列表一一对应
-
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException;
-
- //批量提交任务并获得他们的future,并限定处理所有任务的时间
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException;
- //批量提交任务并获得一个已经成功执行的任务的结果
-
- <T> T invokeAny(Collection<? extends Callable<T>> tasks)
- throws InterruptedException, ExecutionException;
- //批量提交任务并获得一个已经成功执行的任务的结果,并限定处理任务的时间
-
- <T> T invokeAny(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException;
- }
AbstractExecutorService抽象类
派生自ExecutorService接口,实现了几个非常实现的方法,供子类进行调用。
- public abstract class AbstractExecutorService implements ExecutorService {
-
-
- //RunnableFuture类用于获取执行结果,在实际使用时,我们经常使用的是它的子类FutureTask,newTaskFor方法的作用就是将任务封装成FutureTask对象,后续将FutureTask对象提交到线程池。
- protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
- return new FutureTask<T>(runnable, value);
- }
-
- protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
- return new FutureTask<T>(callable);
- }
-
-
- public Future<?> submit(Runnable task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<Void> ftask = newTaskFor(task, null);
- execute(ftask);
- return ftask;
- }
-
- public <T> Future<T> submit(Runnable task, T result) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<T> ftask = newTaskFor(task, result);
- execute(ftask);
- return ftask;
- }
-
-
- public <T> Future<T> submit(Callable<T> task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<T> ftask = newTaskFor(task);
- execute(ftask);
- return ftask;
- }
-
-
- private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
- boolean timed, long nanos)
- throws InterruptedException, ExecutionException, TimeoutException {
- //提交的任务为空,抛出空指针异常
- if (tasks == null)
- throw new NullPointerException();
- //记录待执行的任务的剩余数量
- int ntasks = tasks.size();
- //任务集合中的数据为空,抛出非法参数异常
- if (ntasks == 0)
- throw new IllegalArgumentException();
- ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
- //以当前实例对象作为参数构建ExecutorCompletionService对象
- // ExecutorCompletionService负责执行任务,后面调用用poll返回第一个执行结果
- ExecutorCompletionService<T> ecs =
- new ExecutorCompletionService<T>(this);
-
-
-
- try {
- // 记录可能抛出的执行异常
- ExecutionException ee = null;
- // 初始化超时时间
- final long deadline = timed ? System.nanoTime() + nanos : 0L;
- Iterator<? extends Callable<T>> it = tasks.iterator();
-
- //提交任务,并将返回的结果数据添加到futures集合中
- //提交一个任务主要是确保在进入循环之前开始一个任务
- futures.add(ecs.submit(it.next()));
- --ntasks;
- //记录正在执行的任务数量
- int active = 1;
-
- for (;;) {
- //从完成任务的BlockingQueue队列中获取并移除下一个将要完成的任务的结果。
- //如果BlockingQueue队列中中的数据为空,则返回null
- //这里的poll()方法是非阻塞方法
- Future<T> f = ecs.poll();
- //获取的结果为空
- if (f == null) {
- //集合中仍有未执行的任务数量
- if (ntasks > 0) {
- //未执行的任务数量减1
-
- --ntasks;
- //提交完成并将结果添加到futures集合中
- futures.add(ecs.submit(it.next()));
- //正在执行的任务数量加•1
-
- ++active;
- }
- //所有任务执行完成,并且返回了结果数据,则退出循环
- //之所以处理active为0的情况,是因为poll()方法是非阻塞方法,可能导致未返回结果时active为0
- else if (active == 0)
- break;
- //如果timed为true,则执行获取结果数据时设置超时时间,也就是超时获取结果表示
- else if (timed) {
- f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
- if (f == null)
- throw new TimeoutException();
- nanos = deadline - System.nanoTime();
- }
- //没有设置超时,并且所有任务都被提交了,则一直阻塞,直到返回一个执行结果
- else
- f = ecs.take();
- }
- //获取到执行结果,则将正在执行的任务减1,从Future中获取结果并返回
- if (f != null) {
- --active;
- try {
- return f.get();
- } catch (ExecutionException eex) {
- ee = eex;
- } catch (RuntimeException rex) {
- ee = new ExecutionException(rex);
- }
- }
- }
-
- if (ee == null)
- ee = new ExecutionException();
- throw ee;
-
- } finally {
- //如果从所有执行的任务中获取到一个结果数据,则取消所有执行的任务,不再向下执行
- for (int i = 0, size = futures.size(); i < size; i++)
- futures.get(i).cancel(true);
- }
- }
-
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
- throws InterruptedException, ExecutionException {
- try {
- return doInvokeAny(tasks, false, 0);
- } catch (TimeoutException cannotHappen) {
- assert false;
- return null;
- }
- }
-
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- return doInvokeAny(tasks, true, unit.toNanos(timeout));
- }
-
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException {
- if (tasks == null)
- throw new NullPointerException();
- ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
- boolean done = false;
- try {
- for (Callable<T> t : tasks) {
- RunnableFuture<T> f = newTaskFor(t);
- futures.add(f);
- execute(f);
- }
- for (int i = 0, size = futures.size(); i < size; i++) {
- Future<T> f = futures.get(i);
- if (!f.isDone()) {
- try {
- f.get();
- } catch (CancellationException ignore) {
- } catch (ExecutionException ignore) {
- }
- }
- }
- done = true;
- return futures;
- } finally {
- if (!done)
- for (int i = 0, size = futures.size(); i < size; i++)
- futures.get(i).cancel(true);
- }
- }
-
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException {
- if (tasks == null)
- throw new NullPointerException();
- long nanos = unit.toNanos(timeout);
- ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
- boolean done = false;
- try {
- for (Callable<T> t : tasks)
- futures.add(newTaskFor(t));
-
- final long deadline = System.nanoTime() + nanos;
- final int size = futures.size();
-
- // Interleave time checks and calls to execute in case
- // executor doesn't have any/much parallelism.
- for (int i = 0; i < size; i++) {
- execute((Runnable)futures.get(i));
- nanos = deadline - System.nanoTime();
- if (nanos <= 0L)
- return futures;
- }
-
- for (int i = 0; i < size; i++) {
- Future<T> f = futures.get(i);
- if (!f.isDone()) {
- if (nanos <= 0L)
- return futures;
- try {
- f.get(nanos, TimeUnit.NANOSECONDS);
- } catch (CancellationException ignore) {
- } catch (ExecutionException ignore) {
- } catch (TimeoutException toe) {
- return futures;
- }
- nanos = deadline - System.nanoTime();
- }
- }
- done = true;
- return futures;
- } finally {
- if (!done)
- for (int i = 0, size = futures.size(); i < size; i++)
- futures.get(i).cancel(true);
- }
- }
-
- }
这个方法是批量执行线程池的任务,最终返回一个结果数据的核心方法,通过源代码的分析,我们可以发现,这个方法只要获取到 一个结果数据,就会取消线程池中所有运行的任务,并将结果数据返回。这就好比是很多要进入一个居民小区一样,只要有一个人 有门禁卡,门卫就不再检查其他人是否有门禁卡,直接放行。
在上述代码中,我们看到提交任务使用的ExecutorCompletionService对象的submit方法,我们再来看下 ExecutorCompletionService类中的submit方法,在非定时任务类的线程池中提交任务时,本质上都是调用的Executor接口的execute方法
ScheduledExecutorService定时任务接口
派生自ExecutorService接口,拥有ExecutorService接口定义的全部方法,并扩展 了定时任务相关的方法
Executors线程池工具类
提供了几种快速创建线程池的方法
java中创建一个线程池通过ThreadPoolExecutor类实现,ThreadPoolExecutor类继承了AbstractExecutorService类,并提供了四个构造器
当然,也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录 日志或持久化存储不能处理的任务。
所以根据上面分析我们可以看到,FixedThreadPool和SigleThreadExecutor中之所以用LinkedBlockingQueue无界队列,是因为设置了corePoolSize=maxPoolSize,线程数无法动态扩展,于是就设置了无界阻塞队列来应对不可知的任务量;
而CachedThreadPool则使用的是SynchronousQueue同步移交队列,为什么使用这个队列呢?因为CachedThreadPool设置了corePoolSize=0,maxPoolSize=Integer.MAX_VALUE,来一个任务就创建一个线程来执行任务,用不到队列来存储任务;
SchduledThreadPool用的是延迟队列DelayedWorkQueue。在实际项目开发中也是推荐使用手动创建线程池的方式,而不用默认方式,关于这点在《阿里巴巴开发规范》中是这样描述的:
上面说了使用Executors工具类创建的线程池有隐患,那如何使用才能避免这个隐患呢?建立自己的线程工厂类,灵活设置关键参数
//这里默认拒绝策略为AbortPolicy private static ExecutorService executor = new ThreadPoolExecutor(10,10,60L, TimeUnit.SECONDS,new ArrayBlockingQueue(10));
使用guava包中的ThreadFactoryBuilder工厂类来构造线程池:
private static ThreadFactory threadFactory = new ThreadFactoryBuilder().build();
private static ExecutorService executorService = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10), threadFactory, new ThreadPoolExecutor.AbortPolicy());
- private static ThreadFactory threadFactory = new ThreadFactoryBuilder().build();
-
- private static ExecutorService executorService = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10), threadFactory, new ThreadPoolExecutor.AbortPolicy());
通过guava的ThreadFactory工厂类还可以指定线程组名称,这对于后期定位错误时也是很有帮助的
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-d%").build();
向线程池提交的任务有两种:Runnable和Callable,二者的区别如下:
三种提交任务的方式:
任务停止
可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线 程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务 可能永远无法终止。但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线 程。 只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务 都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪 一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭 线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。
监控任务
如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根 据线程池的使用状况快速定位问题。可以通过线程池提供的参数进行监控,在监控线程池的 时候可以使用以下属性。 ·taskCount:线程池需要执行的任务数量。 ·completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount。 ·largestPoolSize:线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是 否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。 ·getPoolSize:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销 毁,所以这个大小只增不减。 ·getActiveCount:获取活动的线程数。 通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的 beforeExecute、afterExecute和terminated方法,也可以在任务执行前、执行后和线程池关闭前执 行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最小执行时间等。 这几个方法在线程池里是空方法。
Running:运行状态,能接收新提交的任务,并且也能处理阻塞队列中的任务
shutdown():平滑关闭线程池,正在执行中的及队列中的任务能执行完成,后续进来的任务会被执行拒绝策略,当线程池处于Running状态时, 调用shutdown()方法会使线程池进入该状态
Stop(shutdownNow): 不能接收新任务,也不能处理阻塞队列中已经保存的任务,会中断正在处理任务的线程,如果线程池处于Running或 Shutdown状态,调用shutdownNow()方法,会使线程池进入该状态
Tidying: 如果所有的任务都已经终止,有效线程数为0(阻塞队列为空,线程池中的工作线程数量为0),线程池就会进入该状 态
isTerminated():当正在执行的任务及对列中的任务全部都执行(清空)完就会返回true,处于Tidying状态的线程池调用terminated()方法,会使用线程池进入该状态
因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM,并且会造成cpu过度切换。
阻塞队列可以保证任务队列中没有任务时阻塞获取任务的线程,使得线程进入wait状态,释放cpu资源。
当队列中有任务时才唤醒对应线程从队列中取出消息进行执行。
使得在线程不至于一直占用cpu资源。
(线程执行完任务后通过循环再次从任务队列中取出任务进行执行,代码片段如下
while (task != null || (task = getTask()) != null) {})。
不用阻塞队列也是可以的,不过实现起来比较麻烦而已,有好用的为啥不用呢?
要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。
CPU密集型任务
尽量使用较小的线程池,一般为CPU核心数+1。 因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,会造成CPU过度切换。
IO密集型任务
可以使用稍大的线程池,一般为2*CPU核心数。 IO密集型任务CPU使用率并不高,因此可以让CPU在等待IO的时候有其他线程去处理别的任务,充分利用CPU时间。
混合型任务
可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。 只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效。
因为如果划分之后两个任务执行时间有数据级的差距,那么拆分没有意义。
因为先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失。
性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的 线程,如配置Ncpu+1个线程的线程池。由于IO密集型任务线程并不是一直在执行任务,则应配 置尽可能多的线程,如2*Ncpu。混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务 和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量 将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过 Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。 优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高 的任务先执行。
注意 如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能 执行。
执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让 执行时间短的任务先执行。 依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越 长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。
建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点 儿,比如几千。有一次,我们系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任 务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线 程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻 塞,任务积压在线程池里。如果当时我们设置成无界队列,那么线程池的队列就会越来越多, 有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然,我们的系统所 有的任务是用单独的服务器部署的,我们使用不同规模的线程池完成不同类型的任务,但是 出现这样问题时也会影响到其他任务
当前在JDK中默认使用的线程池 ThreadPoolExecutor,在具体使用场景中,有以下几个缺点
1.core线程一般不会timeOut
2.新任务提交时,如果工作线程数小于 coreSize,会自动先创建线程,即使当前工作线程已经空闲,这样会造成空闲线程浪费
3.设置的maxSize参数只有在队列满之后,才会生效,而默认情况下容器队列会很大(比如1000)
如一个coreSize为10,maxSize为100,队列长度为1000的线程池,在运行一段时间之后的效果会是以下2个效果:
1.系统空闲时,线程池中始终保持10个线程不变,有一部分线程在执行任务,另一部分线程一直wait中(即使设置allowCoreThreadTimeOut)
2.系统繁忙时,线程池中线程仍然为10个,但队列中有还没有执行的任务(不超过1000),存在任务堆积现象
本文将描述一下简单版本的线程池,参考于 Tomcat ThreadPoolExecutor, 实现以下3个目标
1.新任务提交时,如果有空闲线程,直接让空闲线程执行任务,而非创建新线程
2.如果coreSize满了,并且线程数没有超过maxSize,则优先创建线程,而不是放入队列
3.其它规则与ThreadPoolExecutor一致,如 timeOut机制
首先看一下ThreadPoolExecutor的执行逻辑, 其基本逻辑如下
1.如果线程数小于coreSize,直接创建新线程并执行(coreSize逻辑)
2.尝试放入队列
3.放入队列失败,则尝试创建新线程(maxSize逻辑)
而执行线程的任务执行逻辑,就是不断地从队列里面获取任务并执行,换言之,即如果有执行线程,直接往队列里面放任务,执行线程就会被通知到并直接执行任务
空闲线程优先
空闲线程优先在基本逻辑中,即如果线程数小于coreSize,但如果有空闲线程,就取消创建线程的逻辑. 在有空闲线程的情况下,直接将任务放入队列中,即达到任务执行的目的。
这里的逻辑即是直接调整默认的ThreadPoolExecutor逻辑,通过重载 execute(Runnable) 方法达到效果. 具体代码如下所示:
public void execute(Runnable command) { //此处优先处理有活跃线程的情况,避免在<coreSize时,直接创建线程 if(getActiveCount() < getPoolSize()) { if(pool1.offer(command)) { return; } } super.execute(command); }
coreSize满了优先创建线程
从之前的逻辑来看,如果放入队列失败,则尝试创建新线程。在这个时候,相应的coreSize肯定已经满了。那么,只需要处理一下逻辑,将其offer调整为false,即可以实现相应的目的。
这里的逻辑,即是重新定义一个BlockingDeque,重载相应的offer方法,相应的参考如下:
public boolean offer(Runnable o) { //这里的parent为ThreadPoolExecutor的引用 int poolSize = parent.getPoolSize(); int maxPoolSize = parent.getMaximumPoolSize(); //还没到最大值,先创建线程 if(poolSize < maxPoolSize) { return false; } //默认逻辑 return super.offer(o); }
在ThreadPoolExecutor类中有两个比较重要的方法引起了我们的注意:beforeExecute和afterExecute
这两个方法是protected修饰的,很显然是留给开发人员去重写方法体实现自己的业务逻辑,非常适合做钩子函数,在任务run方法的前后增加业务逻辑,比如添加日志、统计等。这个和我们springmvc中拦截器的preHandle和afterCompletion方法很类似,都是对方法进行环绕,类似于spring的AOP
- public class MyTest {
- public static void main(String[] args) throws InterruptedException {
- List<Integer> list = new Vector<>();
- ExecutorService executorService = Executors.newFixedThreadPool(1000);
- for (int i = 0; i < 1000; i++) {
- final int index = i;
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName()+"-index="+index);
- try {
- Thread.sleep(1000);
- list.add(index);
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- });
- }
- //不在接收新的任务
- executorService.shutdown();
- // 等待所有线程执行完毕
- executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
- System.out.println("all element size====================================================="+list.size());
- }
- }
Java中的线程池设计得非常巧妙,可以高效并发执行多个任务,但是在某些场景下需要对 线程池进行扩展才能更好地服务于系统。例如,如果一个任务仍进线程池之后,运行线程池的 程序重启了,那么线程池里的任务就会丢失。另外,线程池只能处理本机的任务,在集群环境 下不能有效地调度所有机器的任务。所以,需要结合线程池开发一个异步任务处理池。图11-2 为异步任务池设计图。
任务池的主要处理流程是,每台机器会启动一个任务池,每个任务池里有多个线程池,当 某台机器将一个任务交给任务池后,任务池会先将这个任务保存到数据中,然后某台机器上 的任务池会从数据库中获取待执行的任务,再执行这个任务。
每个任务有几种状态,分别是创建(NEW)、执行中(EXECUTING)、RETRY(重试)、挂起 (SUSPEND)、中止(TEMINER)和执行完成(FINISH)。
·创建:提交给任务池之后的状态。
·执行中:任务池从数据库中拿到任务执行时的状态。
·重试:当执行任务时出现错误,程序显式地告诉任务池这个任务需要重试,并设置下一次 执行时间。
·挂起:当一个任务的执行依赖于其他任务完成时,可以将这个任务挂起,当收到消息后, 再开始执行。
·中止:任务执行失败,让任务池停止执行这个任务,并设置错误消息告诉调用端。
·执行完成:任务执行结束。
任务池的任务隔离。异步任务有很多种类型,比如抓取网页任务、同步数据任务等,不同 类型的任务优先级不一样,但是系统资源是有限的,如果低优先级的任务非常多,高优先级的 任务就可能得不到执行,所以必须对任务进行隔离执行。使用不同的线程池处理不同的任务, 或者不同的线程池处理不同优先级的任务,如果任务类型非常少,建议用任务类型来隔离,如 果任务类型非常多,比如几十个,建议采用优先级的方式来隔离。
任务池的重试策略。根据不同的任务类型设置不同的重试策略,有的任务对实时性要求 高,那么每次的重试间隔就会非常短,如果对实时性要求不高,可以采用默认的重试策略,重 试间隔随着次数的增加,时间不断增长,比如间隔几秒、几分钟到几小时。每个任务类型可以 设置执行该任务类型线程池的最小和最大线程数、最大重试次数。
使用任务池的注意事项。任务必须无状态:任务不能在执行任务的机器中保存数据,比如 某个任务是处理上传的文件,任务的属性里有文件的上传路径,如果文件上传到机器1,机器2 获取到了任务则会处理失败,所以上传的文件必须存在其他的集群里,比如OSS或SFTP。
异步任务的属性。包括任务名称、下次执行时间、已执行次数、任务类型、任务优先级和 执行时的报错信息(用于快速定位问题)
线程池执行流程如下
1)线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作 线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。
2)线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这 个工作队列里。如果工作队列满了,则进入下个流程。
3)线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程 来执行任务。如果已经满了,则交给饱和策略来处理这个任务
源码分析:上面的流程分析让我们很直观地了解了线程池的工作原理,让我们再通过源代 码来看看是如何实现的,线程池执行任务的方法如下
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- // 如果线程数小于基本线程数,则创建线程并执行当前任务
- if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
- // 如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中。
- if (runState == RUNNING && workQueue.offer(command)) {
- if (runState != RUNNING || poolSize == 0)
- ensureQueuedTaskHandled(command);
- }
- // 如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,
- // 则创建一个线程执行任务。
- else if (!addIfUnderMaximumPoolSize(command))
- // 抛出RejectedExecutionException异常
- reject(command); // is shutdown or saturated
- }
- }
工作线程:线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务 后,还会循环获取工作队列里的任务来执行。我们可以从Worker类的run()方法里看到这点。
具体的服用原理
1.线程池里执行的是任务,核心逻辑在ThreadPoolExecutor类的execute方法中,同时ThreadPoolExecutor中维护了HashSet<Worker> workers;
2.addWorker()方法来创建线程执行任务,如果是核心线程的任务,会赋值给Worker的firstTask属性;
3.Worker实现了Runnable,本质上也是任务,核心在run()方法里;
4.run()方法的执行核心runWorker(),自旋拿任务while (task != null || (task = getTask()) != null)),task是核心线程Worker的firstTask或者getTask();
5.getTask()的核心逻辑:
1.若当前工作线程数量大于核心线程数->说明此线程是非核心工作线程,通过poll()拿任务,未拿到任务即getTask()返回null,然后会在processWorkerExit(w, completedAbruptly)方法释放掉这个非核心工作线程的引用;
2.若当前工作线程数量小于核心线程数->说明此时线程是核心工作线程,通过take()拿任务
3.take()方式取任务,如果队列中没有任务了会调用await()阻塞当前线程,直到新任务到来,所以核心工作线程不会被回收; 当执行execute方法里的workQueue.offer(command)时会调用Condition.singal()方法唤醒一个之前阻塞的线程,这样核心线程即可复用
ThreadPoolExecutor中线程执行任务的示意图
线程池中的线程执行任务分两种情况,如下。
1)在execute()方法中创建一个线程时,会让这个线程执行当前任务。
2)这个线程执行完上图中1的任务后,会反复从BlockingQueue获取任务来执行
可以用Executors工具类根据不同场景创建对应的线程池
创建只有一个线程的线程池;
保证所有任务按照指 定顺序(先入先出或者优先级)执行;
如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它;
- public void singleThreadExecutorDemo(){
- ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
- for (int i = 0; i < 3; i++) {
- final int index = i;
-
- singleThreadExecutor.execute(new Runnable() {
-
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName()+", index="+index);
- }
- });
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- pool-1-thread-1, index=0
- pool-1-thread-1, index=1
- pool-1-thread-1, index=2
从运行结果可以看出,所有任务都是在单一线程运行的。
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程, 那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。线程池的大小上限为Integer.MAX_VALUE
- public void cachedThreadPoolDemo(){
- ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
- for (int i = 0; i < 5; i++) {
- final int index = i;
-
- cachedThreadPool.execute(new Runnable() {
-
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName()+", index="+index);
- }
- });
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- pool-1-thread-1, index=0
- pool-1-thread-1, index=1
- pool-1-thread-1, index=2
- pool-1-thread-1, index=3
- pool-1-thread-1, index=4
-
从运行结果可以看出,整个过程都在同一个线程pool-1-thread-1中运行,后面线程复用前面的线程。
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,超出的线程会在队列中等待;如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
- public void fixedThreadPoolDemo(){
- ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
- for (int i = 0; i < 6; i++) {
- final int index = i;
-
- fixedThreadPool.execute(new Runnable() {
-
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName()+", index="+index);
- }
- });
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- pool-1-thread-1, index=0
- pool-1-thread-2, index=1
- pool-1-thread-3, index=2
- pool-1-thread-1, index=3
- pool-1-thread-2, index=4
- pool-1-thread-3, index=5
从运行结果可以看出,线程池大小为3,每休眠1s后将任务提交给线程池的各个线程轮番交错地执行。线程池的大小设置,可参数Runtime.getRuntime().availableProcessors()。
创建一个定长的线程池,可定时执行或周期执行任务,该方法可指定线程池的核心线程个数
- public void scheduledThreadPoolDemo(){
- ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
- //定时执行一次的任务,延迟1s后执行
- scheduledThreadPool.schedule(new Runnable() {
-
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName()+", delay 1s");
- }
- }, 1, TimeUnit.SECONDS);
-
- //周期性地执行任务,延迟2s后,每3s一次地周期性执行任务
- scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName()+", every 3s");
- }
- }, 2, 3, TimeUnit.SECONDS);
- }
-
- pool-1-thread-1, delay 1s
- pool-1-thread-1, every 3s
- pool-1-thread-2, every 3s
- pool-1-thread-2, every 3s
ScheduledExecutorService功能强大,对于定时执行的任务,建议多采用该方法。
以上四种线程池,都是基于ThreadPoolExecutor创建的线程池,只是new ThreadPoolExecutor()的时候参数不同而已。
创建一个单线程化的线程池,支持定时、周期性的任务执行
JDK1.8新增线程池,一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用cpu数量的线程来并行执行。 newWorkStealingPool不是ThreadPoolExecutor的扩展,它是新的线程池类ForkJoinPool的扩展,但是都是在统一的一个Executors类中实现;由于能够合理的使用CPU进行对任务操作(并行操作),所以适合使用在很耗时的任务中。
Fork/Join框架是⼀个实现了ExecutorService接⼝的多线程处理器,也是Java 7开始引入的一种新的Fork/Join线程池。
它专为那些可 以通过递归分解成更细⼩的任务⽽设计,最⼤化的利⽤多核处理器来提⾼应⽤程序 的性能。
与其他ExecutorService相关的实现相同的是,Fork/Join框架会将任务分配给线程 池中的线程。⽽与之不同的是,Fork/Join框架在执⾏任务时使⽤了⼯作窃取算法。
fork在英⽂⾥有分叉的意思,join在英⽂⾥连接、结合的意思。顾名思义,fork就 是要使⼀个⼤任务分解成若⼲个⼩任务,⽽join就是最后将各个⼩任务的结果结合 起来得到⼤任务的结果。
我们举个例子:如果要计算一个超大数组的和,最简单的做法是用一个循环在一个线程内完成:
还有一种方法,可以把数组拆成两部分,分别计算,最后加起来就是最终结果,这样可以用两个线程并行执行:
如果拆成两部分还是很大,我们还可以继续拆,用4个线程并行执行
这就是Fork/Join任务的原理,Fork/Join的运⾏流程⼤致如下所示
需要注意的是,图⾥的次级⼦任务可以⼀直分下去,⼀直分到⼦任务⾜够⼩为⽌
⼯作窃取算法指的是在多线程执⾏不同任务队列的过程中,某个线程执⾏完⾃⼰队 列的任务后从其他线程的任务队列⾥窃取任务来执⾏。 ⼯作窃取流程如下图所示:
值得注意的是,当⼀个线程窃取另⼀个线程的时候,为了减少两个任务线程之间的 竞争,我们通常使⽤双端队列来存储任务。被窃取的任务线程都从双端队列的头部 拿任务执⾏,⽽窃取其他任务的线程从双端队列的尾部执⾏任务。
另外,当⼀个线程在窃取任务时要是没有其他可⽤的任务了,这个线程会进⼊阻塞 状态以等待再次“⼯作”。
综上Fork/Join包含以下两个操作
1、任务分割:首先 Fork/Join 框架需要把大的任务分割成足够小的子任务,如果 子任务比较大的话还要对子任务进行继续分割
2、执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程 分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里, 启动一个线程从队列里取数据,然后合并这些数据。
在 Java 的 Fork/Join 框架中,使用两个类完成上述操作
ForkJoinPool 由 ForkJoinTask 数组和 ForkJoinWorkerThread 数组组成,
ForkJoinTask 数组负责将存放以及将程序提交给 ForkJoinPool,而 ForkJoinWorkerThread 负责执行这些任务。
前⾯我们说Fork/Join框架简单来讲就是对任务的分割与⼦任务的合并,所以要实现 这个框架,先得有任务。在Fork/Join框架⾥提供了抽象类 ForkJoinTask 来实现任 务。
ForkJoinTask是⼀个类似普通线程的实体,但是⽐普通线程轻量得多。
fork()⽅法:使⽤线程池中的空闲线程异步提交任务
- // 本⽂所有代码都引⾃Java 8
- public final ForkJoinTask<V> fork() {
- Thread t;
- // ForkJoinWorkerThread是执⾏ForkJoinTask的专有线程,由ForkJoinPool管理
- // 先判断当前线程是否是ForkJoin专有线程,如果是,则将任务push到当前线程所负责的队列⾥去
- if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
- ((ForkJoinWorkerThread)t).workQueue.push(this);
- else
- // 如果不是则将线程加⼊队列
- // 没有显式创建ForkJoinPool的时候⾛这⾥,提交任务到默认的common线程池中
- ForkJoinPool.common.externalPush(this);
- return this;
- }
其实fork()只做了⼀件事,那就是把任务推⼊当前⼯作线程的⼯作队列⾥。
join()⽅法:等待处理任务的线程处理完毕,获得返回值。
我们在之前介绍过说Thread.join()会使线程阻塞,⽽ForkJoinPool.join()会使线程免 于阻塞,下⾯是ForkJoinPool.join()的流程图
RecursiveAction和RecursiveTask
通常情况下,在创建任务的时候我们⼀般不直接继承ForkJoinTask,⽽是继承它的 ⼦类RecursiveAction和RecursiveTask。 两个都是ForkJoinTask的⼦类,RecursiveAction可以看做是⽆返回值的 ForkJoinTask,RecursiveRask是有返回值的ForkJoinTask。
此外,两个⼦类都有执⾏主要计算的⽅法compute(),当然,RecursiveAction的 compute()返回void,RecursiveTask的compute()有具体的返回值。
ForkJoinPool是⽤于执⾏ForkJoinTask任务的执⾏(线程)池。 ForkJoinPool管理着执⾏池中的线程和任务队列,此外,执⾏池是否还接受任务, 显示线程的运⾏状态也是在这⾥处理。 我们来⼤致看下ForkJoinPool的源码:
- public class ForkJoinPool extends AbstractExecutorService {
- // 任务队列
- volatile WorkQueue[] workQueues;
- // 线程的运⾏状态
- volatile int runState;
- // 创建ForkJoinWorkerThread的默认⼯⼚,可以通过构造函数重写
- public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThread
- // 公⽤的线程池,其运⾏状态不受shutdown()和shutdownNow()的影响
- static final ForkJoinPool common;
- // 私有构造⽅法,没有任何安全检查和参数校验,由makeCommonPool直接调⽤
- // 其他构造⽅法都是源⾃于此⽅法
- // parallelism: 并⾏度,
- // 默认调⽤java.lang.Runtime.availableProcessors() ⽅法返回可⽤处理器的数量
- private ForkJoinPool(int parallelism,
- ForkJoinWorkerThreadFactory factory, // ⼯作线程⼯⼚
- UncaughtExceptionHandler handler, // 拒绝任务的handler
- int mode, // 同步模式
- String workerNamePrefix) { // 线程名prefix
- this.workerNamePrefix = workerNamePrefix;
- this.factory = factory;
- this.ueh = handler;
- this.config = (parallelism & SMASK) | mode;
- long np = (long)(-parallelism); // offset ctl counts
- this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK)
- }
- }
WorkQueue
双端队列,ForkJoinTask存放在这⾥。 当⼯作线程在处理⾃⼰的⼯作队列时,会从队列尾取任务来执⾏(LIFO);如果是 窃取其他队列的任务时,窃取的任务位于所属任务队列的队⾸(FIFO)。
ForkJoinPool与传统线程池最显著的区别就是它维护了⼀个⼯作队列数组(volatile WorkQueue[] workQueues,ForkJoinPool中的每个⼯作线程都维护着⼀个⼯作队 列)。
runState
ForkJoinPool的运⾏状态。SHUTDOWN状态⽤负数表示,其他⽤2的幂次表示。
上⾯我们说ForkJoinPool负责管理线程和任务,ForkJoinTask实现fork和join操作, 所以要使⽤Fork/Join框架就离不开这两个类了,只是在实际开发中我们常⽤ ForkJoinTask的⼦类RecursiveTask 和RecursiveAction来替代ForkJoinTask。
案例1:Fork/Join对大数据进行并行求和:
- public class Main {
- public static void main(String[] args) throws Exception {
- // 创建2000个随机数组成的数组:
- long[] array = new long[2000];
- long expectedSum = 0;
- for (int i = 0; i < array.length; i++) {
- array[i] = random();
- expectedSum += array[i];
- }
- System.out.println("Expected sum: " + expectedSum);
- // fork/join:
- ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
- long startTime = System.currentTimeMillis();
- Long result = ForkJoinPool.commonPool().invoke(task);
- long endTime = System.currentTimeMillis();
- System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
- }
-
- static Random random = new Random(0);
-
- static long random() {
- return random.nextInt(10000);
- }
- }
-
- class SumTask extends RecursiveTask<Long> {
- static final int THRESHOLD = 500;
- long[] array;
- int start;
- int end;
-
- SumTask(long[] array, int start, int end) {
- this.array = array;
- this.start = start;
- this.end = end;
- }
-
- @Override
- protected Long compute() {
- if (end - start <= THRESHOLD) {
- // 如果任务足够小,直接计算:
- long sum = 0;
- for (int i = start; i < end; i++) {
- sum += this.array[i];
- // 故意放慢计算速度:
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- }
- }
- return sum;
- }
- // 任务太大,一分为二:
- int middle = (end + start) / 2;
- System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
- SumTask subtask1 = new SumTask(this.array, start, middle);
- SumTask subtask2 = new SumTask(this.array, middle, end);
- invokeAll(subtask1, subtask2);
- Long subresult1 = subtask1.join();
- Long subresult2 = subtask2.join();
- Long result = subresult1 + subresult2;
- System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);
- return result;
- }
- }
Fork/Join框架在Java标准库中就有应用。Java标准库java.util.Arrays.parallelSort(array)
可以进行并行排序,它的原理就是内部通过Fork/Join对大数组分拆进行并行排序,在多核CPU上就可以大大提高排序的速度。
如果要计算的任务⽐较简单(⽐如我们案例中的斐波那契数列),那当然是直接使 ⽤单线程会更快⼀些。但如果要计算的东⻄⽐较复杂,计算机⼜是多核的情况下, 就可以充分利⽤多核CPU来提⾼计算速度。 另外,Java 8 Stream的并⾏操作底层就是⽤到了Fork/Join框架
ForkJoinTask 在执行的时候可能会抛出异常,但是我们没办法在主线程里直接 捕获异常,所以 ForkJoinTask 提供了 isCompletedAbnormally()方法来检查 任务是否已经抛出异常或已经被取消了,并且可以通过 ForkJoinTask 的 getException 方法获取异常。 getException 方法返回 Throwable 对象,如果任务被取消了则返回 CancellationException。如果任务没有完成或者没有抛出异常则返回 null。
从Java 8 开始,我们可以使⽤ Stream 接⼝以及lambda表达式进⾏“流式计算”。它 可以让我们对集合的操作更加简洁、更加可读、更加⾼效。 Stream接⼝有⾮常多⽤于集合计算的⽅法,⽐如判空操作empty、过滤操作filter、 求最max值、查找操作findFirst和findAny等等。
关于stream用法,参考前面文章
Stream接⼝默认是使⽤串⾏的⽅式,也就是说在⼀个线程⾥执⾏。下⾯举⼀个例 ⼦:
- public static void main(String[] args) {
- Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
- .reduce((a, b) -> {
- System.out.println(String.format("%s: %d + %d = %d",
- Thread.currentThread().getName(), a, b, a + b));
- return a + b;
- })
- .ifPresent(System.out::println);
- }
我们来理解⼀下这个⽅法。⾸先我们⽤整数1~9创建了⼀个 Stream 。这⾥的 Stream.of(T... values)⽅法是Stream接⼝的⼀个静态⽅法,其底层调⽤的是 Arrays.stream(T[] array)⽅法。 然后我们使⽤了 reduce ⽅法来计算这个集合的累加和。 reduce ⽅法这⾥做的是: 从前两个元素开始,进⾏某种操作(我这⾥进⾏的是加法操作)后,返回⼀个结 果,然后再拿这个结果跟第三个元素执⾏同样的操作,以此类推,直到最后的⼀个 元素。 我们来打印⼀下当前这个reduce操作的线程以及它们被操作的元素和返回的结果以 及最后所有reduce⽅法的结果,也就代表的是数字1到9的累加和
- main: 1 + 2 = 3
- main: 3 + 3 = 6
- main: 6 + 4 = 10
- main: 10 + 5 = 15
- main: 15 + 6 = 21
- main: 21 + 7 = 28
- main: 28 + 8 = 36
- main: 36 + 9 = 45
- 45
可以看到,默认情况下,它是在⼀个单线程运⾏的,也就是main线程。然后每次 reduce操作都是串⾏起来的,⾸先计算前两个数字的和,然后再往后依次计算。
我们思考上⾯⼀个例⼦,是不是⼀定要在单线程⾥进⾏串⾏地计算呢?假如我的计 算机是⼀个多核计算机,我们在理论上能否利⽤多核来进⾏并⾏计算,提⾼计算效 率呢?
当然可以,⽐如我们在计算前两个元素1 + 2 = 3的时候,其实我们也可以同时在另 ⼀个核计算 3 + 4 = 7。然后等它们都计算完成之后,再计算 3 + 7 = 10的操作。 是不是很熟悉这样的操作⼿法?没错,它就是ForkJoin框架的思想。
下⾯⼩⼩地修 改⼀下上⾯的代码,增加⼀⾏代码,使Stream使⽤多线程来并⾏计算:
- public static void main(String[] args) {
- Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
- .parallel()
- .reduce((a, b) -> {
- System.out.println(String.format("%s: %d + %d = %d",
- Thread.currentThread().getName(), a, b, a + b));
- return a + b;
- })
- .ifPresent(System.out::println);
- }
可以看到,与上⼀个案例的代码只有⼀点点区别,就是在reduce⽅法被调⽤之前, 调⽤了parallel()⽅法。下⾯来看看这个⽅法的输出
- ForkJoinPool.commonPool-worker-1: 3 + 4 = 7
- ForkJoinPool.commonPool-worker-5: 1 + 2 = 3
- ForkJoinPool.commonPool-worker-2: 8 + 9 = 17
- ForkJoinPool.commonPool-worker-4: 5 + 6 = 11
- ForkJoinPool.commonPool-worker-5: 3 + 7 = 10
- ForkJoinPool.commonPool-worker-2: 7 + 17 = 24
- ForkJoinPool.commonPool-worker-2: 11 + 24 = 35
- ForkJoinPool.commonPool-worker-2: 10 + 35 = 45
- 45
可以很明显地看到,它使⽤的线程是 ForkJoinPool ⾥⾯的 commonPool ⾥⾯的 worker线程。并且它们是并⾏计算的,并不是串⾏计算的。但由于Fork/Join框架 的作⽤,它最终能很好的协调计算结果,使得计算结果完全正确。
如果我们⽤Fork/Join代码去实现这样⼀个功能,那⽆疑是⾮常复杂的。但Java8提 供了并⾏式的流式计算,⼤⼤简化了我们的代码量,使得我们只需要写很少很简单 的代码就可以利⽤计算机底层的多核资源。
上⾯我们通过在控制台输出线程的名字,看到了Stream的并⾏计算底层其实是使⽤ 的Fork/Join框架。那它到底是在哪使⽤Fork/Join的呢?我们从源码上来解析⼀下上 述案例。
Stream.of ⽅法就不说了,它只是⽣成⼀个简单的Stream。先来看 看 parallel() ⽅法的源码。这⾥由于我的数据是 int 类型的,所以它其实是使⽤ 的 BaseStream 接⼝的 parallel() ⽅法。⽽ BaseStream 接⼝的JDK唯⼀实现类是⼀ 个叫 AbstractPipeline 的类。下⾯我们来看看这个类的 parallel() ⽅法的代码:
public final S parallel() { sourceStage.parallel = true; return (S) this; }
这个⽅法很简单,就是把⼀个标识 sourceStage.parallel 设置为 true 。然后返回 实例本身。 接着我们再来看 reduce 这个⽅法的内部实现。 Stream.reduce()⽅法的具体实现是交给了 ReferencePipeline 这个抽象类,它是继 承了 AbstractPipeline 这个类的:
- // ReferencePipeline抽象类的reduce⽅法
- @Override
- public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
- // 调⽤evaluate⽅法
- return evaluate(ReduceOps.makeRef(accumulator));
- }
- final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
- assert getOutputShape() == terminalOp.inputShape();
- if (linkedOrConsumed)
- throw new IllegalStateException(MSG_STREAM_LINKED);
- linkedOrConsumed = true;
- return isParallel() // 调⽤isParallel()判断是否使⽤并⾏模式
- ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOp
- : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getO
- }
- @Override
- public final boolean isParallel() {
- // 根据之前在parallel()⽅法设置的那个flag来判断。
- return sourceStage.parallel;
- }
可以看到,对应的是Stream的⼏种主要的计算操作。我们这⾥的示例代码使⽤的是 reduce计算,那我们就看看ReduceOp类的这个⽅法的源码:
- // java.util.stream.ReduceOps.ReduceOp.evaluateParallel
- @Override
- public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
- Spliterator<P_IN> spliterator) {
- return new ReduceTask<>(this, helper, spliterator).invoke().get();
- }
evaluateParallel⽅法创建了⼀个新的ReduceTask实例,并且调⽤了invoke()⽅法后 再调⽤get()⽅法,然后返回这个结果。那这个ReduceTask是什么呢?
它的invoke ⽅法内部⼜是什么呢? 追溯源码我们可以发现,ReduceTask类是ReduceOps类的⼀个内部类,它继承了 AbstractTask类,⽽AbstractTask类⼜继承了CountedCompleter类,⽽ CountedCompleter类⼜继承了ForkJoinTask类! 它们的继承关系如下:
ReduceTask -> AbstractTask -> CountedCompleter -> ForkJoinTask
这⾥的ReduceTask的invoke⽅法,其实是调⽤的ForkJoinTask的invoke⽅法,中间 三层继承并没有覆盖这个⽅法的实现。 所以这就从源码层⾯解释了Stream并⾏的底层原理是使⽤了Fork/Join框架。
我们可以在本地测试⼀下如果在多核情况下,Stream并⾏计算会给我们的程序带来 多⼤的效率上的提升。⽤以下示例代码来计算⼀千万个随机数的和:
- public class Test {
- public static void main(String[] args) {
- System.out.println(String.format("本计算机的核数:%d", Runtime.getRuntime().availableProcessors()));
- // 产⽣100w个随机数(1 ~ 100),组成列表
- Random random = new Random();
- List<Integer> list = new ArrayList<>(1000_0000);
- for (int i = 0; i < 1000_000; i++) {
- list.add(random.nextInt(100));
- }
- long prevTime = getCurrentTime();
- list.stream().reduce((a, b) -> a + b).ifPresent(System.out::println);
- System.out.println(String.format("单线程计算耗时:%d", getCurrentTime() -prevTime));
- prevTime = getCurrentTime();
- list.stream().parallel().reduce((a, b) -> a + b).ifPresent(System.out::println);
- System.out.println(String.format("多线程计算耗时:%d", getCurrentTime() -prevTime));
- }
-
- private static long getCurrentTime() {
- return System.currentTimeMillis();
- }
-
- }
输出
- 本计算机的核数:8
- 495156156
- 单线程计算耗时:223
- 495156156
- 多线程计算耗时:95
所以在多核的情况下,使⽤Stream的并⾏计算确实⽐串⾏计算能带来很⼤效率上的 提升,并且也能保证结果计算完全准确。
本⽂⼀直在强调的“多核”的情况。其实可以看到,我的本地电脑有8核,但并⾏计算 耗时并不是单线程计算耗时除以8,因为线程的创建、销毁以及维护线程上下⽂的 切换等等都有⼀定的开销。所以如果你的服务器并不是多核服务器,那也没必要⽤ Stream的并⾏计算。因为在单核的情况下,往往Stream的串⾏计算⽐并⾏计算更 快,因为它不需要线程切换的开销。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。