当前位置:   article > 正文

ThreadPoolExecutor源码

ThreadPoolExecutor源码
java.util.concurrent.ThreadPoolExecutor extends AbstractExecutorService
  • 1

ThreadPoolExecutor类称为线程池。AbstractExecutorService是一个抽象类,它实现了ExecutorService接口
1.类变量&常量

//ctl是一个AtomicInteger类型的原子对象。ctl记录了"线程池中的任务数量"和"线程池状态"2个信息。
//ctl共包括32位。其中,高3位表示"线程池状态",低29位表示"线程池中的任务数量"。
//RUNNING    -- 对应的高3位值是111。
//SHUTDOWN   -- 对应的高3位值是000。
//STOP       -- 对应的高3位值是001。
//TIDYING    -- 对应的高3位值是010。
//TERMINATED -- 对应的高3位值是011。
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
    private final BlockingQueue<Runnable> workQueue;//阻塞队列,当线程池中的线程数大约线程池大小事,新的任务会进入阻塞队列。
    private final ReentrantLock mainLock = new ReentrantLock(); //互斥锁
    /*Worker表示工作线程,workers表示多个线程的集合。当对应的线程池启动时,
    会执行workers中的任务;当执行完一个任务后,它会从线程池的阻塞队列中取出一
    个阻塞的任务来放入workers中继续运行。workers可以使多个线程同时运行。*/
    private final HashSet<Worker> workers = new HashSet<Worker>(); 
    private final Condition termination = mainLock.newCondition(); //终止条件
    private int largestPoolSize;    //最大的线程池长度
    private long completedTaskCount;    //完成的任务数
    private volatile ThreadFactory threadFactory;   //线程工厂,用于创建线程
    private volatile RejectedExecutionHandler handler;  //拒绝策略
    private volatile long keepAliveTime;    //线程生存时间
    private volatile boolean allowCoreThreadTimeOut;    //允许的核心线程的超时时间

    /* 当新任务提交给线程池时(通过execute方法)。
        创建流程为:
        <corePoolSize   --> 创建新线程,放入线程池,处理请求
        >corePoolSize && < maximumPoolSize --> 放入阻塞队列中,若阻塞队列满,
        则创建新线程,并将任务交给该线程直接处理,加入线程池中
        > maximumPoolSize --> 执行相应的拒绝策略
        */
    private volatile int corePoolSize;  //核心池大小,也即Worker的最小数量,保证不超时
    private volatile int maximumPoolSize;   //线程池的最大长度

    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();  //默认的拒绝策略

    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");      
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

2.构造方法

    //采用默认的拒绝策略和使用默认的线程工程
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    //增加了拒绝策略的参数
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

    //增加了线程工厂的参数,但无拒绝策略参数
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

    //核心构造方法
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 || //线程核心池大小不能小于0
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

3.内部类

1.private final class Worker extends AbstractQueuedSynchronizer implements Runnable
执行任务的线程类,实现了Runnable接口,继承AQS类
其内部的run方法调用ThreadPoolExecutorrunWorker方法,初始构造方法设置标志位为-1
构造方法为
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
2.拒绝策略类
AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy、DiscardPolicy四个类均实现了RejectedExecutionHandler方法

任务被线程池拒绝的主要原因有:任务数量超过了maximumPoolSize;线程池异常关闭
2.1 AbortPolicy类
该类的拒绝策略为直接抛出RejectedExecutionException异常

2.2 CallerRunsPolicy类
该类的拒绝策略为,若线程池未关闭,则直接在调用该task的线程中执行被拒绝的任务。若线程池关闭,则抛弃该task

2.3 DiscardOldestPolicy类
该类的拒绝策略为若线程池为关闭,则将阻塞队列的头部抛出,也即抛出最“老”的线程,并且将被拒绝的task加入到等待队列中

2.4 DiscardPolicy类
该类的拒绝策略为丢弃被拒绝的任务
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

4.重要方法

1.execute方法,执行任务
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //获取ctl对应的int值。该int值保存了"线程池中任务的数量"和"线程池状态"信息
        int c = ctl.get();  
        //线程数小于corePoolSize,新建线程,并将任务command加入线程中
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

        //大于corePoolSize,首先尝试加入阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //判断线程池状态,若异常终止,则删除该任务,并执行相应的拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 否则,如果"线程池中任务数量"为0,则通过addWorker(null, false)尝试新建一个线程,新建线程对应的任务为null。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

        //阻塞队列已满,则直接新建一个线程并将command任务加入到线程池中。若创建失败,则执行相应拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
2.  addWorker方法,将任务添加进线程池中。参数core为true,比较corePoolSize;core为false,则比较maxPoolSize
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        // 更新"线程池状态和计数"标记,即更新ctl。
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c); //获取线程池状态

            // 线程池是否有效
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);  //线程池中Worker线程数量
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;   //不能超出最大数量
                 // 通过CAS函数将c的值+1。操作失败的话,则退出循环。
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  //重新得到ctl的值
                //两次结果需一致,若不同则从retry重新开始循环
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);  //添加任务到Worker线程中,作为firsttask
            final Thread t = w.thread;  //获取Worker对应的线程
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();    //获取锁
                try {
                    //再次确认ctl的状态
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);     //将新建的线程加入到workers容器中
                        int s = workers.size();
                        if (s > largestPoolSize)    //更新largestPoolSize
                            largestPoolSize = s;
                        workerAdded = true; 
                    }
                } finally {
                    mainLock.unlock();  
                }
                if (workerAdded) {  //若添加成功,则执行Worker赌赢的线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)    //为添加成功,则执行相应的addWorkerFailed方法
                addWorkerFailed(w);
        }
        return workerStarted;
    }

    private void addWorkerFailed(Worker w) {    
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);  //将该线程从workers中移除
            decrementWorkerCount(); //将工作线程数减1
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
3.submit方法,父类AbstractExecutorService实现
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
4.shutdown方法,不再接受新的任务,直到所有任务执行完后,关闭线程池
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess(); // 检查终止线程池的“线程”是否有权限。
            advanceRunState(SHUTDOWN);  // 设置线程池的状态为关闭状态
            interruptIdleWorkers(); //中断线程池中空闲的线程。
            onShutdown(); //在ThreadPoolExecutor中没有任何动作,钩子方法
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

    //尝试关闭所有正在执行的任务,暂停等待任务的处理,并返回等待执行的任务列表。
该方法并不能一定保证线程池一定能够关闭,因为其调用Thread.interrupt方法,有可能有中断响应的线程存在,故无法立刻关闭
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();   
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
5.线程池有五种状态
RUNNING   -- 线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进
          -- 行处理
SHUTDOWN  -- 线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务
STOP      -- 线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会
          -- 中断正在处理的任务
TIDYING   -- 当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING
          -- 状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。
TERMINATED-- 线程池彻底终止,就变成TERMINATED状态

RUNNING: 线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RU
NNING状态
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
可知,线程池一开始ctl处于RUNNING状态


SHUTDOWN:调用shutdown方法,从RUNNING-->SHUTDOWN

STOP: 调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) --> 
STOPTIDYING: 当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空
时,就会由 SHUTDOWN -> TIDYING。
当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYINGTERMINATED :处在TIDYING状态,执行完terminated()后,会由TIDYING --> TERMINATED
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

image

5.Executors静态工厂类中创建的几种不同的线程池

1.  newCachedThreadPool方法,创建一个corePoolSize=0,maxPoolSize=Integer.MAX_VALUE的可扩充线程池。
其超时时间为60s,采用SynchronousQueue队列(其为一个sync队列,默认为非公平锁,且没有长度界限)和默认的拒绝策略(DiscardPolicy类)
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
2.  newFixedThreadPool方法,创建一个corePoolSize = maxPoolSize的固定大小线程池,也即当阻塞队列满时,新加任务会被抛弃。
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
3.  newScheduledThreadPool方法,新建一个ScheduledThreadPoolExecutor对象,其corePoolSize为传入参数的大小。
该线程池为延迟线程池,支持定时及周期性的任务执行的线程池
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
4.  newSingleThreadExecutor方法,创建一个一次只能有一个线程执行的线程池,可以用来进行线程调度   
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/944580
推荐阅读
  

闽ICP备14008679号