当前位置:   article > 正文

Hystrix源码分析一1_hystrix 源码分析

hystrix 源码分析

Hystrix服务调用的内部逻辑

在这里插入图片描述

Hystrix 在SpringCloud的使用

给HystrixCommand设值

@GetMapping("/{id}")
@HystrixCommand(fallbackMethod = "errorCallBack")
public Book getBook(@PathVariable("id")Integer id){
    show();
    Book book = bookService.getBook(id);
    if(book == null ){
        throw  new RuntimeException("查无此产品");
    }
    return book;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

启用断路器

@SpringBootApplication
@EnableEurekaClient
@EnableDiscoveryClient
@EnableCircuitBreaker   //启用断路器
public class ProHystrixApp {
    public static void main(String[] args) {
        SpringApplication.run(ProHystrixApp.class,args);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

Hystrix源码分析

我们先看看@HystrixCommand

在注解中我们可以设置HystrixCommand的回退方法等各种参数

在这里插入图片描述

然后我们再看看看HystrixCommand一些参数

在这里插入图片描述

之后我们再来看看@EnableCircuitBreaker,这里引入了一个@Import(EnableCircuitBreakerImportSelector.class) 类,翻译类的名字就是 , 开启熔断器的导入选择器

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableCircuitBreakerImportSelector.class)
public @interface EnableCircuitBreaker {

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

EnableCircuitBreakerImportSelector的作用就是去导入熔断器的配置 ,即会自动加载 jar包 spring-cloud-netflix-core 中的META-INF/spring.factories 中的Hystrix相关的自动配置类

@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableCircuitBreakerImportSelector
      extends SpringFactoryImportSelector<EnableCircuitBreaker> {
   @Override
   protected boolean isEnabled() { //最后会返回一个true
      return getEnvironment().getProperty("spring.cloud.circuit.breaker.enabled",
            Boolean.class, Boolean.TRUE);
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
现在让我们来看看他是怎么实现的吧

EnableCircuitBreakerImportSelector继承了SpringFactoryImportSelector类,且它父类的泛型是EnableCircuitBreaker

通过构造方法,获得了一个EnableCircuitBreaker的对象

@SuppressWarnings("unchecked")
protected SpringFactoryImportSelector() {//获得一个EnableCircuitBreaker对象
   this.annotationClass = (Class<T>) GenericTypeResolver
         .resolveTypeArgument(this.getClass(), SpringFactoryImportSelector.class);
}
  • 1
  • 2
  • 3
  • 4
  • 5

然后通过selectImports方法 ,在META-INF/spring.factories 目录下找到 的EnableCircuitBreaker 为key 对应的类HystrixCircuitBreakerConfiguration,然后把他加入到spring容器中。
在这里插入图片描述

HystrixCircuitBreakerConfiguration 就是针对于 Hystrix熔断器的配置

@Configuration
public class HystrixCircuitBreakerConfiguration {
   @Bean
   public HystrixCommandAspect hystrixCommandAspect() {
      return new HystrixCommandAspect();
   }
   @Bean
   public HystrixShutdownHook hystrixShutdownHook() {
      return new HystrixShutdownHook();
   }
   @Bean
   public HasFeatures hystrixFeature() {
      return HasFeatures
            .namedFeatures(new NamedFeature("Hystrix", HystrixCommandAspect.class));
   }
   private class HystrixShutdownHook implements DisposableBean {
      @Override
      public void destroy() throws Exception {
         // Just call Hystrix to reset thread pool etc.
         Hystrix.reset();
      }
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

我们发现这个类中有HystrixCommandAspect,这就是我们联想到了之前的@HystrixCommand注解了,点进去看看

发现这有一个切入@HystrixCommand的切入点表达式

@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
  • 1

还有对这个切入点表达式处理的环绕增强

@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
  • 1

接下来我们仔细看下这个增强函数

@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
    Method method = getMethodFromTarget(joinPoint);//获取目标函数
    Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
    //不能同时使用@HystrixCommand 和 @HystrixCollapser 两个注解
    if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
        throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                "annotations at the same time");
    }
    MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
    //metaHolder 封装了目标方法的返回、参数、proxy、同时使用setFallbackMethod,和				setDefaultProperties保存了一些Command的配置
    MetaHolder metaHolder = metaHolderFactory.create(joinPoint); 
    
    //最后得到了一个GenericCommand
    HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
    //获取执行类型
    ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
            metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
	
    Object result;
    try {
        if (!metaHolder.isObservable()) {
            //执行这个
            result = CommandExecutor.execute(invokable, executionType, metaHolder);
        } else {
            result = executeObservable(invokable, executionType, metaHolder);
        }
    } catch (HystrixBadRequestException e) {
        throw e.getCause();
    } catch (HystrixRuntimeException e) {
        throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
    }
    return result;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

追踪metaHolderFactory.create(joinPoint);,发现metaHolderFactory是一个接口,选择它的实现类CommandMetaHolderFactory

public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
    HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);
    //获取执行的形式
    ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());
    MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);
    if (isCompileWeaving()) {
        builder.ajcMethod(getAjcMethodFromTarget(joinPoint));
    }
    return builder.defaultCommandKey(method.getName())
                    .hystrixCommand(hystrixCommand)
                    .observableExecutionMode(hystrixCommand.observableExecutionMode())
                    .executionType(executionType)
                    .observable(ExecutionType.OBSERVABLE == executionType)
                    .build();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

追踪:ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());

public static ExecutionType getExecutionType(Class<?> type) {//函数的返回值一般只是普通的对象
    if (Future.class.isAssignableFrom(type)) {
        return ExecutionType.ASYNCHRONOUS;
    } else if (Observable.class.isAssignableFrom(type)) {
        return ExecutionType.OBSERVABLE;
    } else {
        return ExecutionType.SYNCHRONOUS;//默认同步
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

这里我们知道了,ExecutionType是SYNCHRONOUS

回到增强函数那

我们追踪:

HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
  • 1

发现HystrixInvokable是一个空接口,他的目的只是用来标记可被执行,那么他是如何创建的,我们先看create()方法,最终返回的是一个GenericCommand,实现了HystrixCommand

public HystrixInvokable create(MetaHolder metaHolder) {
    HystrixInvokable executable;
    if (metaHolder.isCollapserAnnotationPresent()) {
        executable = new CommandCollapser(metaHolder);
    } else if (metaHolder.isObservable()) {
        executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
    } else {
    	//最终会进入这,返回一个GenericCommand
        executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
    }
    return executable;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

追踪GenericCommand

@ThreadSafe
public class GenericCommand extends AbstractHystrixCommand<Object> {

    private static final Logger LOGGER = LoggerFactory.getLogger(GenericCommand.class);

    public GenericCommand(HystrixCommandBuilder builder) {
        super(builder);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    protected Object run() throws Exception {
        LOGGER.debug("execute command: {}", getCommandKey().name());
        return process(new Action() {
            @Override
            Object execute() {
                return getCommandAction().execute(getExecutionType());
            }
        });
    }
    @Override
    protected Object getFallback() {
        final CommandAction commandAction = getFallbackAction();
        if (commandAction != null) {
            try {
                return process(new Action() {
                    @Override
                    Object execute() {
                        MetaHolder metaHolder = commandAction.getMetaHolder();
                        Object[] args = createArgsForFallback(metaHolder, getExecutionException());
                        return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
                    }
                });
            } catch (Throwable e) {
                LOGGER.error(FallbackErrorMessageBuilder.create()
                        .append(commandAction, e).build());
                throw new FallbackInvocationException(unwrapCause(e));
            }
        } else {
            return super.getFallback();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

它本身对目标方法的正常执行和对 fallback方法的 执行做了实现 。
GenericCommand.this.getCommandAction().execute(…)获取到目标方法并执行,底层会交给 MethodExecutionAction 使用反射去执行方法

在这里插入图片描述

之后回到 HystrixCommandAspect的methodsAnnotatedWithHystrixCommand方法中,这是一个命令模式执行我们的方法.

Object result;
        try {
             是否响应式,默认为非响应式
            if (!metaHolder.isObservable()) {
                //载入 Command、运行类型等
                result = CommandExecutor.execute(invokable, executionType, metaHolder);
            } else {
                result = executeObservable(invokable, executionType, metaHolder);
            }
        } catch (HystrixBadRequestException e) {
            throw e.getCause();
        } catch (HystrixRuntimeException e) {
            throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
        }
        return result;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

我们追踪下execute()方法

public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
    //非空判断
    Validate.notNull(invokable);
    Validate.notNull(metaHolder);
	//三种运行类型 同步、异步、观察
    switch (executionType) {
        case SYNCHRONOUS: {
            //我们进入默认的castToExecutable.execute()
            return castToExecutable(invokable, executionType).execute();
        }
        case ASYNCHRONOUS: {
            HystrixExecutable executable = castToExecutable(invokable, executionType);
            if (metaHolder.hasFallbackMethodCommand()
                    && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                return new FutureDecorator(executable.queue());
            }
            return executable.queue();
        }
        case OBSERVABLE: {
            HystrixObservable observable = castToObservable(invokable);
            return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
        }
        default:
            throw new RuntimeException("unsupported execution type: " + executionType);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

进入同步模式的castToExecutable(invokable, executionType),发现是一个返回值是接口HystrixExecutable,

它的目的就是把HystrixInvokable类型转换成HystrixExecutable类型。

private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
    if (invokable instanceof HystrixExecutable) {
        return (HystrixExecutable) invokable;
    }
    throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

并调用execute方法执行 ,跟踪execute方法进入HystrixCommand.execute方法中

public R execute() {
    try {
        return queue().get();
    } catch (Exception e) {
        throw Exceptions.sneakyThrow(decomposeException(e));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

这里通过 queue() 返回一个 Future对象,这个future的实际处理委派给 f 实现,f是匿名内部类,当调用queue().get()方法时,最终调用 delegate.get() 方法

public Future<R> queue() {
    // 创建一个委派对象
     final Future<R> delegate = toObservable().toBlocking().toFuture();
     final Future<R> f = new Future<R>() {
       	......
     };
     .....
     return f;
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

delegate 对象由 toObservable() 创建,toObservable() 中调用了 applyHystrixSemantics() 方法

final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
    @Override
    public Observable<R> call() {
        if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
            return Observable.never();
        }
        return applyHystrixSemantics(_cmd);//进入这里
    }
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

applyHystrixSemantics 中先通过 circuitBreaker.allowRequest() 判断是否允许当前请求,如果允许执行后续逻辑;否则 调用 handleShortCircuitViaFallback 执行 fallback 方法。

参考:https://blog.csdn.net/u014656201/article/details/105745575/

参考:https://blog.csdn.net/u010139373/article/details/108181757

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/978749
推荐阅读
相关标签
  

闽ICP备14008679号