赞
踩
动态线程池是个什么鬼,可能有些读者看到这里会感到很懵逼,甚至有些人看到这个题目,心里面立马就是一个wocao,其实,动态线程池在美团内部应用是很广泛的呢,出这个东西也是有原因的,下面我就仔细的聊一聊,聊之前呢,咱们先把线程池的一些知识复习一把。
线程池相信大家都不陌生,简单点说就是一个生产者消费者模型,生产者就是咱们自己的业务线程,不断地生产任务,消费者就是线程池中的线程,不断的消费任务。看一看下面的图就知道了。
我相信大家对于创建线程池都不陌生,我们看过阿里爸爸的java开发手册的都知道,人家说,不能直接这么写:
ExecutorService threadPool = Executors.newFixedThreadPool(10);
原因就是这么写任务队列默认就是Integer.MAX_VALUE,队列过大可能会造成oom,但是笔者阅读过线程池源码以后,感觉队列过大,还会存在另外一个问题,在这先容我卖个关子(干啥啥不行,装逼第一名),一会再说。
哈哈,不扯闲篇了,进入正题,线程池的创建方式应该是这样的:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
其中,这几个参数很重要:
corePoolSize:核心线程数
maximumPoolSize:最大线程数
keepAliveTime:maxPoolSize中空闲线程,销毁所需要的时间
unit:时间单位
workQueue:队列类型
handler:拒绝策略
那么,重点来了,这几个参数是如何相互配合起作用的呢,笔者看过源码以后知道。
当任务提交以后,先判断线程数量有没有达到corePoolSize,要是没有达到,就去创建线程并处理当前任务,要是达到了呢,就丢到任务队列中,但假如任务队列也满了呢,就再去创建线程跑当前任务,直到线程数量达到maximumPoolSize,如果队列满了,线程数量也达到maximumPoolSize了,这时候拒绝策略就会起作用了。
说到这里,咱们回头思考一下刚才笔者抛出的问题,队列过大,还会存在什么问题,很显然,maximumPoolSize这个参数设置可能就不起作用了,全靠corePoolSize去处理任务,而maximumPoolSize就在那静静的看着~~
这让我想起了凤凰传奇的两位歌手,请问这个场景下,谁是corePoolSize,谁是maximumPoolSize呢?
网上有很多资料在说这个问题,笔者曾经对他们深信不疑,有cpu密集型,有io密集型,甚至还有一些公式可以计算出来线程池的最理想的线程数量设置,看下面公式:
为保持处理器达到期望的使用率,最优的池的大小等于:
Nthreads = Ncpu x Ucpu x (1 + W/C)
Ncpu = CPU的数量
Ucpu = 目标CPU的使用率, 0 <= Ucpu <= 1
W/C = 等待时间与计算时间的比率
其实人家说的对,在忽悠面试官的时候非常管用,但是,咋测量呢,就算是设置好了,随着业务的迭代和系统的不断变化,这个设置还能一直保证是合理的吗?这是一个直击灵魂的拷问,那么接下来,我们就要引入动态线程池的概念了。
所谓的动态线程池其实很简单,就是要的控制线程池的几个核心参数是可被改变的,并且还可以实时监控到这几个参数的值,发现线程不够了,就改大点呗,线程过多了,担心上下文切换频频,就改小点呗,那么,我们怎么去实现呢,下面笔者就我自己在项目中的实现和大家分享一下哈,不喜勿喷!
首先,笔者自定义了一个线程池CustomThreadPoolExecutor,他继承自ThreadPoolExecutor,在CustomThreadPoolExecutor中,我们定义了一个
healthQuality变量,他是用来描述线程池的健康状态的。
@Slf4j public class CustomThreadPoolExecutor extends ThreadPoolExecutor { /** * 线程池健康状况 * 1:极佳 * 2:优良 * 3:欠佳 */ private int healthQuality = ThreadPoolHealthQualityEnum.BEST.getCode(); public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } @Override protected void beforeExecute(Thread t, Runnable r) { // 初始化任务状态 // log.info("before task execute..."); } @Override protected void afterExecute(Runnable r, Throwable t) { // 更新任务执行完成状态 // log.info("after task execute..."); } public int getHealthQuality() { return healthQuality; } public void setHealthQuality(int healthQuality) { this.healthQuality = healthQuality; } }
我们定义了一个动态线程池管理器,他实现了InitializingBean 接口,实现这个接口以后,在bean对象属性设置完以后,我们的spring会自动调用afterPropertiesSet这个方法,这点知识建议大家去看一下spring源码深度解析这本书,郝佳老师写的,目前事我看过,解读spring源码最好的了。
在afterPropertiesSet方法中,我们创建了一个线程池,并且起了一个线程去定期检查所有的线程池的健康状态。
规则是:
1.活跃线程<核心线程 状态BEST
2.活跃线程等于核心线程 && 队列未满 状态还好
3.活跃线程等于核心线程 && 队列接近满 状态不太好
4.队列接近满&&活跃线程接近最大线程数 比较糟糕
当线程池状态比较糟糕时,调用通知接口,具体通知逻辑,需要业务方自己去实现。
@Slf4j @Component public class DynamicThreadPoolManager implements InitializingBean { @Autowired private KVConfig kvConfig; @Autowired private BizNotifyHolder bizNotifyHolder; private final String DEFAULT_THREAD_POOL_NAME = "default-thread-pool"; /** * 线程池映射关系:<线程池名称, 线程池实例> */ private Map<String, CustomThreadPoolExecutor> threadPoolMap = new HashMap<>(); @Override public void afterPropertiesSet() throws Exception { CustomThreadPoolExecutor defaultThreadPoolExecutor = new CustomThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS, new ResizableLinkedBlockIngQueue<Runnable>(1000)); threadPoolMap.put(DEFAULT_THREAD_POOL_NAME, defaultThreadPoolExecutor); // 启动线程池健康检查 new Thread(new Runnable() { @Override public void run() { while (true) { for (Map.Entry<String, CustomThreadPoolExecutor> entry : threadPoolMap.entrySet()) { String threadPoolName = entry.getKey(); CustomThreadPoolExecutor executor = entry.getValue(); int corePoolSize = executor.getCorePoolSize(); long taskCount = executor.getTaskCount(); int maximumPoolSize = executor.getMaximumPoolSize(); int activeCount = executor.getActiveCount(); ResizableLinkedBlockIngQueue queue = (ResizableLinkedBlockIngQueue)executor.getQueue(); int capacity = queue.getCapacity(); int size = queue.size(); log.info("pool_name={}, core_size={}, max_size={}, active_count={}, queue_capacity={}, queue_size={}, task_count={}", threadPoolName, corePoolSize, maximumPoolSize, activeCount, queue.getCapacity(), queue.size(), taskCount); //还未达到核心线程数 状态极佳 if (activeCount < corePoolSize) { executor.setHealthQuality(ThreadPoolHealthQualityEnum.BEST.getCode()); } //达到了核心线程数&&队列还未满 状态还好 if (activeCount == corePoolSize && size < capacity) { executor.setHealthQuality(ThreadPoolHealthQualityEnum.GOOD.getCode()); } //达到了核心线程数&&队列接近满 状态不太好 if (activeCount == corePoolSize && nearBetween(size, capacity)) { executor.setHealthQuality(ThreadPoolHealthQualityEnum.WORSE.getCode()); } //队列接近满&&接近最大线程数 比较糟糕 if (nearBetween(size, capacity) && nearBetween(activeCount, maximumPoolSize)) { executor.setHealthQuality(ThreadPoolHealthQualityEnum.WORST.getCode()); //队列接近满&&接近最大线程数 比较糟糕 通知 if (nearBetween(size, capacity) && nearBetween(activeCount, maximumPoolSize)) { executor.setHealthQuality(ThreadPoolHealthQualityEnum.WORST.getCode()); notify(executor); } } } try { Thread.sleep(kvConfig.dynamicThreadPoolHealthCheckInterval); } catch (InterruptedException e) { } } } //通知 private void notify(CustomThreadPoolExecutor executor) { try{ WarnNotify notify = SpringContextUtil.getBean(WarnNotify.class); notify.notify(executor); }catch (Exception e){ log.error("通知异常",e); } } }).start(); } /** * 添加线程池映射关系 * * @param name * @param executor */ public void add(String name, CustomThreadPoolExecutor executor) { if (threadPoolMap.get(name) != null) { return; } threadPoolMap.put(name, executor); } /** * 指定线程池执行 * * @param task */ public void execute(String threadPoolName, Runnable task) { get(threadPoolName).execute(task); } /** * 不指定线程池:选择一个健康的线程池并执行 * * @param task */ public void execute(Runnable task) { String targetThreadPool = null; for (Map.Entry<String, CustomThreadPoolExecutor> entry : threadPoolMap.entrySet()) { if (entry.getValue().getHealthQuality() == ThreadPoolHealthQualityEnum.BEST.getCode()) { targetThreadPool = entry.getKey(); break; } } if (StringUtils.isBlank(targetThreadPool)) { for (Map.Entry<String, CustomThreadPoolExecutor> entry : threadPoolMap.entrySet()) { if (entry.getValue().getHealthQuality() == ThreadPoolHealthQualityEnum.GOOD.getCode()) { targetThreadPool = entry.getKey(); break; } } } if (StringUtils.isBlank(targetThreadPool)) { for (Map.Entry<String, CustomThreadPoolExecutor> entry : threadPoolMap.entrySet()) { if (entry.getValue().getHealthQuality() == ThreadPoolHealthQualityEnum.WORSE.getCode()) { targetThreadPool = entry.getKey(); break; } } } if (StringUtils.isBlank(targetThreadPool)) { for (Map.Entry<String, CustomThreadPoolExecutor> entry : threadPoolMap.entrySet()) { if (entry.getValue().getHealthQuality() == ThreadPoolHealthQualityEnum.WORST.getCode()) { targetThreadPool = entry.getKey(); break; } } } try { threadPoolMap.get(targetThreadPool).execute(task); } catch (Exception e) { log.error("线程池任务调度异常, targetThreadPool={}", targetThreadPool, e); BizNotifyContext context = new BizNotifyContext(); if (e instanceof RejectedExecutionException) { context.setTitle("线程池任务调度异常: 拒绝投递"); context.setNotifyType(BizNotifyTypeEnum.ZYL.getCode()); context.setDetail(targetThreadPool); bizNotifyHolder.handle(context); } else { context.setTitle("线程池任务调度异常: " + e.getLocalizedMessage()); context.setNotifyType(BizNotifyTypeEnum.ZYL.getCode()); context.setDetail(targetThreadPool); bizNotifyHolder.handle(context); } } } /** * 添加线程池映射关系 * * @param name */ public ThreadPoolExecutor get(String name) { CustomThreadPoolExecutor threadPoolExecutor = threadPoolMap.get(name); if (threadPoolExecutor == null) { return threadPoolMap.get(DEFAULT_THREAD_POOL_NAME); } return threadPoolExecutor; } /** * 返回默认线程池 * * @return */ public CustomThreadPoolExecutor get() { return threadPoolMap.get(DEFAULT_THREAD_POOL_NAME); } /** * 移除线程池映射关系 * * @param name */ public void remove(String name) { if (threadPoolMap.get(name) == null) { return; } threadPoolMap.remove(name); } /** * @description:获取线程池名称 * @date:2021/4/19 3:31 下午 */ public List<String> getThreadPoolName() { return Lists.newArrayList(this.threadPoolMap.keySet()); } 此处省略N行代码。。。 }
我们来看一下获取各个线程池状态值的方法,很简单,就是获取一下各个线程池的状态以及线程池的当前核心参数值。
public List<ThreadPoolStatusInfo> getThreadPoolStatus() { Map<String, CustomThreadPoolExecutor> threadPoolMap = this.threadPoolMap; List<ThreadPoolStatusInfo> data = Lists.newArrayListWithCapacity(threadPoolMap.size()); for (Map.Entry<String, CustomThreadPoolExecutor> entry : threadPoolMap.entrySet()) { ThreadPoolStatusInfo threadPoolStatusInfo = new ThreadPoolStatusInfo(); String threadPoolName = entry.getKey(); CustomThreadPoolExecutor executor = entry.getValue(); threadPoolStatusInfo.setPool_name(threadPoolName); threadPoolStatusInfo.setCore_thread_size(executor.getCorePoolSize()); threadPoolStatusInfo.setMax_thread_size(executor.getMaximumPoolSize()); threadPoolStatusInfo.setActive_thread_size(executor.getActiveCount()); if (executor.getQueue() instanceof ResizableLinkedBlockIngQueue) { ResizableLinkedBlockIngQueue resizableLinkedBlockIngQueue = (ResizableLinkedBlockIngQueue)executor.getQueue(); threadPoolStatusInfo.setQueue_type("ResizableLinkedBlockIngQueue"); threadPoolStatusInfo.setQueue_capacity(resizableLinkedBlockIngQueue.getCapacity()); threadPoolStatusInfo.setQueue_size(resizableLinkedBlockIngQueue.size()); threadPoolStatusInfo.setHealth_quality(executor.getHealthQuality()); threadPoolStatusInfo.setTask_count(executor.getTaskCount()); threadPoolStatusInfo.setCompleted_task_count(executor.getCompletedTaskCount()); data.add(threadPoolStatusInfo); } } return data; }
接下来我们看一下线程池变量重置的方法,比较简单,就是修改一下参数值而已。
/** * 线程池实例重置 * * @param name * @param param */ public synchronized void reset(String name, ResetThreadPoolParam param) { ThreadPoolExecutor threadPoolExecutor = threadPoolMap.get(name); if (threadPoolExecutor == null) { return; } Integer corePoolSize = param.getCorePoolSize(); Integer maximumPoolSize = param.getMaximumPoolSize(); Integer queueCapacity = param.getQueueCapacity(); if (corePoolSize != null && corePoolSize <= 0) { throw new BusinessException("核心线程数配置非法"); } if (maximumPoolSize != null && maximumPoolSize <= 0) { throw new BusinessException("最大线程数配置非法"); } if (queueCapacity != null && queueCapacity <= 0) { throw new BusinessException("队列容量配置非法"); } if (corePoolSize != null && maximumPoolSize != null && corePoolSize > maximumPoolSize) { throw new BusinessException("核心线程数不能大于最大线程数"); } if (corePoolSize != null) { threadPoolExecutor.setCorePoolSize(corePoolSize); } if (maximumPoolSize != null) { threadPoolExecutor.setMaximumPoolSize(maximumPoolSize); } if (queueCapacity != null && queueCapacity > 0 && threadPoolExecutor.getQueue() instanceof ResizableLinkedBlockIngQueue) { ResizableLinkedBlockIngQueue resizableLinkedBlockIngQueue = (ResizableLinkedBlockIngQueue) threadPoolExecutor.getQueue(); resizableLinkedBlockIngQueue.setCapacity(queueCapacity); } }
那么线程池参数配置在哪里呢? 我们使用了apollo进行管理,代码如下:
@Slf4j @Service public class ThreadPoolConfigModifyWatcher { @Autowired private DynamicThreadPoolManager dynamicThreadPoolManager; @ApolloConfigChangeListener(value = {"thread-pool"}) public void watchConfigChange(ConfigChangeEvent changeEvent) { Set<String> changedKeys = changeEvent.changedKeys(); for (String threadPoolName : changedKeys) { String threadPoolConfig = changeEvent.getChange(threadPoolName).getNewValue(); ResetThreadPoolParam param = JacksonUtil.json2Bean(threadPoolConfig, ResetThreadPoolParam.class); try { dynamicThreadPoolManager.reset(threadPoolName, param); } catch (Exception e) { log.error("监听线程池配置变更重置线程池实例发生异常", e); } } } }
讲到这里,最核心的实现就已经讲完了,还剩下一些细枝末节就不说啦,感兴趣的笔者可以自己去研究哈。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。