赞
踩
Hystrix工作的流程图如下:
Hystrix的工作流程结合上图如下:
(1)每次调用都会创建一个HystrixCommand
(2)执行execute或queue做同步/异步调用
(3)判断熔断器是否打开,如果打开跳到步骤8,否则进入步骤4
(4)判断线程池/信号量是否跑满,如果跑满进入步骤8,否则进入步骤5
(5)调用HystrixCommand的run方法,如果调用超时进入步骤8
(6)判断是否调用成功,返回成功调用结果,如果失败进入步骤8
(7)计算熔断器状态,所有的运行状态(成功, 失败, 拒绝,超时)上报给熔断器,用于统计从而判断熔断器状态
(8)降级处理逻辑,根据上方的步骤可以得出以下四种情况会进入降级处理:熔断器打开、线程池/信号量跑满、调用超时、调用失败
(9)返回执行成功结果
Hystrix的依赖如下:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-core</artifactId>
</dependency>
新建一个Command:
public class SimpleCommand extends HystrixCommand<String> { private String name; private int timeIndex; public SimpleCommand(Setter setter, String name, int timeIndex) { super(setter); this.name = name; this.timeIndex = timeIndex; } // 当 excute()或queue()里面的command执行失败时,这个方法被触发返回降级数据 @Override protected String getFallback() { Throwable cause = getExecutionException(); ExecuteResultType failureType = CommandSupport.getFailureType(this); return "Fallback TimeMillSeconds Is :" + timeIndex + ", Failure Type Is :" + failureType.name(); } // 当 excute()或queue()里面的command被触发时,执行的代码 @Override protected String run() { try { Thread.sleep(this.getTimeIndex()); } catch (InterruptedException e) { } return "Succeed TimeMillSeconds Is :" + timeIndex; } public int getTimeIndex() { return timeIndex; } }
测试Hystrix:
public class SimpleCommandTest { public static void main(String[] args) { int count = 0; while (true) { String s = new SimpleCommand( HystrixCommand.Setter // 命令分组 .withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) // 命令线程池(某些命令可能需要彼此隔离) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("threadpoolwithfallback")) .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(10).withMaximumSize(10)) .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() // 触发Hystrix服务降级处理的超时时间 .withExecutionTimeoutInMilliseconds(1000) // 熔断后的重试时间窗口 .withCircuitBreakerSleepWindowInMilliseconds(5000) // 如果在一个采样时间窗口内,失败率超过该配置,则自动打开熔断开关实现降级处理,即快速失败 .withCircuitBreakerErrorThresholdPercentage(50) // 在熔断开关闭合情况下,在进行失败率判断之前,一个采样周期内必须进行至少N个请求才能进行采样统计 // 目的是有足够的采样使得失败率计算正确,默认为20 .withCircuitBreakerRequestVolumeThreshold(2)), "ccc", (count % 20) * 100).execute(); System.out.println(s); count++; try { Thread.currentThread().sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } finally { } } } }
运行结果如下:
Succeed TimeMillSeconds Is :900
Succeed TimeMillSeconds Is :1000
Fallback TimeMillSeconds Is :1100, Failure Type Is :RESPONSE_TIMEDOUT
Fallback TimeMillSeconds Is :1200, Failure Type Is :RESPONSE_TIMEDOUT
Fallback TimeMillSeconds Is :1300, Failure Type Is :RESPONSE_TIMEDOUT
Fallback TimeMillSeconds Is :1400, Failure Type Is :RESPONSE_TIMEDOUT
Fallback TimeMillSeconds Is :1500, Failure Type Is :RESPONSE_TIMEDOUT
Fallback TimeMillSeconds Is :1600, Failure Type Is :RESPONSE_TIMEDOUT
Fallback TimeMillSeconds Is :1700, Failure Type Is :CIRCUITBREAK_EROPEN
Fallback TimeMillSeconds Is :1800, Failure Type Is :CIRCUITBREAK_EROPEN
可以看到当运行时间超过一秒后就开始响应超时处理,当错误数过多时就打开熔断了。
跟进一下源码,首先进入到execute():
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
public Future<R> queue() {
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
…
进入到toObservable()方法:
… return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { /* this is a stateful object so can only be used once */ if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance."); //TODO make a new error type for this throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null); } commandStartTimestamp = System.currentTimeMillis(); if (properties.requestLogEnabled().get()) { // log this command execution regardless of what happened if (currentRequestLog != null) { currentRequestLog.addExecutedCommand(_cmd); } } //如果开启请求缓存则查询缓存是否存在 final boolean requestCacheEnabled = isRequestCachingEnabled(); final String cacheKey = getCacheKey(); /* try from cache first */ if (requestCacheEnabled) { HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } … return afterCache .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once .doOnCompleted(fireOnCompletedHook); } }
在上面这个方法中会有一个缓存的判断,如果存在缓存的话直接返回结果,否则进入方法applyHystrixSemantics()方法:
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { // mark that we're starting execution on the ExecutionHook // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent executionHook.onStart(_cmd); /* determine if we're allowed to execute */ // 判断是否改处理熔断 if (circuitBreaker.allowRequest()) { // 获取信号量实例 final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); } } }; final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); // 执行并观察 return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { // 拒绝 return handleSemaphoreRejectionViaFallback(); } } else { // 失败 return handleShortCircuitViaFallback(); } }
在applyHystrixSemantics()方法中,首先会判断是否开启熔断器,如果开启则直接进入失败处理的逻辑,否则会尝试获取信号量(如果使用的是线程池的模式则默认获取成功),获取成功进入executeCommandAndObserve()方法:
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); Observable<R> execution; if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }
进入到executeCommandWithSpecifiedIsolation():
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { // 进入方法会首先判断隔离策略,如果是使用的信号量模式则在当前线程上执行,否则进入下方的线程池逻辑 if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE) return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { executionResult = executionResult.setExecutionOccurred(); // 更改HystrixCommand的状态为USER_CODE_EXECUTED if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD); // 判断HystrixCommand的超时状态,如果超时则抛出异常 if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { return Observable.error(new RuntimeException("timed out before executing run()")); } // 更改当前command的线程执行状态为STARTED if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { HystrixCounters.incrementGlobalConcurrentThreads(); threadPool.markThreadExecution(); endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); … } else { return Observable.error(new RuntimeException("unsubscribed before executing run()")); } } }).doOnTerminate(new Action0() { // 执行完毕后更改线程状态为TERMINAL @Override public void call() { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) { } } }).doOnUnsubscribe(new Action0() { // 当Observable被取消订阅,更改线程状态为TERMINAL @Override public void call() { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) { } } }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } })); } else { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { executionResult = executionResult.setExecutionOccurred(); if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE); endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); try { executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); // 调用getUserExecutionObservable执行具体的业务逻辑,也就是我们实现的那个run方法 return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw } catch (Throwable ex) { return Observable.error(ex); } } }); } }
作者 Github : tojohnonly , 博客 : EnskDeCode
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。