赞
踩
这一篇我们主要来看下Hystrix
是怎样在OpenFiegn
上做进一层封装的,主要梳理Hystrix
封装逻辑。
我们配置了Hystrix
的使用逻辑后
@RequestMapping(value = "simple",method = RequestMethod.GET)
public String simpleMethod(){
return feignConsumerClient.producerSimple();
}
@FeignClient(value = "producer-server",fallback = FeignConsumerClientImp.class)
public interface FeignConsumerClient {
@RequestMapping(value = "producer/simple",method = RequestMethod.GET)
public String producerSimple();
}
这里就不是像我们上篇介绍的opengeign
那样,使用的ReflectiveFeign
的代理执行,而是使用hystrix
的代理执行HystrixInvocationHandler
。
@Override public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { if ("equals".equals(method.getName())) { ............ HystrixCommand<Object> hystrixCommand = new HystrixCommand<Object>(setterMethodMap.get(method)) { @Override protected Object run() throws Exception { try { return HystrixInvocationHandler.this.dispatch.get(method).invoke(args); } catch (Exception e) { throw e; } catch (Throwable t) { throw (Error) t; } } @Override protected Object getFallback() { if (fallbackFactory == null) { return super.getFallback(); } try { Object fallback = fallbackFactory.create(getExecutionException()); Object result = fallbackMethodMap.get(method).invoke(fallback, args); if (isReturnsHystrixCommand(method)) { return ((HystrixCommand) result).execute(); } else if (isReturnsObservable(method)) { // Create a cold Observable return ((Observable) result).toBlocking().first(); } else if (isReturnsSingle(method)) { // Create a cold Observable as a Single return ((Single) result).toObservable().toBlocking().first(); } else if (isReturnsCompletable(method)) { ((Completable) result).await(); return null; } else if (isReturnsCompletableFuture(method)) { return ((Future) result).get(); } else { return result; } ........ } }; if (Util.isDefault(method)) { return hystrixCommand.execute(); } else if (isReturnsHystrixCommand(method)) { return hystrixCommand; } else if (isReturnsObservable(method)) { // Create a cold Observable return hystrixCommand.toObservable(); } else if (isReturnsSingle(method)) { // Create a cold Observable as a Single return hystrixCommand.toObservable().toSingle(); } else if (isReturnsCompletable(method)) { return hystrixCommand.toObservable().toCompletable(); } else if (isReturnsCompletableFuture(method)) { return new ObservableCompletableFuture<>(hystrixCommand); } return hystrixCommand.execute(); }
这里hystrix
的执行逻辑主要是封装在HystrixCommand
中,并且hystrix
与openfeign
一样,同样是使用RxJava
封装调用关系。十一我们可以看到其在hystrixCommand.execute()
的执行返回中,会判断不同的你本身调用的不同类型。
if (Util.isDefault(method)) {
return hystrixCommand.execute();
} else if (isReturnsHystrixCommand(method)) {
return hystrixCommand;
} else if (isReturnsObservable(method)) {
..........
同时在HystrixCommand
的run
方法中return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
就是对
openfeign
逻辑的封装:
这里的SynchronousMethodHandler
就是我们上篇梳理的feign
的逻辑调用。
虽然这里表面是比较好理解,但内部是有很多通过Rxjava
封装的逻辑。例如失败统计数、是否需要熔断请求等。这里的逻辑是在HystrixCommand
的父类AbstractCommand
中。
public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
............
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
..........
}
这里的主要执行方法就是applyHystrixSemantics(_cmd)
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { // mark that we're starting execution on the ExecutionHook // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent executionHook.onStart(_cmd); /* determine if we're allowed to execute */ if (circuitBreaker.allowRequest()) { final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); .......... if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { return handleSemaphoreRejectionViaFallback(); } } else { return handleShortCircuitViaFallback(); } }
通常我们刷面试体,例如hystrix
的执行熔断判断,例如hystrix
会有一个基础数量,然后在规定的单位中请求的数量要达到一定数量才会去触发熔断统计的逻辑,如果达到了就会根据成功、失败的请求触发熔断、已经之后的恢复逻辑。这里的逻辑描述判断主要是在circuitBreaker.allowRequest()
中,我们可以看到,如果返回false
的话,就表示不能访问、会直接执行handleShortCircuitViaFallback()
,直接熔断回调。
@Override
public boolean allowRequest() {
if (properties.circuitBreakerForceOpen().get()) {
// properties have asked us to force the circuit open so we will allow NO requests
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
// we still want to allow isOpen() to perform it's calculations so we simulate normal behavior
isOpen();
// properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through
return true;
}
return !isOpen() || allowSingleTest();
}
这里的properties
是HystrixCommandProperties
,这个是在配置文件中的关于熔断的一些判断数值配置
public abstract class HystrixCommandProperties {
private static final Logger logger = LoggerFactory.getLogger(HystrixCommandProperties.class);
/* defaults */
/* package */ static final Integer default_metricsRollingStatisticalWindow = 10000;// default => statisticalWindow: 10000 = 10 seconds (and default of 10 buckets so each bucket is 1 second)
private static final Integer default_metricsRollingStatisticalWindowBuckets = 10;// default => statisticalWindowBuckets: 10 = 10 buckets in a 10 second window so each bucket is 1 second
private static final Integer default_circuitBreakerRequestVolumeThreshold = 20;// default => statisticalWindowVolumeThreshold: 20 requests in 10 seconds must occur before statistics matter
......
private static final ExecutionIsolationStrategy default_executionIsolationStrategy = ExecutionIsolationStrategy.THREAD;
private static final Boolean default_executionIsolationThreadInterruptOnTimeout = true;
private static final Boolean default_executionIsolationThreadInterruptOnFutureCancel = false;
private static final Boolean default_metricsRollingPercentileEnabled = true;
allowRequest()
首先是判断circuitBreakerForceOpen
断路器是否强制打开,如果打开就直接返回false
表示拒绝,在判断如果circuitBreakerForceClosed
强制关闭,就直接返回true
表示能通过。然后的话,就是通过isOpen()
来执行具体的判断逻辑了。
@Override public boolean isOpen() { if (circuitOpen.get()) { // if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close return true; } // we're closed, so let's see if errors have made us so we should trip the circuit open HealthCounts health = metrics.getHealthCounts(); // check if we are past the statisticalWindowVolumeThreshold if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { // we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything return false; } if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { return false; } else { // our failure rate is too high, trip the circuit if (circuitOpen.compareAndSet(false, true)) { // if the previousValue was false then we want to set the currentTime circuitOpenedOrLastTestedTime.set(System.currentTimeMillis()); return true; } else { // How could previousValue be true? If another thread was going through this code at the same time a race-condition could have // caused another thread to set it to true already even though we were in the process of doing the same // In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open return true; } } } }
这个就是具体根据熔断的指标来判断是否需要熔断。这里还有metrics
,这个类是用来放统计过程中的各个统计指标数据。
private final HystrixCommandMetrics metrics;
然后上面的判断,例如当health.getTotalRequests()
也就是请求的总的数量要小于配置的请求数的话,是不会断路的,然后是计算的失败请求百分比与配置的比例阈值。如果比例没有到达,可能是目前已经熔断了,就再设置circuitOpen
来重新打开熔断,看之后请求是否正常。
private AtomicBoolean circuitOpen = new AtomicBoolean(false);
在我们判断这个请求是否需要熔断后,下面是请求的策略处理
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { // mark that we're starting execution on the ExecutionHook // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent executionHook.onStart(_cmd); /* determine if we're allowed to execute */ if (circuitBreaker.allowRequest()) { final TryableSemaphore executionSemaphore = getExecutionSemaphore(); ........ if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { return handleSemaphoreRejectionViaFallback(); } } else { return handleShortCircuitViaFallback(); } }
protected TryableSemaphore getExecutionSemaphore() { if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) { if (executionSemaphoreOverride == null) { TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name()); if (_s == null) { // we didn't find one cache so setup executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests())); // assign whatever got set (this or another thread) return executionSemaphorePerCircuit.get(commandKey.name()); } else { return _s; } } else { return executionSemaphoreOverride; } } else { // return NoOp implementation since we're not using SEMAPHORE isolation return TryableSemaphoreNoOp.DEFAULT; } }
protected static final ConcurrentHashMap<String, TryableSemaphore> executionSemaphorePerCircuit = new ConcurrentHashMap<String, TryableSemaphore>();
这里就是先使用信号量判断,其是否已经达到请求的限制数,如果达到了就是handleSemaphoreRejectionViaFallback()
方法来处理拒绝策略。
这个commandKey.name
我们可以看到默认就是类名+方法名。获取信号量与获取线程池一样,都是解析了类级别的隔离。
如果我们的请求处理部署使用信号量而是使用线程的话,这里就会返回TryableSemaphoreNoOp.DEFAULT
。
public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp();
@Override
public boolean tryAcquire() {
return true;
}
我们可以看到其是直接返回true
。
在通过上面的逻辑后,最终会在这里选择对应的执行策略处理
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { @Override public Observable<R> call() { ............ }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } })); } else { return Observable.defer(new Func0<Observable<R>>() { ............ } }
这里如果是线程执行的话,就是threadPool.getScheduler
处理到线程池中。而线程池的初始化,其也是通过HystrixThreadPoolKey
来解析隔离的
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
if (fromConstructor == null) {
// get the default implementation of HystrixThreadPool
return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
} else {
return fromConstructor;
}
}
static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) { // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work String key = threadPoolKey.name(); // this should find it for all but the first time HystrixThreadPool previouslyCached = threadPools.get(key); if (previouslyCached != null) { return previouslyCached; } // if we get here this is the first time so we need to initialize synchronized (HystrixThreadPool.class) { if (!threadPools.containsKey(key)) { threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder)); } } return threadPools.get(key); }
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
当然这里还有一些请求失败这些,还是很复杂,目前我们就梳理上面的这两个内容吧。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。