当前位置:   article > 正文

@Async详解(二)

@async

Spring异步注解@Async

@Async为什么要使用自定义线程池

通过上一篇对@Async的初步分析,我们可以知道当使用@Async不指定线程池时,Spring会默认使用SimpleAsyncTaskExecutor线程池,那么SimpleAsyncTaskExecutor有什么缺点呢?下面我们通过源码分析SimpleAsyncTaskExecutor为什么不建议使用。

SimpleAsyncTaskExecutor

此类特点概括为以下几点:

  • 为每个任务启动一个新线程,异步执行它。

  • 支持通过“concurrencyLimit”bean 属性限制并发线程。默认情况下,并发线程数是无限的。

  • 注意:此实现不重用线程!

  • 考虑一个线程池 TaskExecutor 实现,特别是用于执行大量短期任务。

默认情况下,SimpleAsyncTaskExecutor不会限制线程创建的个数,这会导致资源耗尽。这个线程池和我们印象中的的线程池可以说是相悖的。如果需要使用SimpleAsyncTaskExecutor,则需指定线程上限(调用setConcurrencyLimit方法),避免在极端情况下出现资源耗尽的问题。另外,该任务执行器并没有执行拒绝策略,这也是在线上环境需谨慎使用的原因之一。

@Async使用自定义线程池

自定义线程池配置:

  1. @Slf4j
  2. @Configuration
  3. @EnableConfigurationProperties({TaskExecutorProperties.class,ExportExecutorProperties.class})
  4. public class ExecutePoolConfig {
  5. // 普通任务线程池配置
  6. private final TaskExecutorProperties taskExecutorProperties;
  7. // 导出任务线程池配置
  8. private final ExportExecutorProperties exportExecutorProperties;
  9. @Autowired
  10. public ExecutePoolConfig(TaskExecutorProperties taskExecutorProperties,ExportExecutorProperties exportExecutorProperties) {
  11. this.taskExecutorProperties = taskExecutorProperties;
  12. this.exportExecutorProperties = exportExecutorProperties;
  13. }
  14. @Bean("taskExecutor")
  15. public Executor taskExecutor() {
  16. return buildExecutor(taskExecutorProperties);
  17. }
  18. @Bean("exportExecutor")
  19. public Executor exportExecutor() {
  20. return buildExecutor(exportExecutorProperties);
  21. }
  22. /**
  23. * 构建线程池
  24. *
  25. * @param executorProperties ExecutorBaseProperties
  26. * @return {@link Executor}
  27. */
  28. private Executor buildExecutor(ExecutorBaseProperties executorProperties) {
  29. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  30. // 设置核心线程数
  31. executor.setCorePoolSize(executorProperties.getCorePoolSize());
  32. // 设置最大线程数
  33. executor.setMaxPoolSize(executorProperties.getMaxPoolSize());
  34. // 设置队列大小
  35. executor.setQueueCapacity(executorProperties.getQueueCapacity());
  36. // 设置线程池维护线程所允许的空闲时间
  37. executor.setKeepAliveSeconds(executorProperties.getKeepAliveSeconds());
  38. // 设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
  39. executor.setWaitForTasksToCompleteOnShutdown(true);
  40. // 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁
  41. executor.setAwaitTerminationSeconds(30);
  42. // 设置线程池中的线程的名称前缀
  43. executor.setThreadNamePrefix(executorProperties.getThreadNamePrefix());
  44. // rejection-policy:当pool已经达到max size的时候,如何处理新任务
  45. // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
  46. RejectedExecutionHandler rejectedExecutionHandler = null;
  47. try {
  48. Class<?> clazz = Class
  49. .forName("java.util.concurrent.ThreadPoolExecutor$" + executorProperties.getRejectionPolicyName());
  50. rejectedExecutionHandler = (RejectedExecutionHandler)clazz.newInstance();
  51. } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
  52. log.error("获取rejection-policy异常,请查看配置文件", e);
  53. return null;
  54. }
  55. executor.setRejectedExecutionHandler(rejectedExecutionHandler);
  56. // 执行初始化
  57. executor.initialize();
  58. return executor;
  59. }
  60. }
  61. @Data
  62. @Component
  63. @ConfigurationProperties(prefix = "task.executor.conf")
  64. public class TaskExecutorProperties extends ExecutorBaseProperties {
  65. /**
  66. * 核心线程数
  67. */
  68. private int corePoolSize = 10;
  69. /**
  70. * 最大线程数
  71. */
  72. private int maxPoolSize = 20;
  73. /**
  74. * 队列大小
  75. */
  76. private int queueCapacity = 200;
  77. /**
  78. * 线程池维护线程所允许的空闲时间
  79. */
  80. private int keepAliveSeconds = 60;
  81. /**
  82. * 线程的名称前缀
  83. */
  84. private String threadNamePrefix = "taskExecutor-";
  85. /**
  86. * 拒绝策略
  87. */
  88. private String rejectionPolicyName = "CallerRunsPolicy";
  89. }
  90. @Data
  91. public class ExecutorBaseProperties {
  92. /**
  93. * 核心线程数
  94. */
  95. private int corePoolSize = 10;
  96. /**
  97. * 最大线程数
  98. */
  99. private int maxPoolSize = 20;
  100. /**
  101. * 队列大小
  102. */
  103. private int queueCapacity = 200;
  104. /**
  105. * 线程池维护线程所允许的空闲时间
  106. */
  107. private int keepAliveSeconds = 60;
  108. /**
  109. * 线程的名称前缀
  110. */
  111. private String threadNamePrefix = "taskExecutor-";
  112. /**
  113. * 拒绝策略
  114. */
  115. private String rejectionPolicyName = "CallerRunsPolicy";
  116. }

线程池的使用:

  1. @RestController
  2. public class TestController {
  3. @Resource
  4. private AsyncTestService asyncTestService;
  5. @RequestMapping("/exportTest")
  6. public String exportTest() {
  7. System.out.println("父线程userId:" + UserUtils.getUserId());
  8.        System.out.println("父线程traceId:" + MDC.get("traceId"));
  9. for (int i = 0; i < 10; i++) {
  10. asyncTestService.exportTest();
  11. }
  12. return "OK";
  13. }
  14. @RequestMapping("/taskTest")
  15. public String taskTest() {
  16. for (int i = 0; i < 10; i++) {
  17. asyncTestService.taskTest();
  18. }
  19. return "OK";
  20. }
  21. }
  22. public interface AsyncTestService {
  23. /**
  24. * exportTest
  25. */
  26. void exportTest();
  27. /**
  28. * taskTest
  29. */
  30. void taskTest();
  31. }
  32. @Service
  33. public class AsyncTestServiceImpl implements AsyncTestService {
  34. @Async("exportExecutor")
  35. @Override
  36. public void exportTest() {
  37. System.out.println("子线程userId:" + UserUtils.getUserId());
  38.         System.out.println("子线程traceId:" + MDC.get("traceId"));
  39. System.out.println("我是自定义类型的线程池:" + Thread.currentThread().getName());
  40. }
  41. @Async
  42. @Override
  43. public void taskTest() {
  44. System.out.println("我是beanName使用taskExecutor名字的线程:" + Thread.currentThread().getName());
  45. }
  46. }
  47. /**
  48. *使用ThreadLocal存储共享的数据变量,如登录的用户信息
  49. */
  50. public class UserUtils {
  51. private static final ThreadLocal<String> userLocal = new ThreadLocal<>();
  52. public static String getUserId() {
  53. return userLocal.get();
  54. }
  55. public static void setUserId(String userId) {
  56. userLocal.set(userId);
  57. }
  58. public static void clear() {
  59. userLocal.remove();
  60. }
  61. }

执行结果:

可以看到两种线程池都生效了,通过自定义配置可以让我们达到对@Async使用的掌控,以避免使用默认线程池带来的弊端。

但是也来了新的问题,在实际开发过程中我们需要父子线程之间传递一些数据,比如用户信息,分布式系统中用来链路追踪的tranceId等。通过上方的执行结果我们可以看到在子线程中是无法正常获取到父线程的线程数据的。

@Async实现父子线程之间的数据传递

多线程的情况下实现父子线程之间的数据传递方式有多种,这里我们介绍两种方式实现父子线程之间数据传递。

阿里巴巴TransmittableThreadLocal实现父子线程之间的用户数据传递

  1. public class UserUtils {
  2. private static final ThreadLocal<String> userLocal = new TransmittableThreadLocal<>();
  3. public static String getUserId() {
  4. return userLocal.get();
  5. }
  6. public static void setUserId(String userId) {
  7. userLocal.set(userId);
  8. }
  9. public static void clear() {
  10. userLocal.remove();
  11. }
  12. }

MDC实现线程池tranceId全链路传递

1:首先定义一个自定义线程池类继承ThreadPoolTaskExecutor

  1. // 首先定义一个自定义线程池类继承ThreadPoolTaskExecutor
  2. public class ThreadPoolExecutorMdcWrapper extends ThreadPoolTaskExecutor {
  3. @Override
  4. public void execute(Runnable task) {
  5. super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
  6. }
  7. @Override
  8. public <T> Future<T> submit(Callable<T> task) {
  9. return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
  10. }
  11. @Override
  12. public Future<?> submit(Runnable task) {
  13. return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
  14. }
  15. }

2:编写tranceid包装工具类,判断当前线程对应MDC的上下文是否存在,存在则是子线程,设置MDC中的traceId值,不存在则生成新的tranceid,再执行run方法,执行结束之后清除线程tranceId

  1. // 编写tranceid包装工具类
  2. // 判断当前线程对应MDC的上下文是否存在,存在则是子线程,设置MDC中的traceId值
  3. // 不存在则生成新的tranceid,再执行run方法,执行结束之后清除线程tranceId
  4. public class ThreadMdcUtil {
  5. private final static String MDC_TRACE_ID = "traceId";
  6. public static void setTraceIdIfAbsent() {
  7. if (MDC.get(MDC_TRACE_ID) == null) {
  8. MDC.put(MDC_TRACE_ID, UUID.randomUUID().toString().replaceAll("-", ""));
  9. }
  10. }
  11. public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
  12. return () -> {
  13. if (context == null) {
  14. MDC.clear();
  15. } else {
  16. MDC.setContextMap(context);
  17. }
  18. setTraceIdIfAbsent();
  19. try {
  20. return callable.call();
  21. } finally {
  22. MDC.clear();
  23. }
  24. };
  25. }
  26. public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
  27. return () -> {
  28. if (context == null) {
  29. MDC.clear();
  30. } else {
  31. MDC.setContextMap(context);
  32. }
  33. setTraceIdIfAbsent();
  34. try {
  35. runnable.run();
  36. } finally {
  37. MDC.clear();
  38. }
  39. };
  40. }
  41. }

3:使用ThreadPoolExecutorMdcWrapper改造自定义线程池

  1. // 初始化自定义线程池
  2. @Slf4j
  3. @Configuration
  4. @EnableConfigurationProperties({TaskExecutorProperties.class,ExportExecutorProperties.class})
  5. public class ExecutePoolConfig {
  6. // 普通任务线程池配置
  7. private final TaskExecutorProperties taskExecutorProperties;
  8. // 导出任务线程池配置
  9. private final ExportExecutorProperties exportExecutorProperties;
  10. @Autowired
  11. public ExecutePoolConfig(TaskExecutorProperties taskExecutorProperties,ExportExecutorProperties exportExecutorProperties) {
  12. this.taskExecutorProperties = taskExecutorProperties;
  13. this.exportExecutorProperties = exportExecutorProperties;
  14. }
  15. @Bean("taskExecutor")
  16. public Executor taskExecutor() {
  17. return buildExecutor(taskExecutorProperties);
  18. }
  19. @Bean("exportExecutor")
  20. public Executor exportExecutor() {
  21. return buildExecutor(exportExecutorProperties);
  22. }
  23. /**
  24. * 构建线程池
  25. *
  26. * @param executorProperties ExecutorBaseProperties
  27. * @return {@link Executor}
  28. */
  29. private Executor buildExecutor(ExecutorBaseProperties executorProperties) {
  30. ThreadPoolExecutorMdcWrapper executor = new ThreadPoolExecutorMdcWrapper();
  31. // 设置核心线程数
  32. executor.setCorePoolSize(executorProperties.getCorePoolSize());
  33. // 设置最大线程数
  34. executor.setMaxPoolSize(executorProperties.getMaxPoolSize());
  35. // 设置队列大小
  36. executor.setQueueCapacity(executorProperties.getQueueCapacity());
  37. // 设置线程池维护线程所允许的空闲时间
  38. executor.setKeepAliveSeconds(executorProperties.getKeepAliveSeconds());
  39. // 设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
  40. executor.setWaitForTasksToCompleteOnShutdown(true);
  41. // 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁
  42. executor.setAwaitTerminationSeconds(30);
  43. // 设置线程池中的线程的名称前缀
  44. executor.setThreadNamePrefix(executorProperties.getThreadNamePrefix());
  45. // rejection-policy:当pool已经达到max size的时候,如何处理新任务
  46. // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
  47. RejectedExecutionHandler rejectedExecutionHandler = null;
  48. try {
  49. Class<?> clazz = Class
  50. .forName("java.util.concurrent.ThreadPoolExecutor$" + executorProperties.getRejectionPolicyName());
  51. rejectedExecutionHandler = (RejectedExecutionHandler)clazz.newInstance();
  52. } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
  53. log.error("获取rejection-policy异常,请查看配置文件", e);
  54. return null;
  55. }
  56. executor.setRejectedExecutionHandler(rejectedExecutionHandler);
  57. // 执行初始化
  58. executor.initialize();
  59. //return TtlExecutors.getTtlExecutor(executor);
  60. return executor;
  61. }
  62. }

4:执行结果


参考文章--MDC实现线程池tranceId全链路传递

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/102640?site
推荐阅读
相关标签
  

闽ICP备14008679号