【SpringCloud】Hystrix源码解析_hystrix 源码

  1. @Target({ElementType.METHOD})
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Inherited
  4. @Documented
  5. public @interface HystrixCommand {
  6. ...
  7. String fallbackMethod() default "";
  8. }


  1. @Aspect
  2. public class HystrixCommandAspect {
  3. ...
  4. // 环绕通知
  5. @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
  6. public Object methodsAnnotatedWithHystrixCommand(ProceedingJoinPoint joinPoint) throws Throwable {
  7. Method method = AopUtils.getMethodFromTarget(joinPoint);
  8. Validate.notNull(method, "failed to get method from joinPoint: %s", new Object[]{joinPoint});
  9. if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
  10. throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser annotations at the same time");
  11. } else {
  12. MetaHolderFactory metaHolderFactory = (MetaHolderFactory)META_HOLDER_FACTORY_MAP.get(HystrixCommandAspect.HystrixPointcutType.of(method));
  13. MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
  14. HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
  15. ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
  16. try {
  17. Object result;
  18. if (!metaHolder.isObservable()) {
  19. // 代理执行方法
  20. result = CommandExecutor.execute(invokable, executionType, metaHolder);
  21. } else {
  22. result = this.executeObservable(invokable, executionType, metaHolder);
  23. }
  24. return result;
  25. } catch (HystrixBadRequestException var9) {
  26. throw var9.getCause();
  27. } catch (HystrixRuntimeException var10) {
  28. throw this.hystrixRuntimeExceptionToThrowable(metaHolder, var10);
  29. }
  30. }
  31. }
  32. }


  1. 调用链:
  2. -> CommandExecutor.execute
  3. -> castToExecutable(invokable, executionType).execute()
  4. -> HystrixCommand.execute
  5. -> this.queue().get()
  1. public Future<R> queue() {
  2. // 获取Future对象
  3. final Future<R> delegate = this.toObservable().toBlocking().toFuture();
  4. Future<R> f = new Future<R>() {
  5. ...
  6. public R get() throws InterruptedException, ExecutionException {
  7. return delegate.get();
  8. }
  9. public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
  10. return delegate.get(timeout, unit);
  11. }
  12. };
  13. ...
  14. }


  1. public Observable<R> toObservable() {
  2. ...
  3. final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
  4. public Observable<R> call() {
  5. return
  6. ((CommandState)AbstractCommand.this.commandState.get()).equals(AbstractCommand.CommandState.UNSUBSCRIBED) ? Observable.never() :
  7. // 传入指令执行任务
  8. AbstractCommand.this.applyHystrixSemantics(AbstractCommand.this);
  9. }
  10. };
  11. ...
  12. return Observable.defer(new Func0<Observable<R>>() {
  13. public Observable<R> call() {
  14. ...
  15. // 有订阅者订阅了才创建Observable对象
  16. Observable<R> hystrixObservable =
  17. Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);
  18. Observable afterCache;
  19. if (requestCacheEnabled && cacheKey != null) {
  20. HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, AbstractCommand.this);
  21. HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache)AbstractCommand.this.requestCache.putIfAbsent(cacheKey, toCache);
  22. if (fromCache != null) {
  23. toCache.unsubscribe();
  24. AbstractCommand.this.isResponseFromCache = true;
  25. return AbstractCommand.this.handleRequestCacheHitAndEmitValues(fromCache, AbstractCommand.this);
  26. }
  27. afterCache = toCache.toObservable();
  28. } else {
  29. afterCache = hystrixObservable;
  30. }
  31. return afterCache.doOnTerminate(terminateCommandCleanup).doOnUnsubscribe(unsubscribeCommandCleanup).doOnCompleted(fireOnCompletedHook);
  32. ...
  33. }
  34. });
  35. }


  1. private Observable<R> applyHystrixSemantics(AbstractCommand<R> _cmd) {
  2. this.executionHook.onStart(_cmd);
  3. // 是否允许请求,判断熔断状态
  4. if (this.circuitBreaker.allowRequest()) {
  5. final TryableSemaphore executionSemaphore = this.getExecutionSemaphore();
  6. final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
  7. Action0 singleSemaphoreRelease = new Action0() {
  8. public void call() {
  9. if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
  10. executionSemaphore.release();
  11. }
  12. }
  13. };
  14. Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
  15. public void call(Throwable t) {
  16. AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, AbstractCommand.this.commandKey);
  17. }
  18. };
  19. if (executionSemaphore.tryAcquire()) {
  20. try {
  21. this.executionResult = this.executionResult.setInvocationStartTime(System.currentTimeMillis());
  22. // 执行任务
  23. return this.executeCommandAndObserve(_cmd).doOnError(markExceptionThrown).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease);
  24. } catch (RuntimeException var7) {
  25. return Observable.error(var7);
  26. }
  27. } else {
  28. return this.handleSemaphoreRejectionViaFallback();
  29. }
  30. } else {
  31. // 处于熔断状态,执行备用任务
  32. return this.handleShortCircuitViaFallback();
  33. }
  34. }


  1. private Observable<R> executeCommandAndObserve(AbstractCommand<R> _cmd) {
  2. ...
  3. Observable execution;
  4. if ((Boolean)this.properties.executionTimeoutEnabled().get()) {
  5. // 添加了超时监控
  6. execution = this.executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator(_cmd));
  7. } else {
  8. execution = this.executeCommandWithSpecifiedIsolation(_cmd);
  9. }
  10. ...
  11. // handleFallback:不同异常状况下使用不同的处理方法
  12. Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
  13. public Observable<R> call(Throwable t) {
  14. Exception e = AbstractCommand.this.getExceptionFromThrowable(t);
  15. AbstractCommand.this.executionResult = AbstractCommand.this.executionResult.setExecutionException(e);
  16. if (e instanceof RejectedExecutionException) {
  17. return AbstractCommand.this.handleThreadPoolRejectionViaFallback(e);
  18. } else if (t instanceof HystrixTimeoutException) {
  19. // 抛出超时异常时,做超时处理
  20. return AbstractCommand.this.handleTimeoutViaFallback();
  21. } else if (t instanceof HystrixBadRequestException) {
  22. return AbstractCommand.this.handleBadRequestByEmittingError(e);
  23. } else if (e instanceof HystrixBadRequestException) {
  24. AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, AbstractCommand.this.commandKey);
  25. return Observable.error(e);
  26. } else {
  27. return AbstractCommand.this.handleFailureViaFallback(e);
  28. }
  29. }
  30. };
  31. ...
  32. return execution.doOnNext(markEmits).doOnCompleted(markOnCompleted)
  33. // 调用handleFallback处理异常
  34. .onErrorResumeNext(handleFallback).doOnEach(setRequestContext);
  35. }
  1. private static class HystrixObservableTimeoutOperator<R> implements Observable.Operator<R, R> {
  2. final AbstractCommand<R> originalCommand;
  3. public HystrixObservableTimeoutOperator(AbstractCommand<R> originalCommand) {
  4. this.originalCommand = originalCommand;
  5. }
  6. public Subscriber<? super R> call(final Subscriber<? super R> child) {
  7. final CompositeSubscription s = new CompositeSubscription();
  8. child.add(s);
  9. final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(this.originalCommand.concurrencyStrategy, new Runnable() {
  10. public void run() {
  11. // 3.抛出超时异常
  12. child.onError(new HystrixTimeoutException());
  13. }
  14. });
  15. HystrixTimer.TimerListener listener = new HystrixTimer.TimerListener() {
  16. // 1.判断是否超时
  17. public void tick() {
  18. if (HystrixObservableTimeoutOperator.this.originalCommand.isCommandTimedOut.compareAndSet(AbstractCommand.TimedOutStatus.NOT_EXECUTED, AbstractCommand.TimedOutStatus.TIMED_OUT)) {
  19. HystrixObservableTimeoutOperator.this.originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, HystrixObservableTimeoutOperator.this.originalCommand.commandKey);
  20. s.unsubscribe();
  21. // 2.执行超时任务
  22. timeoutRunnable.run();
  23. }
  24. }
  25. };
  26. }
  27. }


  1. private Observable<R> handleTimeoutViaFallback() {
  2. // 1.根据异常类型处理异常
  3. return this.getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
  4. }
  5. private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, HystrixEventType eventType, final HystrixRuntimeException.FailureType failureType, final String message, final Exception originalException) {
  6. ...
  7. // 获取回调观察者
  8. fallbackExecutionChain = this.getFallbackObservable();
  9. ...
  10. }
  11. protected final Observable<R> getFallbackObservable() {
  12. return Observable.defer(new Func0<Observable<R>>() {
  13. public Observable<R> call() {
  14. try {
  15. // 执行备用方法
  16. return Observable.just(HystrixCommand.this.getFallback());
  17. } catch (Throwable var2) {
  18. return Observable.error(var2);
  19. }
  20. }
  21. });
  22. }


  1. protected Object getFallback() {
  2. // 获取注解中的备用方法信息
  3. final CommandAction commandAction = this.getFallbackAction();
  4. if (commandAction != null) {
  5. try {
  6. return this.process(new AbstractHystrixCommand<Object>.Action() {
  7. Object execute() {
  8. MetaHolder metaHolder = commandAction.getMetaHolder();
  9. Object[] args = CommonUtils.createArgsForFallback(metaHolder, GenericCommand.this.getExecutionException());
  10. return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
  11. }
  12. });
  13. } catch (Throwable var3) {
  14. LOGGER.error(FallbackErrorMessageBuilder.create().append(commandAction, var3).build());
  15. throw new FallbackInvocationException(ExceptionUtils.unwrapCause(var3));
  16. }
  17. } else {
  18. return super.getFallback();
  19. }
  20. }


  1. 调用链
  2. -> AbstractCommand.handleShortCircuitViaFallback
  3. -> getFallbackOrThrowException
  4. -> this.getFallbackObservable
  5. -> GenericCommand.getFallback



  1. public boolean allowRequest() {
  2. // 是否强制打开熔断
  3. if ((Boolean)this.properties.circuitBreakerForceOpen().get()) {
  4. return false;
  5. // 是否强制关闭熔断
  6. } else if ((Boolean)this.properties.circuitBreakerForceClosed().get()) {
  7. this.isOpen();
  8. return true;
  9. } else {
  10. return !this.isOpen() || this.allowSingleTest();
  11. }
  12. }
  13. public boolean isOpen() {
  14. if (this.circuitOpen.get()) {
  15. return true;
  16. } else {
  17. HystrixCommandMetrics.HealthCounts health = this.metrics.getHealthCounts();
  18. // 请求次数是否超过单位时间内请求数阈值
  19. if (health.getTotalRequests() < (long)(Integer)this.properties.circuitBreakerRequestVolumeThreshold().get()) {
  20. return false;
  21. // 请求异常次数占比
  22. } else if (health.getErrorPercentage() < (Integer)this.properties.circuitBreakerErrorThresholdPercentage().get()) {
  23. return false;
  24. } else if (this.circuitOpen.compareAndSet(false, true)) {
  25. this.circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
  26. return true;
  27. } else {
  28. return true;
  29. }
  30. }
  31. }




