当前位置:   article > 正文

Java 并发编程面试题——线程池_java线程池面试题

java线程池面试题

目录

参考文章:
Java 并发编程的艺术》
7000 字 + 24 张图带你彻底弄懂线程池

什么是线程池?有什么优缺点?

(1)线程池 (ThreadPool) 是一种用于管理和复用线程的机制,它是在程序启动时就预先创建一定数量的线程,将这些线程放入一个池中,并对它们进行有效的管理和复用,从而在需要执行任务时,可以从线程池中获取一个可用线程来执行任务,任务执行完毕后线程不会被销毁而是返回线程池,以便下次使用。

(2)线程池的优点主要有以下几个方面:

  • 降低资源消耗:线程的创建和销毁都需要一定的系统资源和时间,如果每个任务都单独创建一个线程,则会浪费大量的系统资源和时间。而使用线程池可以避免这种浪费,通过重复利用已创建的线程降低线程创建和销毁造成的消耗
  • 提高响应速度:由于线程的创建和销毁需要一定的时间和系统资源,如果任务的数量很多,频繁地创建和销毁线程会造成系统的负载过重,导致性能下降。而线程池可以预先创建一定数量的线程,当任务到达时,任务可以不需要等到线程创建就能立即执行
  • 提高线程的可管理性:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是要做到合理利用线程池,必须对其实现原理了如指掌。

(3)线程池的缺点主要有以下几个方面:

  • 内存泄漏:如果线程池中的线程没有正确地释放或回收,可能会导致内存泄漏问题。
  • 阻塞问题:如果线程池中的线程都被占满,可能会导致任务被阻塞,从而降低应用程序的响应性。
  • 线程安全问题:如果线程池中的线程共享数据,可能会导致线程安全问题,例如竞态条件和死锁。
  • 性能问题:如果线程池的大小不合适或者线程的执行时间不均衡,可能会导致性能问题,例如响应延迟和系统负载过高。

创建线程池的方式有哪些?

通过 Executor 框架的工具类 Executors 来创建不同类型的线程池

(1)通过工具类 Executors,可以创建以下 3 种类型的 ThreadPoolExecutor

  • FixedThreadPool:FixedThreadPool 被称为可重用固定线程数的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
//创建一个固定大小的线程池,大小为 3
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • SingleThreadExecutor: SingleThreadExecutor是使用单个 worker 线程的 Executor。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
//创建一个单线程的线程池
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • CachedThreadPool:CachedThreadPool 是一个会根据需要创建新线程的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
//创建一个单线程的线程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

(2)通过工具类 Executors,可以创建下面的 ScheduledThreadPoolExecutor,它是一个用来在给定的延迟后运行任务或者定期执行任务的线程池,继承自 ThreadPoolExecutor,并且其内部使用DelayedWorkQueue 作为任务队列。

//创建一个定时执行的线程池
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

具体示例如下:

class ScheduledThreadPoolExecutorExample {
    public static void main(String[] args) {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);

        // 定时任务,每隔 1 秒执行一次
        executor.scheduleAtFixedRate(() -> {
            System.out.println("定时任务执行了:" + System.currentTimeMillis());
        }, 0, 1, TimeUnit.SECONDS);

        // 延时任务,延迟 2 秒后执行
        executor.schedule(() -> {
            System.out.println("延时任务执行了:" + System.currentTimeMillis());
        }, 2, TimeUnit.SECONDS);

        // 关闭线程池
        executor.shutdown();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

在这里插入图片描述

使用 ThreadPoolExecutor 类自定义线程池

ThreadPoolExecutor 是 Executor 框架的一个具体实现,通过自定义 ThreadPoolExecutor 类可以创建更加灵活、符合需求的线程池。ThreadPoolExecutor 的 4 个构造函数如下所示,有关每个参数的具体含义可参考面试题 3

在这里插入图片描述

✨注意事项

《阿里巴巴 Java 开发手册》中有如下规定:
(1)【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。

说明:线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。

(2)【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

说明:Executors 返回的线程池对象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2) CachedThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

(3)使用 Executors.newCachedThreadPool() 方法创建线程池可能会导致以下问题:

  • 线程数量过多newCachedThreadPool 是一个无界线程池,它会根据任务的需求动态地创建线程。如果任务提交的速度远远大于线程处理的速度,线程池将会不断地创建新线程,可能导致线程数量过多,从而占用大量系统资源(如内存和CPU),降低系统性能甚至引发内存溢出或系统崩溃。
  • 线程过多导致线程切换开销增加:线程切换是有开销的,当线程数量过多时,系统需要频繁地进行线程切换,导致额外的开销,降低了整体的执行效率。
  • 对长时间执行的任务不适用:由于 newCachedThreadPool 是为了快速处理任务而创建线程的,它会缓存线程一段时间以备重复利用,默认的空闲线程存活时间是 60s。这对于长时间执行的任务来说可能不适用,因为长时间运行的任务可能需要维持相对稳定的线程池大小,而不是频繁地创建和销毁线程。
  • 可能出现任务饱和:当任务数量超过线程池的处理能力时,剩余的任务将会排队等待执行。如果长时间保持任务提交速度高于线程处理速度,会导致任务队列越来越长,最终可能造成任务饱和,系统响应变得极慢。

因此,在使用 newCachedThreadPool 创建线程池时需要谨慎,根据实际场景合理地评估任务的特点和负载情况,选择适当的线程池类型和参数配置,以避免以上问题的发生,并保证系统的性能和稳定性。

✨自定义线程池时有哪些参数?它们各有说明含义?

构造函数

使用下面 ThreadPoolExecutor 类的构造函数可以自定义线程池,参数最全的构造函数如下:

public class ThreadPoolExecutor extends AbstractExecutorService {
	
	//...

	public ThreadPoolExecutor(int corePoolSize,
	                          int maximumPoolSize,
	                          long keepAliveTime,
	                          TimeUnit unit,
	                          BlockingQueue<Runnable> workQueue,
	                          ThreadFactory threadFactory,
	                          RejectedExecutionHandler handler) {
	    if (corePoolSize < 0 ||
	        maximumPoolSize <= 0 ||
	        maximumPoolSize < corePoolSize ||
	        keepAliveTime < 0)
	        throw new IllegalArgumentException();
	    if (workQueue == null || threadFactory == null || handler == null)
	        throw new NullPointerException();
	    this.acc = System.getSecurityManager() == null ?
	            null :
	            AccessController.getContext();
	    this.corePoolSize = corePoolSize;
	    this.maximumPoolSize = maximumPoolSize;
	    this.workQueue = workQueue;
	    this.keepAliveTime = unit.toNanos(keepAliveTime);
	    this.threadFactory = threadFactory;
	    this.handler = handler;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

参数含义

其中的参数含义如下所示:

参数含义
corePoolSize线程池基本大小,当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建
maximumPoolSize线程池允许创建的最大线程数,如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务,值得注意的是,如果使用了无界的任务队列,那么这个参数就没什么效果
keepAliveTime线程活动保持时间,线程池中的线程数量大于 corePoolSize 时,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime 才会被回收销毁
unit线程活动保持时间的单位,即 keepAliveTime 的时间单位
workQueue用于保存等待执行的任务的阻塞队列,新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在任务队列中。值得注意的是,如果使用了工作队列 SynchronousQueue,那么当任务数超过线程池的核心线程数时,该任务不会进入队列
threadFactory用于设置创建线程的工厂,决定了如何创建新线程,例如可以通过线程工厂给每个创建出来的线程设置更有意义的名字。使用开源框架 guava 提供的 ThreadFactoryBuilder 可以快速给线程池里的线程设置有意义的名字
handler当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是 AbortPolicy,表示无法处理新任务时抛出异常

在这里插入图片描述

① 上图来源:《Java 性能调优实战》
② 上述提到的 7 个参数中,corePoolSizemaximumPoolSizeworkQueue 这 3 个参数是核心参数。
③ 上述 workQueue 就是 BlockingQueue,有关 BlockingQueue 的相关知识可以参考 Java 并发编程面试题——BlockingQueue 这篇文章。

使用示例

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.*;

class Solution {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //自定义线程池
        int corePoolSize = 2;
        int maximumPoolSize = 5;
        long keepAliveTime = 50;
        // keepAliveTime 的单位
        TimeUnit unit = TimeUnit.MICROSECONDS;
        //工作队列 workQueue
        BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(3);
        //使用开源框架 guava 提供的 ThreadFactoryBuilder 可以给线程池里的线程自定义名字
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("demo-task-%d").build();
        //饱和策略
        RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
        ThreadPoolExecutor threadsPool = new ThreadPoolExecutor(
                                             corePoolSize, maximumPoolSize,
                                             keepAliveTime, unit,
                                             blockingQueue, threadFactory,
                                             handler);
        //执行无返回值的任务
        Runnable taskWithoutRet = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " is running");
            }
        };
        threadsPool.execute(taskWithoutRet);
    
        //执行有返回值的任务
        FutureTask<Integer> taskWithRet = new FutureTask<>(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println(Thread.currentThread().getName() + " is running");
                //线程睡眠 1000 ms
                Thread.sleep(1000);
                return 100;
            }
        });
        threadsPool.submit(taskWithRet);
        System.out.println("有返回值的任务的结果为: " + taskWithRet.get());
        
        //关闭线程池
        threadsPool.shutdown();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

输出结果如下:

demo-task-0 is running
demo-task-1 is running
有返回值的任务的结果为: 100
  • 1
  • 2
  • 3

在 Java 8 中可以使用 lambda 表达式来对创建任务的代码进行简化:

Runnable taskWithoutRet = () -> System.out.println(Thread.currentThread().getName() + " is running");

FutureTask<Integer> taskWithRet = new FutureTask<>(() -> {
    System.out.println(Thread.currentThread().getName() + " is running");
    //线程睡眠 1000 ms
    Thread.sleep(1000);
    return 100;
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

与 FutureTask 有关的知识可以参考 Java 并发编程面试题——Future 这篇文章。

线程池的拒绝策略有哪些?如何自定义拒绝策略?

(1)如果线程到达 maximumPoolSize 仍然有新任务,这时线程池会执行拒绝策略。ThreadPoolExecutor 为接口 RejectedExecutionHandler 提供了以下 4 种实现,它们都是 ThreadPoolExecutor 中的静态内部类:

  • AbortPolicy:抛出 RejectedExecutionException 异常来拒绝新任务的处理,这也是默认的策略
  • CallerRunsPolicy只用调用者所在线程来运行任务,也就是直接在调用 execute 方法的线程中运行 (run) 被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,那么可以选择这个策略;
  • DiscardPolicy:不处理本次任务,选择直接放弃;
  • DiscardOldestPolicy:放弃队列中最早的任务,本任务取而代之;
    在这里插入图片描述

(2)在 Java 线程池中,可以通过实现 RejectedExecutionHandler 接口来自定义拒绝策略。拒绝策略定义了当线程池无法接受新任务时应该采取的操作。RejectedExecutionHandler 接口有一个方法 rejectedExecution,在线程池无法接受新任务时会调用该方法。以下是一个示例代码,演示如何自定义拒绝策略:

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
        System.out.println("任务被拒绝: " + runnable.toString());

        // 执行自定义的拒绝策略,可以选择抛出异常、丢弃任务或阻塞等操作
        // 这里我们简单地抛出一个异常
        throw new RejectedExecutionException("线程池已满,无法接受新任务!");
    }
}

public class ThreadPoolExample {

    public static void main(String[] args) {
        // 创建线程池
        int corePoolSize = 5;
        int maxPoolSize = 10;
        long keepAliveTime = 1000;

        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime,
                TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(2), new CustomRejectedExecutionHandler());

        // 执行任务
        for (int i = 1; i <= 15; i++) {
            final int taskId = i;
            threadPool.execute(() -> {
                System.out.println("执行任务: " + taskId);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        // 关闭线程池
        threadPool.shutdown();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

某次输出如下:

执行任务: 1
执行任务: 5
执行任务: 7
执行任务: 2
执行任务: 3
执行任务: 4
执行任务: 9
执行任务: 8
执行任务: 11
任务被拒绝: com.exam.ThreadPoolExample$$Lambda$1/2093631819@4f3f5b24
执行任务: 10
Exception in thread "main" java.util.concurrent.RejectedExecutionException: 线程池已满,无法接受新任务!
	at com.exam.CustomRejectedExecutionHandler.rejectedExecution(Main.java:73)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at com.exam.ThreadPoolExample.main(Main.java:91)
执行任务: 6
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

在上述示例中,我们首先创建了一个自定义的 RejectedExecutionHandler 实现CustomRejectedExecutionHandler,其中的rejectedExecution方法简单地抛出一个 RejectedExecutionException 异常。然后,在创建线程池时,我们将上述自定义的拒绝策略传递给 ThreadPoolExecutor 的构造函数。当线程池无法接受新任务时,拒绝策略会被调用,我们在这个方法中抛出一个异常,显示任务被拒绝。你也可以根据自己的需求,实现不同的拒绝策略,例如将任务丢弃、阻塞等。请注意,当自定义拒绝策略抛出异常时,可能需要适当地处理异常,以免影响程序的执行。

✨线程池处理任务的流程是什么样的?

(1)线程池的主要处理流程如下图所示:

在这里插入图片描述

(2)从图中可以看出,当提交一个新任务到线程池时,线程池的处理流程如下:

  • 线程池判断当前工作线程数是否小于核心线程数 (其大小为 corePoolSize) 。如果小于,即核心线程池未满,则创建一个新的工作线程来执行任务。如果大于,即核心线程池里的线程都在执行任务,则进入下个流程;
  • 线程池判断工作队列 (workQueue) 是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程;
  • 线程池判断线程池的线程 (大小为 maximumPoolSize) 是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略 (RejectedExecutionHandler) 来处理这个任务;

✨为什么线程池要先把任务放在阻塞队列中,然后再调用非核心线程处理任务?

(1)线程池将任务放在阻塞队列中,并先调用非核心线程来处理任务的原因有以下几点:

  • 平衡资源利用率:通过将任务放在阻塞队列中,可以避免瞬间来临的大量任务导致创建过多的线程,从而平衡了系统资源的利用率。非核心线程可以限制线程数量,而阻塞队列可以缓冲任务,让系统在承载能力范围内保持稳定的负载。
  • 控制线程数量:通过将任务放在阻塞队列中,非核心线程可以通过轮询阻塞队列来获取任务并执行。这种方式可以限制线程的数量,防止线程过多占用过多资源,避免系统资源耗尽。
  • 避免任务丢失:当任务到达时,如果线程池中的非核心线程都已经被占用,那么新的任务将被放在阻塞队列中等待执行。这样可以避免任务丢失,尽可能确保所有任务都能得到处理

(2)综上所述,在线程池中先将任务放在阻塞队列中,然后再调用非核心线程来处理任务,可以实现对线程数量的控制、平衡资源利用率、提高响应速度和避免任务丢失等好处。这样设计可以更有效地管理和利用系统资源,并提供更高效的并发处理能力。

线程池中线程实现复用的原理是什么?

(1)线程池的核心功能就是实现了线程的重复利用,而在线程池内部其实是被封装成一个 Worker 对象:

public class ThreadPoolExecutor extends AbstractExecutorService {

	//...

	private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
		//...
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

从上述源码可知,Worker 继承了 AQS,即有一定锁的特性。创建线程来执行任务的方法上面提到是通过 addWorker 方法创建的。在创建 Worker 对象的时候,会把线程和任务一起封装到 Worker 内部,然后调用 runWorker 方法来让线程执行任务:

public class ThreadPoolExecutor extends AbstractExecutorService {

	//...

	final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

(2)runWorker 方法内部使用了 while 死循环,当第一个任务执行完之后,会不断地通过 getTask 方法获取任务:

  • 如果能获取到任务,就会调用run 方法,继续执行任务,这就是线程能够复用的主要原因。
  • 如果不能获取到任务,最后就会调用 finally 中的 processWorkerExit 方法,来将线程退出。

(3)需要注意的是,因为 Worker 继承了 AQS,每次在执行任务之前都会调用 lock 方法,执行完任务之后,会调用 unlock 方法,这样做的目的在于通过 Woker 的加锁状态就能判断出当前线程是否正在运行任务

(4)如果想知道线程是否正在运行任务,只需要调用 Woker 的 tryLock 方法,根据是否加锁成功来进行判断:加锁成功说明当前线程没有加锁,也就没有执行任务。在调用 shutdown 方法关闭线程池的时候,就是通过这种方式来判断线程有没有在执行任务,如果没有的话,来尝试打断没有执行任务的线程。

线程是如何获取任务的以及如何实现超时的?

线程在执行完任务之后,会继续从 getTask 方法中获取任务,获取不到就会退出。getTask 方法的源码如下:

public class ThreadPoolExecutor extends AbstractExecutorService {

	//...

	private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

前面一部分是线程池的一些状态的判断,下面这行代码

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  • 1

是判断当前过来获取任务的线程是否可以超时退出。如果 allowCoreThreadTimeOut 设置为 true 或者线程池当前的线程数大于核心线程数,也就是 corePoolSize,那么该获取任务的线程就可以超时退出。那是怎么做到超时退出呢,就是这行核心代码

Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
  • 1
  • 2
  • 3

会根据是否允许超时来选择调用阻塞队列 workQueue 的 poll 方法或者 take 方法:

  • 如果允许超时,则会调用 poll 方法,传入 keepAliveTime,也就是构造线程池时传入的空闲时间,这个方法的意思就是从队列中阻塞 keepAliveTime 时间来获取任务,获取不到就会返回 null;
  • 如果不允许超时,就会调用 take 方法,这个方法会一直阻塞获取任务,直到从队列中获取到任务位置。

从这里可以看到 keepAliveTime 是如何使用的了。所以到这里应该就知道线程池中的线程为什么可以做到空闲一定时间就退出了。其实最主要的是利用了阻塞队列的 poll 方法的实现,这个方法可以指定超时时间,一旦线程达到了 keepAliveTime 还没有获取到任务,那么就会返回 null,前面提到 getTask 方法返回 null,线程就会退出

需要注意的是,判断当前获取任务的线程是否可以超时退出的时候,如果将 allowCoreThreadTimeOut 设置为 true,那么所有线程走到这个timed 都是 true,那么所有的线程,包括核心线程都可以做到超时退出。如果你的线程池需要将核心线程超时退出,那么可以通过allowCoreThreadTimeOut 方法将 allowCoreThreadTimeOut 变量设置为 true。

线程池的关闭是如何进行的?

线程池提供了 shutdownshutdownNow 两个方法来关闭线程池。

shutdown()

shutdown 方法将线程池的状态修改为 SHUTDOWN,然后尝试打断空闲的线程(如何判断空闲,前面有提到),也就是在阻塞等待任务的线程。

public class ThreadPoolExecutor extends AbstractExecutorService {

	//...
	
	public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

shutdownNow()

shutdownNow 方法将线程池的状态修改为 STOP,然后尝试打断所有的线程,从阻塞队列中移除剩余的任务,这也是为什么 shutdownNow 不能执行剩余任务的原因。

public class ThreadPoolExecutor extends AbstractExecutorService {

	//...
	
	public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

所以也可以看出 shutdown 方法和 shutdownNow 方法的主要区别就是:shutdown 之后还能处理在队列中的任务,shutdownNow 直接就将任务从队列中移除,线程池里的线程就不再处理了。

如果在线程池中配置了有限的最大线程数量、有界队列,那么当请求过多时,应该如何解决任务可能丢失的问题?

可以通过实现 RejectedExecutionHandler 接口来自定义拒绝策略,如果线程池无法执行更多的任务了,此时建议可以把任务信息持久化写入到磁盘里面去,后台专门启动一个线程,后续等待你的线程池的工作负荷降低了,再慢慢的从磁盘中读取之间持久化的任务,重新提交到线程池中去执行。

如果线上机器宕机,线程池中的阻塞队列可能还有未处理的任务,应该如何解决这个问题?

在提交一个任务到线程池里去之前,可以先将该任务的信息保存到数据库中,并记录其状态(状态包括未提交、已提交、已完成,初始时为未提交)。当该任务提交之后,更新其状态为已提交即可。系统重启之后后台线程去扫描数据库里的未提交和已提交的任务状态。可以把未提交、已提交的任务重新提交到线程池中继续执行即可。

线程池的队列满了之后可能会发生什么?

对于这个问题,我们可以分情况进行讨论:

  • 如果线程池中的队列满了并且创建的线程数达到了 corePoolSize 之后,会判断如果 maximumPoolSize 大于 coolPoolSize,此时会创建不会大于 maximumPoolSize 额外的线程处理队列中的请求。
  • 如果队列中的请求处理完了,额外的线程数会存活 keepAliveTime 时间后会自动销毁。
  • 如果队列满了,并且额外的线程也已经满负荷了。这个时候会执行对应的拒绝策略

ThreadPoolExecutor 执行 execute() 方法的执行流程是什么样的?

(1)ThreadPoolExecutor 执行 execute() 方法的执行流程如下如所示,我们可以将其看作线程池执行流程在代码中的具体表现。

在这里插入图片描述

(2)ThreadPoolExecutor 执行 execute 方法分下面 4 种情况:
① 如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
② 如果运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue
③ 如果无法将任务加入 BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
④ 如果创建新线程将使当前运行的线程超出 maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution() 方法。

ThreadPoolExecutor 采取上述步骤的总体设计思路,是为了在执行 execute() 方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在 ThreadPoolExecutor 完成预热之后(当前运行的线程数大于等于 corePoolSize),几乎所有的 execute() 方法调用都是执行步骤 ②,而步骤 ② 不需要获取全局锁。

如何给线程池命名?

初始化线程池的时候需要显示命名(设置线程池名称前缀),有利于定位问题。默认情况下创建的线程名字类似 pool-1-thread-n 这样的,没有业务含义,不利于定位问题。给线程池里的线程命名通常有下面两种方式:

使用 guava 的 ThreadFactoryBuilder

ThreadPoolExecutor 的构造函数中的参数 ThreadFactory 是用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。使用开源框架 guava 提供的 ThreadFactoryBuilder 可以快速给线程池里的线程设置有意义的名字,代码如下:

ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
  • 1

实现 ThreadFactory 接口

  • 线程池参数 threadFactory 的作用是为线程池创建新线程并提供线程的配置。通过自定义 threadFactory,可以灵活地指定线程的属性。当线程池需要创建新线程来执行任务时,会调用 threadFactory 的 newThread() 方法创建一个线程,并将其添加到线程池中。因此,通过自定义 threadFactory,我们可以对新创建的线程进行一些个性化的设置,例如线程名、优先级、是否为守护线程等。
  • 常见的线程池实现类(如 ThreadPoolExecutor)通常提供了一个默认的 threadFactory,如果不指定 threadFactory 参数,会使用默认的线程工厂。但是,我们也可以根据需求自己实现 ThreadFactory 接口,以创建满足特定要求的线程。
  • 总结起来,threadFactory 参数在线程池中的作用是用于创建新线程,并可以根据需要进行线程的个性化配置。通过自定义 threadFactory,我们能够更好地控制和管理线程池中的线程。
//带有前缀名称的线程工厂
public static class NamedThreadFactory implements ThreadFactory {
    
    //线程名前缀
    private final String prefix;
    //线程编号
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    
    public NamedThreadFactory(String prefix) {
        this.prefix = prefix;
    }
    
    //重写 newThread 方法
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(null, r, prefix + threadNumber.getAndIncrement());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

线程池的状态有哪些?

ThreadPoolExecutor 使用变量 ctl 的高 3 位来表示线程池状态,低 29 位表示线程数量。

public class ThreadPoolExecutor extends AbstractExecutorService {

	//...
	
	private static final int COUNT_BITS = Integer.SIZE - 3;	

	// runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
状态名高 3 位接收新任务处理阻塞队列任务说明
RUNNING111YY线程池创建时就是这个状态,能够接收新任务,以及对已添加的任务进行处理。
SHUTDOWN000NY调用shutdown方法线程池就会转换成SHUTDOWN状态,此时线程池不再接收新任务,但能继续处理已添加的任务到队列中任务。
STOP001NN调用 shutdownNow 方法线程池就会转换成 STOP 状态,不接收新任务,也不能继续处理已添加的任务到队列中任务,并且会尝试中断正在处理的任务的线程。
TIDYING010NNSHUTDOWN 状态下,任务数为 0, 其他所有任务已终止,线程池会变为 TIDYING 状态。线程池在 SHUTDOWN 状态,任务队列为空且执行中任务为空,线程池会变为 TIDYING 状态。线程池在 STOP 状态,线程池中执行中任务为空时,线程池会变为 TIDYING 状态。
TERMINATED011NN线程池彻底终止。线程池在 TIDYING 状态执行完 terminated() 方法就会转变为 TERMINATED 状态。

从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING。这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 CAS 原子操作进行赋值。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));

// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) {
	return rs | wc; 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

✨如何设置线程池的大小?

(1)首先,需要明确的是线程池大小不能设置地过大或者过小

  • 如果线程池设置地太大,那么大量的线程可能会同时在竞争 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。
  • 如果线程池设置地太小,那么当同一时间有大量任务需要处理时,可能会导致它们在任务队列中排队等待执行,甚至会出现任务队列满了之后任务无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样一来,CPU 没有得到充分的利用。

(2)要想合理地配置线程池,可以从任务特性入手来进行分析:

  • 任务的性质,性质不同的任务可以用不同规模的线程池分开处理:
    • CPU 密集型任务:应配置尽可能小的线程,如配置 N c p u N_{cpu} Ncpu + 1 个线程的线程池;
    • I/O 密集型任务:由于 I/O 密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如 2 ∗ N c p u 2 * N_{cpu} 2Ncpu
    • 混合型任务:如果可以拆分,将其拆分成一个 CPU 密集型任务和一个 I/O 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。
  • 任务的优先级:高、中和低。优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以让优先级高的任务先执行。不过需要注意的是,如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
  • 任务的执行时间:长、中和短。执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行
  • 任务的依赖性:是否依赖其他系统资源,如数据库连接。依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则 CPU 空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用 CPU

如何判断是 CPU 密集任务还是 I/O 密集任务?
答:简单来说,CPU 密集型任务指需要利用 CPU 计算能力的任务,例如对大量数据进行运算。而 I/O 密集型指涉及到网络读取、文件读取等的任务,其特点在于 I/O 操作完成的时间远大于 CPU 计算耗费的时间,即大部分时间都花在了等待 I/O 操作完成上

Java 中如何获取当前 CPU 的核心数?
答:使用 int cores = Runtime.getRuntime().availableProcessors(); 这行代码即可获取当前 CPU 的核心数。

对于 CPU 密集型任务,线程池的大小为什么不设置为 n,而是 n + 1?
答:对于 CPU 密集型任务,在有 n 个处理器的系统上,当线程池的大小设置为 n + 1 时,能实现 CPU 的最优利用率,因为即使当 CPU 密集型的线程偶尔由于页缺失故障或者其他原因暂停时,这个“额外”的线程也能确保 CPU 的时钟周期不会被浪费,从未尽可能地利用 CPU 资源

为什么对于 I/O 密集型任务,线程数建议设置为 2 * n?
因为在 I/O 密集型任务中,大部分时间线程都在等待外部 I/O 操作完成,而不是进行计算密集型的工作,并且用于上下文切换的时间要远小于阻塞等待的时间。例如,当线程在从磁盘读取数据或等待网络响应时,它会处于阻塞状态,CPU 资源不会被过多占用。

假设在一个请求中,计算操作需要 5ms,I/O 操作需要 100 ms。对于一台有 8 核的机器,如果要求 CPU 利用率达到 100%,应该如何设置线程数?

  • 对于复合型任务,我们可以综合考虑 CPU 密集型和 I/O 密集型的特点进行设置。
  • 在这种情况下,如果计算操作需要 5ms(CPU 操作 5ms),而 I/O 操作需要 100ms,那么单核 CPU 的利用率就是 5 / (5+100)。
  • 为了达到理论上的 100% CPU 利用率,需要的线程数为 1 / (5 / (5 + 100)),即 21 个线程。
  • 然而,现在我们只有 8 个核可用,所以根据这个理论,需要的线程数是 168 个(8 核 * 21 线程)。
  • 不过,这是一个理论值,实际情况可能受其他因素的限制。

如何对线程池进行监控?

(1)如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根据线程池的使用状况快速定位问题。可以通过线程池提供的参数进行监控,在监控线程池的时候可以使用以下属性:

  • taskCount:线程池需要执行的任务数量。
  • completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount。
  • largestPoolSize:线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
  • getPoolSize:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销毁,所以这个大小只增不减。
  • getActiveCount:获取活动的线程数。

(2)此外,我们还可以通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的 beforeExecuteafterExecuteterminated 方法,也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最小执行时间等,这几个方法在线程池里是空方法。

如何动态地修改线程池的参数?

ThreadPoolExecutor 类中提供了以下这些方法来动态修改参数:

在这里插入图片描述

线程池使用的注意事项有哪些?线程池中的线程是不是线程越多越好?

(1)在线程池的使用中,有一些注意事项需要考虑:

  • 合理配置线程池大小:线程池的大小应根据处理任务的类型和数量来进行配置。如果线程池的线程数量过少,可能无法及时处理所有的任务;而线程数量过多,则会消耗过多的系统资源。需要根据实际情况进行调整,确保线程池的大小可以适应任务负载。
  • 考虑线程的生命周期:线程池中的线程是可以重用的,所以在设计任务时,要考虑到线程的生命周期,避免出现线程不会自动终止的情况。如果线程的任务执行完毕后,没有新的任务可以执行,可以通过合理的配置线程存活时间、设置超时时间等机制来避免空闲线程一直存在。
  • 任务类型和任务队列:根据任务的特点选择适当的任务队列类型。例如,对于耗时较长的I/O密集型任务,可以使用无界队列;而对于CPU密集型任务,可能需要使用有界队列来限制任务的提交速度,避免系统资源被耗尽。选择适当的任务队列类型可以对系统性能产生重要影响。
  • 异常处理:在任务执行过程中,可能会出现异常。要确保及时捕获并处理异常,避免线程池因为异常情况而终止或发生其他意外。
  • 监控和调优:及时监控线程池的运行状态、任务执行情况和系统资源的使用情况,以便及时调整线程池的配置参数,提高系统性能和稳定性。

(2)关于线程数量的问题,更多不一定意味着更好。线程数量的增加会增加线程间的上下文切换开销,并且会占用更多的系统资源。当线程数量超出合理范围时,可能会导致系统负载过重,性能反而下降。因此,需要根据实际需求和系统资源进行评估,并选择适当的线程数量。通常情况下,可以依据CPU核心数和任务类型进行参考,一般不建议无限制增加线程数量。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/926021
推荐阅读
相关标签
  

闽ICP备14008679号