当前位置:   article > 正文

动态线程池的设计和实现_resizablelinkedblockingqueue

resizablelinkedblockingqueue

动态线程池是个什么鬼,可能有些读者看到这里会感到很懵逼,甚至有些人看到这个题目,心里面立马就是一个wocao,其实,动态线程池在美团内部应用是很广泛的呢,出这个东西也是有原因的,下面我就仔细的聊一聊,聊之前呢,咱们先把线程池的一些知识复习一把。

1.线程池简介

线程池相信大家都不陌生,简单点说就是一个生产者消费者模型,生产者就是咱们自己的业务线程,不断地生产任务,消费者就是线程池中的线程,不断的消费任务。看一看下面的图就知道了。
在这里插入图片描述

2.线程池的核心参数

我相信大家对于创建线程池都不陌生,我们看过阿里爸爸的java开发手册的都知道,人家说,不能直接这么写:

ExecutorService threadPool = Executors.newFixedThreadPool(10);
  • 1

原因就是这么写任务队列默认就是Integer.MAX_VALUE,队列过大可能会造成oom,但是笔者阅读过线程池源码以后,感觉队列过大,还会存在另外一个问题,在这先容我卖个关子(干啥啥不行,装逼第一名),一会再说。
哈哈,不扯闲篇了,进入正题,线程池的创建方式应该是这样的:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

其中,这几个参数很重要:

corePoolSize:核心线程数
maximumPoolSize:最大线程数
keepAliveTime:maxPoolSize中空闲线程,销毁所需要的时间
unit:时间单位
workQueue:队列类型
handler:拒绝策略
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

那么,重点来了,这几个参数是如何相互配合起作用的呢,笔者看过源码以后知道。
当任务提交以后,先判断线程数量有没有达到corePoolSize,要是没有达到,就去创建线程并处理当前任务,要是达到了呢,就丢到任务队列中,但假如任务队列也满了呢,就再去创建线程跑当前任务,直到线程数量达到maximumPoolSize,如果队列满了,线程数量也达到maximumPoolSize了,这时候拒绝策略就会起作用了。

说到这里,咱们回头思考一下刚才笔者抛出的问题,队列过大,还会存在什么问题,很显然,maximumPoolSize这个参数设置可能就不起作用了,全靠corePoolSize去处理任务,而maximumPoolSize就在那静静的看着~~
这让我想起了凤凰传奇的两位歌手,请问这个场景下,谁是corePoolSize,谁是maximumPoolSize呢?
在这里插入图片描述

3.线程池的参数如何设置呢

网上有很多资料在说这个问题,笔者曾经对他们深信不疑,有cpu密集型,有io密集型,甚至还有一些公式可以计算出来线程池的最理想的线程数量设置,看下面公式:

为保持处理器达到期望的使用率,最优的池的大小等于:
Nthreads = Ncpu x Ucpu x (1 + W/C)
Ncpu = CPU的数量
Ucpu = 目标CPU的使用率, 0 <= Ucpu <= 1
W/C = 等待时间与计算时间的比率
  • 1
  • 2
  • 3
  • 4
  • 5

其实人家说的对,在忽悠面试官的时候非常管用,但是,咋测量呢,就算是设置好了,随着业务的迭代和系统的不断变化,这个设置还能一直保证是合理的吗?这是一个直击灵魂的拷问,那么接下来,我们就要引入动态线程池的概念了。

4. 动态线程池

所谓的动态线程池其实很简单,就是要的控制线程池的几个核心参数是可被改变的,并且还可以实时监控到这几个参数的值,发现线程不够了,就改大点呗,线程过多了,担心上下文切换频频,就改小点呗,那么,我们怎么去实现呢,下面笔者就我自己在项目中的实现和大家分享一下哈,不喜勿喷!

首先,笔者自定义了一个线程池CustomThreadPoolExecutor,他继承自ThreadPoolExecutor,在CustomThreadPoolExecutor中,我们定义了一个
healthQuality变量,他是用来描述线程池的健康状态的。

@Slf4j
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {

    /**
     * 线程池健康状况
     * 1:极佳
     * 2:优良
     * 3:欠佳
     */
    private int healthQuality = ThreadPoolHealthQualityEnum.BEST.getCode();

    public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        // 初始化任务状态
        // log.info("before task execute...");
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        // 更新任务执行完成状态
        // log.info("after task execute...");
    }

    public int getHealthQuality() {
        return healthQuality;
    }

    public void setHealthQuality(int healthQuality) {
        this.healthQuality = healthQuality;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

我们定义了一个动态线程池管理器,他实现了InitializingBean 接口,实现这个接口以后,在bean对象属性设置完以后,我们的spring会自动调用afterPropertiesSet这个方法,这点知识建议大家去看一下spring源码深度解析这本书,郝佳老师写的,目前事我看过,解读spring源码最好的了。
在afterPropertiesSet方法中,我们创建了一个线程池,并且起了一个线程去定期检查所有的线程池的健康状态。

规则是:
1.活跃线程<核心线程 状态BEST
2.活跃线程等于核心线程 && 队列未满 状态还好
3.活跃线程等于核心线程 && 队列接近满 状态不太好
4.队列接近满&&活跃线程接近最大线程数 比较糟糕
当线程池状态比较糟糕时,调用通知接口,具体通知逻辑,需要业务方自己去实现。

@Slf4j
@Component
public class DynamicThreadPoolManager implements InitializingBean {

    @Autowired
    private KVConfig kvConfig;
    @Autowired
    private BizNotifyHolder bizNotifyHolder;

    private final String DEFAULT_THREAD_POOL_NAME = "default-thread-pool";

    /**
     * 线程池映射关系:<线程池名称, 线程池实例>
     */
    private Map<String, CustomThreadPoolExecutor> threadPoolMap = new HashMap<>();

    @Override
    public void afterPropertiesSet() throws Exception {
        CustomThreadPoolExecutor defaultThreadPoolExecutor = new CustomThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS, new ResizableLinkedBlockIngQueue<Runnable>(1000));
        threadPoolMap.put(DEFAULT_THREAD_POOL_NAME, defaultThreadPoolExecutor);
        // 启动线程池健康检查
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    for (Map.Entry<String, CustomThreadPoolExecutor> entry : threadPoolMap.entrySet()) {
                        String threadPoolName = entry.getKey();
                        CustomThreadPoolExecutor executor = entry.getValue();
                        int corePoolSize = executor.getCorePoolSize();
                        long taskCount = executor.getTaskCount();
                        int maximumPoolSize = executor.getMaximumPoolSize();
                        int activeCount = executor.getActiveCount();
                        ResizableLinkedBlockIngQueue queue = (ResizableLinkedBlockIngQueue)executor.getQueue();
                        int capacity = queue.getCapacity();
                        int size = queue.size();
                        log.info("pool_name={}, core_size={}, max_size={}, active_count={}, queue_capacity={}, queue_size={}, task_count={}", threadPoolName, corePoolSize, maximumPoolSize, activeCount, queue.getCapacity(), queue.size(), taskCount);
                         //还未达到核心线程数 状态极佳
                        if (activeCount < corePoolSize) {
                            executor.setHealthQuality(ThreadPoolHealthQualityEnum.BEST.getCode());
                        }
                        //达到了核心线程数&&队列还未满 状态还好
                        if (activeCount == corePoolSize && size < capacity) {
                            executor.setHealthQuality(ThreadPoolHealthQualityEnum.GOOD.getCode());
                        }
                        //达到了核心线程数&&队列接近满 状态不太好
                        if (activeCount == corePoolSize && nearBetween(size, capacity)) {
                            executor.setHealthQuality(ThreadPoolHealthQualityEnum.WORSE.getCode());
                        }
                        //队列接近满&&接近最大线程数  比较糟糕
                        if (nearBetween(size, capacity) && nearBetween(activeCount, maximumPoolSize)) {
                            executor.setHealthQuality(ThreadPoolHealthQualityEnum.WORST.getCode());
                             //队列接近满&&接近最大线程数  比较糟糕 通知
                        if (nearBetween(size, capacity) && nearBetween(activeCount, maximumPoolSize)) {
                            executor.setHealthQuality(ThreadPoolHealthQualityEnum.WORST.getCode());
                            notify(executor);
                        }
                        }
                    }
                    try {
                        Thread.sleep(kvConfig.dynamicThreadPoolHealthCheckInterval);
                    } catch (InterruptedException e) {
                    }
                }
            }
            //通知
             private void notify(CustomThreadPoolExecutor executor) {
                try{
                    WarnNotify notify = SpringContextUtil.getBean(WarnNotify.class);
                    notify.notify(executor);
                }catch (Exception e){
                    log.error("通知异常",e);
                }
            }
        }).start();
    }

    /**
     * 添加线程池映射关系
     *
     * @param name
     * @param executor
     */
    public void add(String name, CustomThreadPoolExecutor executor) {
        if (threadPoolMap.get(name) != null) {
            return;
        }
        threadPoolMap.put(name, executor);
    }

    /**
     * 指定线程池执行
     *
     * @param task
     */
    public void execute(String threadPoolName, Runnable task) {
        get(threadPoolName).execute(task);
    }

    /**
     * 不指定线程池:选择一个健康的线程池并执行
     *
     * @param task
     */
    public void execute(Runnable task) {
        String targetThreadPool = null;
        for (Map.Entry<String, CustomThreadPoolExecutor> entry : threadPoolMap.entrySet()) {
            if (entry.getValue().getHealthQuality() == ThreadPoolHealthQualityEnum.BEST.getCode()) {
                targetThreadPool = entry.getKey();
                break;
            }
        }
        if (StringUtils.isBlank(targetThreadPool)) {
            for (Map.Entry<String, CustomThreadPoolExecutor> entry : threadPoolMap.entrySet()) {
                if (entry.getValue().getHealthQuality() == ThreadPoolHealthQualityEnum.GOOD.getCode()) {
                    targetThreadPool = entry.getKey();
                    break;
                }
            }
        }
        if (StringUtils.isBlank(targetThreadPool)) {
            for (Map.Entry<String, CustomThreadPoolExecutor> entry : threadPoolMap.entrySet()) {
                if (entry.getValue().getHealthQuality() == ThreadPoolHealthQualityEnum.WORSE.getCode()) {
                    targetThreadPool = entry.getKey();
                    break;
                }
            }
        }
         if (StringUtils.isBlank(targetThreadPool)) {
            for (Map.Entry<String, CustomThreadPoolExecutor> entry : threadPoolMap.entrySet()) {
                if (entry.getValue().getHealthQuality() == ThreadPoolHealthQualityEnum.WORST.getCode()) {
                    targetThreadPool = entry.getKey();
                    break;
                }
            }
        }
        try {
            threadPoolMap.get(targetThreadPool).execute(task);
        } catch (Exception e) {
            log.error("线程池任务调度异常, targetThreadPool={}", targetThreadPool, e);
            BizNotifyContext context = new BizNotifyContext();
            if (e instanceof RejectedExecutionException) {
                context.setTitle("线程池任务调度异常: 拒绝投递");
                context.setNotifyType(BizNotifyTypeEnum.ZYL.getCode());
                context.setDetail(targetThreadPool);
                bizNotifyHolder.handle(context);
            } else {
                context.setTitle("线程池任务调度异常: " + e.getLocalizedMessage());
                context.setNotifyType(BizNotifyTypeEnum.ZYL.getCode());
                context.setDetail(targetThreadPool);
                bizNotifyHolder.handle(context);
            }
        }
    }

    /**
     * 添加线程池映射关系
     *
     * @param name
     */
    public ThreadPoolExecutor get(String name) {
        CustomThreadPoolExecutor threadPoolExecutor = threadPoolMap.get(name);
        if (threadPoolExecutor == null) {
            return threadPoolMap.get(DEFAULT_THREAD_POOL_NAME);
        }
        return threadPoolExecutor;
    }

    /**
     * 返回默认线程池
     *
     * @return
     */
    public CustomThreadPoolExecutor get() {
        return threadPoolMap.get(DEFAULT_THREAD_POOL_NAME);
    }

    /**
     * 移除线程池映射关系
     *
     * @param name
     */
    public void remove(String name) {
        if (threadPoolMap.get(name) == null) {
            return;
        }
        threadPoolMap.remove(name);
    }

    /**
     * @description:获取线程池名称
     * @date:2021/4/19 3:31 下午
     */
    public List<String> getThreadPoolName() {
        return Lists.newArrayList(this.threadPoolMap.keySet());
    }

  此处省略N行代码。。。
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198

我们来看一下获取各个线程池状态值的方法,很简单,就是获取一下各个线程池的状态以及线程池的当前核心参数值。

 public List<ThreadPoolStatusInfo> getThreadPoolStatus() {
        Map<String, CustomThreadPoolExecutor> threadPoolMap = this.threadPoolMap;
        List<ThreadPoolStatusInfo> data = Lists.newArrayListWithCapacity(threadPoolMap.size());
        for (Map.Entry<String, CustomThreadPoolExecutor> entry : threadPoolMap.entrySet()) {
            ThreadPoolStatusInfo threadPoolStatusInfo = new ThreadPoolStatusInfo();
            String threadPoolName = entry.getKey();
            CustomThreadPoolExecutor executor = entry.getValue();
            threadPoolStatusInfo.setPool_name(threadPoolName);
            threadPoolStatusInfo.setCore_thread_size(executor.getCorePoolSize());
            threadPoolStatusInfo.setMax_thread_size(executor.getMaximumPoolSize());
            threadPoolStatusInfo.setActive_thread_size(executor.getActiveCount());
            if (executor.getQueue() instanceof ResizableLinkedBlockIngQueue) {
                ResizableLinkedBlockIngQueue resizableLinkedBlockIngQueue = (ResizableLinkedBlockIngQueue)executor.getQueue();
                threadPoolStatusInfo.setQueue_type("ResizableLinkedBlockIngQueue");
                threadPoolStatusInfo.setQueue_capacity(resizableLinkedBlockIngQueue.getCapacity());
                threadPoolStatusInfo.setQueue_size(resizableLinkedBlockIngQueue.size());
                threadPoolStatusInfo.setHealth_quality(executor.getHealthQuality());
                threadPoolStatusInfo.setTask_count(executor.getTaskCount());
                threadPoolStatusInfo.setCompleted_task_count(executor.getCompletedTaskCount());
                data.add(threadPoolStatusInfo);
            }
        }
        return data;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

接下来我们看一下线程池变量重置的方法,比较简单,就是修改一下参数值而已。

 /**
     * 线程池实例重置
     *
     * @param name
     * @param param
     */
    public synchronized void reset(String name, ResetThreadPoolParam param) {
        ThreadPoolExecutor threadPoolExecutor = threadPoolMap.get(name);
        if (threadPoolExecutor == null) {
            return;
        }
        Integer corePoolSize = param.getCorePoolSize();
        Integer maximumPoolSize = param.getMaximumPoolSize();
        Integer queueCapacity = param.getQueueCapacity();
        if (corePoolSize != null && corePoolSize <= 0) {
            throw new BusinessException("核心线程数配置非法");
        }
        if (maximumPoolSize != null && maximumPoolSize <= 0) {
            throw new BusinessException("最大线程数配置非法");
        }
        if (queueCapacity != null && queueCapacity <= 0) {
            throw new BusinessException("队列容量配置非法");
        }
        if (corePoolSize != null && maximumPoolSize != null && corePoolSize > maximumPoolSize) {
            throw new BusinessException("核心线程数不能大于最大线程数");
        }
        if (corePoolSize != null) {
            threadPoolExecutor.setCorePoolSize(corePoolSize);
        }
        if (maximumPoolSize != null) {
            threadPoolExecutor.setMaximumPoolSize(maximumPoolSize);
        }
        if (queueCapacity != null && queueCapacity > 0 && threadPoolExecutor.getQueue() instanceof ResizableLinkedBlockIngQueue) {
            ResizableLinkedBlockIngQueue resizableLinkedBlockIngQueue = (ResizableLinkedBlockIngQueue) threadPoolExecutor.getQueue();
            resizableLinkedBlockIngQueue.setCapacity(queueCapacity);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

那么线程池参数配置在哪里呢? 我们使用了apollo进行管理,代码如下:

@Slf4j
@Service
public class ThreadPoolConfigModifyWatcher {

    @Autowired
    private DynamicThreadPoolManager dynamicThreadPoolManager;

    @ApolloConfigChangeListener(value = {"thread-pool"})
    public void watchConfigChange(ConfigChangeEvent changeEvent) {
        Set<String> changedKeys = changeEvent.changedKeys();
        for (String threadPoolName : changedKeys) {
            String threadPoolConfig = changeEvent.getChange(threadPoolName).getNewValue();
            ResetThreadPoolParam param = JacksonUtil.json2Bean(threadPoolConfig, ResetThreadPoolParam.class);
            try {
                dynamicThreadPoolManager.reset(threadPoolName, param);
            } catch (Exception e) {
                log.error("监听线程池配置变更重置线程池实例发生异常", e);
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

讲到这里,最核心的实现就已经讲完了,还剩下一些细枝末节就不说啦,感兴趣的笔者可以自己去研究哈。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/156109
推荐阅读
相关标签
  

闽ICP备14008679号