当前位置:   article > 正文

并发编程工具 - 线程池的使用和自己的封装_ttlexecutors.getttlscheduledexecutorservice

ttlexecutors.getttlscheduledexecutorservice

目录

线程池的使用

线程池参数设置

自己封装的线程池工具


     根据阿姆达尔(Amdal)定律可知[并发编程基础 - 安全性、活跃性、性能问题#性能问题],若串行度为5%那么无论使用任何技术,性能最高提升20%,所以减小串行度(减小锁粒度、算法优化等)是前提。但是除此之外使用多线程并行任务也是比较常用的手段,效果也非常明显,只是使用不好坑会非常多,所以最好深入理解其运行原理,这当然也是面试的高频。

线程池的使用

    上面的多线程并行是最理想的状态,真正执行的时候与CPU的核数(并行度)以及上下文的切换,GC垃圾回收线程(特别是Stop The World阶段)的运行等综合的结果。理论上串行那么就是所有耗时的总和,并行使用合理则耗时为最长一个的时间;结合具体的线程池工作原理,为了方便理解执行相同耗时的任务比如上图,真实耗时介于1到三倍中间,即最差的耗时也就相当于串行。这种情况下我们可以使用下面线程池封装的executeAll方法,底层是调用了ThreadPoolExecutor的invokeAll接口。类似这样的方法,我们还可以使用Java提供了CompletionService从队列中获取;或者CompletableFuture#allOf方法;当然也可以使用execute方法结合CountDownLatch,后面专门分析和项目使用。针对更复杂的情况,CompletableFuture提供了强大的API,自己仅仅是在项目上使用部分。

    比如项目上,自己将获取一个分段的字符串,每段使用线程池任务获取,只是多线程处理的结果在合并到主线程(当前主线程一般是比如Tomcat线程池中的一个线程)时,需要注意数据的安全性,我下面使用了ConcurrentHahsMap。最终放入结合下面的线程池工具类如下:

  1. private void aaa(StringBuilder orderCode) {
  2. final ConcurrentHashMap<StateConfigEnum, String> resultMap = new ConcurrentHashMap<>(16);
  3. List<Callable<String>> taskList = Lists.<Callable<String>>newArrayList(
  4. () -> {
  5. SendPayDTO sendPayDTO = getInstance().get().sendPayDTO;
  6. DefSaleOrderDTO defSaleOrderDTO = defSaleOrderService.sendPayDTOBySoCode(sendPayDTO.getSoTypeCode());
  7. return resultMap.put(SALE_DEFINITION, defSaleOrderDTO.getCursorCode());
  8. },
  9. () -> resultMap.put(SALE_AUDIT, confSaleOrderAuditService.sendPayStage()),
  10. () -> resultMap.put(SHIPPING_CONDITION, confShippingConditionService.sendPayStage())
  11. );
  12. // 阻塞获取结果
  13. ExecutorService executor = TtlExecutors.getTtlExecutorService(SimpleThreadPool.THREAD_POOL_EXECUTOR_MAP.get(SEND_PAY.name()));
  14. SimpleThreadPool.executeAll(executor, taskList).forEach(this::getFuture);
  15. orderCode.append(BEFORE_SALE_DEFINITION_DEFAULT)
  16. .append(resultMap.get(SALE_DEFINITION))
  17. .append(AFTER_SALE_DEFINITION_DEFAULT)
  18. .append(resultMap.get(SALE_AUDIT))
  19. .append(resultMap.get(SHIPPING_CONDITION));
  20. }

    使用CountDownLatch实现相同的效果,上面的方法可以修改为:

  1. private void byCountDownLatch(StringBuilder orderCode) throws InterruptedException {
  2. final ConcurrentHashMap<StateConfigEnum, String> resultMap = new ConcurrentHashMap<>(16);
  3. CountDownLatch countDownLatch = new CountDownLatch(3);
  4. ExecutorService executor = TtlExecutors.getTtlExecutorService(SimpleThreadPool.THREAD_POOL_EXECUTOR_MAP.get(SEND_PAY.name()));
  5. executor.execute(() -> {
  6. SendPayDTO sendPayDTO = getInstance().get().sendPayDTO;
  7. DefSaleOrderDTO defSaleOrderDTO = defSaleOrderService.sendPayDTOBySoCode(sendPayDTO.getSoTypeCode());
  8. resultMap.put(SALE_DEFINITION, defSaleOrderDTO.getCursorCode());
  9. countDownLatch.countDown();
  10. });
  11. executor.submit(() -> {
  12. resultMap.put(SALE_AUDIT, confSaleOrderAuditService.sendPayStage());
  13. countDownLatch.countDown();
  14. });
  15. executor.submit(() -> {
  16. resultMap.put(SHIPPING_CONDITION, confShippingConditionService.sendPayStage());
  17. countDownLatch.countDown();
  18. });
  19. // 阻塞获取结果
  20. countDownLatch.await();
  21. orderCode.append(BEFORE_SALE_DEFINITION_DEFAULT)
  22. .append(resultMap.get(SALE_DEFINITION))
  23. .append(AFTER_SALE_DEFINITION_DEFAULT)
  24. .append(resultMap.get(SALE_AUDIT))
  25. .append(resultMap.get(SHIPPING_CONDITION));
  26. }

 

线程池参数设置

    CPU密集型: 一般设置为CPU核(或者核线程,如果我们现在电脑是4核8线程)+ 1;增加1是为了防止线程偶发性的缺页中断或者其他原因导致的任务暂停,导致资源使用不充分。

    I/O密集型:一般设置为CPU核 * 2

上面是极端的情况,很多时候我们的任务是综合型的,那么可以根据下面公式大概计算(我们可以使用VisualVM等来查询WT/ST):

    线程数=N(CPU核数)*(1+WT(线程等待时间)/ST(线程时间运行时间))

    但是不管怎么计算,上面的只是一个基础参数值,具体的还需要根据压测来判断。理论上,随着参数的增加性能会提升,替身到一定的值后会出现拐点,再比较机具的向下走,所以压测就是为了找到拐点是的参数值。

    因为上面也提到了,还跟GC情况,上下文切换,CPU和内存动态等有关,所以压测是不仅要看Tps值,还要随时注意CPU和内存的使用情况,GC的回收耗时和情况,必要也可以查看线程上下文的切换[并发编程基础 - 多线程的上下文切换问题]。

线程池监控

    使用线程池时,一般情况下只有等到问题出现时再去dump等,当然ThreadPoolExecutor提供了调用的API,比如查看处理的任务数,当前的线程数等,那么我们可以基于定时任务打印到日志中,前面还提到了每个线程执行Work#runWorker时,在调用我们的写的Runnable的run方法前后都允许定义回调任务。并且上一篇看到,Tomcat的ThreadPoolExecutor类也重写了后置回调方法。

    1、定时任务打印线程池情况到日志

这种方式在下面工具的ThreadPoolInit中也有体现,具体使用了juc的单线程的ScheduledThreadPoolExecutor线程池,如下:

  1. public static final ScheduledExecutorService SINGLE_POOL = Executors.newSingleThreadScheduledExecutor();
  2. private static void printScheduledThreadStats() {
  3. SINGLE_POOL.scheduleAtFixedRate(() -> {
  4. THREAD_POOL_EXECUTOR_MAP.forEach((name, threadPool) -> {
  5. log.info("{} Pool Size: {}", name, threadPool.getPoolSize());
  6. log.info("{} Active Threads: {}", name, threadPool.getActiveCount());
  7. log.info("{} Number of Tasks Completed: {}", name, threadPool.getCompletedTaskCount());
  8. log.info("{} Number of Tasks in Queue: {}", name, threadPool.getQueue().size());
  9. });
  10. }, 0L, 5L, TimeUnit.SECONDS);
  11. }

    2、每个我们的Runnable#run方法前后打印

这样需要像Tomcat一下继承自juc的ThreadPoolExecutor,并且按需要重新前置和后置方法(如果我们的任务执行非常快,并且量非常大则肯定不能使用该方式,否则打印太多了):

  1. @Slf4j
  2. public class MyThreadPoolExecutor extends ThreadPoolExecutor {
  3. @Override
  4. protected void beforeExecute(Thread thread, Runnable runnable) {}
  5. @Override
  6. protected void afterExecute(Runnable runnable, Throwable throwable) {
  7. log.info("{} Pool Size: {}", super.getPoolSize());
  8. log.info("{} Active Threads: {}", super.getActiveCount());
  9. log.info("{} Number of Tasks Completed: {}", super.getCompletedTaskCount());
  10. log.info("{} Number of Tasks in Queue: {}", super.getQueue().size());
  11. }
  12. // 省略重写父类的构造。。。
  13. }

自己封装的线程池工具

    1、自己封装的线程池管理工具,仅仅是为了统一管理,以及多个微服务项目需要使用时候,提高代码的复用性。

    2、使用时发现一个项目启动了5个及以上的线程池,那么波峰是无法估计的,也比较危险,压测也只压一个线程池或者方法。之前一直不理解怎么去控制,后面理解了如果一个项目要启动那么多线程池,其实项目本身也早就该拆分了(拆分到不同的项目JVM               中)。

    3、而每一个应用的线程池性质本身;服务器的配置情况只有开发的人自己清楚,所以使用了Java原生的Spi机制实现

  1. public class ThreadPoolImpl implements ThreadPool {
  2. @Override
  3. public List<ThreadPoolEntity> appendThreadPool() {
  4. return Lists.newArrayList(
  5. new ThreadPoolEntity(ThreadPoolEnum.TRANSFER_ORDER.name(), SimpleThreadPool.PoolModel.FAST_IO, Boolean.FALSE, Boolean.TRUE,
  6. "****线程池【Per PO Per Thread】", 5, 8, 30, TimeUnit.SECONDS,
  7. new TaskQueue(50), new ThreadPoolExecutor.AbortPolicy())
  8. );
  9. }
  10. }

    4、定义的线程池类型枚举,CPU型、IO型、FAST_IO型【就是原生的Tomcat类型,先创建最大线程数,再放入队列,详细见编发编程工具 - Tomcat对juc线程池的扩展

代码如下(欢迎讨论):

  1. @Slf4j
  2. @Component
  3. @SuppressWarnings("ALL")
  4. public class SimpleThreadPool extends ThreadPoolInit implements EnvironmentAware {
  5. /**
  6. * 默认最大的超时时间
  7. */
  8. private static int DEFAULT_TIMEOUT = 50000;
  9. @Override
  10. public void setEnvironment(Environment environment) {
  11. // 执行父类中的环境设置
  12. super.setEnvironment(environment);
  13. String[] activeProfiles = environment.getActiveProfiles();
  14. if (activeProfiles == null) {
  15. return;
  16. }
  17. String pro = "prod";
  18. for (String activeProfile : activeProfiles) {
  19. if (pro.equals(activeProfile)) {
  20. if (log.isInfoEnabled()) {
  21. log.info("active spring profile prod");
  22. }
  23. DEFAULT_TIMEOUT = 2000;
  24. }
  25. }
  26. if (log.isInfoEnabled()) {
  27. log.info("SimpleThreadPool DEFAULT_TIMEOUT = {} ms", DEFAULT_TIMEOUT);
  28. }
  29. }
  30. /**
  31. * 执行没有返回值的任务
  32. * @param key 线程池枚举
  33. * @param runnable 任务
  34. */
  35. public static void execute(String key, Runnable runnable) {
  36. if (!THREAD_POOL_EXECUTOR_MAP.containsKey(key)) {
  37. throw new IllegalArgumentException("未找到线程池:" + key);
  38. }
  39. // 执行任务
  40. THREAD_POOL_EXECUTOR_MAP.get(key).execute(runnable);
  41. }
  42. /**
  43. * 执行有返回值任务
  44. *
  45. * @param key 线程池名称
  46. * @param callable 需要执行的任务可变数组
  47. * @param <T> 任务类型
  48. * @return 任务结果
  49. */
  50. public static <T> List<Future<T>> execute(String key, Callable<T>... callable) {
  51. if (callable == null || callable.length == 0) {
  52. throw new IllegalArgumentException("任务不能为空!");
  53. }
  54. List<Callable<T>> taskList = Arrays.stream(callable).collect(Collectors.toList());
  55. return executeAll(key, taskList);
  56. }
  57. /**
  58. * 批量执行并行任务, 使用默认的最大超时时间,单位毫秒
  59. * @param key 线程池名称
  60. * @param callableList 任务列表
  61. * @return 结果
  62. */
  63. public static <T> List<Future<T>> executeAll(String key, List<Callable<T>> callableList) {
  64. return executeAll(key, DEFAULT_TIMEOUT, callableList);
  65. }
  66. /**
  67. * 批量执行并行任务, 使用默认的最大超时时间,单位毫秒
  68. * @param key 线程池名称
  69. * @param callableList 任务列表
  70. * @return 结果
  71. */
  72. public static <T> List<Future<T>> executeAll(ExecutorService service, List<Callable<T>> callableList) {
  73. return executeAll(service, DEFAULT_TIMEOUT, callableList);
  74. }
  75. /**
  76. * 批量执行并行任务
  77. * @param key 线程池名称
  78. * @param maxTimeout 最大超时时间,没有取默认值,单位2000毫秒
  79. * @param callableList 任务列表
  80. * @return 结果
  81. */
  82. public static <T> List<Future<T>> executeAll(ExecutorService service, int maxTimeout, List<Callable<T>> callableList) {
  83. try {
  84. return service.invokeAll(callableList, maxTimeout, TimeUnit.MILLISECONDS);
  85. } catch (InterruptedException e) {
  86. log.error("线程池批量执行任务异常失败", e);
  87. } catch (Exception e) {
  88. log.error("线程池批量执行任务异常失败", e);
  89. }
  90. return new ArrayList<>();
  91. }
  92. /**
  93. * 批量执行并行任务
  94. * @param key 线程池名称
  95. * @param callableList 任务列表
  96. * @return 结果
  97. */
  98. public static <T> List<Future<T>> executeAllUntil(String key, List<Callable<T>> callableList) {
  99. if (!THREAD_POOL_EXECUTOR_MAP.containsKey(key)) {
  100. throw new IllegalArgumentException("未配置线程池" + key);
  101. }
  102. try {
  103. return THREAD_POOL_EXECUTOR_MAP.get(key).invokeAll(callableList);
  104. } catch (InterruptedException e) {
  105. log.error("线程池批量执行任务异常失败", e);
  106. } catch (Exception e) {
  107. log.error("线程池批量执行任务异常失败", e);
  108. }
  109. return new ArrayList<>();
  110. }
  111. /**
  112. * 批量执行并行任务
  113. * @param key 线程池名称
  114. * @param maxTimeout 最大超时时间,没有取默认值,单位2000毫秒
  115. * @param callableList 任务列表
  116. * @return 结果
  117. */
  118. public static <T> List<Future<T>> executeAll(String key, int maxTimeout, List<Callable<T>> callableList) {
  119. if (!THREAD_POOL_EXECUTOR_MAP.containsKey(key)) {
  120. throw new IllegalArgumentException("未配置线程池" + key);
  121. }
  122. try {
  123. return THREAD_POOL_EXECUTOR_MAP.get(key).invokeAll(callableList, maxTimeout, TimeUnit.MILLISECONDS);
  124. } catch (InterruptedException e) {
  125. log.error("线程池批量执行任务异常失败", e);
  126. } catch (Exception e) {
  127. log.error("线程池批量执行任务异常失败", e);
  128. }
  129. return new ArrayList<>();
  130. }
  131. /**
  132. * @param key 线程池名称
  133. * @param callableList 任务列表
  134. * @return 结果
  135. */
  136. public static void executeRunnable(String key, Runnable... r) {
  137. if (!THREAD_POOL_EXECUTOR_MAP.containsKey(key)) {
  138. throw new IllegalArgumentException("未配置线程池" + key);
  139. }
  140. ThreadPoolExecutor executor = THREAD_POOL_EXECUTOR_MAP.get(key);
  141. for (int i = 0; i < r.length; i++) {
  142. executor.submit(r[i]);
  143. }
  144. }
  145. }
  1. @Slf4j
  2. public class ThreadPoolInit implements EnvironmentAware {
  3. /**
  4. * 定时执行任务的单线程, 所有任务公用
  5. * 当前使用的有: 打印线程信息,装运点等定时刷新任务
  6. */
  7. public static final ScheduledExecutorService SINGLE_POOL = Executors.newSingleThreadScheduledExecutor();
  8. /**
  9. * 线程池容器
  10. */
  11. public static final Map<String, ThreadPoolExecutor> THREAD_POOL_EXECUTOR_MAP = new ConcurrentHashMap<>(16);
  12. /**
  13. * 是否打印
  14. */
  15. private static Boolean printThreadPoolInfoInterval = Boolean.TRUE;
  16. static {
  17. // 初始化线程池
  18. ServiceLoader<ThreadPool> load = ServiceLoader.load(ThreadPool.class);
  19. load.forEach(threadPool -> threadPool.appendThreadPool().forEach(SimpleThreadPool::putThreadPool));
  20. }
  21. @Override
  22. public void setEnvironment(Environment environment) {
  23. String[] activeProfiles = environment.getActiveProfiles();
  24. String pro = "dev";
  25. for (String activeProfile : activeProfiles) {
  26. if (pro.equals(activeProfile)) {
  27. return;
  28. }
  29. }
  30. // 初始化线程池状态信息订单打印
  31. if (printThreadPoolInfoInterval) {
  32. printScheduledThreadStats();
  33. }
  34. }
  35. /**
  36. * 线程池类型, 只是作为标识当前任务是CPU型还是IO型为主
  37. */
  38. @SuppressWarnings("unused")
  39. public enum PoolModel {
  40. /** io型 */
  41. IO,
  42. /** CPU型 */
  43. CPU,
  44. /** io型,Tomcat扩展的 juc线程池 */
  45. FAST_IO
  46. }
  47. /**
  48. * 线程工厂
  49. */
  50. static class DefaultThreadFactory implements ThreadFactory {
  51. /**
  52. * 定义线程组
  53. */
  54. static ThreadGroup threadGroup;
  55. /**
  56. * 定义每个线程池中每个线程的名称后缀数字
  57. */
  58. static final AtomicInteger THREAD_NUMBER = new AtomicInteger(1);
  59. /**
  60. * 定义每个线程词的名称前缀
  61. */
  62. static String namePrefix;
  63. public DefaultThreadFactory(String name) {
  64. final SecurityManager securityManager = System.getSecurityManager();
  65. threadGroup = (securityManager != null) ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();
  66. namePrefix = name + "-thread-";
  67. }
  68. @Override
  69. public Thread newThread(Runnable runnable) {
  70. Thread thread = new Thread(threadGroup, runnable, namePrefix + THREAD_NUMBER.getAndIncrement(), 0);
  71. if (thread.isDaemon()) {
  72. thread.setDaemon(false);
  73. }
  74. if(thread.getPriority() != Thread.NORM_PRIORITY){
  75. thread.setPriority(Thread.NORM_PRIORITY);
  76. }
  77. return thread;
  78. }
  79. }
  80. /**
  81. * 启动打印线程
  82. */
  83. private static void printScheduledThreadStats() {
  84. SINGLE_POOL.scheduleAtFixedRate(() -> THREAD_POOL_EXECUTOR_MAP.forEach((name, threadPool) -> {
  85. log.info("{} Pool Size: {}", name, threadPool.getPoolSize());
  86. log.info("{} Active Threads: {}", name, threadPool.getActiveCount());
  87. log.info("{} Number of Tasks Completed: {}", name, threadPool.getCompletedTaskCount());
  88. log.info("{} Number of Tasks in Queue: {}", name, threadPool.getQueue().size());
  89. }), 0, 5, TimeUnit.SECONDS);
  90. }
  91. /**
  92. * 往线程池容器中放入线程池池
  93. * @param threadPoolEntity 线程池定义对象
  94. */
  95. public static void putThreadPool(ThreadPoolEntity threadPoolEntity) {
  96. ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(threadPoolEntity);
  97. log.info("name: {}, threadPoolExecutor = {}", threadPoolEntity.taskName, threadPoolExecutor);
  98. THREAD_POOL_EXECUTOR_MAP.put(threadPoolEntity.taskName, threadPoolExecutor);
  99. }
  100. /**
  101. * 根据枚举获取线程池
  102. * @param entity 线程池枚举
  103. */
  104. private static ThreadPoolExecutor getThreadPoolExecutor(ThreadPoolEntity entity) {
  105. ThreadPoolExecutor executor;
  106. if (entity.poolModel == PoolModel.FAST_IO) {
  107. if (!(entity.blockingQueue instanceof TaskQueue)) {
  108. throw new RuntimeException("PoolModel.FAST_IO 类型的线程池,只能创建 " + TaskQueue.class.getName() + " 类型的队列!");
  109. }
  110. TaskQueue taskQueue = (TaskQueue)entity.blockingQueue;
  111. ThreadPoolExecutorImpl threadPoolExecutor = new ThreadPoolExecutorImpl(entity.corePoolNum, entity.maxPoolNum, entity.deleteThreadNum,
  112. entity.deleteTreadUnit, taskQueue, new TaskThreadFactory(entity.taskName), entity.rejectedExecutionHandler);
  113. // 设置父类,用于判断线程的对列表是否真的满了
  114. taskQueue.setParent(threadPoolExecutor);
  115. executor = threadPoolExecutor;
  116. log.info("init ThreadPoolExecutorImpl and TaskQueue!");
  117. } else {
  118. executor = new ThreadPoolExecutor(entity.corePoolNum, entity.maxPoolNum, entity.deleteThreadNum, entity.deleteTreadUnit,
  119. entity.blockingQueue, new DefaultThreadFactory(entity.taskName), entity.rejectedExecutionHandler);
  120. // 是否预热核心线程
  121. if (entity.preStartAllCoreThreads) {
  122. executor.prestartAllCoreThreads();
  123. }
  124. log.info("init ThreadPoolExecutor and {}}!", entity.blockingQueue.getClass().getSimpleName());
  125. }
  126. try {
  127. // 是否允许核心线程超时
  128. if (entity.allowsCoreThreadTimeOut) {
  129. executor.allowsCoreThreadTimeOut();
  130. }
  131. } catch (NullPointerException e) {
  132. log.error("初始化线程池错误:" + e);
  133. }
  134. return executor;
  135. }
  136. }

    SPI机制使用的接口和定义如下:

  1. @FunctionalInterface
  2. public interface ThreadPool {
  3. /**
  4. * 添加线程池
  5. * @return 线程池定义对象
  6. */
  7. List<ThreadPoolEntity> appendThreadPool();
  8. }
  1. @AllArgsConstructor
  2. public final class ThreadPoolEntity {
  3. /** 先池名称 */
  4. public final String taskName;
  5. /** 线程池任务类型 */
  6. public final SimpleThreadPool.PoolModel poolModel;
  7. /** 是否允许核心线程超时 */
  8. public final Boolean allowsCoreThreadTimeOut;
  9. /** 是否预热核心线程池 */
  10. public final Boolean preStartAllCoreThreads;
  11. /** 线程池说明 */
  12. public final String detail;
  13. /** 核心线程数 */
  14. public final int corePoolNum;
  15. /** 最大线程数 */
  16. public final int maxPoolNum;
  17. /** 超时时间 */
  18. public final int deleteThreadNum;
  19. /** 超时单位 */
  20. public final TimeUnit deleteTreadUnit;
  21. /** 任务队列,没有特殊理由不能使用无界队列 */
  22. public final BlockingQueue<Runnable> blockingQueue;
  23. /** 拒绝策略 */
  24. public final RejectedExecutionHandler rejectedExecutionHandler;
  25. }

 

 

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/763619
推荐阅读
相关标签
  

闽ICP备14008679号