赞
踩
ThreadPoolExecutor
1.什么是线程池?
(首先要理解什么是线程)
线程池,thread pool,是一种线程使用模式,线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。
2.为什么使用线程池?
为了减少创建和销毁线程的次数,让每个线程都可以多次的使用,可以根据系统情况调整线程的数量,防止消耗过多内存。在实际使用中,服务器在创建和销毁线程上花费的时间和消耗的系统资源都相当大,使用线程池就可以优化。
线程池主要用来解决线程生命周期开销问题和资源不足问题。通过对多个任务重复使用线程,线程创建的开销就被分摊到了多个任务上了,而且由于在请求到达时线程已经存在,所以消除了线程创建所带来的延迟。这样,就可以立即为请求服务,使用应用程序响应更快。另外,通过适当的调整线程中的线程数目可以防止出现资源不足的情况。
3.线程池的核心参数
public ThreadPoolExecutor(int corePoolSize,//核心线程数
int maximumPoolSize,//最大线程数
long keepAliveTime,//线程空闲时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列
ThreadFactory threadFactory,//线程工厂
RejectedExecutionHandler handler//拒绝策略)
{
...
}
4.线程池的执行顺序
线程池按以下行为执行任务
5.线程池的参数详解
6.ThreadPoolExecutor和spring封装的ThreadPoolTaskExecutor案例
ThreadPoolExecutor是Java的线程池
ThreadPoolTaskExecutor是spring封装的线程池
package com.thgy.bc.common.config; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.*; @Slf4j @Configuration public class ThreadPoolConfig { @Bean public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); int i = Runtime.getRuntime().availableProcessors(); //核心线程数目 executor.setCorePoolSize(i * 2); //指定最大线程数 executor.setMaxPoolSize(i * 2); //队列中最大的数目 executor.setQueueCapacity(i * 2 * 10); //线程名称前缀 executor.setThreadNamePrefix("ThreadPoolTaskExecutor-"); //rejection-policy:当pool已经达到max size的时候,如何处理新任务 //CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行 //对拒绝task的处理策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //当调度器shutdown被调用时等待当前被调度的任务完成 executor.setWaitForTasksToCompleteOnShutdown(true); //线程空闲后的最大存活时间 executor.setKeepAliveSeconds(60); //加载 executor.initialize(); log.info("初始化线程池成功"); return executor; } @Bean public ThreadPoolExecutor threadPoolExecutor() { //获取cpu核心数 int i = Runtime.getRuntime().availableProcessors(); //核心线程数 int corePoolSize = i * 2; //最大线程数 int maximumPoolSize = i * 2; //线程无引用存活时间 long keepAliveTime = 60; //时间单位 TimeUnit unit = TimeUnit.SECONDS; //任务队列,接收一个整型的参数,这个整型参数指的是队列的长度, //ArrayBlockingQueue(int,boolean),boolean类型的参数是作为可重入锁的参数进行初始化,默认false,另外初始化了notEmpty、notFull两个信号量。 BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue(i * 2 * 10); //1. 同步阻塞队列 (put,take),直接提交。直接提交策略表示线程池不对任务进行缓存。新进任务直接提交给线程池,当线程池中没有空闲线程时,创建一个新的线程处理此任务。 // 这种策略需要线程池具有无限增长的可能性。实现为:SynchronousQueue //2. 有界队列。当线程池中线程达到corePoolSize时,新进任务被放在队列里排队等待处理。有界队列(如ArrayBlockingQueue)有助于防止资源耗尽, // 但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销, // 但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小, // CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。 //3. 无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。 // 这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时, // 适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。 //线程工厂 //defaultThreadFactory() //返回用于创建新线程的默认线程工厂。 //privilegedThreadFactory() //返回一个用于创建与当前线程具有相同权限的新线程的线程工厂。 ThreadFactory threadFactory =Executors.defaultThreadFactory(); //拒绝执行处理器 RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy(); //创建线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); return threadPoolExecutor; } }
7.jdk自带的四种线程池创建方式
// 第一种线程池:固定个数的线程池,可以为每个CPU核绑定一定数量的线程数
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(processors * 2);
// 缓存线程池,无上限
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 单一线程池,永远会维护存在一条线程
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
// 固定个数的线程池,可以执行延时任务,也可以执行带有返回值的任务。
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
8.队列的详解
参考:https://www.cnblogs.com/aspirant/p/8657801.html
9.调用线程池的线程案例
public List<AccountRecordVO> requestTest() throws ExecutionException, InterruptedException { List<String> ids = Lists.newArrayList(); ids.add("1"); ids.add("2"); ids.add("3"); ids.add("4"); //有返回值的情况,定义接收返回值 List<AccountRecordVO> futureList2 = Lists.newArrayList(); //分布式计数器,若业务不需要则可以不定义 CountDownLatch countDownLatch = new CountDownLatch(ids.size()); for (String id : ids) { //调用线程池的线程执行任务 threadPoolTaskExecutor.submit(new Runnable() { @Override public void run() { test(Lists.newArrayList(id),futureList2); //计数器-1 countDownLatch.countDown(); } }); } //await阻塞,直到计数器为0 countDownLatch.await(); System.out.println("主线程"+"===="); return futureList2; } public List<AccountRecordVO> test(List<String> ids, List<AccountRecordVO> list2){ //随便写的业务逻辑代码,无实际意义,仅作演示 System.out.println("线程体" + "===="); List<AccountRecordVO> accountRecordVOS = Lists.newArrayList(); int i = 0; AccountRecordVO accountRecordVO = new AccountRecordVO(); accountRecordVO.setUserId("123"); accountRecordVO.setAmount(12333); for (String id : ids){ accountRecordVOS.add(accountRecordVO); list2.add(accountRecordVO); } try{ Thread.sleep(Long.valueOf("1000")); }catch (Exception e){ log.error(e.getMessage()); } System.out.println("线程体结束" + "===="); return accountRecordVOS; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。