当前位置:   article > 正文

简历写着熟悉 Dubbo,居然连 Dubbo 线程池监控都不知道?

dubbo怎么在简历上呈现

点击关注公众号,实用技术文章及时了解9f6c501a3a698dc7341adcb41310ab93.png

Dubbo 是一款优秀的微服务框架,它以其高性能、简单易用、易扩展等特点,广泛应用于互联网、金融保险、科技公司、制造业、零售物流等多个领域。如今,Dubbo 框架已经成了互联网开发中比较常用的技术框架。

在Dubbo框架中,当客户端调用服务端的时候,请求抵达了服务端之后,会有专门的线程池去接收参数并且处理。所以如果要实现Dubbo的线程池监控,就需要先了解下Dubbo底层对于业务线程池的实现原理。

Dubbo底层对于线程池的查看

这里我所使用的框架是 Dubbo 2.7.8 版本,它在底层对于线程池的管理是通过一个叫做ExecutorRepository 的类处理的,这个类负责创建并管理 Dubbo 中的线程池,通过该扩展接口,我们可以获取到Dubbo再实际运行中的业务线程池对象。

具体的处理逻辑部分如下所示:

  1. package org.idea.dubbo.monitor.core.collect;
  2. import org.apache.dubbo.common.extension.ExtensionLoader;
  3. import org.apache.dubbo.common.threadpool.manager.DefaultExecutorRepository;
  4. import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
  5. import java.lang.reflect.Field;
  6. import java.util.concurrent.ConcurrentMap;
  7. import java.util.concurrent.ExecutorService;
  8. import java.util.concurrent.ThreadPoolExecutor;
  9. /**
  10.  * @Author idea
  11.  * @Date created in 7:04 下午 2022/6/29
  12.  */
  13. public class DubboThreadPoolCollector {
  14.     /**
  15.      * 获取Dubbo的线程池
  16.      * @return
  17.      */
  18.     public static ThreadPoolExecutor getDubboThreadPoolInfo(){
  19.         //dubbo线程池数量监控
  20.         try {
  21.             ExtensionLoader<ExecutorRepository> executorRepositoryExtensionLoader = ExtensionLoader.getExtensionLoader(ExecutorRepository.class);
  22.             DefaultExecutorRepository defaultExecutorRepository = (DefaultExecutorRepository) executorRepositoryExtensionLoader.getDefaultExtension();
  23.             Field dataField = defaultExecutorRepository.getClass().getDeclaredField("data");
  24.             dataField.setAccessible(true);
  25.             ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> data = (ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>>) dataField.get(defaultExecutorRepository);
  26.             ConcurrentMap<Integer, ExecutorService> executorServiceConcurrentMap = data.get("java.util.concurrent.ExecutorService");
  27.             //获取到默认的线程池模型
  28.             ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorServiceConcurrentMap.get(9090);
  29.             return threadPoolExecutor;
  30.         } catch (Exception e) {
  31.             e.printStackTrace();
  32.         }
  33.         return null;
  34.     }
  35. }

好了,现在我们知道如何在代码中实时查看Dubbo线程池的信息了,那么接下来要做的就是如何采集这些线程池的数据,并且进行上报,最后将上报存储的数据通过统计图的方式展示出来。

下边我们按照采集,上报,展示三个环节来展示数据。

采集数据

在采集数据这块,有两种思路去采集,分别如下:

  • 后台开启一个定时任务,然后每秒都查询一下线程池的参数信息。

  • 每次有请求抵达provider的时候,就查看一些线程池的参数信息。

采用两种不同的模式采集出来的数据,可能会有些差异,下边是两种方式的比对:

统计方式实现难度可能存在的问题
定时任务采集数据简单定时任务执行间隙中的数据无法采集,导致数据失真。
请求抵达是采集数据稍为复杂一些在每次请求的时候都需要采集数据,会对性能有一定损耗。

通过对实际的业务场景分析,其实第二种方式对应用的性能损耗极微,甚至可以忽略,所以使用这种方式去采集数据的话会比较合适。

下边让我们一起来看看这种方式采集数据的话,该如何实现。

首先我们需要自己定义一个filter过滤器:

  1. package org.idea.dubbo.monitor.core.filter;
  2. import org.apache.dubbo.common.constants.CommonConstants;
  3. import org.apache.dubbo.common.extension.Activate;
  4. import org.apache.dubbo.rpc.*;
  5. import org.idea.dubbo.monitor.core.DubboMonitorHandler;
  6. import java.util.concurrent.ThreadPoolExecutor;
  7. import static org.idea.dubbo.monitor.core.config.CommonCache.DUBBO_INFO_STORE_CENTER;
  8. /**
  9.  * @Author idea
  10.  * @Date created in 2:33 下午 2022/7/1
  11.  */
  12. @Activate(group = CommonConstants.PROVIDER)
  13. public class DubboRecordFilter implements Filter {
  14.     @Override
  15.     public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
  16.         ThreadPoolExecutor threadPoolExecutor = DubboMonitorHandler.getDubboThreadPoolInfo();
  17.         //请求的时候趣统计线程池,当请求量太小的时候,这块的数据可能不准确,但是如果请求量大的话,就接近准确了
  18.         DUBBO_INFO_STORE_CENTER.reportInfo(9090,threadPoolExecutor.getActiveCount(),threadPoolExecutor.getQueue().size());
  19.         return invoker.invoke(invocation);
  20.     }
  21. }

关于DUBBO_INFO_STORE_CENTER的代码如下所示:

并且在dubbo的spi配置文件中指定好它们:

dubboRecordFilter=org.idea.dubbo.monitor.core.filter.DubboRecordFilter

当provider加入了这个过滤器以后,若有请求抵达服务端,则会通过这个filter触发采集操作。

  1. package org.idea.dubbo.monitor.core.collect;
  2. import org.idea.dubbo.monitor.core.bo.DubboInfoStoreBO;
  3. import java.util.Map;
  4. import java.util.concurrent.ConcurrentHashMap;
  5. /**
  6.  * Dubbo数据存储中心
  7.  *
  8.  * @Author idea
  9.  * @Date created in 11:15 上午 2022/7/1
  10.  */
  11. public class DubboInfoStoreCenter {
  12.     private static Map<Integer, DubboInfoStoreBO> dubboInfoStoreBOMap = new ConcurrentHashMap<>();
  13.     public void reportInfo(Integer port, Integer corePoolSize, Integer queueLength) {
  14.         synchronized (this) {
  15.             DubboInfoStoreBO dubboInfoStoreBO = dubboInfoStoreBOMap.get(port);
  16.             if (dubboInfoStoreBO != null) {
  17.                 boolean hasChange = false;
  18.                 int currentMaxPoolSize = dubboInfoStoreBO.getMaxCorePoolSize();
  19.                 int currentMaxQueueLength = dubboInfoStoreBO.getMaxCorePoolSize();
  20.                 if (corePoolSize > currentMaxPoolSize) {
  21.                     dubboInfoStoreBO.setMaxCorePoolSize(corePoolSize);
  22.                     hasChange = true;
  23.                 }
  24.                 if (queueLength > currentMaxQueueLength) {
  25.                     dubboInfoStoreBO.setMaxQueueLength(queueLength);
  26.                     hasChange = true;
  27.                 }
  28.                 if (hasChange) {
  29.                     dubboInfoStoreBOMap.put(port, dubboInfoStoreBO);
  30.                 }
  31.             } else {
  32.                 dubboInfoStoreBO = new DubboInfoStoreBO();
  33.                 dubboInfoStoreBO.setMaxQueueLength(queueLength);
  34.                 dubboInfoStoreBO.setMaxCorePoolSize(corePoolSize);
  35.                 dubboInfoStoreBOMap.put(port, dubboInfoStoreBO);
  36.             }
  37.         }
  38.     }
  39.     public DubboInfoStoreBO getInfo(Integer port){
  40.         return dubboInfoStoreBOMap.get(port);
  41.     }
  42.     public void cleanInfo(Integer port) {
  43.         dubboInfoStoreBOMap.remove(port);
  44.     }
  45. }

注意这个采集类只会采集一段时间的数据,然后定期会清空重置。

之所以这么做,是希望用这个map统计指定时间内的最大线程数和最大队列数,接着当这些峰值数据被上报到存储中心后就进行清空。

关于DubboInfoStoreCenter对象的定义,我将它放置在了一个叫做CommonCache的类里面,具体如下:

  1. package org.idea.dubbo.monitor.core.config;
  2. import org.idea.dubbo.monitor.core.store.DubboInfoStoreCenter;
  3. /**
  4.  * @Author idea
  5.  * @Date created in 12:15 下午 2022/7/1
  6.  */
  7. public class CommonCache {
  8.     public static DubboInfoStoreCenter DUBBO_INFO_STORE_CENTER = new DubboInfoStoreCenter();
  9. }

所以在上边的过滤器中,我们才可以直接通过静态类引用去调用它的采集接口。

好了,现在整体来看,我们已经实现了在过滤器中去实时采集线程池的数据,并且将它暂存在了一个Map表中,这个map的数据主要是记录了某段时间内的线程池峰值,供采集器角色去使用。

那么接下来,我们就来看看上报器模块主要做了哪些操作。

上报数据

上报数据前,最重要的就是选择合适的存储组件了。首先上报的数据本身体量并不大,我们可以将采集时间短设置为15秒,那么设计一个上报任务,每隔15秒采集一次dubbo线程池的数据。那么一天的时间就需上报5760次,假设一次上报存储一条记录的话,那么一天下来所需要存储的数据也并不是特别多。

并且存储下来的服务数据实际上也并不需要保留太长的时间,一般存储个一周时间也就足够了,所以最终我选用啦Redis进行这方面的存储。


9fa2d4f2f781e7a602be3daa03121f27.png

我们实际每次关注的数据字段主要有三个,关于它们的定义我整理成了下边这个对象:

  1. package org.idea.dubbo.monitor.core.bo;
  2. /**
  3.  * @Author idea
  4.  * @Date created in 7:17 下午 2022/6/29
  5.  */
  6. public class ThreadInfoBO {
  7.     private Integer activePoolSize;
  8.     private Integer queueLength;
  9.     private long saveTime;
  10.     public Integer getActivePoolSize() {
  11.         return activePoolSize;
  12.     }
  13.     public void setActivePoolSize(Integer activePoolSize) {
  14.         this.activePoolSize = activePoolSize;
  15.     }
  16.     public Integer getQueueLength() {
  17.         return queueLength;
  18.     }
  19.     public void setQueueLength(Integer queueLength) {
  20.         this.queueLength = queueLength;
  21.     }
  22.     public long getSaveTime() {
  23.         return saveTime;
  24.     }
  25.     public void setSaveTime(long saveTime) {
  26.         this.saveTime = saveTime;
  27.     }
  28.     @Override
  29.     public String toString() {
  30.         return "ThreadInfoBO{" +
  31.                 ", queueLength=" + queueLength +
  32.                 ", saveTime=" + saveTime +
  33.                 '}';
  34.     }
  35. }

接着会开启一个线程任务,每间隔15秒就会执行一轮上报数据的动作:

  1. package org.idea.dubbo.monitor.core.report;
  2. import com.alibaba.fastjson.JSON;
  3. import org.idea.dubbo.monitor.core.bo.DubboInfoStoreBO;
  4. import org.idea.dubbo.monitor.core.bo.ThreadInfoBO;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.boot.CommandLineRunner;
  9. import java.util.concurrent.ExecutorService;
  10. import java.util.concurrent.Executors;
  11. import static org.idea.dubbo.monitor.core.config.CommonCache.DUBBO_INFO_STORE_CENTER;
  12. /**
  13.  * @Author idea
  14.  * @Date created in 12:13 下午 2022/7/1
  15.  */
  16. public class DubboInfoReportHandler implements CommandLineRunner {
  17.     @Autowired
  18.     private IReportTemplate reportTemplate;
  19.     private static final Logger LOGGER = LoggerFactory.getLogger(DubboInfoReportHandler.class);
  20.     public static ExecutorService executorService = Executors.newFixedThreadPool(1);
  21.     public static int DUBBO_PORT = 9090;
  22.     @Override
  23.     public void run(String... args) throws Exception {
  24.         executorService.submit(new Runnable() {
  25.             @Override
  26.             public void run() {
  27.                 while (true) {
  28.                     try {
  29.                         Thread.sleep(10000);
  30.                         DubboInfoStoreBO dubboInfoStoreBO = DUBBO_INFO_STORE_CENTER.getInfo(DUBBO_PORT);
  31.                         ThreadInfoBO threadInfoBO = new ThreadInfoBO();
  32.                         threadInfoBO.setSaveTime(System.currentTimeMillis());
  33.                         if(dubboInfoStoreBO!=null){
  34.                             threadInfoBO.setQueueLength(dubboInfoStoreBO.getMaxQueueLength());
  35.                             threadInfoBO.setActivePoolSize(dubboInfoStoreBO.getMaxCorePoolSize());
  36.                         } else {
  37.                            //这种情况可能是对应的时间段内没有流量请求到provider上
  38.                             threadInfoBO.setQueueLength(0);
  39.                             threadInfoBO.setActivePoolSize(0);
  40.                         }
  41.                         //这里是上报器上报数据到redis中
  42.                         reportTemplate.reportData(JSON.toJSONString(threadInfoBO));
  43.                         //上报之后,这里会重置map中的数据
  44.                         DUBBO_INFO_STORE_CENTER.cleanInfo(DUBBO_PORT);
  45.                         LOGGER.info(" =========== Dubbo线程池数据上报 =========== ");
  46.                     } catch (Exception e) {
  47.                         e.printStackTrace();
  48.                     }
  49.                 }
  50.             }
  51.         });
  52.     }
  53. }

这类要注意下,Dubbo应用的线程池上报任务应当等整个SpringBoot应用启动成功之后再去触发,否则可能会有些许数据不准确性。所以再定义Bean初始化线程的时候,我选择了CommandLineRunner接口。

细心查看代码的你可能会看到这么一个类:

org.idea.dubbo.monitor.core.report.IReportTemplate

这个类定义了数据上报器的基本动作,下边是它的具体代码:

  1. package org.idea.dubbo.monitor.core.report;
  2. /**
  3.  * 上报模版
  4.  *
  5.  * @Author idea
  6.  * @Date created in 7:10 下午 2022/6/29
  7.  */
  8. public interface IReportTemplate {
  9.     /**
  10.      * 上报数据
  11.      *
  12.      * @return
  13.      */
  14.     boolean reportData(String json);
  15. }

实现类部分如下所示:

  1. package org.idea.dubbo.monitor.core.report.impl;
  2. import org.idea.dubbo.monitor.core.report.IReportTemplate;
  3. import org.idea.qiyu.cache.redis.service.IRedisService;
  4. import org.springframework.stereotype.Component;
  5. import javax.annotation.Resource;
  6. import java.time.LocalDate;
  7. import java.util.concurrent.TimeUnit;
  8. /**
  9.  * @Author idea
  10.  * @Date created in 7:12 下午 2022/6/29
  11.  */
  12. @Component
  13. public class RedisTemplateImpl implements IReportTemplate {
  14.     @Resource
  15.     private IRedisService redisService;
  16.     private static String queueKey = "dubbo:threadpool:info:";
  17.     @Override
  18.     public boolean reportData(String json) {
  19.         redisService.lpush(queueKey + LocalDate.now().toString(), json);
  20.         redisService.expire(queueKey + LocalDate.now().toString(),7, TimeUnit.DAYS);
  21.         return true;
  22.     }
  23. }

这里面我采用的是list的结构去存储这些数据指标,设定了一个过期时间为一周,最终存储到redis之后的格式如下所示:

ece0223949cdae6eee8adaf3b109881f.png

数据展示

好了,现在我们已经完成了对线程池的监控,最后只需要设计一个管理台,从缓存中提取上报的数据并且进行页面的展示即可。

实现的逻辑比较简单,只需要定义好统计图所需要的数据结构,然后在controller曾返回即可,例如下图所示:

c19916695ddf3deb223c1729ad20744d.png

最终展现出来的效果如下图:

aef9df9093b5c15ce461d5784f8bf851.png

随着请求dubbo接口的量发生变化,统计图可以展示出dubbo线程池的数据变动情况。如果希望统计图以实时的方式展示数据的话,其实只需要在js中写一个定时调用的函数即可。

这里我是使用的是echart插件做的图表渲染,我选用的是最简单的统计图类型,大家也可以根据自己的具体所需在echart的官网上选择合适的模型进行渲染,下边这是echart的官网地址:

https://echarts.apache.org/examples/zh/index.html

推荐

Java面试题宝典

技术内卷群,一起来学习!!

b58038d262da02a73e2df846c9b3ffbf.jpeg

PS:因为公众号平台更改了推送规则,如果不想错过内容,记得读完点一下“在看”,加个“星标”,这样每次新文章推送才会第一时间出现在你的订阅列表里。点“在看”支持我们吧!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/169697
推荐阅读
相关标签
  

闽ICP备14008679号