当前位置:   article > 正文

Hystrix之ThreadLocal上下文传播

hystrix 线程池传递上下文

引言

ThreadLocal

ThreadLocal这个类给线程提供了一个本地变量,这个变量是该线程自己拥有,各线程间不共享。在该线程存活和ThreadLocal实例能访问的时候,保存了对这个变量副本的引用。当线程消失的时候,所有的本地实例都会被GC。并且建议ThreadLocal最好是使用 private static 修饰。

InheritableThreadLocal

InheritableThreadLocal是为了解决子线程获得父线程本地变量的需求,继承自ThreadLocal。如果你使用它,那么保存的所有东西都已经不在原来的threadLocals里面,而是在一个新的叫inheritableThreadLocals变量中。意思就是说每个线程Thread里面还有一个Map变量,名叫inheritableThreadLocals,它保存的是需要传递的引用(通过InheritableThreadLocal设置的线程变量)。

这种父子传递的需求还是有些比较重要的应用场景,如上下文传递(用户标识、事务等),调用日志跟踪等。Log4j中的MDC就是基于InheritableThreadLocal实现。但一般来说我们用线程池比较多,线程池会缓存线程,重复使用,线程可能会执行不同的任务。这样一来它的上下文传递就达不到正确的效果。

TransmittableThreadLocal

阿里巴巴有个开源项目就是为了解决线程池中变量传递,它里面有个叫TransmittableThreadLocal,继承于InheritableThreadLocal。它通过包装返回Runnable的方式代理了run方法,在run之前copy装载线程变量,run之后清除线程变量,来实现此功能。TransmittableThreadLocal除了继承过来的线程Map,它还定义了一个名叫holder的InheritableThreadLocal静态变量,也就是说TransmittableThreadLocal有两套线程变量。

但事实上我们不可能所有的应用均采用InheritableThreadLocal,尽管他是一个不错的选择,但如何让ThreadLocal也实现在Hystrix应用场景下实现线程上下文的传播呢。这就是本章的重点了。

HystrixConcurrencystrategy

Hystrixconcurrencystrategy是Hystrix的线程池创建源码所在,并且提供方法wrapCallable来装饰线程池执行环境。所以我们我们可以自定义一个并发策略,即可于Hystrix应用场景内实现线程上下文的传播。附wrapCallable源码

  1. /**
  2. * Provides an opportunity to wrap/decorate a {@code Callable<T>} before execution.
  3. * <p>
  4. * This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}).
  5. * <p>
  6. * <b>Default Implementation</b>
  7. * <p>
  8. * Pass-thru that does no wrapping.
  9. *
  10. * @param callable
  11. * {@code Callable<T>} to be executed via a {@link ThreadPoolExecutor}
  12. * @return {@code Callable<T>} either as a pass-thru or wrapping the one given
  13. */
  14. public <T> Callable<T> wrapCallable(Callable<T> callable) {
  15. return callable;
  16. }

Ps:注释上有一句这么说的:This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}).

拓展

既然这个方法可以装饰线程池回调,那么我们亦可定义一个装饰器接口,只要这个接口的实现类,都会通过上述并发策略装饰不同业务不同场景需要的线程变量。

装饰器接口定义

  1. /**
  2. * Hystrix CallBack 装饰器定义
  3. *
  4. * @author wangzhuhua
  5. * @date 2018/09/07 下午4:18
  6. **/
  7. public interface HystrixCallableWrapper {
  8. /**
  9. * 装饰 Callable实例
  10. *
  11. * @param callable
  12. * 待装饰实例
  13. * @param <T>
  14. * 返回类型
  15. * @return 装饰后的实例
  16. */
  17. <T> Callable<T> wrap(Callable<T> callable);
  18. }

HystrixConcurrencystrategy Custom

  1. /**
  2. * Hystrix并发策略
  3. *
  4. * @author wangzhuhua
  5. * @date 2018/09/07 下午4:06
  6. **/
  7. public class HystrixConcurrencyStrategyCustom extends HystrixConcurrencyStrategy {
  8. /** 装饰队列 */
  9. private final List<HystrixCallableWrapper> wrappers;
  10. public HystrixConcurrencyStrategyCustom(List<HystrixCallableWrapper> wrappers) {
  11. this.wrappers = wrappers;
  12. }
  13. @Override
  14. public <T> Callable<T> wrapCallable(Callable<T> callable) {
  15. return new CallableWrapperChain(callable, this.wrappers).wrapCallable();
  16. }
  17. /**
  18. * callback 调用链
  19. *
  20. * @author wangzhuhua
  21. * @date 2018/09/07 下午4:38
  22. **/
  23. private static class CallableWrapperChain<T> {
  24. /** 回调 */
  25. private final Callable<T> callable;
  26. /** 回调包装 */
  27. private final List<HystrixCallableWrapper> wrappers;
  28. CallableWrapperChain(Callable<T> callable, List<HystrixCallableWrapper> wrappers) {
  29. this.callable = callable;
  30. this.wrappers = wrappers;
  31. }
  32. /**
  33. * 装饰线程hystrix callable
  34. *
  35. * @return {@link Callable}
  36. */
  37. Callable<T> wrapCallable() {
  38. Callable<T> delegate = callable;
  39. for (HystrixCallableWrapper wrapper : wrappers) {
  40. delegate = wrapper.wrap(delegate);
  41. }
  42. return delegate;
  43. }
  44. }
  45. }

Configuration

  1. /**
  2. * Hystrix配置
  3. *
  4. * @author wangzhuhua
  5. * @date 2018/09/07 下午4:01
  6. **/
  7. @Configuration
  8. @ConditionalOnProperty(value = "feign.hystrix.enabled", havingValue = "true")
  9. public class HystrixConfiguration {
  10. @Autowired(required = false)
  11. private List<HystrixCallableWrapper> wrappers = new ArrayList<>();
  12. @PostConstruct
  13. public void init() {
  14. HystrixPlugins.getInstance().registerConcurrencyStrategy(new HystrixConcurrencyStrategyCustom(wrappers));
  15. }
  16. }

实现示例

  1. /**
  2. * Token 装饰器
  3. *
  4. * @author wangzhuhua
  5. * @date 2018/09/10 上午11:28
  6. **/
  7. @Component
  8. public class TokenWrapper implements HystrixCallableWrapper {
  9. @Override
  10. public <T> Callable<T> wrap(Callable<T> callable) {
  11. return new TokenAwareCallable(callable, TokenHandler.getToken());
  12. }
  13. /**
  14. * Token装饰
  15. *
  16. * @param <T>
  17. */
  18. static class TokenAwareCallable<T> implements Callable<T> {
  19. /** 回调代理 */
  20. private final Callable<T> delegate;
  21. /** Token */
  22. private final String token;
  23. TokenAwareCallable(Callable<T> callable, String token) {
  24. this.delegate = callable;
  25. this.token = token;
  26. }
  27. @Override
  28. public T call() throws Exception {
  29. try {
  30. // 填充当前线程变量
  31. TokenHandler.setToken(this.token);
  32. return delegate.call();
  33. } finally {
  34. TokenHandler.remove();
  35. }
  36. }
  37. }
  38. }

其中TokenHandler内使用线程变量存储


 

转载于:https://my.oschina.net/xiaominmin/blog/3052404

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号