赞
踩
给HystrixCommand设值
@GetMapping("/{id}")
@HystrixCommand(fallbackMethod = "errorCallBack")
public Book getBook(@PathVariable("id")Integer id){
show();
Book book = bookService.getBook(id);
if(book == null ){
throw new RuntimeException("查无此产品");
}
return book;
}
启用断路器
@SpringBootApplication
@EnableEurekaClient
@EnableDiscoveryClient
@EnableCircuitBreaker //启用断路器
public class ProHystrixApp {
public static void main(String[] args) {
SpringApplication.run(ProHystrixApp.class,args);
}
}
在注解中我们可以设置HystrixCommand的回退方法等各种参数
然后我们再看看看HystrixCommand一些参数
之后我们再来看看@EnableCircuitBreaker,这里引入了一个@Import(EnableCircuitBreakerImportSelector.class) 类,翻译类的名字就是 , 开启熔断器的导入选择器
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableCircuitBreakerImportSelector.class)
public @interface EnableCircuitBreaker {
}
EnableCircuitBreakerImportSelector的作用就是去导入熔断器的配置 ,即会自动加载 jar包 spring-cloud-netflix-core 中的META-INF/spring.factories 中的Hystrix相关的自动配置类
@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableCircuitBreakerImportSelector
extends SpringFactoryImportSelector<EnableCircuitBreaker> {
@Override
protected boolean isEnabled() { //最后会返回一个true
return getEnvironment().getProperty("spring.cloud.circuit.breaker.enabled",
Boolean.class, Boolean.TRUE);
}
}
现在让我们来看看他是怎么实现的吧
EnableCircuitBreakerImportSelector继承了SpringFactoryImportSelector类,且它父类的泛型是EnableCircuitBreaker
通过构造方法,获得了一个EnableCircuitBreaker的对象
@SuppressWarnings("unchecked")
protected SpringFactoryImportSelector() {//获得一个EnableCircuitBreaker对象
this.annotationClass = (Class<T>) GenericTypeResolver
.resolveTypeArgument(this.getClass(), SpringFactoryImportSelector.class);
}
然后通过selectImports方法 ,在META-INF/spring.factories 目录下找到 的EnableCircuitBreaker 为key 对应的类HystrixCircuitBreakerConfiguration,然后把他加入到spring容器中。
HystrixCircuitBreakerConfiguration 就是针对于 Hystrix熔断器的配置
@Configuration public class HystrixCircuitBreakerConfiguration { @Bean public HystrixCommandAspect hystrixCommandAspect() { return new HystrixCommandAspect(); } @Bean public HystrixShutdownHook hystrixShutdownHook() { return new HystrixShutdownHook(); } @Bean public HasFeatures hystrixFeature() { return HasFeatures .namedFeatures(new NamedFeature("Hystrix", HystrixCommandAspect.class)); } private class HystrixShutdownHook implements DisposableBean { @Override public void destroy() throws Exception { // Just call Hystrix to reset thread pool etc. Hystrix.reset(); } } }
我们发现这个类中有HystrixCommandAspect,这就是我们联想到了之前的@HystrixCommand注解了,点进去看看
发现这有一个切入@HystrixCommand的切入点表达式
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
还有对这个切入点表达式处理的环绕增强
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
接下来我们仔细看下这个增强函数
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()") public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable { Method method = getMethodFromTarget(joinPoint);//获取目标函数 Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint); //不能同时使用@HystrixCommand 和 @HystrixCollapser 两个注解 if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) { throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " + "annotations at the same time"); } MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method)); //metaHolder 封装了目标方法的返回、参数、proxy、同时使用setFallbackMethod,和 setDefaultProperties保存了一些Command的配置 MetaHolder metaHolder = metaHolderFactory.create(joinPoint); //最后得到了一个GenericCommand HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); //获取执行类型 ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType(); Object result; try { if (!metaHolder.isObservable()) { //执行这个 result = CommandExecutor.execute(invokable, executionType, metaHolder); } else { result = executeObservable(invokable, executionType, metaHolder); } } catch (HystrixBadRequestException e) { throw e.getCause(); } catch (HystrixRuntimeException e) { throw hystrixRuntimeExceptionToThrowable(metaHolder, e); } return result; }
追踪metaHolderFactory.create(joinPoint);,发现metaHolderFactory是一个接口,选择它的实现类CommandMetaHolderFactory
public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);
//获取执行的形式
ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());
MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);
if (isCompileWeaving()) {
builder.ajcMethod(getAjcMethodFromTarget(joinPoint));
}
return builder.defaultCommandKey(method.getName())
.hystrixCommand(hystrixCommand)
.observableExecutionMode(hystrixCommand.observableExecutionMode())
.executionType(executionType)
.observable(ExecutionType.OBSERVABLE == executionType)
.build();
}
追踪:ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());
public static ExecutionType getExecutionType(Class<?> type) {//函数的返回值一般只是普通的对象
if (Future.class.isAssignableFrom(type)) {
return ExecutionType.ASYNCHRONOUS;
} else if (Observable.class.isAssignableFrom(type)) {
return ExecutionType.OBSERVABLE;
} else {
return ExecutionType.SYNCHRONOUS;//默认同步
}
}
这里我们知道了,ExecutionType是SYNCHRONOUS
回到增强函数那
我们追踪:
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
发现HystrixInvokable是一个空接口,他的目的只是用来标记可被执行,那么他是如何创建的,我们先看create()方法,最终返回的是一个GenericCommand,实现了HystrixCommand
public HystrixInvokable create(MetaHolder metaHolder) {
HystrixInvokable executable;
if (metaHolder.isCollapserAnnotationPresent()) {
executable = new CommandCollapser(metaHolder);
} else if (metaHolder.isObservable()) {
executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
} else {
//最终会进入这,返回一个GenericCommand
executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
}
return executable;
}
追踪GenericCommand
@ThreadSafe public class GenericCommand extends AbstractHystrixCommand<Object> { private static final Logger LOGGER = LoggerFactory.getLogger(GenericCommand.class); public GenericCommand(HystrixCommandBuilder builder) { super(builder); } /** * {@inheritDoc} */ @Override protected Object run() throws Exception { LOGGER.debug("execute command: {}", getCommandKey().name()); return process(new Action() { @Override Object execute() { return getCommandAction().execute(getExecutionType()); } }); } @Override protected Object getFallback() { final CommandAction commandAction = getFallbackAction(); if (commandAction != null) { try { return process(new Action() { @Override Object execute() { MetaHolder metaHolder = commandAction.getMetaHolder(); Object[] args = createArgsForFallback(metaHolder, getExecutionException()); return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args); } }); } catch (Throwable e) { LOGGER.error(FallbackErrorMessageBuilder.create() .append(commandAction, e).build()); throw new FallbackInvocationException(unwrapCause(e)); } } else { return super.getFallback(); } } }
它本身对目标方法的正常执行和对 fallback方法的 执行做了实现 。
GenericCommand.this.getCommandAction().execute(…)获取到目标方法并执行,底层会交给 MethodExecutionAction 使用反射去执行方法
之后回到 HystrixCommandAspect的methodsAnnotatedWithHystrixCommand方法中,这是一个命令模式执行我们的方法.
Object result;
try {
是否响应式,默认为非响应式
if (!metaHolder.isObservable()) {
//载入 Command、运行类型等
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause();
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
我们追踪下execute()方法
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException { //非空判断 Validate.notNull(invokable); Validate.notNull(metaHolder); //三种运行类型 同步、异步、观察 switch (executionType) { case SYNCHRONOUS: { //我们进入默认的castToExecutable.execute() return castToExecutable(invokable, executionType).execute(); } case ASYNCHRONOUS: { HystrixExecutable executable = castToExecutable(invokable, executionType); if (metaHolder.hasFallbackMethodCommand() && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) { return new FutureDecorator(executable.queue()); } return executable.queue(); } case OBSERVABLE: { HystrixObservable observable = castToObservable(invokable); return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable(); } default: throw new RuntimeException("unsupported execution type: " + executionType); } }
进入同步模式的castToExecutable(invokable, executionType),发现是一个返回值是接口HystrixExecutable,
它的目的就是把HystrixInvokable类型转换成HystrixExecutable类型。
private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
if (invokable instanceof HystrixExecutable) {
return (HystrixExecutable) invokable;
}
throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
}
并调用execute方法执行 ,跟踪execute方法进入HystrixCommand.execute方法中
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
这里通过 queue() 返回一个 Future对象,这个future的实际处理委派给 f 实现,f是匿名内部类,当调用queue().get()方法时,最终调用 delegate.get() 方法
public Future<R> queue() {
// 创建一个委派对象
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
......
};
.....
return f;
}
delegate 对象由 toObservable() 创建,toObservable() 中调用了 applyHystrixSemantics() 方法
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);//进入这里
}
};
applyHystrixSemantics 中先通过 circuitBreaker.allowRequest() 判断是否允许当前请求,如果允许执行后续逻辑;否则 调用 handleShortCircuitViaFallback 执行 fallback 方法。
参考:https://blog.csdn.net/u014656201/article/details/105745575/
参考:https://blog.csdn.net/u010139373/article/details/108181757
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。