当前位置:   article > 正文

Hystrix源码分析

hystrix源码

Hystrix是分布式系统中用来做服务隔离的框架。它有限流、降级、熔断的功能。用来做依赖服务的隔离,比如订单服务响应时间很高时或者挂掉,本服务限制请求数或者对订单服务的接口降级保证本服务的稳定性。服务隔离底层使用线程池和信号量实现的,线程池没有指定threadPoolKey的话,默认基于groupKey维度划分,每个HystrixCommand实现run()执行业务逻辑。支持定义线程池,且内部用CHM复用线程池。Hystrix还有HealthCounts统计一个commandKey所代表接口的调用情况,接口调用结果有:成功、失败(执行抛出错误)、拒绝执行(线程池拒绝)、超时(在线程池中排队时间过长),这些结果都被报告给commandKey的circuitBreaker断路器。

降级就是在命令执行过程发生失败、被线程池拒绝、超时都会执行fallback函数。

断路器会有一个时间滑动窗口,根据配置决定在一段时间内错误率过高则打开短路,让某个commandKey的请求直接执行fallback降级函数。默认5s后会让一个请求试探接口,如果接口成功则关闭断路器,重置command的度量统计。

除了HystrixBadRequestException异常之外,所有从run()方法抛出的异常都算作失败,并触发降级getFallback()和断路器逻辑。

执行流程图

文章大纲,我们会通过源码分析

  • Hystrix的基本属性配置
  • 熔断器和线程池的初始化
  • command的执行流程
  • 熔断器如何工作
  • command超时监控的原理

线程池隔离在高并发时有什么问题

Hystrix的基本属性配置

基本属性配置大多数都来自于HystrixCommandProperties对象

  1. /* --------------统计相关------------------*/
  2. // 统计滚动的时间窗口,默认:5000毫秒(取自circuitBreakerSleepWindowInMilliseconds)
  3. private final HystrixProperty metricsRollingStatisticalWindowInMilliseconds;
  4. // 统计窗口的Buckets的数量,默认:10个,每秒一个Buckets统计,也就是滑动窗口的大小是10秒
  5. private final HystrixProperty metricsRollingStatisticalWindowBuckets; // number of buckets in the statisticalWindow
  6. // 是否开启监控统计功能,默认:true
  7. private final HystrixProperty metricsRollingPercentileEnabled;
  8. /* --------------熔断器相关------------------*/
  9. // 熔断器在整个统计时间内是否开启的阀值,默认20。也就是在metricsRollingStatisticalWindowInMilliseconds(默认10s)内至少请求20次,熔断器才发挥起作用
  10. private final HystrixProperty circuitBreakerRequestVolumeThreshold;
  11. // 熔断时间窗口,默认:5秒.熔断器中断请求5秒后会进入半打开状态,放下一个请求进来重试,如果该请求成功就关闭熔断器,否则继续等待一个熔断时间窗口
  12. private final HystrixProperty circuitBreakerSleepWindowInMilliseconds;
  13. //是否启用熔断器,默认true. 启动
  14. private final HystrixProperty circuitBreakerEnabled;
  15. //默认:50%。当出错率超过50%后熔断器启动,也就是在10秒的滑动窗口内错误率达50%,就会开启熔断
  16. private final HystrixProperty circuitBreakerErrorThresholdPercentage;
  17. //是否强制开启熔断器阻断所有请求,默认:false,不开启。置为true时,所有请求都将被拒绝,直接到fallback
  18. private final HystrixProperty circuitBreakerForceOpen;
  19. //是否允许熔断器忽略错误,默认false, 不开启
  20. private final HystrixProperty circuitBreakerForceClosed;
  21. /* --------------信号量相关------------------*/
  22. //使用信号量隔离时,命令调用最大的并发数,默认:10
  23. private final HystrixProperty executionIsolationSemaphoreMaxConcurrentRequests;
  24. //使用信号量隔离时,命令fallback(降级)调用最大的并发数,默认:10
  25. private final HystrixProperty fallbackIsolationSemaphoreMaxConcurrentRequests;
  26. /* --------------其他------------------*/
  27. //使用命令调用隔离方式,默认:采用线程隔离,ExecutionIsolationStrategy.THREAD
  28. private final HystrixProperty executionIsolationStrategy;
  29. //使用线程隔离时,调用超时时间,默认:1秒
  30. private final HystrixProperty executionIsolationThreadTimeoutInMilliseconds;
  31. //线程池的key,用于决定命令在哪个线程池执行
  32. private final HystrixProperty executionIsolationThreadPoolKeyOverride;
  33. //是否开启fallback降级策略 默认:true
  34. private final HystrixProperty fallbackEnabled;
  35. // 使用线程隔离时,是否对命令执行超时的线程调用中断(Thread.interrupt())操作.默认:true
  36. private final HystrixProperty executionIsolationThreadInterruptOnTimeout;
  37. // 是否开启请求日志,默认:true
  38. private final HystrixProperty requestLogEnabled;
  39. //是否开启请求缓存,默认:true
  40. private final HystrixProperty requestCacheEnabled;

 Hystrix的用法,在编写好HystrixCommand后交给框架执行。HystrixCommand对象包装成一个可观察的 Observable对象,然后创建一个Observer观察者对象订阅这个Observable对象。subscribe()就会异步执行command。

Used for asynchronous execution of command with a callback by subscribing to the Observable.This lazily starts execution of the command once the Observable is subscribed to.An eager Observable can be obtained from observe().

See https://github.com/ReactiveX/RxJava/wiki for more information.

根据toObservable()方法的解释,只要command被调用了subscribe之后这个command就会被交给对应的线程池执行。执行完成根据结果回调completed或者error

  1. taskCommand.toObservable().subscribe(new Observer<Object>() {
  2. @Override
  3. public void onCompleted() {
  4. handleCompleted(logStr, cmd, taskCommand, starter, result);
  5. }
  6. @Override
  7. public void onError(Throwable e) {
  8. // 执行fallback函数异常才会调用这个方法
  9. log.error("{} 严重异常 e=", logStr, e);
  10. }
  11. @Override
  12. public void onNext(Object aVoid) {
  13. // nothing
  14. }
  15. });

熔断器和线程池初始化

Hystrix可以指定为每一个请求创建独立的线程池来执行,首先看一下@HystrixCommand的参数说明:

  1. public @interface HystrixCommand {
  2. // HystrixCommand 命令所属的组的名称:默认注解方法类的名称
  3. String groupKey() default "";
  4. // HystrixCommand 命令的key值,默认值为注解方法的名称
  5. String commandKey() default "";
  6. // 线程池名称,默认定义为groupKey
  7. String threadPoolKey() default "";
  8. // 定义回退方法的名称, 此方法必须和hystrix的执行方法在相同类中
  9. String fallbackMethod() default "";
  10. // 配置hystrix命令的参数
  11. HystrixProperty[] commandProperties() default {};
  12. // 配置hystrix依赖的线程池的参数
  13. HystrixProperty[] threadPoolProperties() default {};
  14. // 如果hystrix方法抛出的异常包括RUNTIME_EXCEPTION,则会被封装HystrixRuntimeException异常。我们也可以通过此方法定义哪些需要忽略的异常
  15. Class<? extends Throwable>[] ignoreExceptions() default {};
  16. // 定义执行hystrix observable的命令的模式,类型详细见ObservableExecutionMode
  17. ObservableExecutionMode observableExecutionMode() default ObservableExecutionMode.EAGER;
  18. // 如果hystrix方法抛出的异常包括RUNTIME_EXCEPTION,则会被封装HystrixRuntimeException异常。此方法定义需要抛出的异常
  19. HystrixException[] raiseHystrixExceptions() default {};
  20. // 定义回调方法:但是defaultFallback不能传入参数,返回参数和hystrix的命令兼容
  21. String defaultFallback() default "";
  22. }

这里构造线程池的方式就是我们熟悉的:new ThreadPoolExecutor()。

至此隔离机制中的线程池隔离我们就弄清楚了,线程池是以HystrixCommand.groupKey进行划分的,不同的CommandGroup有不同的线程池来处理。通常线程池适合用依赖服务的维度划分?A服务下的所有接口使用一个线程池执行请求?其实可以更细粒化,针对重要的高负荷的RPC接口单独使用线程池,避免互相影响。

  1. /*
  2. * 初始化线程池的key
  3. * 如果key为空,默认使用HystrixCommandGroup的名称作为key
  4. */
  5. private static HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, String threadPoolKeyOverride) {
  6. if (threadPoolKeyOverride == null) {
  7. // we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup
  8. if (threadPoolKey == null) {
  9. /* use HystrixCommandGroup if HystrixThreadPoolKey is null */
  10. return HystrixThreadPoolKey.Factory.asKey(groupKey.name());
  11. } else {
  12. return threadPoolKey;
  13. }
  14. } else {
  15. // we have a property defining the thread-pool so use it instead
  16. return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride);
  17. }
  18. }
  19. /*
  20. * 初始化线程池
  21. * HystrixThreadPool 中构造了一个ConcurrentHashMap来保存所有的线程池
  22. */
  23. private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
  24. if (fromConstructor == null) {
  25. // get the default implementation of HystrixThreadPool
  26. return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
  27. } else {
  28. return fromConstructor;
  29. }
  30. }
  31. /**
  32. *从map中获取线程池,如果不存在则构造一个线程池对象存入
  33. */
  34. static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
  35. // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
  36. String key = threadPoolKey.name();
  37. // this should find it for all but the first time
  38. HystrixThreadPool previouslyCached = threadPools.get(key);
  39. if (previouslyCached != null) {
  40. return previouslyCached;
  41. }
  42. // 加锁 保证单机并发的安全性
  43. synchronized (HystrixThreadPool.class) {
  44. if (!threadPools.containsKey(key)) {
  45. //通过HystrixThreadPoolDefault类来构造线程池
  46. threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
  47. }
  48. }
  49. return threadPools.get(key);
  50. }

仍旧是在AbstractCommand的构造函数中,有熔断器初始化的逻辑:

  1. this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
  2. /*
  3. *调用 HystrixCircuitBreaker 工厂类方法执行初始化
  4. */
  5. private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
  6. HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
  7. HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
  8. if (enabled) {
  9. if (fromConstructor == null) {
  10. // get the default implementation of HystrixCircuitBreaker
  11. return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
  12. } else {
  13. return fromConstructor;
  14. }
  15. } else {
  16. return new NoOpCircuitBreaker();
  17. }
  18. }

同样,在熔断器的保存逻辑中,也是将所有的熔断器存储在本地Map:

  1. public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
  2. // this should find it for all but the first time
  3. HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
  4. if (previouslyCached != null) {
  5. return previouslyCached;
  6. }
  7. HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
  8. if (cbForCommand == null) {
  9. return circuitBreakersByCommand.get(key.name());
  10. return cbForCommand;
  11. }
  12. }

Hystrix从提交任务到执行任务的流程

toObservable()比较重要的地方就是构建了执行链,将执行断路器的匿名内部类加到链中,等到任务执行前会回调call()。Observable 对象,它代表操作的多个结果,需要咱们自己手动订阅并消费掉。

  1. // 用户代码,提交任务执行。taskCommand是一个HystrixCommand子对象
  2. taskCommand.toObservable().subscribe(new Observer<Object>() {
  3. @Override
  4. public void onCompleted() {
  5. handleCompleted(logStr, param, result, requestPipelineContext, curContext, handler, taskCommand);
  6. }
  7. @Override
  8. public void onError(Throwable e) {
  9. // 执行fallback函数异常才会调用这个方法
  10. log.error("严重异常 e=", e);
  11. }
  12. @Override
  13. public void onNext(Object aVoid) {
  14. // nothing
  15. }
  16. });
  17. public Observable<R> toObservable() {
  18. final AbstractCommand<R> _cmd = this;
  19. // 省略...
  20. // applyHystrixSemantics()应用断路器 HystrixCircuitBreaker。这里是先建一个Observable的匿名内部类待会加到执行链中执行
  21. final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
  22. @Override
  23. public Observable<R> call() {
  24. if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
  25. return Observable.never();
  26. }
  27. return applyHystrixSemantics(_cmd);
  28. }
  29. };
  30. // 省略...
  31. }

执行用户提交任务的入口也是在applyHystrixSemantics()中,当断路器判断完毕可以继续执行

  1. private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
  2. // mark that we're starting execution on the ExecutionHook
  3. // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
  4. executionHook.onStart(_cmd);
  5. /* 断路器判断是否可以继续执行 */
  6. if (circuitBreaker.allowRequest()) {
  7. // 这里如果isolation是线程池执行,返回默认的TryableSemaphoreNoOp,它的tryAcquire直接返回true。
  8. // 信号量隔离则返回TryableSemaphoreActul,tryAcquire根据信号量数量判断是否执行
  9. final TryableSemaphore executionSemaphore = getExecutionSemaphore();
  10. final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
  11. final Action0 singleSemaphoreRelease = new Action0() {
  12. @Override
  13. public void call() {
  14. if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
  15. executionSemaphore.release();
  16. }
  17. }
  18. };
  19. final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
  20. @Override
  21. public void call(Throwable t) {
  22. eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
  23. }
  24. };
  25. // 如果用线程池隔离,这里tryAcauire是默认返回true
  26. if (executionSemaphore.tryAcquire()) {
  27. try {
  28. /* used to track userThreadExecutionTime */
  29. executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
  30. // executeCommandAndObserve()是开始执行用户提交的HystrixCommand
  31. return executeCommandAndObserve(_cmd)
  32. .doOnError(markExceptionThrown)
  33. .doOnTerminate(singleSemaphoreRelease)
  34. .doOnUnsubscribe(singleSemaphoreRelease);
  35. } catch (RuntimeException e) {
  36. return Observable.error(e);
  37. }
  38. } else {
  39. return handleSemaphoreRejectionViaFallback();
  40. }
  41. } else {
  42. // 断路器打开了,直接降级执行fallback
  43. return handleShortCircuitViaFallback();
  44. }
  45. }

executeCommandAndObserve()中比较重要的地方是这里,根据cmd的配置决定执行的方式:线程池或信号量

  1. Observable<R> execution;
  2. if (properties.executionTimeoutEnabled().get()) {
  3. // 一般对command会配置执行超时时间,所以会走这里,里面判断用线程池还是信号量隔离执行
  4. execution = executeCommandWithSpecifiedIsolation(_cmd)
  5. .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
  6. } else {
  7. execution = executeCommandWithSpecifiedIsolation(_cmd);
  8. }
  9. executeCommandWithSpecifiedIsolation函数内部
  10. /**
  11. * If any of these hooks throw an exception, then it appears as if the actual execution threw an error
  12. */
  13. try {
  14. // 执行一些回调
  15. executionHook.onThreadStart(_cmd);
  16. executionHook.onRunStart(_cmd);
  17. executionHook.onExecutionStart(_cmd);
  18. // 调用HystrixCommand的方法,返回可执行的Observable对象
  19. return getUserExecutionObservable(_cmd);
  20. } catch (Throwable ex) {
  21. return Observable.error(ex);
  22. }

到这里就是HystrixCommand的方法了,还是包装成匿名内部类在其中调用子类的run()。到这里算是完成了toObserable().subscribe()入口到执行用户提交任务的流程。

  1. @Override
  2. final protected Observable<R> getExecutionObservable() {
  3. return Observable.defer(new Func0<Observable<R>>() {
  4. @Override
  5. public Observable<R> call() {
  6. try {
  7. // 线程执行用户提交的任务,子类run()
  8. return Observable.just(run());
  9. } catch (Throwable ex) {
  10. return Observable.error(ex);
  11. }
  12. }
  13. }).doOnSubscribe(new Action0() {
  14. @Override
  15. public void call() {
  16. // Save thread on which we get subscribed so that we can interrupt it later if needed
  17. executionThread.set(Thread.currentThread());
  18. }
  19. });
  20. }

命令执行过程如何使用断路器

熔断器是以command维度统计的。在上面的任务执行流程中就可以看到在applyHystrixSemantics()中 ,衔接断路器的使用。circuitBreaker.allowRequest()

断路器的默认实现是HystrixCircuitBreakerImpl类

  1. @Override
  2. public boolean allowRequest() {
  3. // 配置如果开了 强制断路,那所有的用户任务都会直接拒绝执行,转而执行fallback降级。默认false
  4. if (properties.circuitBreakerForceOpen().get()) {
  5. // properties have asked us to force the circuit open so we will allow NO requests
  6. return false;
  7. }
  8. // 配置如果开了 强制不启用断路器,那所有用户任务都会直接执行,即使这个command代表的接口有很多报错。默认false
  9. if (properties.circuitBreakerForceClosed().get()) {
  10. // we still want to allow isOpen() to perform it's calculations so we simulate normal behavior
  11. isOpen();
  12. // properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through
  13. return true;
  14. }
  15. // 正常不会去配置强制断路或强制不断路,而是根据接口的调用统计情况,让断路器适配
  16. // isOpen()去计算断路器是否打开,在打开了情况下,allowSingleTest()根据时间窗口每5s放一个请求试探接口是否已恢复
  17. return !isOpen() || allowSingleTest();
  18. }
  19. public boolean allowSingleTest() {
  20. long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
  21. // 1) if the circuit is open
  22. // 2) and it's been longer than 'sleepWindow' since we opened the circuit
  23. // 断路器打开了,如果最后一次请求时间已经过了5s,那去试探一下接口是否已恢复
  24. if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
  25. // We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
  26. // If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
  27. // 放过请求前,更新一下最后请求时间
  28. if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
  29. // if this returns true that means we set the time so we'll return true to allow the singleTest
  30. // if it returned false it means another thread raced us and allowed the singleTest before we did
  31. return true;
  32. }
  33. }
  34. // 仍然在限流的时间窗口内,不让请求通过,后续会执行fallback
  35. return false;
  36. }
  37. @Override
  38. public boolean isOpen() {
  39. if (circuitOpen.get()) {
  40. // if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
  41. return true;
  42. }
  43. // HystrixCommandMetrics对象是每个HystrixCommand的统计接口调用情况的对象
  44. HystrixCommandMetrics.HealthCounts health = metrics.getHealthCounts();
  45. // check if we are past the statisticalWindowVolumeThreshold
  46. if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
  47. // we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
  48. return false;
  49. }
  50. // 接口错误率还没达到阈值
  51. if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
  52. return false;
  53. } else {
  54. // 接口的错误率已经达到了设置的阈值,默认50%,就开启断路器
  55. if (circuitOpen.compareAndSet(false, true)) {
  56. // if the previousValue was false then we want to set the currentTime
  57. circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
  58. // 标记熔断打开,这里是回调avenager框架的监听器,设置一个标志位表示断路器已打开
  59. if (null != listener) {
  60. try {
  61. listener.markCircuitBreakerOpen();
  62. } catch (Throwable e) {}
  63. }
  64. return true;
  65. } else {
  66. // How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
  67. // caused another thread to set it to true already even though we were in the process of doing the same
  68. // In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
  69. return true;
  70. }
  71. }
  72. }

Command的执行超时实现原理

在AbstractCommand.executeCommandAndObserve()函数中

  1. Observable<R> execution;
  2. if (properties.executionTimeoutEnabled().get()) {
  3. execution = executeCommandWithSpecifiedIsolation(_cmd)
  4. // RxJava函数式编程。executeCommandWithSpecifiedIsolation(_cmd)会返回Observable对象,
  5. // lift会代理原Observable对象,加入HystrixObservableTimeoutOperator对象,就是加入超时检测逻辑
  6. .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
  7. } else {
  8. execution = executeCommandWithSpecifiedIsolation(_cmd);
  9. }

HystrixObservableTimeoutOperator.call() 会在执行原Observable对象之前调用,里面会创建一个TimerListener对象,tick函数是处理命令执行超时的逻辑。是将command的timeout状态置为true。

  1. @Override
  2. public Subscriber<? super R> call(final Subscriber<? super R> child) {
  3. HystrixTimer.TimerListener listener = new HystrixTimer.TimerListener() {
  4. @Override
  5. public void tick() {
  6. // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath
  7. // otherwise it means we lost a race and the run() execution completed or did not start
  8. // CAS将命令状态改为超时,如果工作线程没有在超时时间之前将状态改变,这里的HystrixTimeout线程改成功则意味着命令执行超时了
  9. if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
  10. // report timeout failure
  11. // HystrixEventNotify.markEvent()默认空实现
  12. originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
  13. // shut down the original request
  14. s.unsubscribe();
  15. timeoutRunnable.run();
  16. //if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout
  17. }
  18. }
  19. @Override
  20. public int getIntervalTimeInMilliseconds() {
  21. return originalCommand.properties.executionTimeoutInMilliseconds().get();
  22. }
  23. };
  24. // 将TimeListener加到HystrixTimer,线程池调度
  25. final Reference<HystrixTimer.TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
  26. // set externally so execute/queue can see this
  27. originalCommand.timeoutTimer.set(tl);
  28. }

有了标记执行超时的逻辑,那怎么实现检查逻辑?

将TimerListener包装Job丢给计划线程池执行,执行时间间隔是HystrixCommand的timeout,周期执行HystrixTimer.getInstance().addTimerListener(listener);

  1. public Reference<TimerListener> addTimerListener(final TimerListener listener) {
  2. // 初始化HystrixTimer中的计划线程池
  3. startThreadIfNeeded();
  4. // 驱动超时标记的Job
  5. Runnable r = new Runnable() {
  6. @Override
  7. public void run() {
  8. try {
  9. listener.tick();
  10. } catch (Exception e) {
  11. logger.error("Failed while ticking TimerListener", e);
  12. }
  13. }
  14. };
  15. // 提交给计划线程池调度,执行时间间隔是HystrixCommand的timeout,周期执行
  16. ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
  17. return new TimerReference(listener, f);
  18. }

当任务的执行完成后会将Command.TimedOutStatus改成COMPLETED,如果任务抢先在HystrixTimer线程之前完成,就会将状态改成完成,否则就是超时。

超时之后会执行listener.tick() 其中调用timeoutRunnable.run(); 传递一个HystrixTimeoutException 最终会传递给handleFallback(),一个executeCommandAndObserve函数创建的匿名内部类。handleFallback会加入Observable对象中,在Observable执行遇到异常时回调这个handleFallback的call

  1. final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
  2. @Override
  3. public Observable<R> call(Throwable t) {
  4. Exception e = getExceptionFromThrowable(t);
  5. executionResult = executionResult.setExecutionException(e);
  6. if (e instanceof RejectedExecutionException) {
  7. return handleThreadPoolRejectionViaFallback(e);
  8. } else if (t instanceof HystrixTimeoutException) {
  9. // HystrixTimer抛出的超时异常在此处理
  10. return handleTimeoutViaFallback();
  11. } else if (t instanceof HystrixBadRequestException) {
  12. return handleBadRequestByEmittingError(e);
  13. } else {
  14. /*
  15. * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
  16. */
  17. if (e instanceof HystrixBadRequestException) {
  18. eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
  19. return Observable.error(e);
  20. }
  21. return handleFailureViaFallback(e);
  22. }
  23. }
  24. };

debug看到这里给command对象的result对象添加timeout事件

 eventCounts是一个位图BitSet,eventType是枚举类,会按照枚举声明顺序给位图的下标打1,外界就知道这个command发生了什么事件。

  1. public boolean isResponseTimedOut() {
  2. // 超时判断根据result的二进制位是否包含TIMEOUT标识
  3. return getCommandResult().getEventCounts().contains(HystrixEventType.TIMEOUT);
  4. }

AbstractCommand判断是否执行超时就是判断eventCounts中位图中TIMEOUT位置是否被置1。所以一个Command的执行超时这个时间范围是[提交Command~Command执行完成]。意味着什么?

这个时间范围不单只任务本身的执行时间,还包括它在线程池中等待的时间,有可能线程池负载太大导致任务饥饿。

最后看看计划线程池如何拿到任务,因为任务超时时间判断基于ScheduledThreadPool实现。计划线程池对于任务延迟执行的实现基于DelayWorkQueue。

DelayWorkQueue本身是一个阻塞队列,但是数据结构是最小堆,排序规则是job的可执行时间。所以堆顶是最近将可执行的job。计划线程池基于ThreadPoolExecutor,所以Worker工作逻辑还是相同,从queue不断取任务执行。

结合Hystrix的执行超时机制来看,就是HystrixTimer线程不断从DelayWorkQueue走下面的take逻辑,当Command的在timeout时间内没有将TimeoutStatus改成Completed,HystrixTimer线程就会取到TimerListener,将status改成timeout,抛出HystrixTimeoutException,在handleFallback函数将executionResult的eventCounts置为timeout,外界则得知任务执行超时。

  1. public RunnableScheduledFuture<?> take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. for (;;) {
  6. RunnableScheduledFuture<?> first = queue[0];
  7. // 堆顶没任务则线程等待
  8. if (first == null)
  9. available.await();
  10. else {
  11. // 计算最近的任务是否可执行,当前时间-任务发生时间
  12. long delay = first.getDelay(NANOSECONDS);
  13. if (delay <= 0)
  14. // 可立即执行,淘汰堆顶,调整最小堆,下滤
  15. return finishPoll(first);
  16. // 最近的任务未到时间,线程等待
  17. first = null; // don't retain ref while waiting
  18. // leader是第一个到达堆获取任务的等待线程
  19. if (leader != null)
  20. // 当前线程等待,让leader去处理堆顶任务
  21. available.await();
  22. else {
  23. // 当前线程是第一个到达堆的线程
  24. Thread thisThread = Thread.currentThread();
  25. leader = thisThread;
  26. try {
  27. // 在等待队列中等待最近的任务发生时间
  28. // 或者等其它线程通知
  29. available.awaitNanos(delay);
  30. } finally {
  31. if (leader == thisThread)
  32. leader = null;
  33. }
  34. }
  35. }
  36. }
  37. } finally {
  38. if (leader == null && queue[0] != null)
  39. available.signal();
  40. lock.unlock();
  41. }
  42. }

HystrixCommand的HealthCounts怎么算?

  1. public HealthCounts plus(long[] eventTypeCounts) {
  2. long updatedTotalCount = totalCount;
  3. long updatedErrorCount = errorCount;
  4. long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];
  5. long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];
  6. long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];
  7. long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];
  8. long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];
  9. updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
  10. // 错误数包括:执行错误、执行超时、线程池拒绝、信号量拒绝
  11. updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
  12. return new HealthCounts(updatedTotalCount, updatedErrorCount);
  13. }
  14. // 构造HealthCounts
  15. HealthCounts(long total, long error) {
  16. this.totalCount = total;
  17. this.errorCount = error;
  18. if (totalCount > 0) {
  19. // 错误率 = 错误数/总数,默认错误率到50%开启断路器
  20. this.errorPercentage = (int) ((double) errorCount / totalCount * 100);
  21. } else {
  22. this.errorPercentage = 0;
  23. }
  24. }

 

HystrixCommand使用线程池隔离在高并发时有什么问题?

从线程栈来看可知TimerListener在入延迟队列时要获取lock,大量的线程并发入队必然引起大量挂起此时就会有大量的线程切换。 

Hystrix 注释里解释这些 TimerListener 是 HystrixCommand 用来处理异步线程超时的,它们会在command执行超时时执行,将超时结果返回。而在调用量大时,设置这些 TimerListener 就会因为锁而阻塞,从而阻塞当前的主线程导致服务响应变慢,甚至超过了command本身设定的执行超时时间,command还没被执行。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/代码探险家/article/detail/978671
推荐阅读
  

闽ICP备14008679号