引言
ThreadLocal
ThreadLocal这个类给线程提供了一个本地变量,这个变量是该线程自己拥有,各线程间不共享。在该线程存活和ThreadLocal实例能访问的时候,保存了对这个变量副本的引用。当线程消失的时候,所有的本地实例都会被GC。并且建议ThreadLocal最好是使用 private static 修饰。
InheritableThreadLocal
InheritableThreadLocal是为了解决子线程获得父线程本地变量的需求,继承自ThreadLocal。如果你使用它,那么保存的所有东西都已经不在原来的threadLocals里面,而是在一个新的叫inheritableThreadLocals变量中。意思就是说每个线程Thread里面还有一个Map变量,名叫inheritableThreadLocals,它保存的是需要传递的引用(通过InheritableThreadLocal设置的线程变量)。
这种父子传递的需求还是有些比较重要的应用场景,如上下文传递(用户标识、事务等),调用日志跟踪等。Log4j中的MDC就是基于InheritableThreadLocal实现。但一般来说我们用线程池比较多,线程池会缓存线程,重复使用,线程可能会执行不同的任务。这样一来它的上下文传递就达不到正确的效果。
TransmittableThreadLocal
阿里巴巴有个开源项目就是为了解决线程池中变量传递,它里面有个叫TransmittableThreadLocal,继承于InheritableThreadLocal。它通过包装返回Runnable的方式代理了run方法,在run之前copy装载线程变量,run之后清除线程变量,来实现此功能。TransmittableThreadLocal除了继承过来的线程Map,它还定义了一个名叫holder的InheritableThreadLocal静态变量,也就是说TransmittableThreadLocal有两套线程变量。
但事实上我们不可能所有的应用均采用InheritableThreadLocal,尽管他是一个不错的选择,但如何让ThreadLocal也实现在Hystrix应用场景下实现线程上下文的传播呢。这就是本章的重点了。
HystrixConcurrencystrategy
Hystrixconcurrencystrategy是Hystrix的线程池创建源码所在,并且提供方法wrapCallable来装饰线程池执行环境。所以我们我们可以自定义一个并发策略,即可于Hystrix应用场景内实现线程上下文的传播。附wrapCallable源码
- /**
- * Provides an opportunity to wrap/decorate a {@code Callable<T>} before execution.
- * <p>
- * This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}).
- * <p>
- * <b>Default Implementation</b>
- * <p>
- * Pass-thru that does no wrapping.
- *
- * @param callable
- * {@code Callable<T>} to be executed via a {@link ThreadPoolExecutor}
- * @return {@code Callable<T>} either as a pass-thru or wrapping the one given
- */
- public <T> Callable<T> wrapCallable(Callable<T> callable) {
- return callable;
- }
Ps:注释上有一句这么说的:This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}).
拓展
既然这个方法可以装饰线程池回调,那么我们亦可定义一个装饰器接口,只要这个接口的实现类,都会通过上述并发策略装饰不同业务不同场景需要的线程变量。
装饰器接口定义
- /**
- * Hystrix CallBack 装饰器定义
- *
- * @author wangzhuhua
- * @date 2018/09/07 下午4:18
- **/
- public interface HystrixCallableWrapper {
-
- /**
- * 装饰 Callable实例
- *
- * @param callable
- * 待装饰实例
- * @param <T>
- * 返回类型
- * @return 装饰后的实例
- */
- <T> Callable<T> wrap(Callable<T> callable);
- }
HystrixConcurrencystrategy Custom
- /**
- * Hystrix并发策略
- *
- * @author wangzhuhua
- * @date 2018/09/07 下午4:06
- **/
- public class HystrixConcurrencyStrategyCustom extends HystrixConcurrencyStrategy {
-
- /** 装饰队列 */
- private final List<HystrixCallableWrapper> wrappers;
-
- public HystrixConcurrencyStrategyCustom(List<HystrixCallableWrapper> wrappers) {
- this.wrappers = wrappers;
- }
-
- @Override
- public <T> Callable<T> wrapCallable(Callable<T> callable) {
- return new CallableWrapperChain(callable, this.wrappers).wrapCallable();
- }
-
- /**
- * callback 调用链
- *
- * @author wangzhuhua
- * @date 2018/09/07 下午4:38
- **/
- private static class CallableWrapperChain<T> {
-
- /** 回调 */
- private final Callable<T> callable;
- /** 回调包装 */
- private final List<HystrixCallableWrapper> wrappers;
-
- CallableWrapperChain(Callable<T> callable, List<HystrixCallableWrapper> wrappers) {
- this.callable = callable;
- this.wrappers = wrappers;
- }
-
- /**
- * 装饰线程hystrix callable
- *
- * @return {@link Callable}
- */
- Callable<T> wrapCallable() {
- Callable<T> delegate = callable;
- for (HystrixCallableWrapper wrapper : wrappers) {
- delegate = wrapper.wrap(delegate);
- }
- return delegate;
- }
- }
-
- }
Configuration
- /**
- * Hystrix配置
- *
- * @author wangzhuhua
- * @date 2018/09/07 下午4:01
- **/
- @Configuration
- @ConditionalOnProperty(value = "feign.hystrix.enabled", havingValue = "true")
- public class HystrixConfiguration {
-
- @Autowired(required = false)
- private List<HystrixCallableWrapper> wrappers = new ArrayList<>();
-
- @PostConstruct
- public void init() {
- HystrixPlugins.getInstance().registerConcurrencyStrategy(new HystrixConcurrencyStrategyCustom(wrappers));
- }
-
- }
实现示例
- /**
- * Token 装饰器
- *
- * @author wangzhuhua
- * @date 2018/09/10 上午11:28
- **/
- @Component
- public class TokenWrapper implements HystrixCallableWrapper {
- @Override
- public <T> Callable<T> wrap(Callable<T> callable) {
- return new TokenAwareCallable(callable, TokenHandler.getToken());
- }
-
- /**
- * Token装饰
- *
- * @param <T>
- */
- static class TokenAwareCallable<T> implements Callable<T> {
-
- /** 回调代理 */
- private final Callable<T> delegate;
- /** Token */
- private final String token;
-
- TokenAwareCallable(Callable<T> callable, String token) {
- this.delegate = callable;
- this.token = token;
- }
-
- @Override
- public T call() throws Exception {
- try {
- // 填充当前线程变量
- TokenHandler.setToken(this.token);
- return delegate.call();
- } finally {
- TokenHandler.remove();
- }
- }
- }
- }
其中TokenHandler内使用线程变量存储