当前位置:   article > 正文

Executors全面解析

Executors全面解析

一、类结构图

在这里插入图片描述
1、Executor接口:顶层抽象接口最大的作用就是解耦了任务和任务的执行
只有一个void execute(Runnable command);方法,该方法用来执行任务,因为是接口方法,根据重写方式不同的不同,就会导致该任务的实现方式不同
2、ExecutorService接口:继承了Executor接口,增加了任务的管理与异步运行(Future模式:比如<T> Future<T> submit(Runnable task, T result);
3、ScheduledExecutorService接口:添加了处理延迟执行或者周期任务。
4、AbstractExecutorService子类:实现ExecutorService接口,为各类执行器类提供基础
5、ThreadPoolExecutor子类:ExecutorService的具体线程池实现。
6、ScheduledThreadPoolExecutor子类:增加了延迟执行和异步执行的ExecutorService的具体线程池实现。

二、ExecutorService

  • 实现功能
    1、关闭执行器,禁止任务提交

shutdown与shutdownNow区别:
(1)shutdown调用后,不可以再submit新的task,已经submit的将继续执行。终止空闲线程
(2)shutdownNow试图停止当前正执行的task,并返回尚未执行的task的list,终止所有线程

isShutDown与isTerminated区别:
(1)isShutDown当调用shutdown()或shutdownNow()方法后返回为true。
(2)isTerminated当调用shutdown()方法后,并且所有提交的任务完成后返回为true;
(3)isTerminated当调用shutdownNow()方法后,成功停止后返回为true;
(4)如果线程池任务正常完成,都为false

2、监视执行器的状态;
3、拓展了提交有返回值的任务,提供对异步任务的支持,主要体现在返回值为Future
(Future模式在我另一篇篇博客:https://blog.csdn.net/qq_32679835/article/details/90743846
4、提供对批处理任务的支持。

三、ScheduledExecutorService

继承ExecutorService接口,主要是针对周期或者延迟执行的任务

1、scheduleAtFixedRate() 与scheduleWithFixedDelay()区别
scheduleAtFixedRate ,是以上一个任务开始的时间计时,period时间过去后,检测上一个任务是否执行完毕,如果上一个任务执行完毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。
scheduleWithFixedDelay,是以上一个任务结束时开始计时,period时间过去后,立即执行。
两者开始的时间点不同
在这里插入图片描述
2、返回ScheduledFuture:是在future基础上添加延迟功能
interface ScheduledFuture<V> extends Delayed, Future<V>

四、AbstractExecutorService

对ExecutorService接口的默认实现
1、invokeAny方法,通过调用内部实现的doInvokeAny,能够实现无超时等待的版本和超时等待的版本
2、invokeAll方法,通过调用内部实现的doInvokeAny,能够实现无超时等待的版本和超时等待的版本

invokeAny与invokeAll应用场景
1、invokeAny查找海量数据中的其中某一个资源,多个线程同时查找,有一个线程执行完成,就会立即返回,其他线程就会推出,不再占用CPU资源
2、invokeAll查找海量数据的所有资源,多个线程同时查找,需要全部查找出来

3、submit方法
首先将提交的任务通过newTaskFor封装为RunnableFuture对象,之后execute提交任务,返回一步计算结果对象

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

最终返回的是FutureTask对象,以让调用方以异步方式获取任务的执行结果
(futuretask讲解:https://blog.csdn.net/qq_32679835/article/details/90743846#FutureTask_115)

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
  • 1
  • 2
  • 3

在这里插入图片描述

五、ThreadPoolExecutor

Executor执行器,能够将任务和任务的执行分离,此时就会导致执行任务的方式会千差万别,当然自己也能够通过重写execute()构造任务执行方式,因此 Doug Lea给出了两种常用的Executor执行方式,线程池执行器分别是:ThreadPoolExecutor和ScheduledThreadPoolExecutor

(一)属性

//低29位表示工作线程数
//高3位表示线程池状态
 private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 高位3bit表示的运行状态
    private static final int RUNNING    = -1 << COUNT_BITS;RUNNING : 接受新任务, 且处理已经进入阻塞队列的任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS;SHUTDOWN : 不接受新任务, 但处理已经进入阻塞队列的任务
    private static final int STOP       =  1 << COUNT_BITS;STOP : 不接受新任务, 且不处理已经进入阻塞队列的任务, 同时中断正在运行的任务
    private static final int TIDYING    =  2 << COUNT_BITS;TIDYING : 所有任务都已终止, 工作线程数为0, 线程转化为TIDYING状态并准备调用terminated方法
    private static final int TERMINATED =  3 << COUNT_BITS;TERMINATED : terminated方法已经执行完成
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

(二)、内部类

  • Worker通过AQS框架实现,线程池中的每一个Work关联一个工作线程,用来处理提交的任务
 Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);//线程工厂
        }
  • 1
  • 2
  • 3
  • 4
  • 5

关联的线程通过线程工厂(接口)newThread来创建,Executors(可以看做是执行器的工厂)中的内部静态类实现线程工厂
创建的Thread将Worker自身作为任务,所以当调用Thread的start方法时,最终实际是调用了Worker.run()方法,该方法内部委托给runWorker方法执行任务

public void run() {
            runWorker(this);
    }
  • 1
  • 2
  • 3

(三)构造方法

有四个构造函数,这是参数最全的构造函数,都是调用该构造函数

/**
 * 使用给定的参数创建ThreadPoolExecutor.
 *
 * @param corePoolSize    核心线程池中的最大线程数
 * @param maximumPoolSize 总线程池中的最大线程数
 * @param keepAliveTime   当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
 * @param unit            keepAliveTime的单位
 * @param workQueue       任务队列, 保存已经提交但尚未被执行的线程
 * @param threadFactory   线程工厂(用于指定如果创建一个线程)
 * @param handler         拒绝策略 (当任务太多导致工作队列满时的处理策略)
 */
public ThreadPoolExecutor(int corePoolSize,//核心池线程容量
                              int maximumPoolSize,//最大线程容量
                              long keepAliveTime,//
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
//...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

(四)运行流程

AbstractExecutorService中的submit方法:

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

execute()控制执行任务的核心方法,将代码转化为流程图:
1、如果工作线程数小于核心线程池上限(CorePoolSize),则直接新建一个工作线程并执行任务;
2、如果工作线程数大于等于CorePoolSize,则尝试将任务加入到队列等待以后执行。如果加入队列失败了(比如队列已满的情况),则在总线程池未满的情况下(CorePoolSize ≤ 工作线程数 < maximumPoolSize)新建一个工作线程立即执行任务,否则执行拒绝策略。

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
     
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {//小于核心线程池数,添加工作线程并执行
            if (addWorker(command, true))//true表示线程绑定任务,绑定任务的线程直接执行该任务
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//工作线程数>=核心线程数,插入到阻塞队列
            int recheck = ctl.get();
            //不是RUnning状态,移除任务并拒绝
            if (! isRunning(recheck) && remove(command))
                reject(command);
              //线程池非running状态,但是线程池中依旧存在任务,需要添加一个线程处理任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);//false表示线程未绑定任务,直接从阻塞队列中获取任务来执行
        }
        else if (!addWorker(command, false))
            reject(command);
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

addWorker(null, false);这一行,这要结合addWorker一起来看。 主要目的是防止HUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。
在这里插入图片描述

1、创建工作线程

(1)第一部分如果状态不适合接受新任务,或者工作线程数超出了限制,则直接返回false。
(2)第二部分才真正去创建工作线程并执行任务,可以分为两类线程,第一类:与firftTask任务相关联,处理完关联任务处理其他任务,第二类:针对RUNNING状态 || SHUTDONW状态下清理队列中剩余的任务,直接去清理任务

//firstTask:新增一个线程并执行这个任务,可空,增加的线程从队列获取任务;
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//线程池状态
            // 这条语句等价:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||
            // workQueue.isEmpty())
            // 满足下列调价则直接返回false,线程创建失败:
            // rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此时不再接受新的任务,且所有任务执行结束
            // rs = SHUTDOWN:firtTask != null 此时不再接受任务,但是仍然会执行队列中的任务
            // rs = SHUTDOWN:firtTask == null见execute方法的addWorker(null,false),任务为null && 队列为空
            // 最后一种情况也就是说SHUTDONW状态下,如果队列不为空还得接着往下执行,为什么?add一个null任务目的到底是什么?
            // 看execute方法只有workCount==0的时候firstTask才会为null结合这里的条件就是线程池SHUTDOWN了不再接受新任务
            // 但是此时队列不为空,那么还得创建线程把任务给执行完才行。
            if (rs >= SHUTDOWN && //不再接受任务
                ! (rs == SHUTDOWN &&  firstTask == null && ! workQueue.isEmpty())) //队列为空,不再接受任务,不需要再创建工作线程
                return false;
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||  //工作线程数超过最大线程
                 wc >= (core ? corePoolSize : maximumPoolSize))//true:核心线程,线程数超过核心线程
                 																				//false:非核心线程,线程数超过非核心线程数
                    return false;
                if (compareAndIncrementWorkerCount(c))//工作线程数+1
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)//线程池状态改变
                    continue retry;
            }
        }
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);//绑定线程与任务
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    // RUNNING状态 || SHUTDONW状态下清理队列中剩余的任务
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w); //加入到线程集合
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//启动线程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);//创建线程失败,回滚操作
        }
        return workerStarted;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

2、工作线程执行

线程startt()–>Worker的run()—委托–>runWorker()方法
第一次启动会执行初始化传进来的任务firstTask;
然后会从workQueue中取任务执行,如果队列为空则等待keepAliveTime这么长时间。

1、循环获取任务
2、控制执行线程的中断状态,保证如果线程池正在停止,则线程必须是中断状态,否则线程必须不是中断状态;
2、run()执行任务

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();//执行任务的线程
        Runnable task = w.firstTask;//任务,null的时候获取任务
        w.firstTask = null;
        w.unlock(); //允许线程中断
        boolean completedAbruptly = true;//中断退出
        try {
            while (task != null || (task = getTask()) != null) {//不断获取任务
                w.lock();
            //  1.保证当线程池状态为STOP/TIDYING/TERMINATED时,当前执行任务的线程wt是中断状态(因为线程池处于上述任一状态时,均不能再执行新任务)
             // 2.保证当线程池状态为RUNNING/SHUTDOWN时,当前执行任务的线程wt不是中断状态
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                wt.interrupt();
                try {
                    beforeExecute(wt, task);//钩子函数,任务执行前插入一些操作
                    Throwable thrown = null;
                    try {
                        task.run();//运行
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;//没有携带任务,也无法获取任务
        } finally {
            processWorkerExit(w, completedAbruptly);//工作线程退出
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

3、工作线程清理

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) //中断,工作线程数-1,// 正常的话再runWorker的getTask方法workerCount已经被减一了
            decrementWorkerCount();
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;//线程池完成任务数增加
            workers.remove(w);// 从线程池中移除超时或者出现异常的线程
        } finally {
            mainLock.unlock();
        }

        tryTerminate();//线程池状态判断是否需要终止线程池
        int c = ctl.get();
      // runState为RUNNING或SHUTDOWN
        if (runStateLessThan(c, STOP)) {
            // 线程不是异常结束
            if (!completedAbruptly) {
                // 线程池最小空闲数,允许core thread超时就是0,否则就是corePoolSize
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 如果min == 0但是队列不为空要保证有1个线程来执行队列中的任务
                if (min == 0 && !workQueue.isEmpty())
                    min = 1;
                // 线程池还不为空那就不用担心了
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 1.线程异常退出
            // 2.线程池为空,但是队列中还有任务没执行,看addWoker方法对这种情况的处理,虽然当前线程异常退出,但可能任务队列依旧存在任务,需要去执行
            addWorker(null, false);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

4、任务获取

循环在阻塞队列中获取任务,获取失败返回null

private Runnable getTask() {
        boolean timedOut = false; // 上次获取任务是否超时
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 1、stop状态之上,线程池已停止,不允许获取任务
            //2、shutdown状态,但是任务队列为空,不允许在获取任务
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            //一般核心线程不设置超时限制,设置了超时限制,否则不会超时回收timed = true || false
            //非核心线程,一定超时回收,timed = false(可以忽略) || true;因为wc > corePoolSize条件一定满足
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
            //获取任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;//超时未获取到任务
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

5、拒绝策略

触发条件:(1)核心线程池满,阻塞队列满,非核心线程池满(2)ThreadPoolExecutor关闭

  • AbortPolicy(默认):抛异常
  • CallerRunsPolicy:以自身线程来执行任务,这样可以减缓新任务提交的速度。
  • DiscardOldestPolicy:阻塞队列丢弃最近任务,执行当前任务
  • DiscardPolicy:空运转,什么都不干

六、ScheduledThreadPoolExecutor

ScheduleExecutoService接口的实现类,继承自ThreadPoolExecutor,主要任务:处理周期、延时任务

(一)构造方法

作为ThreadPoolExecutor的子类,构造方法内部调用了父类的构造方法,阻塞队列选择DelayedWorkQueue(内部静态类,与DelayedQueue类似)

(二)内部类

1、ScheduledFutureTask

在这里插入图片描述
Delayed接口:主要处理延时任务
RunnableScheduledFuture接口:主要时提供了一个周期方法
Future:异步接口
FutureTask:主要处理异步任务(是一个类)
综合:继承自FutureTask,实现了RunnableScheduledFuture,可以异步执行任务(周期/延时任务)。重写了延迟getDelay()和compareTo()和isPeriodic()方法

2、DelayedWorkQueue

与DelayedQueue类似,自己实现了对排序
1、offer()添加元素

public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;//返回RunnableScheduledFuture任务
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                    grow();//扩容
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    siftUp(i, e);//上浮
                }
                 if (queue[0] == e) {    // 当前元素是首个元素
            leader = null;
            available.signal(); // 唤醒一个等待线程
        }   
            } finally {
                lock.unlock();
            }
            return true;
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

2、take()方法

public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];//获取队首元素
                    if (first == null)//队首元素为空,则等待
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)//队首元素到期,弹出队首元素
                            return finishPoll(first);//重新调整堆
                        first = null; //队首元素还未到期
                        //有leader则等待
                        if (leader != null)
                            available.await();
                            //无leader设置为当前线程
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                            //结束之后leader设置为空
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
            //此时其他线程可以竞争
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

(三)运行流程

实现了ScheduledExecutorService方法作为调度流程:
在这里插入图片描述
1、schedule()

 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
            //Runnable任务包装成ScheduledFutureTask,用户可以根据自己的需要覆写该方法
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);//执行
        return t;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

decorateTask()可以重写,默认只是将task简单的返回

protected <V> RunnableScheduledFuture<V> decorateTask(
        Runnable runnable, RunnableScheduledFuture<V> task) {
        return task;
    }
  • 1
  • 2
  • 3
  • 4

2、 delayedExecute(t);

private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())//线程池关闭,则拒绝策略
            reject(task);
        else {
            super.getQueue().add(task);//获取任务
            // 如果线程池已关闭且该任务是非周期任务, 则将其从队列移除
            if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) &&  remove(task))
                task.cancel(false);
            else
                ensurePrestart();//添加工作线程
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

ensurePrestart添加工作线程的方式与ThreadPoolExecutor不同:核心线程池未满,则添加到核心线程池,核心线程池满,不会去创建工作线程

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)//核心线程池未满,则添加到核心线程池
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);//核心线程池为0,必须保证任务运行,添加一个线程到非核心线程池
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 对于核心线程池中的工作线程来说,如果没有超时设置(allowCoreThreadTimeOut == false),则会使用阻塞方法take获取任务(因为没有超时限制,所以会一直等待直到队列中有任务);如果设置了超时,则会使用poll方法(方法入参需要超时时间),超时还没拿到任务的话,该工作线程就会被回收。
  • 对于非工作线程来说,都是调用poll获取队列元素,超时取不到任务就会被回收。

ThreadPoolExecutor与ScheduledThreadPoolExecutor区别:
1、总体调度流程不同,ScheduledThreadPoolExecutor:先往队列添加任务,然后创建工作线程执行任务;ThreadPoolExecutor创建Worker关联线程,线程执行当前任务,如果线程没有关联任务,则直接从队列获取任务
2、任务队列不同,ThreadPoolExecutor选择比较多;ScheduledThreadPoolExecutor选择DelayedWorkQueue
3、执行方式不同:ScheduledThreadPoolExecutor只有核心线程执行,只有设置coreSize=0时,才会有一个非核心线程去执行;ThreadPoolExecutor会存在核心线程与非核心线程同时执行
4、返回异步任务:ThreadPoolExecutor返回FutureTask对象;ScheduledThreadPoolExecutor返回ScheduledFutureTask

七、Executors:静态Executor工厂

(一)构建固定线程数的线程池

返回ThreadPoolExecutor实例对象,指定线程容量
1、ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory):需要传递线程工厂
2、ExecutorService newFixedThreadPool(int nThreads):使用默认的线程工厂DefaultThreadFactory

(二)构建单个线程的线程池

1、ExecutorService newSingleThreadExecutor()ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
2、通过无参构造方法讲解:

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

1、会发现此时新建了一个FinalizableDelegatedExecutorService,核心继承DelegatedExecutorService,内部实现委托给ExecutorService,添加中间层的作用在于:ThreadPoolExecutor包含一些设置线程池大小的方法——比如setCorePoolSize,对于只有单个线程的线程池来说,我们是不希望用户通过强转的方式使用这些方法的,所以需要一个包装类,只暴露ExecutorService本身的方法。
2、使用链表阻塞队列

(三)构建可缓存的线程池

1、ExecutorService newCachedThreadPool()ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
2、通过一个构造方法说明:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  • 1
  • 2
  • 3
  • 4
  • 5

只要没有线程可用就可以添加线程,注意两点
1、核心线程容量等于0,非核心线程池容量为Integer.MAX_VALUE
2、会有一个空闲线程等待时间,为60秒
3、使用同步队列

(三)构建可延时/周期的线程池

1、ScheduledExecutorService newScheduledThreadPool(int corePoolSize)ScheduledExecutorService newScheduledThreadPool新建ScheduledThreadPoolExecutor对象,可安排任务在指定延迟后或周期性地执行.

八、线程池简单实例

1、线程池工厂(创建各种类型的线程池)

package ThreadPoolDemo;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class Executorfactory {
	public static  Executorfactory factory = new Executorfactory();
	private ExecutorService executors;

	public Executorfactory() {
		// TODO Auto-generated constructor stub
	}
	public static Executorfactory getFactory() {
		return factory;
	}

	public ExecutorService createSingePool() {
		executors = Executors.newSingleThreadExecutor(new MyThreadFactory());
		return executors;
	}

	/**
	 * 无界队列,内部使用同步阻塞队列
	 * 
	 * @return
	 */
	public ExecutorService createCachePool() {
		executors = Executors.newCachedThreadPool(new MyThreadFactory());
		return executors;
	}

	/**
	 * 产生固定线程数的线程管理器
	 * 
	 * @param count
	 * @return
	 */
	public ExecutorService createFixedPool(int count) {
		executors = Executors.newFixedThreadPool(count);
		return executors;
	}

	/**
	 * 构建带有延迟,周期功能的线程管理器
	 * 
	 * @return
	 */
	public ExecutorService createScheduledPool() {
		// cpu数目
		int cpu = Runtime.getRuntime().availableProcessors();
		executors = Executors.newScheduledThreadPool(cpu * 10, new MyThreadFactory());
		return executors;
	}

	class MyThreadFactory implements ThreadFactory {
		AtomicInteger atomic = new AtomicInteger();

		public MyThreadFactory() {

		}

		public Thread newThread(Runnable r) {
			SecurityManager s = new SecurityManager();
			// 线程组是为了更加安全
			ThreadGroup group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
			Thread thread = new Thread(group, r);
			thread.setName("任务线程-" + atomic.incrementAndGet());
			return thread;
		}

	}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76

2、线程池(提供了Callable任务提交和Runnable任务提交)

注意任务提交只是调用了抽象ExecutorServise实现的submit函数与ThreadPoolExecutor实现的execute函数

package ThreadPoolDemo;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

public class ExecutorProcess {
	private ExecutorService executor;
	private static ExecutorProcess process = new ExecutorProcess();
	private int ThreadMax = 10;
	public ExecutorProcess() {
		System.out.println("创建最大线程数为:"+ThreadMax+"的线程池");
		executor = Executorfactory.getFactory().createFixedPool(ThreadMax);
	}
	public static ExecutorProcess getProcess() {
		return process;
	}
	//关闭线程池
	public void shutdown( ) {
		executor.shutdown();
	}
	//有返回值的任务
	public Future<?> submit(Runnable task) {
		return executor.submit(task);//submit方法在AbstractExecutorService中实现
	}
	//有返回值的任务
	public Future<?> submit(Callable<?> task) {
		return executor.submit(task);
	}
	//无返回结果的任务
	public void execute(Runnable task) {
		executor.execute(task);//execute方法在ThreadPoolExecutor实现
	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

3、Demo测试

分别测试两种类型:Runnable没有返回值,Callable具有返回值

package ThreadPoolDemo;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class Test {
	public static void main(String[] args) {
		ExecutorProcess pool = ExecutorProcess.getProcess();//单例模式
		for (int i = 0; i < 50; i++) {
			String name = "线程"+i;
			pool.execute(new TaskRunnable(name));
			
		}
		for (int i = 50; i < 100; i++) {
			String name = "线程"+i;
			Future<?> future =  pool.submit(new TaskCall(name));
			try {
				System.out.println(future.get());//获取结果
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (ExecutionException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		pool.shutdown();
	}
	static class TaskRunnable implements Runnable {
		private String name;
		public TaskRunnable(String name) {
			this.name = name;
		}

		@Override
		public void run() {
			try {
				//休眠10秒之内的时间
				TimeUnit.SECONDS.sleep((int)Math.random()*10);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			System.out.println("Runnable任务"+name+"执行业务逻辑");
		}
		
	}
	//由于是静态main方法调用,因此需要将该类设置为static
	static class TaskCall implements Callable<String> {
		private String name;
		public TaskCall(String name) {
			this.name = name;
		}

		@Override
		//有返回值
		public String call() throws Exception {
			try {
				//休眠10秒之内的时间
				TimeUnit.SECONDS.sleep((int)Math.random()*10);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			System.out.println("Callable任务"+name+"执行业务逻辑");
			return "线程返回值,线程名称:"+name;
		}
		
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73

引用

【1】AbstractExecutorService:https://www.jianshu.com/p/7418bedd520f
【2】ThreadPoolExecutor:https://segmentfault.com/a/1190000016629668
                      注释更加通俗: https://segmentfault.com/a/1190000010353461
【3】ScheduledThreadPoolExecutor:https://segmentfault.com/a/1190000016672638
                                                     图片:https://blog.csdn.net/luanmousheng/article/details/77816412
【4】Executor工厂:https://segmentfault.com/a/1190000016586578#articleHeader4
【5】Executor Demo实例:https://shanhy.blog.csdn.net/article/details/50180949

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/353687
推荐阅读
相关标签
  

闽ICP备14008679号