赞
踩
摘要: 原创出处 http://www.iocoder.cn/Hystrix/circuit-breaker/ 「芋道源码」欢迎转载,保留摘要,谢谢!
断路器概述
HystrixCircuitBreaker存在三种状态:
CLOSED :关闭
OPEN :打开
HALF_OPEN :半开
当断路器处于OPEN状态时,链路处于非健康状态,命令执行时,直接调用回退逻辑,跳过正常逻辑。
HystrixCircuitBreaker 状态变迁如下图:
①红线 :初始时,断路器处于CLOSED状态,链路处于健康状态。当满足如下条件,断路器从CLOSED变成OPEN状态:
1.周期( 可配,HystrixCommandProperties.default_metricsRollingStatisticalWindow = 10000 ms )内,总请求数超过一定量( 可配,HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20 ) 。
2.错误请求占总请求数超过一定比例( 可配,HystrixCommandProperties.circuitBreakerErrorThresholdPercentage = 50% ) 。
②绿线:断路器处于OPEN 状态,命令执行时,若当前时间超过断路器开启时间一定时间( HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds = 5000 ms ),断路器变成HALF_OPEN状态,尝试调用正常逻辑,根据执行是否成功,打开或关闭熔断器【蓝线】。
HystrixCircuitBreaker
在创建AbstractCommand时,初始化HystrixCircuitBreaker对象。
- abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
-
- /**
- * 断路器定义
- */
- protected final HystrixCircuitBreaker circuitBreaker;
- protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
- HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
- HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
- HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
- // ... 省略无关代码
- // 初始化 断路器
- this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
- // ... 省略无关代码
- }
- private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
- HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
- HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
- if (enabled) {
- if (fromConstructor == null) {
- // get the default implementation of HystrixCircuitBreaker
- return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
- } else {
- return fromConstructor;
- }
- } else {
- return new NoOpCircuitBreaker();
- }
- }
- }
看一下断路器com.netflix.hystrix.HystrixCircuitBreaker的内部定义
- public interface HystrixCircuitBreaker {
- boolean allowRequest();
-
- boolean isOpen();
-
- void markSuccess();
-
- void markNonSuccess();
-
- boolean attemptExecution();
-
- class Factory {...}
-
- class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {...}
-
- static class NoOpCircuitBreaker implements HystrixCircuitBreaker {...}
-
- }
HystrixCircuitBreaker定义了五个接口和三个内部类:
com.netflix.hystrix.HystrixCircuitBreaker.Factory:HystrixCircuitBreaker 工厂,主要用于
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
NoOpCircuitBreaker:是HystrixCircuitBreaker的子类,是一个空的断路器实现,用于不开启断路器功能的情况,即它允许所有请求,断路器始终闭合。
HystrixCircuitBreakerImpl:是HystrixCircuitBreaker的子类,是一个完整的断路器实现。
HystrixCircuitBreakerImpl类内部定义了断路器的四个核心对象和一个断路器状态枚举类:
- private final HystrixCommandProperties properties;
- private final HystrixCommandMetrics metrics;
-
- enum Status {
- CLOSED, OPEN, HALF_OPEN;
- }
-
- private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
- private final AtomicLong circuitOpened = new AtomicLong(-1);
- private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);
isOpen():判断断路器的状态(打开/关闭)。
subscribeToStream():HystrixCircuitBreakerImpl定义的方法,向 Hystrix Metrics 对请求量统计 Observable 的发起订阅。
- private Subscription subscribeToStream() {
- 1: private Subscription subscribeToStream() {
- 2: /*
- 3: * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
- 4: */
- 5: return metrics.getHealthCountsStream()
- 6: .observe()
- 7: .subscribe(new Subscriber<HealthCounts>() {
- 8: @Override
- 9: public void onCompleted() {
- 10:
- 11: }
- 12:
- 13: @Override
- 14: public void onError(Throwable e) {
- 15:
- 16: }
- 17:
- 18: @Override
- 19: public void onNext(HealthCounts hc) {
- 20: System.out.println("totalRequests" + hc.getTotalRequests()); // 芋艿,用于调试
- 21: // check if we are past the statisticalWindowVolumeThreshold
- 22: if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
- 23: // we are not past the minimum volume threshold for the stat window,
- 24: // so no change to circuit status.
- 25: // if it was CLOSED, it stays CLOSED
- 26: // if it was half-open, we need to wait for a successful command execution
- 27: // if it was open, we need to wait for sleep window to elapse
- 28: } else {
- 29: if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
- 30: //we are not past the minimum error threshold for the stat window,
- 31: // so no change to circuit status.
- 32: // if it was CLOSED, it stays CLOSED
- 33: // if it was half-open, we need to wait for a successful command execution
- 34: // if it was open, we need to wait for sleep window to elapse
- 35: } else {
- 36: // our failure rate is too high, we need to set the state to OPEN
- 37: if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
- 38: circuitOpened.set(System.currentTimeMillis());
- 39: }
- 40: }
- 41: }
- 42: }
- 43: });
- 44: }
第 5 至 7 行 :向 Hystrix Metrics 对请求量统计 Observable 的发起订阅。
第 22 行 :判断周期( 可配,HystrixCommandProperties.default_metricsRollingStatisticalWindow = 10000 ms )内,总请求数超过一定量( 可配,HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20 ) 。这里要注意下,请求次数统计的是周期内,超过周期的不计算在内。例如说,00:00 内发起了 N 个请求,00:11 不计算这 N 个请求。
第 29 行 :错误请求占总请求数超过一定比例( 可配,HystrixCommandProperties.circuitBreakerErrorThresholdPercentage = 50% ) 。
第 37 至 39 行 :满足断路器打开条件,CAS 修改状态( CLOSED => OPEN ),并设置打开时间( circuitOpened ) 。
目前该方法有两处调用 :
构造方法内,在创建 HystrixCircuitBreakerImpl 时,向 Hystrix Metrics 对请求量统计 Observable 的发起订阅。固定间隔,计算断路器是否要关闭( CLOSE )。
markSuccess()内,清空 Hystrix Metrics 对请求量统计 Observable 的统计信息,取消原有订阅,并发起新的订阅。
attemptExecution():判断是否可以执行正常逻辑。
如下是 AbstractCommand#applyHystrixSemantics(_cmd) 方法,对 HystrixCircuitBreakerImpl#attemptExecution 方法的调用的代码 :
- private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
-
- // ... 省略无关代码
-
- /* determine if we're allowed to execute */
- if (circuitBreaker.attemptExecution()) {
- // 执行【正常逻辑】
- } else {
- // 执行【回退逻辑】
- }
- }
- @Override
- 2: public boolean attemptExecution() {
- 3: // 强制 打开
- 4: if (properties.circuitBreakerForceOpen().get()) {
- 5: return false;
- 6: }
- 7: // 强制 关闭
- 8: if (properties.circuitBreakerForceClosed().get()) {
- 9: return true;
- 10: }
- 11: // 打开时间为空
- 12: if (circuitOpened.get() == -1) {
- 13: return true;
- 14: } else {
- 15: // 满足间隔尝试断路器时间
- 16: if (isAfterSleepWindow()) {
- 17: //only the first request after sleep window should execute
- 18: //if the executing command succeeds, the status will transition to CLOSED
- 19: //if the executing command fails, the status will transition to OPEN
- 20: //if the executing command gets unsubscribed, the status will transition to OPEN
- 21: if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
- 22: return true;
- 23: } else {
- 24: return false;
- 25: }
- 26: } else {
- 27: return false;
- 28: }
- 29: }
- 30: }
第 4 至 6 行 :当 HystrixCommandProperties.circuitBreakerForceOpen = true ( 默认值 :false) 时,即断路器强制打开,返回 false 。当该配置接入配置中心后,可以动态实现打开熔断。为什么会有该配置?当 HystrixCircuitBreaker 创建完成后,无法动态切换 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,通过该配置以实现类似效果。
第 8 至 10 行 :当 HystrixCommandProperties.circuitBreakerForceClose = true ( 默认值 :false) 时,即断路器强制关闭,返回 true 。当该配置接入配置中心后,可以动态实现关闭熔断。为什么会有该配置?当 HystrixCircuitBreaker 创建完成后,无法动态切换 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,通过该配置以实现类似效果。
第 12 至 13 行 :断路器打开时间( circuitOpened ) 为”空”,返回 true 。
第 16 至 28 行 :调用 #isAfterSleepWindow() 方法,判断是否满足尝试调用正常逻辑的间隔时间。当满足,使用 CAS 方式修改断路器状态( OPEN => HALF_OPEN ),从而保证有且仅有一个线程能够尝试调用正常逻辑。
isAfterSleepWindow():在当前时间超过断路器打开时间 HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds ( 默认值,5000 ms ),返回 true 。
- private boolean isAfterSleepWindow() {
- final long circuitOpenTime = circuitOpened.get();
- final long currentTime = System.currentTimeMillis();
- final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
- return currentTime > circuitOpenTime + sleepWindowTime;
- }
markSuccess():当尝试调用正常逻辑成功时,调用此方法,关闭断路器。
- @Override
- 2: public void markSuccess() {
- 3: if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
- 4: // 清空 Hystrix Metrics 对请求量统计 Observable 的**统计信息**
- 5: //This thread wins the race to close the circuit - it resets the stream to start it over from 0
- 6: metrics.resetStream();
- 7: // 取消原有订阅
- 8: Subscription previousSubscription = activeSubscription.get();
- 9: if (previousSubscription != null) {
- 10: previousSubscription.unsubscribe();
- 11: }
- 12: // 发起新的订阅
- 13: Subscription newSubscription = subscribeToStream();
- 14: activeSubscription.set(newSubscription);
- 15: // 设置断路器打开时间为空
- 16: circuitOpened.set(-1L);
- 17: }
- 18: }
第 3 行 :使用 CAS 方式,修改断路器状态( HALF_OPEN => CLOSED )。
第 6 行 :清空 Hystrix Metrics 对请求量统计 Observable 的统计信息。
第 8 至 14 行 :取消原有订阅,发起新的订阅。
第 16 行 :设置断路器打开时间为”空” 。
markNonSuccess():当尝试调用正常逻辑失败时,调用此方法,重新打开断路器。
- @Override
- 2: public void markNonSuccess() {
- 3: if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
- 4: //This thread wins the race to re-open the circuit - it resets the start time for the sleep window
- 5: circuitOpened.set(System.currentTimeMillis());
- 6: }
- 7: }
第 3 行 :使用 CAS 方式,修改断路器状态( HALF_OPEN => OPEN )。
第 5 行 :设置设置断路器打开时间为当前时间。这样,#attemptExecution() 过一段时间,可以再次尝试执行正常逻辑。
allowRequest():allowRequest() 和 attemptExecution() 方法,方法目的基本类似,差别在于当断路器满足尝试关闭条件时,前者不会将断路器不会修改状态( CLOSE => HALF-OPEN ),而后者会。
下图展示了 HystrixCommand 或 HystrixObservableCommand 如何与 HystrixCircuitBreaker 进行交互,以及 HystrixCircuitBreaker 的决策逻辑流程,包括熔断器内部计数器如何工作。
总结来说,线路的开路闭路详细逻辑如下:
假设线路内的容量(请求QPS)达到一定阈值(通过 HystrixCommandProperties.circuitBreakerRequestVolumeThreshold() 配置)
同时,假设线路内的错误率达到一定阈值(通过 HystrixCommandProperties.circuitBreakerErrorThresholdPercentage() 配置)
熔断器将从『闭路』转换成『开路』
若此时是『开路』状态,熔断器将短路后续所有经过该熔断器的请求,这些请求直接走『失败回退逻辑』
经过一定时间(即『休眠窗口』,通过 HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds() 配置),后续第一个请求将会被允许通过熔断器(此时熔断器处于『半开』状态),若该请求失败,熔断器将又进入『开路』状态,且在休眠窗口内保持此状态;若该请求成功,熔断器将进入『闭路』状态,回到逻辑1循环往复。
依赖隔离
Hystrix 通过使用『舱壁模式』(注:将船的底部划分成一个个的舱室,这样一个舱室进水不会导致整艘船沉没。将系统所有依赖服务隔离起来,一个依赖延迟升高或者失败,不会导致整个系统失败)来隔离依赖服务,并限制访问这些依赖服务的并发度它会为每一个依赖服务创建一个独立的线程池,这样就算某个依赖服务出现延迟过高的情况,也只是对该依赖服务的调用产生影响,不会拖慢其他的依赖服务。
当然,也可以不使用线程池来使你的系统免受依赖服务失效的影响,这需要你小心的设置网络连接/读取超时时间和重试配置,并保证这些配置能正确正常的运作,以使这些依赖服务在失效时,能快速返回错误。
将依赖服务请求通过使用不同的线程池隔离,其优势如下:
简而言之,通过线程池提供的依赖服务隔离,可以使得我们能在不停止服务的情况下,更加优雅地应对客户端库和子系统性能上的变化。
注:尽管线程池能提供隔离性,但你仍然需要对你的依赖服务客户端代码增加超时逻辑,并且/或者处理线程中断异常,以使这些代码不会无故地阻塞或者拖慢 Hystrix 线程池。
你可能担心为每一个依赖服务创建线程池会增加的开销,其实这个开销是很小的,不用过于担心。对于那些本来延迟就比较小的请求(例如访问本地缓存成功率很高的请求)来说,线程池带来的开销是非常高的,这时,你可以考虑采用其他方法,例如非阻塞信号量(不支持超时),来实现依赖服务的隔离,使用信号量的开销很小。但绝大多数情况下,Netflix 更偏向于使用线程池来隔离依赖服务,因为其带来的额外开销可以接受,并且能支持包括超时在内的所有功能。
信号量
除了线程池,队列之外,你可以使用信号量(或者叫计数器)来限制单个依赖服务的并发度。Hystrix 可以利用信号量,而不是线程池,来控制系统负载,但信号量不允许我们设置超时和异步化,如果你对客户端库有足够的信任(延迟不会过高),并且你只需要控制系统负载,那么你可以使用信号量。
HystrixCommand 和 HystrixObservableCommand 在两个地方支持使用信号量:
你可以通过动态配置(即热部署)来决定信号量的大小,以控制并发线程的数量,信号量大小的估计和使用线程池进行并发度估计一样(仅访问内存数据的请求,一般能达到耗时在 1ms 以内,且能达到 5000rps,这样的请求对应的信号量可以设置为 1 或者 2。默认值为 10)。
注意:如果依赖服务使用信号量来进行隔离,当依赖服务出现高延迟,其调用线程也会被阻塞,直到依赖服务的网络请求超时。
信号量在达到上限时,会拒绝后续请求的访问,同时,设置信号量的线程也无法异步化(即像线程池那样,实现提交-做其他工作-得到结果模式)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。