赞
踩
1、Executor接口:顶层抽象接口最大的作用就是解耦了任务和任务的执行
只有一个void execute(Runnable command);
方法,该方法用来执行任务,因为是接口方法,根据重写方式不同的不同,就会导致该任务的实现方式不同
2、ExecutorService接口:继承了Executor接口,增加了任务的管理与异步运行(Future模式:比如<T> Future<T> submit(Runnable task, T result);
)
3、ScheduledExecutorService接口:添加了处理延迟执行或者周期任务。
4、AbstractExecutorService子类:实现ExecutorService接口,为各类执行器类提供基础
5、ThreadPoolExecutor子类:ExecutorService的具体线程池实现。
6、ScheduledThreadPoolExecutor子类:增加了延迟执行和异步执行的ExecutorService的具体线程池实现。
shutdown与shutdownNow区别:
(1)shutdown调用后,不可以再submit新的task,已经submit的将继续执行。终止空闲线程
(2)shutdownNow试图停止当前正执行的task,并返回尚未执行的task的list,终止所有线程
isShutDown与isTerminated区别:
(1)isShutDown当调用shutdown()或shutdownNow()方法后返回为true。
(2)isTerminated当调用shutdown()方法后,并且所有提交的任务完成后返回为true;
(3)isTerminated当调用shutdownNow()方法后,成功停止后返回为true;
(4)如果线程池任务正常完成,都为false
2、监视执行器的状态;
3、拓展了提交有返回值的任务,提供对异步任务的支持,主要体现在返回值为Future
(Future模式在我另一篇篇博客:https://blog.csdn.net/qq_32679835/article/details/90743846 )
4、提供对批处理任务的支持。
继承ExecutorService接口,主要是针对周期或者延迟执行的任务
1、scheduleAtFixedRate() 与scheduleWithFixedDelay()区别
scheduleAtFixedRate ,是以上一个任务开始的时间计时,period时间过去后,检测上一个任务是否执行完毕,如果上一个任务执行完毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。
scheduleWithFixedDelay,是以上一个任务结束时开始计时,period时间过去后,立即执行。
两者开始的时间点不同
2、返回ScheduledFuture:是在future基础上添加延迟功能
interface ScheduledFuture<V> extends Delayed, Future<V>
对ExecutorService接口的默认实现
1、invokeAny方法,通过调用内部实现的doInvokeAny,能够实现无超时等待的版本和超时等待的版本
2、invokeAll方法,通过调用内部实现的doInvokeAny,能够实现无超时等待的版本和超时等待的版本
invokeAny与invokeAll应用场景
1、invokeAny查找海量数据中的其中某一个资源,多个线程同时查找,有一个线程执行完成,就会立即返回,其他线程就会推出,不再占用CPU资源
2、invokeAll查找海量数据的所有资源,多个线程同时查找,需要全部查找出来
3、submit方法
首先将提交的任务通过newTaskFor封装为RunnableFuture对象,之后execute提交任务,返回一步计算结果对象
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
最终返回的是FutureTask对象,以让调用方以异步方式获取任务的执行结果
(futuretask讲解:https://blog.csdn.net/qq_32679835/article/details/90743846#FutureTask_115)
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
Executor执行器,能够将任务和任务的执行分离,此时就会导致执行任务的方式会千差万别,当然自己也能够通过重写execute()构造任务执行方式,因此 Doug Lea给出了两种常用的Executor执行方式,线程池执行器分别是:ThreadPoolExecutor和ScheduledThreadPoolExecutor
//低29位表示工作线程数
//高3位表示线程池状态
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 高位3bit表示的运行状态
private static final int RUNNING = -1 << COUNT_BITS;RUNNING : 接受新任务, 且处理已经进入阻塞队列的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;SHUTDOWN : 不接受新任务, 但处理已经进入阻塞队列的任务
private static final int STOP = 1 << COUNT_BITS;STOP : 不接受新任务, 且不处理已经进入阻塞队列的任务, 同时中断正在运行的任务
private static final int TIDYING = 2 << COUNT_BITS;TIDYING : 所有任务都已终止, 工作线程数为0, 线程转化为TIDYING状态并准备调用terminated方法
private static final int TERMINATED = 3 << COUNT_BITS;TERMINATED : terminated方法已经执行完成
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);//线程工厂
}
关联的线程通过线程工厂(接口)newThread来创建,Executors(可以看做是执行器的工厂)中的内部静态类实现线程工厂
创建的Thread将Worker自身作为任务,所以当调用Thread的start方法时,最终实际是调用了Worker.run()方法,该方法内部委托给runWorker方法执行任务
public void run() {
runWorker(this);
}
有四个构造函数,这是参数最全的构造函数,都是调用该构造函数
/** * 使用给定的参数创建ThreadPoolExecutor. * * @param corePoolSize 核心线程池中的最大线程数 * @param maximumPoolSize 总线程池中的最大线程数 * @param keepAliveTime 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。 * @param unit keepAliveTime的单位 * @param workQueue 任务队列, 保存已经提交但尚未被执行的线程 * @param threadFactory 线程工厂(用于指定如果创建一个线程) * @param handler 拒绝策略 (当任务太多导致工作队列满时的处理策略) */ public ThreadPoolExecutor(int corePoolSize,//核心池线程容量 int maximumPoolSize,//最大线程容量 long keepAliveTime,// TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { //... }
AbstractExecutorService中的submit方法:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
execute()控制执行任务的核心方法,将代码转化为流程图:
1、如果工作线程数小于核心线程池上限(CorePoolSize),则直接新建一个工作线程并执行任务;
2、如果工作线程数大于等于CorePoolSize,则尝试将任务加入到队列等待以后执行。如果加入队列失败了(比如队列已满的情况),则在总线程池未满的情况下(CorePoolSize ≤ 工作线程数 < maximumPoolSize)新建一个工作线程立即执行任务,否则执行拒绝策略。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) {//小于核心线程池数,添加工作线程并执行 if (addWorker(command, true))//true表示线程绑定任务,绑定任务的线程直接执行该任务 return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) {//工作线程数>=核心线程数,插入到阻塞队列 int recheck = ctl.get(); //不是RUnning状态,移除任务并拒绝 if (! isRunning(recheck) && remove(command)) reject(command); //线程池非running状态,但是线程池中依旧存在任务,需要添加一个线程处理任务 else if (workerCountOf(recheck) == 0) addWorker(null, false);//false表示线程未绑定任务,直接从阻塞队列中获取任务来执行 } else if (!addWorker(command, false)) reject(command); }
addWorker(null, false);这一行,这要结合addWorker一起来看。 主要目的是防止HUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。
(1)第一部分如果状态不适合接受新任务,或者工作线程数超出了限制,则直接返回false。
(2)第二部分才真正去创建工作线程并执行任务,可以分为两类线程,第一类:与firftTask任务相关联,处理完关联任务处理其他任务,第二类:针对RUNNING状态 || SHUTDONW状态下清理队列中剩余的任务,直接去清理任务
//firstTask:新增一个线程并执行这个任务,可空,增加的线程从队列获取任务; private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);//线程池状态 // 这条语句等价:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || // workQueue.isEmpty()) // 满足下列调价则直接返回false,线程创建失败: // rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此时不再接受新的任务,且所有任务执行结束 // rs = SHUTDOWN:firtTask != null 此时不再接受任务,但是仍然会执行队列中的任务 // rs = SHUTDOWN:firtTask == null见execute方法的addWorker(null,false),任务为null && 队列为空 // 最后一种情况也就是说SHUTDONW状态下,如果队列不为空还得接着往下执行,为什么?add一个null任务目的到底是什么? // 看execute方法只有workCount==0的时候firstTask才会为null结合这里的条件就是线程池SHUTDOWN了不再接受新任务 // 但是此时队列不为空,那么还得创建线程把任务给执行完才行。 if (rs >= SHUTDOWN && //不再接受任务 ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) //队列为空,不再接受任务,不需要再创建工作线程 return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || //工作线程数超过最大线程 wc >= (core ? corePoolSize : maximumPoolSize))//true:核心线程,线程数超过核心线程 //false:非核心线程,线程数超过非核心线程数 return false; if (compareAndIncrementWorkerCount(c))//工作线程数+1 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs)//线程池状态改变 continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask);//绑定线程与任务 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // RUNNING状态 || SHUTDONW状态下清理队列中剩余的任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); //加入到线程集合 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start();//启动线程 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w);//创建线程失败,回滚操作 } return workerStarted; }
线程startt()–>Worker的run()—委托–>runWorker()方法
第一次启动会执行初始化传进来的任务firstTask;
然后会从workQueue中取任务执行,如果队列为空则等待keepAliveTime这么长时间。
1、循环获取任务
2、控制执行线程的中断状态,保证如果线程池正在停止,则线程必须是中断状态,否则线程必须不是中断状态;
2、run()执行任务
final void runWorker(Worker w) { Thread wt = Thread.currentThread();//执行任务的线程 Runnable task = w.firstTask;//任务,null的时候获取任务 w.firstTask = null; w.unlock(); //允许线程中断 boolean completedAbruptly = true;//中断退出 try { while (task != null || (task = getTask()) != null) {//不断获取任务 w.lock(); // 1.保证当线程池状态为STOP/TIDYING/TERMINATED时,当前执行任务的线程wt是中断状态(因为线程池处于上述任一状态时,均不能再执行新任务) // 2.保证当线程池状态为RUNNING/SHUTDOWN时,当前执行任务的线程wt不是中断状态 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task);//钩子函数,任务执行前插入一些操作 Throwable thrown = null; try { task.run();//运行 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false;//没有携带任务,也无法获取任务 } finally { processWorkerExit(w, completedAbruptly);//工作线程退出 } }
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) //中断,工作线程数-1,// 正常的话再runWorker的getTask方法workerCount已经被减一了 decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks;//线程池完成任务数增加 workers.remove(w);// 从线程池中移除超时或者出现异常的线程 } finally { mainLock.unlock(); } tryTerminate();//线程池状态判断是否需要终止线程池 int c = ctl.get(); // runState为RUNNING或SHUTDOWN if (runStateLessThan(c, STOP)) { // 线程不是异常结束 if (!completedAbruptly) { // 线程池最小空闲数,允许core thread超时就是0,否则就是corePoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果min == 0但是队列不为空要保证有1个线程来执行队列中的任务 if (min == 0 && !workQueue.isEmpty()) min = 1; // 线程池还不为空那就不用担心了 if (workerCountOf(c) >= min) return; // replacement not needed } // 1.线程异常退出 // 2.线程池为空,但是队列中还有任务没执行,看addWoker方法对这种情况的处理,虽然当前线程异常退出,但可能任务队列依旧存在任务,需要去执行 addWorker(null, false); } }
循环在阻塞队列中获取任务,获取失败返回null
private Runnable getTask() { boolean timedOut = false; // 上次获取任务是否超时 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 1、stop状态之上,线程池已停止,不允许获取任务 //2、shutdown状态,但是任务队列为空,不允许在获取任务 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); //一般核心线程不设置超时限制,设置了超时限制,否则不会超时回收timed = true || false //非核心线程,一定超时回收,timed = false(可以忽略) || true;因为wc > corePoolSize条件一定满足 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //获取任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true;//超时未获取到任务 } catch (InterruptedException retry) { timedOut = false; } } }
触发条件:(1)核心线程池满,阻塞队列满,非核心线程池满(2)ThreadPoolExecutor关闭
ScheduleExecutoService接口的实现类,继承自ThreadPoolExecutor,主要任务:处理周期、延时任务
作为ThreadPoolExecutor的子类,构造方法内部调用了父类的构造方法,阻塞队列选择DelayedWorkQueue(内部静态类,与DelayedQueue类似)
Delayed接口:主要处理延时任务
RunnableScheduledFuture接口:主要时提供了一个周期方法
Future:异步接口
FutureTask:主要处理异步任务(是一个类)
综合:继承自FutureTask,实现了RunnableScheduledFuture,可以异步执行任务(周期/延时任务)。重写了延迟getDelay()和compareTo()和isPeriodic()方法
与DelayedQueue类似,自己实现了对排序
1、offer()添加元素
public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;//返回RunnableScheduledFuture任务 final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow();//扩容 size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e);//上浮 } if (queue[0] == e) { // 当前元素是首个元素 leader = null; available.signal(); // 唤醒一个等待线程 } } finally { lock.unlock(); } return true; }
2、take()方法
public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0];//获取队首元素 if (first == null)//队首元素为空,则等待 available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0)//队首元素到期,弹出队首元素 return finishPoll(first);//重新调整堆 first = null; //队首元素还未到期 //有leader则等待 if (leader != null) available.await(); //无leader设置为当前线程 else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { //结束之后leader设置为空 if (leader == thisThread) leader = null; } } } } } finally { //此时其他线程可以竞争 if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
实现了ScheduledExecutorService方法作为调度流程:
1、schedule()
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
//Runnable任务包装成ScheduledFutureTask,用户可以根据自己的需要覆写该方法
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);//执行
return t;
}
decorateTask()可以重写,默认只是将task简单的返回
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
2、 delayedExecute(t);
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())//线程池关闭,则拒绝策略
reject(task);
else {
super.getQueue().add(task);//获取任务
// 如果线程池已关闭且该任务是非周期任务, 则将其从队列移除
if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
task.cancel(false);
else
ensurePrestart();//添加工作线程
}
}
ensurePrestart添加工作线程的方式与ThreadPoolExecutor不同:核心线程池未满,则添加到核心线程池,核心线程池满,不会去创建工作线程
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)//核心线程池未满,则添加到核心线程池
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);//核心线程池为0,必须保证任务运行,添加一个线程到非核心线程池
}
ThreadPoolExecutor与ScheduledThreadPoolExecutor区别:
1、总体调度流程不同,ScheduledThreadPoolExecutor:先往队列添加任务,然后创建工作线程执行任务;ThreadPoolExecutor创建Worker关联线程,线程执行当前任务,如果线程没有关联任务,则直接从队列获取任务
2、任务队列不同,ThreadPoolExecutor选择比较多;ScheduledThreadPoolExecutor选择DelayedWorkQueue
3、执行方式不同:ScheduledThreadPoolExecutor只有核心线程执行,只有设置coreSize=0时,才会有一个非核心线程去执行;ThreadPoolExecutor会存在核心线程与非核心线程同时执行
4、返回异步任务:ThreadPoolExecutor返回FutureTask对象;ScheduledThreadPoolExecutor返回ScheduledFutureTask
返回ThreadPoolExecutor实例对象,指定线程容量
1、ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
:需要传递线程工厂
2、ExecutorService newFixedThreadPool(int nThreads)
:使用默认的线程工厂DefaultThreadFactory
1、ExecutorService newSingleThreadExecutor()
和ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
2、通过无参构造方法讲解:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
1、会发现此时新建了一个FinalizableDelegatedExecutorService,核心继承DelegatedExecutorService,内部实现委托给ExecutorService,添加中间层的作用在于:ThreadPoolExecutor包含一些设置线程池大小的方法——比如setCorePoolSize,对于只有单个线程的线程池来说,我们是不希望用户通过强转的方式使用这些方法的,所以需要一个包装类,只暴露ExecutorService本身的方法。
2、使用链表阻塞队列
1、ExecutorService newCachedThreadPool()
和ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
2、通过一个构造方法说明:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
只要没有线程可用就可以添加线程,注意两点
1、核心线程容量等于0,非核心线程池容量为Integer.MAX_VALUE
2、会有一个空闲线程等待时间,为60秒
3、使用同步队列
1、ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
和 ScheduledExecutorService newScheduledThreadPool
新建ScheduledThreadPoolExecutor对象,可安排任务在指定延迟后或周期性地执行.
package ThreadPoolDemo; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; public class Executorfactory { public static Executorfactory factory = new Executorfactory(); private ExecutorService executors; public Executorfactory() { // TODO Auto-generated constructor stub } public static Executorfactory getFactory() { return factory; } public ExecutorService createSingePool() { executors = Executors.newSingleThreadExecutor(new MyThreadFactory()); return executors; } /** * 无界队列,内部使用同步阻塞队列 * * @return */ public ExecutorService createCachePool() { executors = Executors.newCachedThreadPool(new MyThreadFactory()); return executors; } /** * 产生固定线程数的线程管理器 * * @param count * @return */ public ExecutorService createFixedPool(int count) { executors = Executors.newFixedThreadPool(count); return executors; } /** * 构建带有延迟,周期功能的线程管理器 * * @return */ public ExecutorService createScheduledPool() { // cpu数目 int cpu = Runtime.getRuntime().availableProcessors(); executors = Executors.newScheduledThreadPool(cpu * 10, new MyThreadFactory()); return executors; } class MyThreadFactory implements ThreadFactory { AtomicInteger atomic = new AtomicInteger(); public MyThreadFactory() { } public Thread newThread(Runnable r) { SecurityManager s = new SecurityManager(); // 线程组是为了更加安全 ThreadGroup group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); Thread thread = new Thread(group, r); thread.setName("任务线程-" + atomic.incrementAndGet()); return thread; } } }
注意任务提交只是调用了抽象ExecutorServise实现的submit函数与ThreadPoolExecutor实现的execute函数
package ThreadPoolDemo; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; public class ExecutorProcess { private ExecutorService executor; private static ExecutorProcess process = new ExecutorProcess(); private int ThreadMax = 10; public ExecutorProcess() { System.out.println("创建最大线程数为:"+ThreadMax+"的线程池"); executor = Executorfactory.getFactory().createFixedPool(ThreadMax); } public static ExecutorProcess getProcess() { return process; } //关闭线程池 public void shutdown( ) { executor.shutdown(); } //有返回值的任务 public Future<?> submit(Runnable task) { return executor.submit(task);//submit方法在AbstractExecutorService中实现 } //有返回值的任务 public Future<?> submit(Callable<?> task) { return executor.submit(task); } //无返回结果的任务 public void execute(Runnable task) { executor.execute(task);//execute方法在ThreadPoolExecutor实现 } }
分别测试两种类型:Runnable没有返回值,Callable具有返回值
package ThreadPoolDemo; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public class Test { public static void main(String[] args) { ExecutorProcess pool = ExecutorProcess.getProcess();//单例模式 for (int i = 0; i < 50; i++) { String name = "线程"+i; pool.execute(new TaskRunnable(name)); } for (int i = 50; i < 100; i++) { String name = "线程"+i; Future<?> future = pool.submit(new TaskCall(name)); try { System.out.println(future.get());//获取结果 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } pool.shutdown(); } static class TaskRunnable implements Runnable { private String name; public TaskRunnable(String name) { this.name = name; } @Override public void run() { try { //休眠10秒之内的时间 TimeUnit.SECONDS.sleep((int)Math.random()*10); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("Runnable任务"+name+"执行业务逻辑"); } } //由于是静态main方法调用,因此需要将该类设置为static static class TaskCall implements Callable<String> { private String name; public TaskCall(String name) { this.name = name; } @Override //有返回值 public String call() throws Exception { try { //休眠10秒之内的时间 TimeUnit.SECONDS.sleep((int)Math.random()*10); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("Callable任务"+name+"执行业务逻辑"); return "线程返回值,线程名称:"+name; } } }
【1】AbstractExecutorService:https://www.jianshu.com/p/7418bedd520f
【2】ThreadPoolExecutor:https://segmentfault.com/a/1190000016629668
注释更加通俗: https://segmentfault.com/a/1190000010353461
【3】ScheduledThreadPoolExecutor:https://segmentfault.com/a/1190000016672638
图片:https://blog.csdn.net/luanmousheng/article/details/77816412
【4】Executor工厂:https://segmentfault.com/a/1190000016586578#articleHeader4
【5】Executor Demo实例:https://shanhy.blog.csdn.net/article/details/50180949
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。