赞
踩
Hystrix是分布式系统中用来做服务隔离的框架。它有限流、降级、熔断的功能。用来做依赖服务的隔离,比如订单服务响应时间很高时或者挂掉,本服务限制请求数或者对订单服务的接口降级保证本服务的稳定性。服务隔离底层使用线程池和信号量实现的,线程池没有指定threadPoolKey的话,默认基于groupKey维度划分,每个HystrixCommand实现run()执行业务逻辑。支持定义线程池,且内部用CHM复用线程池。Hystrix还有HealthCounts统计一个commandKey所代表接口的调用情况,接口调用结果有:成功、失败(执行抛出错误)、拒绝执行(线程池拒绝)、超时(在线程池中排队时间过长),这些结果都被报告给commandKey的circuitBreaker断路器。
降级就是在命令执行过程发生失败、被线程池拒绝、超时都会执行fallback函数。
断路器会有一个时间滑动窗口,根据配置决定在一段时间内错误率过高则打开短路,让某个commandKey的请求直接执行fallback降级函数。默认5s后会让一个请求试探接口,如果接口成功则关闭断路器,重置command的度量统计。
除了HystrixBadRequestException异常之外,所有从run()方法抛出的异常都算作失败,并触发降级getFallback()和断路器逻辑。
文章大纲,我们会通过源码分析
线程池隔离在高并发时有什么问题
基本属性配置大多数都来自于HystrixCommandProperties对象
- /* --------------统计相关------------------*/
- // 统计滚动的时间窗口,默认:5000毫秒(取自circuitBreakerSleepWindowInMilliseconds)
- private final HystrixProperty metricsRollingStatisticalWindowInMilliseconds;
- // 统计窗口的Buckets的数量,默认:10个,每秒一个Buckets统计,也就是滑动窗口的大小是10秒
- private final HystrixProperty metricsRollingStatisticalWindowBuckets; // number of buckets in the statisticalWindow
- // 是否开启监控统计功能,默认:true
- private final HystrixProperty metricsRollingPercentileEnabled;
- /* --------------熔断器相关------------------*/
- // 熔断器在整个统计时间内是否开启的阀值,默认20。也就是在metricsRollingStatisticalWindowInMilliseconds(默认10s)内至少请求20次,熔断器才发挥起作用
- private final HystrixProperty circuitBreakerRequestVolumeThreshold;
- // 熔断时间窗口,默认:5秒.熔断器中断请求5秒后会进入半打开状态,放下一个请求进来重试,如果该请求成功就关闭熔断器,否则继续等待一个熔断时间窗口
- private final HystrixProperty circuitBreakerSleepWindowInMilliseconds;
- //是否启用熔断器,默认true. 启动
- private final HystrixProperty circuitBreakerEnabled;
- //默认:50%。当出错率超过50%后熔断器启动,也就是在10秒的滑动窗口内错误率达50%,就会开启熔断
- private final HystrixProperty circuitBreakerErrorThresholdPercentage;
- //是否强制开启熔断器阻断所有请求,默认:false,不开启。置为true时,所有请求都将被拒绝,直接到fallback
- private final HystrixProperty circuitBreakerForceOpen;
- //是否允许熔断器忽略错误,默认false, 不开启
- private final HystrixProperty circuitBreakerForceClosed;
- /* --------------信号量相关------------------*/
- //使用信号量隔离时,命令调用最大的并发数,默认:10
- private final HystrixProperty executionIsolationSemaphoreMaxConcurrentRequests;
- //使用信号量隔离时,命令fallback(降级)调用最大的并发数,默认:10
- private final HystrixProperty fallbackIsolationSemaphoreMaxConcurrentRequests;
- /* --------------其他------------------*/
- //使用命令调用隔离方式,默认:采用线程隔离,ExecutionIsolationStrategy.THREAD
- private final HystrixProperty executionIsolationStrategy;
- //使用线程隔离时,调用超时时间,默认:1秒
- private final HystrixProperty executionIsolationThreadTimeoutInMilliseconds;
- //线程池的key,用于决定命令在哪个线程池执行
- private final HystrixProperty executionIsolationThreadPoolKeyOverride;
- //是否开启fallback降级策略 默认:true
- private final HystrixProperty fallbackEnabled;
- // 使用线程隔离时,是否对命令执行超时的线程调用中断(Thread.interrupt())操作.默认:true
- private final HystrixProperty executionIsolationThreadInterruptOnTimeout;
- // 是否开启请求日志,默认:true
- private final HystrixProperty requestLogEnabled;
- //是否开启请求缓存,默认:true
- private final HystrixProperty requestCacheEnabled;
Hystrix的用法,在编写好HystrixCommand后交给框架执行。HystrixCommand对象包装成一个可观察的 Observable对象,然后创建一个Observer观察者对象订阅这个Observable对象。subscribe()就会异步执行command。
Used for asynchronous execution of command with a callback by subscribing to the Observable.This lazily starts execution of the command once the Observable is subscribed to.An eager Observable can be obtained from observe().
See https://github.com/ReactiveX/RxJava/wiki for more information.
根据toObservable()方法的解释,只要command被调用了subscribe之后这个command就会被交给对应的线程池执行。执行完成根据结果回调completed或者error
- taskCommand.toObservable().subscribe(new Observer<Object>() {
- @Override
- public void onCompleted() {
- handleCompleted(logStr, cmd, taskCommand, starter, result);
- }
- @Override
- public void onError(Throwable e) {
- // 执行fallback函数异常才会调用这个方法
- log.error("{} 严重异常 e=", logStr, e);
- }
- @Override
- public void onNext(Object aVoid) {
- // nothing
- }
- });
Hystrix可以指定为每一个请求创建独立的线程池来执行,首先看一下@HystrixCommand的参数说明:
- public @interface HystrixCommand {
- // HystrixCommand 命令所属的组的名称:默认注解方法类的名称
- String groupKey() default "";
-
- // HystrixCommand 命令的key值,默认值为注解方法的名称
- String commandKey() default "";
-
- // 线程池名称,默认定义为groupKey
- String threadPoolKey() default "";
- // 定义回退方法的名称, 此方法必须和hystrix的执行方法在相同类中
- String fallbackMethod() default "";
- // 配置hystrix命令的参数
- HystrixProperty[] commandProperties() default {};
- // 配置hystrix依赖的线程池的参数
- HystrixProperty[] threadPoolProperties() default {};
-
- // 如果hystrix方法抛出的异常包括RUNTIME_EXCEPTION,则会被封装HystrixRuntimeException异常。我们也可以通过此方法定义哪些需要忽略的异常
- Class<? extends Throwable>[] ignoreExceptions() default {};
-
- // 定义执行hystrix observable的命令的模式,类型详细见ObservableExecutionMode
- ObservableExecutionMode observableExecutionMode() default ObservableExecutionMode.EAGER;
-
- // 如果hystrix方法抛出的异常包括RUNTIME_EXCEPTION,则会被封装HystrixRuntimeException异常。此方法定义需要抛出的异常
- HystrixException[] raiseHystrixExceptions() default {};
-
- // 定义回调方法:但是defaultFallback不能传入参数,返回参数和hystrix的命令兼容
- String defaultFallback() default "";
- }
这里构造线程池的方式就是我们熟悉的:new ThreadPoolExecutor()。
至此隔离机制中的线程池隔离我们就弄清楚了,线程池是以HystrixCommand.groupKey进行划分的,不同的CommandGroup有不同的线程池来处理。通常线程池适合用依赖服务的维度划分?A服务下的所有接口使用一个线程池执行请求?其实可以更细粒化,针对重要的高负荷的RPC接口单独使用线程池,避免互相影响。
- /*
- * 初始化线程池的key
- * 如果key为空,默认使用HystrixCommandGroup的名称作为key
- */
- private static HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, String threadPoolKeyOverride) {
- if (threadPoolKeyOverride == null) {
- // we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup
- if (threadPoolKey == null) {
- /* use HystrixCommandGroup if HystrixThreadPoolKey is null */
- return HystrixThreadPoolKey.Factory.asKey(groupKey.name());
- } else {
- return threadPoolKey;
- }
- } else {
- // we have a property defining the thread-pool so use it instead
- return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride);
- }
- }
- /*
- * 初始化线程池
- * HystrixThreadPool 中构造了一个ConcurrentHashMap来保存所有的线程池
- */
- private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
- if (fromConstructor == null) {
- // get the default implementation of HystrixThreadPool
- return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
- } else {
- return fromConstructor;
- }
- }
- /**
- *从map中获取线程池,如果不存在则构造一个线程池对象存入
- */
- static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
- // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
- String key = threadPoolKey.name();
- // this should find it for all but the first time
- HystrixThreadPool previouslyCached = threadPools.get(key);
- if (previouslyCached != null) {
- return previouslyCached;
- }
- // 加锁 保证单机并发的安全性
- synchronized (HystrixThreadPool.class) {
- if (!threadPools.containsKey(key)) {
- //通过HystrixThreadPoolDefault类来构造线程池
- threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
- }
- }
- return threadPools.get(key);
- }
仍旧是在AbstractCommand的构造函数中,有熔断器初始化的逻辑:
- this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
- /*
- *调用 HystrixCircuitBreaker 工厂类方法执行初始化
- */
- private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
- HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
- HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
- if (enabled) {
- if (fromConstructor == null) {
- // get the default implementation of HystrixCircuitBreaker
- return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
- } else {
- return fromConstructor;
- }
- } else {
- return new NoOpCircuitBreaker();
- }
- }
同样,在熔断器的保存逻辑中,也是将所有的熔断器存储在本地Map:
- public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
- // this should find it for all but the first time
- HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
- if (previouslyCached != null) {
- return previouslyCached;
- }
- HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
- if (cbForCommand == null) {
- return circuitBreakersByCommand.get(key.name());
- return cbForCommand;
- }
- }
toObservable()比较重要的地方就是构建了执行链,将执行断路器的匿名内部类加到链中,等到任务执行前会回调call()。Observable 对象,它代表操作的多个结果,需要咱们自己手动订阅并消费掉。
- // 用户代码,提交任务执行。taskCommand是一个HystrixCommand子对象
- taskCommand.toObservable().subscribe(new Observer<Object>() {
- @Override
- public void onCompleted() {
- handleCompleted(logStr, param, result, requestPipelineContext, curContext, handler, taskCommand);
- }
- @Override
- public void onError(Throwable e) {
- // 执行fallback函数异常才会调用这个方法
- log.error("严重异常 e=", e);
- }
- @Override
- public void onNext(Object aVoid) {
- // nothing
- }
- });
- public Observable<R> toObservable() {
- final AbstractCommand<R> _cmd = this;
-
- // 省略...
-
- // applyHystrixSemantics()应用断路器 HystrixCircuitBreaker。这里是先建一个Observable的匿名内部类待会加到执行链中执行
- final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
- @Override
- public Observable<R> call() {
- if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
- return Observable.never();
- }
- return applyHystrixSemantics(_cmd);
- }
- };
- // 省略...
- }
执行用户提交任务的入口也是在applyHystrixSemantics()中,当断路器判断完毕可以继续执行
- private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
- // mark that we're starting execution on the ExecutionHook
- // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
- executionHook.onStart(_cmd);
-
- /* 断路器判断是否可以继续执行 */
- if (circuitBreaker.allowRequest()) {
- // 这里如果isolation是线程池执行,返回默认的TryableSemaphoreNoOp,它的tryAcquire直接返回true。
- // 信号量隔离则返回TryableSemaphoreActul,tryAcquire根据信号量数量判断是否执行
- final TryableSemaphore executionSemaphore = getExecutionSemaphore();
- final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
- final Action0 singleSemaphoreRelease = new Action0() {
- @Override
- public void call() {
- if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
- executionSemaphore.release();
- }
- }
- };
- final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
- @Override
- public void call(Throwable t) {
- eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
- }
- };
- // 如果用线程池隔离,这里tryAcauire是默认返回true
- if (executionSemaphore.tryAcquire()) {
- try {
- /* used to track userThreadExecutionTime */
- executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
- // executeCommandAndObserve()是开始执行用户提交的HystrixCommand
- return executeCommandAndObserve(_cmd)
- .doOnError(markExceptionThrown)
- .doOnTerminate(singleSemaphoreRelease)
- .doOnUnsubscribe(singleSemaphoreRelease);
- } catch (RuntimeException e) {
- return Observable.error(e);
- }
- } else {
- return handleSemaphoreRejectionViaFallback();
- }
- } else {
- // 断路器打开了,直接降级执行fallback
- return handleShortCircuitViaFallback();
- }
- }
executeCommandAndObserve()中比较重要的地方是这里,根据cmd的配置决定执行的方式:线程池或信号量
- Observable<R> execution;
- if (properties.executionTimeoutEnabled().get()) {
- // 一般对command会配置执行超时时间,所以会走这里,里面判断用线程池还是信号量隔离执行
- execution = executeCommandWithSpecifiedIsolation(_cmd)
- .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
- } else {
- execution = executeCommandWithSpecifiedIsolation(_cmd);
- }
-
- executeCommandWithSpecifiedIsolation函数内部
- /**
- * If any of these hooks throw an exception, then it appears as if the actual execution threw an error
- */
- try {
- // 执行一些回调
- executionHook.onThreadStart(_cmd);
- executionHook.onRunStart(_cmd);
- executionHook.onExecutionStart(_cmd);
- // 调用HystrixCommand的方法,返回可执行的Observable对象
- return getUserExecutionObservable(_cmd);
- } catch (Throwable ex) {
- return Observable.error(ex);
- }
到这里就是HystrixCommand的方法了,还是包装成匿名内部类在其中调用子类的run()。到这里算是完成了toObserable().subscribe()入口到执行用户提交任务的流程。
- @Override
- final protected Observable<R> getExecutionObservable() {
- return Observable.defer(new Func0<Observable<R>>() {
- @Override
- public Observable<R> call() {
- try {
- // 线程执行用户提交的任务,子类run()
- return Observable.just(run());
- } catch (Throwable ex) {
- return Observable.error(ex);
- }
- }
- }).doOnSubscribe(new Action0() {
- @Override
- public void call() {
- // Save thread on which we get subscribed so that we can interrupt it later if needed
- executionThread.set(Thread.currentThread());
- }
- });
- }
熔断器是以command维度统计的。在上面的任务执行流程中就可以看到在applyHystrixSemantics()中 ,衔接断路器的使用。circuitBreaker.allowRequest()
断路器的默认实现是HystrixCircuitBreakerImpl类
- @Override
- public boolean allowRequest() {
- // 配置如果开了 强制断路,那所有的用户任务都会直接拒绝执行,转而执行fallback降级。默认false
- if (properties.circuitBreakerForceOpen().get()) {
- // properties have asked us to force the circuit open so we will allow NO requests
- return false;
- }
- // 配置如果开了 强制不启用断路器,那所有用户任务都会直接执行,即使这个command代表的接口有很多报错。默认false
- if (properties.circuitBreakerForceClosed().get()) {
- // we still want to allow isOpen() to perform it's calculations so we simulate normal behavior
- isOpen();
- // properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through
- return true;
- }
- // 正常不会去配置强制断路或强制不断路,而是根据接口的调用统计情况,让断路器适配
- // isOpen()去计算断路器是否打开,在打开了情况下,allowSingleTest()根据时间窗口每5s放一个请求试探接口是否已恢复
- return !isOpen() || allowSingleTest();
- }
- public boolean allowSingleTest() {
- long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
- // 1) if the circuit is open
- // 2) and it's been longer than 'sleepWindow' since we opened the circuit
- // 断路器打开了,如果最后一次请求时间已经过了5s,那去试探一下接口是否已恢复
- if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
- // We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
- // If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
- // 放过请求前,更新一下最后请求时间
- if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
- // if this returns true that means we set the time so we'll return true to allow the singleTest
- // if it returned false it means another thread raced us and allowed the singleTest before we did
- return true;
- }
- }
- // 仍然在限流的时间窗口内,不让请求通过,后续会执行fallback
- return false;
- }
- @Override
- public boolean isOpen() {
- if (circuitOpen.get()) {
- // if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
- return true;
- }
- // HystrixCommandMetrics对象是每个HystrixCommand的统计接口调用情况的对象
- HystrixCommandMetrics.HealthCounts health = metrics.getHealthCounts();
- // check if we are past the statisticalWindowVolumeThreshold
- if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
- // we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
- return false;
- }
- // 接口错误率还没达到阈值
- if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
- return false;
- } else {
- // 接口的错误率已经达到了设置的阈值,默认50%,就开启断路器
- if (circuitOpen.compareAndSet(false, true)) {
- // if the previousValue was false then we want to set the currentTime
- circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
- // 标记熔断打开,这里是回调avenager框架的监听器,设置一个标志位表示断路器已打开
- if (null != listener) {
- try {
- listener.markCircuitBreakerOpen();
- } catch (Throwable e) {}
- }
- return true;
- } else {
- // How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
- // caused another thread to set it to true already even though we were in the process of doing the same
- // In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
- return true;
- }
- }
- }
在AbstractCommand.executeCommandAndObserve()函数中
- Observable<R> execution;
- if (properties.executionTimeoutEnabled().get()) {
- execution = executeCommandWithSpecifiedIsolation(_cmd)
- // RxJava函数式编程。executeCommandWithSpecifiedIsolation(_cmd)会返回Observable对象,
- // lift会代理原Observable对象,加入HystrixObservableTimeoutOperator对象,就是加入超时检测逻辑
- .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
- } else {
- execution = executeCommandWithSpecifiedIsolation(_cmd);
- }
HystrixObservableTimeoutOperator.call() 会在执行原Observable对象之前调用,里面会创建一个TimerListener对象,tick函数是处理命令执行超时的逻辑。是将command的timeout状态置为true。
- @Override
- public Subscriber<? super R> call(final Subscriber<? super R> child) {
- HystrixTimer.TimerListener listener = new HystrixTimer.TimerListener() {
- @Override
- public void tick() {
- // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath
- // otherwise it means we lost a race and the run() execution completed or did not start
- // CAS将命令状态改为超时,如果工作线程没有在超时时间之前将状态改变,这里的HystrixTimeout线程改成功则意味着命令执行超时了
- if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
- // report timeout failure
- // HystrixEventNotify.markEvent()默认空实现
- originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
- // shut down the original request
- s.unsubscribe();
- timeoutRunnable.run();
- //if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout
- }
- }
- @Override
- public int getIntervalTimeInMilliseconds() {
- return originalCommand.properties.executionTimeoutInMilliseconds().get();
- }
- };
- // 将TimeListener加到HystrixTimer,线程池调度
- final Reference<HystrixTimer.TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
- // set externally so execute/queue can see this
- originalCommand.timeoutTimer.set(tl);
-
- }
有了标记执行超时的逻辑,那怎么实现检查逻辑?
将TimerListener包装Job丢给计划线程池执行,执行时间间隔是HystrixCommand的timeout,周期执行HystrixTimer.getInstance().addTimerListener(listener);
- public Reference<TimerListener> addTimerListener(final TimerListener listener) {
- // 初始化HystrixTimer中的计划线程池
- startThreadIfNeeded();
- // 驱动超时标记的Job
- Runnable r = new Runnable() {
- @Override
- public void run() {
- try {
- listener.tick();
- } catch (Exception e) {
- logger.error("Failed while ticking TimerListener", e);
- }
- }
- };
- // 提交给计划线程池调度,执行时间间隔是HystrixCommand的timeout,周期执行
- ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
- return new TimerReference(listener, f);
- }
当任务的执行完成后会将Command.TimedOutStatus改成COMPLETED,如果任务抢先在HystrixTimer线程之前完成,就会将状态改成完成,否则就是超时。
超时之后会执行listener.tick() 其中调用timeoutRunnable.run(); 传递一个HystrixTimeoutException 最终会传递给handleFallback(),一个executeCommandAndObserve函数创建的匿名内部类。handleFallback会加入Observable对象中,在Observable执行遇到异常时回调这个handleFallback的call
- final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
- @Override
- public Observable<R> call(Throwable t) {
- Exception e = getExceptionFromThrowable(t);
- executionResult = executionResult.setExecutionException(e);
- if (e instanceof RejectedExecutionException) {
- return handleThreadPoolRejectionViaFallback(e);
- } else if (t instanceof HystrixTimeoutException) {
- // HystrixTimer抛出的超时异常在此处理
- return handleTimeoutViaFallback();
- } else if (t instanceof HystrixBadRequestException) {
- return handleBadRequestByEmittingError(e);
- } else {
- /*
- * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
- */
- if (e instanceof HystrixBadRequestException) {
- eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
- return Observable.error(e);
- }
- return handleFailureViaFallback(e);
- }
- }
- };
debug看到这里给command对象的result对象添加timeout事件
eventCounts是一个位图BitSet,eventType是枚举类,会按照枚举声明顺序给位图的下标打1,外界就知道这个command发生了什么事件。
- public boolean isResponseTimedOut() {
- // 超时判断根据result的二进制位是否包含TIMEOUT标识
- return getCommandResult().getEventCounts().contains(HystrixEventType.TIMEOUT);
- }
AbstractCommand判断是否执行超时就是判断eventCounts中位图中TIMEOUT位置是否被置1。所以一个Command的执行超时这个时间范围是[提交Command~Command执行完成]。意味着什么?
这个时间范围不单只任务本身的执行时间,还包括它在线程池中等待的时间,有可能线程池负载太大导致任务饥饿。
最后看看计划线程池如何拿到任务,因为任务超时时间判断基于ScheduledThreadPool实现。计划线程池对于任务延迟执行的实现基于DelayWorkQueue。
DelayWorkQueue本身是一个阻塞队列,但是数据结构是最小堆,排序规则是job的可执行时间。所以堆顶是最近将可执行的job。计划线程池基于ThreadPoolExecutor,所以Worker工作逻辑还是相同,从queue不断取任务执行。
结合Hystrix的执行超时机制来看,就是HystrixTimer线程不断从DelayWorkQueue走下面的take逻辑,当Command的在timeout时间内没有将TimeoutStatus改成Completed,HystrixTimer线程就会取到TimerListener,将status改成timeout,抛出HystrixTimeoutException,在handleFallback函数将executionResult的eventCounts置为timeout,外界则得知任务执行超时。
- public RunnableScheduledFuture<?> take() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- for (;;) {
- RunnableScheduledFuture<?> first = queue[0];
- // 堆顶没任务则线程等待
- if (first == null)
- available.await();
- else {
- // 计算最近的任务是否可执行,当前时间-任务发生时间
- long delay = first.getDelay(NANOSECONDS);
- if (delay <= 0)
- // 可立即执行,淘汰堆顶,调整最小堆,下滤
- return finishPoll(first);
- // 最近的任务未到时间,线程等待
- first = null; // don't retain ref while waiting
- // leader是第一个到达堆获取任务的等待线程
- if (leader != null)
- // 当前线程等待,让leader去处理堆顶任务
- available.await();
- else {
- // 当前线程是第一个到达堆的线程
- Thread thisThread = Thread.currentThread();
- leader = thisThread;
- try {
- // 在等待队列中等待最近的任务发生时间
- // 或者等其它线程通知
- available.awaitNanos(delay);
- } finally {
- if (leader == thisThread)
- leader = null;
- }
- }
- }
- }
- } finally {
- if (leader == null && queue[0] != null)
- available.signal();
- lock.unlock();
- }
- }
- public HealthCounts plus(long[] eventTypeCounts) {
- long updatedTotalCount = totalCount;
- long updatedErrorCount = errorCount;
-
- long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];
- long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];
- long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];
- long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];
- long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];
-
- updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
- // 错误数包括:执行错误、执行超时、线程池拒绝、信号量拒绝
- updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
- return new HealthCounts(updatedTotalCount, updatedErrorCount);
- }
- // 构造HealthCounts
- HealthCounts(long total, long error) {
- this.totalCount = total;
- this.errorCount = error;
- if (totalCount > 0) {
- // 错误率 = 错误数/总数,默认错误率到50%开启断路器
- this.errorPercentage = (int) ((double) errorCount / totalCount * 100);
- } else {
- this.errorPercentage = 0;
- }
- }
从线程栈来看可知TimerListener在入延迟队列时要获取lock,大量的线程并发入队必然引起大量挂起此时就会有大量的线程切换。
Hystrix 注释里解释这些 TimerListener 是 HystrixCommand 用来处理异步线程超时的,它们会在command执行超时时执行,将超时结果返回。而在调用量大时,设置这些 TimerListener 就会因为锁而阻塞,从而阻塞当前的主线程导致服务响应变慢,甚至超过了command本身设定的执行超时时间,command还没被执行。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。