当前位置:   article > 正文

【SpringCloud】Hystrix源码解析_hystrix 源码

hystrix 源码

e246a1dda09849a5a89535a62441565d.png

hystrix是一个微服务容错组件,提供了资源隔离、服务降级、服务熔断的功能。这一章重点分析hystrix的实现原理

1、服务降级

当服务实例所在服务器承受的压力过大或者受到网络因素影响没法及时响应请求时,请求会阻塞堆积,情况严重的话整个系统都会瘫痪,hystrix为此提供了一种容错机制:当服务实例没法及时响应请求,可以采用服务降级的方式快速失败,维持系统的稳定性

服务降级和@HystrixCommand注解绑定,查看它的源码

  1. @Target({ElementType.METHOD})
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Inherited
  4. @Documented
  5. public @interface HystrixCommand {
  6. ...
  7. String fallbackMethod() default "";
  8. }

源码提供的信息很少,要分析注解的功能得找到处理注解的类:HystrixCommandAspect

  1. @Aspect
  2. public class HystrixCommandAspect {
  3. ...
  4. // 环绕通知
  5. @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
  6. public Object methodsAnnotatedWithHystrixCommand(ProceedingJoinPoint joinPoint) throws Throwable {
  7. Method method = AopUtils.getMethodFromTarget(joinPoint);
  8. Validate.notNull(method, "failed to get method from joinPoint: %s", new Object[]{joinPoint});
  9. if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
  10. throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser annotations at the same time");
  11. } else {
  12. MetaHolderFactory metaHolderFactory = (MetaHolderFactory)META_HOLDER_FACTORY_MAP.get(HystrixCommandAspect.HystrixPointcutType.of(method));
  13. MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
  14. HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
  15. ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
  16. try {
  17. Object result;
  18. if (!metaHolder.isObservable()) {
  19. // 代理执行方法
  20. result = CommandExecutor.execute(invokable, executionType, metaHolder);
  21. } else {
  22. result = this.executeObservable(invokable, executionType, metaHolder);
  23. }
  24. return result;
  25. } catch (HystrixBadRequestException var9) {
  26. throw var9.getCause();
  27. } catch (HystrixRuntimeException var10) {
  28. throw this.hystrixRuntimeExceptionToThrowable(metaHolder, var10);
  29. }
  30. }
  31. }
  32. }

从命名上我们能看出这是一个切面,说明服务降级是通过aop代理实现的,跟踪CommandExecutor的execute方法

  1. 调用链:
  2. -> CommandExecutor.execute
  3. -> castToExecutable(invokable, executionType).execute()
  4. -> HystrixCommand.execute
  5. -> this.queue().get()
  1. public Future<R> queue() {
  2. // 获取Future对象
  3. final Future<R> delegate = this.toObservable().toBlocking().toFuture();
  4. Future<R> f = new Future<R>() {
  5. ...
  6. public R get() throws InterruptedException, ExecutionException {
  7. return delegate.get();
  8. }
  9. public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
  10. return delegate.get(timeout, unit);
  11. }
  12. };
  13. ...
  14. }

HystrixCommand类的queue方法返回了一个Future对象,在线程任务中常用Future对象来获取任务执行完成后返回的结果。这里的Future对象是通过this.toObservable().toBlocking().toFuture()创建的,点击查看toObservable方法,它返回一个Observable对象

  1. public Observable<R> toObservable() {
  2. ...
  3. final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
  4. public Observable<R> call() {
  5. return
  6. ((CommandState)AbstractCommand.this.commandState.get()).equals(AbstractCommand.CommandState.UNSUBSCRIBED) ? Observable.never() :
  7. // 传入指令执行任务
  8. AbstractCommand.this.applyHystrixSemantics(AbstractCommand.this);
  9. }
  10. };
  11. ...
  12. return Observable.defer(new Func0<Observable<R>>() {
  13. public Observable<R> call() {
  14. ...
  15. // 有订阅者订阅了才创建Observable对象
  16. Observable<R> hystrixObservable =
  17. Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);
  18. Observable afterCache;
  19. if (requestCacheEnabled && cacheKey != null) {
  20. HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, AbstractCommand.this);
  21. HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache)AbstractCommand.this.requestCache.putIfAbsent(cacheKey, toCache);
  22. if (fromCache != null) {
  23. toCache.unsubscribe();
  24. AbstractCommand.this.isResponseFromCache = true;
  25. return AbstractCommand.this.handleRequestCacheHitAndEmitValues(fromCache, AbstractCommand.this);
  26. }
  27. afterCache = toCache.toObservable();
  28. } else {
  29. afterCache = hystrixObservable;
  30. }
  31. return afterCache.doOnTerminate(terminateCommandCleanup).doOnUnsubscribe(unsubscribeCommandCleanup).doOnCompleted(fireOnCompletedHook);
  32. ...
  33. }
  34. });
  35. }

Observable对象的创建任务委托了给了AbstractCommand.this.applyHystrixSemantics方法

  1. private Observable<R> applyHystrixSemantics(AbstractCommand<R> _cmd) {
  2. this.executionHook.onStart(_cmd);
  3. // 是否允许请求,判断熔断状态
  4. if (this.circuitBreaker.allowRequest()) {
  5. final TryableSemaphore executionSemaphore = this.getExecutionSemaphore();
  6. final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
  7. Action0 singleSemaphoreRelease = new Action0() {
  8. public void call() {
  9. if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
  10. executionSemaphore.release();
  11. }
  12. }
  13. };
  14. Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
  15. public void call(Throwable t) {
  16. AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, AbstractCommand.this.commandKey);
  17. }
  18. };
  19. if (executionSemaphore.tryAcquire()) {
  20. try {
  21. this.executionResult = this.executionResult.setInvocationStartTime(System.currentTimeMillis());
  22. // 执行任务
  23. return this.executeCommandAndObserve(_cmd).doOnError(markExceptionThrown).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease);
  24. } catch (RuntimeException var7) {
  25. return Observable.error(var7);
  26. }
  27. } else {
  28. return this.handleSemaphoreRejectionViaFallback();
  29. }
  30. } else {
  31. // 处于熔断状态,执行备用任务
  32. return this.handleShortCircuitViaFallback();
  33. }
  34. }

this.circuitBreaker.allowReques返回true表示没有熔断,走executeCommandAndObserve方法

  1. private Observable<R> executeCommandAndObserve(AbstractCommand<R> _cmd) {
  2. ...
  3. Observable execution;
  4. if ((Boolean)this.properties.executionTimeoutEnabled().get()) {
  5. // 添加了超时监控
  6. execution = this.executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator(_cmd));
  7. } else {
  8. execution = this.executeCommandWithSpecifiedIsolation(_cmd);
  9. }
  10. ...
  11. // handleFallback:不同异常状况下使用不同的处理方法
  12. Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
  13. public Observable<R> call(Throwable t) {
  14. Exception e = AbstractCommand.this.getExceptionFromThrowable(t);
  15. AbstractCommand.this.executionResult = AbstractCommand.this.executionResult.setExecutionException(e);
  16. if (e instanceof RejectedExecutionException) {
  17. return AbstractCommand.this.handleThreadPoolRejectionViaFallback(e);
  18. } else if (t instanceof HystrixTimeoutException) {
  19. // 抛出超时异常时,做超时处理
  20. return AbstractCommand.this.handleTimeoutViaFallback();
  21. } else if (t instanceof HystrixBadRequestException) {
  22. return AbstractCommand.this.handleBadRequestByEmittingError(e);
  23. } else if (e instanceof HystrixBadRequestException) {
  24. AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, AbstractCommand.this.commandKey);
  25. return Observable.error(e);
  26. } else {
  27. return AbstractCommand.this.handleFailureViaFallback(e);
  28. }
  29. }
  30. };
  31. ...
  32. return execution.doOnNext(markEmits).doOnCompleted(markOnCompleted)
  33. // 调用handleFallback处理异常
  34. .onErrorResumeNext(handleFallback).doOnEach(setRequestContext);
  35. }
  1. private static class HystrixObservableTimeoutOperator<R> implements Observable.Operator<R, R> {
  2. final AbstractCommand<R> originalCommand;
  3. public HystrixObservableTimeoutOperator(AbstractCommand<R> originalCommand) {
  4. this.originalCommand = originalCommand;
  5. }
  6. public Subscriber<? super R> call(final Subscriber<? super R> child) {
  7. final CompositeSubscription s = new CompositeSubscription();
  8. child.add(s);
  9. final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(this.originalCommand.concurrencyStrategy, new Runnable() {
  10. public void run() {
  11. // 3.抛出超时异常
  12. child.onError(new HystrixTimeoutException());
  13. }
  14. });
  15. HystrixTimer.TimerListener listener = new HystrixTimer.TimerListener() {
  16. // 1.判断是否超时
  17. public void tick() {
  18. if (HystrixObservableTimeoutOperator.this.originalCommand.isCommandTimedOut.compareAndSet(AbstractCommand.TimedOutStatus.NOT_EXECUTED, AbstractCommand.TimedOutStatus.TIMED_OUT)) {
  19. HystrixObservableTimeoutOperator.this.originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, HystrixObservableTimeoutOperator.this.originalCommand.commandKey);
  20. s.unsubscribe();
  21. // 2.执行超时任务
  22. timeoutRunnable.run();
  23. }
  24. }
  25. };
  26. }
  27. }

executeCommandAndObserve方法添加超时监控,如果任务执行超出限制时间会抛出超时异常,handleTimeoutViaFallback方法负责处理超时异常

  1. private Observable<R> handleTimeoutViaFallback() {
  2. // 1.根据异常类型处理异常
  3. return this.getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
  4. }
  5. private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, HystrixEventType eventType, final HystrixRuntimeException.FailureType failureType, final String message, final Exception originalException) {
  6. ...
  7. // 获取回调观察者
  8. fallbackExecutionChain = this.getFallbackObservable();
  9. ...
  10. }
  11. protected final Observable<R> getFallbackObservable() {
  12. return Observable.defer(new Func0<Observable<R>>() {
  13. public Observable<R> call() {
  14. try {
  15. // 执行备用方法
  16. return Observable.just(HystrixCommand.this.getFallback());
  17. } catch (Throwable var2) {
  18. return Observable.error(var2);
  19. }
  20. }
  21. });
  22. }

到这里终于看到了getFallback方法,它会调用注解中fallback指向的方法,快速失败返回响应结果

  1. protected Object getFallback() {
  2. // 获取注解中的备用方法信息
  3. final CommandAction commandAction = this.getFallbackAction();
  4. if (commandAction != null) {
  5. try {
  6. return this.process(new AbstractHystrixCommand<Object>.Action() {
  7. Object execute() {
  8. MetaHolder metaHolder = commandAction.getMetaHolder();
  9. Object[] args = CommonUtils.createArgsForFallback(metaHolder, GenericCommand.this.getExecutionException());
  10. return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
  11. }
  12. });
  13. } catch (Throwable var3) {
  14. LOGGER.error(FallbackErrorMessageBuilder.create().append(commandAction, var3).build());
  15. throw new FallbackInvocationException(ExceptionUtils.unwrapCause(var3));
  16. }
  17. } else {
  18. return super.getFallback();
  19. }
  20. }

回到AbstractCommand.this.applyHystrixSemantics方法,当this.circuitBreaker.allowReques返回true时会走正常请求,返回false时表示服务进入熔断状态,熔断状态会走getFallback方法

  1. 调用链
  2. -> AbstractCommand.handleShortCircuitViaFallback
  3. -> getFallbackOrThrowException
  4. -> this.getFallbackObservable
  5. -> GenericCommand.getFallback

2、服务熔断

服务熔断是hystrix提供的一种保护机制,当一段时间内服务响应的异常的次数过多,hystrix会让服务降级快速返回失败信息,避免累积压力造成服务崩溃。
联系上文找到circuitBreaker.allowRequest方法,该方法判断是否允许请求往下走

  1. public boolean allowRequest() {
  2. // 是否强制打开熔断
  3. if ((Boolean)this.properties.circuitBreakerForceOpen().get()) {
  4. return false;
  5. // 是否强制关闭熔断
  6. } else if ((Boolean)this.properties.circuitBreakerForceClosed().get()) {
  7. this.isOpen();
  8. return true;
  9. } else {
  10. return !this.isOpen() || this.allowSingleTest();
  11. }
  12. }
  13. public boolean isOpen() {
  14. if (this.circuitOpen.get()) {
  15. return true;
  16. } else {
  17. HystrixCommandMetrics.HealthCounts health = this.metrics.getHealthCounts();
  18. // 请求次数是否超过单位时间内请求数阈值
  19. if (health.getTotalRequests() < (long)(Integer)this.properties.circuitBreakerRequestVolumeThreshold().get()) {
  20. return false;
  21. // 请求异常次数占比
  22. } else if (health.getErrorPercentage() < (Integer)this.properties.circuitBreakerErrorThresholdPercentage().get()) {
  23. return false;
  24. } else if (this.circuitOpen.compareAndSet(false, true)) {
  25. this.circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
  26. return true;
  27. } else {
  28. return true;
  29. }
  30. }
  31. }

isOpen方法内有针对请求的各种量化计算,当请求异常情况过多,就会触发熔断,走服务降级

3、总结

hystrix组件会根据请求状态判断是否执行请求,当请求超时或者存在其他异常会走备用方法,当异常次数过多会进入熔断状态快速失败,避免服务累积过多压力

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/978657
推荐阅读
相关标签
  

闽ICP备14008679号