赞
踩
分析入口@EnableCircuitBreaker
开启熔断器功能
@EnableCircuitBreaker注解开启熔断器
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
//import了一个selector(spring的扩展点)
@Import(EnableCircuitBreakerImportSelector.class)
public @interface EnableCircuitBreaker {
}
查看EnableCircuitBreakerImportSelector类
@Order(Ordered.LOWEST_PRECEDENCE - 100)
//在父类SpringFactoryImportSelector中传入了一个@EnableCircuitBreaker注解类
public class EnableCircuitBreakerImportSelector extends
SpringFactoryImportSelector<EnableCircuitBreaker> {
//获取熔断器是否启用配置
@Override
protected boolean isEnabled() {
return getEnvironment().getProperty(
"spring.cloud.circuit.breaker.enabled", Boolean.class, Boolean.TRUE);
}
}
查看SpringFactoryImportSelector类
protected SpringFactoryImportSelector() { //获取到子类传递到父类的泛型,这里就是@EnableCircuitBreaker注解类 this.annotationClass = (Class<T>) GenericTypeResolver .resolveTypeArgument(this.getClass(), SpringFactoryImportSelector.class); } /** *在selectImports该方法中根据传递进来的泛型全限定类名(@EnableCircuitBreaker就是该注解的全限定类名), *作为key去spring.factories中,查找对应配置类进行注入 */ @Override public String[] selectImports(AnnotationMetadata metadata) { if (!isEnabled()) { return new String[0]; } AnnotationAttributes attributes = AnnotationAttributes.fromMap( metadata.getAnnotationAttributes(this.annotationClass.getName(), true)); Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is " + metadata.getClassName() + " annotated with @" + getSimpleName() + "?"); // Find all possible auto configuration classes, filtering duplicates List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader .loadFactoryNames(this.annotationClass, this.beanClassLoader))); if (factories.isEmpty() && !hasDefaultFactory()) { throw new IllegalStateException("Annotation @" + getSimpleName() + " found, but there are no implementations. Did you forget to include a starter?"); } if (factories.size() > 1) { // there should only ever be one DiscoveryClient, but there might be more than // one factory log.warn("More than one implementation " + "of @" + getSimpleName() + " (now relying on @Conditionals to pick one): " + factories); } return factories.toArray(new String[factories.size()]); }
spring.factories
找到对应的配置类 HystrixCircuitBreakerConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration
#####就是这个,找到对应的配置类HystrixCircuitBreakerConfiguration
org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration
查看HystrixCircuitBreakerConfiguration配置类
@Configuration
public class HystrixCircuitBreakerConfiguration {
/**
* 这里注入了一个切面HystrixCommandAspect,Hystrix就是利用切面机制工作的
*/
@Bean
public HystrixCommandAspect hystrixCommandAspect() {
return new HystrixCommandAspect();
}
}
查看切面类HystrixCommandAspect
@Aspect public class HystrixCommandAspect { private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP; static { META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder() .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory()) .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory()) .build(); } //定义切点 加了@HystrixCommand注解的方法 @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)") public void hystrixCommandAnnotationPointcut() { } @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)") public void hystrixCollapserAnnotationPointcut() { } //环绕通知方法 @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()") public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable { //获取原始目标方法 Method method = getMethodFromTarget(joinPoint); Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint); if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) { throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " + "annotations at the same time"); } //根据方法类型获取到用于封装MetaHolder的工厂,这里从一个map中获取,这个map存着两种工厂 //(@HystrixCommand和@HystrixCollapser) MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method)); //获取封装方法的元数据(元数据指的是比如方法的签名,方法上加的注解信息,参数,注解上配置回退方法这些都属于元数据)封装成一个MetaHolder MetaHolder metaHolder = metaHolderFactory.create(joinPoint); //封装成一个可执行的组件 //在这里会进行一些资源初始化 进入查看该方法 HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType(); Object result; try { if (!metaHolder.isObservable()) { result = CommandExecutor.execute(invokable, executionType, metaHolder); } else { result = executeObservable(invokable, executionType, metaHolder); } } catch (HystrixBadRequestException e) { throw e.getCause(); } catch (HystrixRuntimeException e) { throw hystrixRuntimeExceptionToThrowable(metaHolder, e); } return result; } }
HystrixCommandFactory 的create()方法
public class HystrixCommandFactory { private static final HystrixCommandFactory INSTANCE = new HystrixCommandFactory(); private HystrixCommandFactory() { } public static HystrixCommandFactory getInstance() { return INSTANCE; } public HystrixInvokable create(MetaHolder metaHolder) { HystrixInvokable executable; //这里会根据元数据类型进行一些判断 if (metaHolder.isCollapserAnnotationPresent()) {//@HystrixCollapser的 executable = new CommandCollapser(metaHolder); } else if (metaHolder.isObservable()) {//是否异步处理 executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder)); } else {//一般会进入这个GenericCommand executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder)); } return executable; } }
查看GenericCommand类
该类中有两个重要的方法 run()
和 getFallback()
public class GenericCommand extends AbstractHystrixCommand<Object> { //根据元数据信息找到目标原始方法 调用原始方法执行 @Override protected Object run() throws Exception { LOGGER.debug("execute command: {}", getCommandKey().name()); return process(new Action() { @Override Object execute() { return getCommandAction().execute(getExecutionType()); } }); } //根据元数据信息找到回退的方法 调用回退的方法 @Override protected Object getFallback() { final CommandAction commandAction = getFallbackAction(); if (commandAction != null) { try { return process(new Action() { @Override Object execute() { MetaHolder metaHolder = commandAction.getMetaHolder(); Object[] args = createArgsForFallback(metaHolder, getExecutionException()); return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args); } }); } catch (Throwable e) { LOGGER.error(FallbackErrorMessageBuilder.create() .append(commandAction, e).build()); throw new FallbackInvocationException(unwrapCause(e)); } } else { return super.getFallback(); } } }
同时在GenericCommand
其父类的构造器中会进行一些资源的初始化工作
GenericCommand ->AbstractHystrixCommand—>HystrixCommand—>AbstractCommand
//做一些资源初始化 protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) { this.commandGroup = initGroupKey(group); this.commandKey = initCommandKey(key, getClass()); this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults); //这里初始化了线程池(之前元数据中存了具体配置信息,这里会传递过来进行初始化,他会把线程池进行缓存放进一个concurrentHashMap中,先获取 获取不到就创建) this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get()); //一些指标的初始化,根据这些指标来决定是否断路之类的一些操作 this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties); //断路器初始化 this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics); this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults); //Strategies from plugins this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier(); this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties); this.executionHook = initExecutionHook(executionHook); this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy); this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy); /* fallback semaphore override if applicable */ this.fallbackSemaphoreOverride = fallbackSemaphore; /* execution semaphore override if applicable */ this.executionSemaphoreOverride = executionSemaphore; }
接下来回到之前的切面类HystrixCommandAspect
查看切面类HystrixCommandAspect
@Aspect public class HystrixCommandAspect { private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP; static { META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder() .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory()) .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory()) .build(); } //定义切点 加了@HystrixCommand注解的方法 @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)") public void hystrixCommandAnnotationPointcut() { } @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)") public void hystrixCollapserAnnotationPointcut() { } //环绕通知方法 @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()") public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable { //获取原始目标方法 Method method = getMethodFromTarget(joinPoint); Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint); if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) { throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " + "annotations at the same time"); } //根据方法类型获取到用于封装MetaHolder的工厂,这里从一个map中获取,这个map存着两种工厂 //(@HystrixCommand和@HystrixCollapser) MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method)); //获取封装方法的元数据(元数据指的是比如方法的签名,方法上加的注解信息,参数,注解上配置回退方法这些都属于元数据)封装成一个MetaHolder MetaHolder metaHolder = metaHolderFactory.create(joinPoint); //封装成一个可执行的组件 //在这里会进行一些资源初始化 进入查看该方法(这里的逻辑在上面已经分析过了) HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType(); //返回结果 Object result; try { if (!metaHolder.isObservable()) {//是否是响应式的(由于我们这些都是同步的会走这个逻辑) result = CommandExecutor.execute(invokable, executionType, metaHolder); } else { result = executeObservable(invokable, executionType, metaHolder); } } catch (HystrixBadRequestException e) { throw e.getCause(); } catch (HystrixRuntimeException e) { throw hystrixRuntimeExceptionToThrowable(metaHolder, e); } return result; } }
查看CommandExecutor类
public class CommandExecutor { /** * Calls a method of {@link HystrixExecutable} in accordance with specified execution type. * * @param invokable {@link HystrixInvokable} * @param metaHolder {@link MetaHolder} * @return the result of invocation of specific method. * @throws RuntimeException */ public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException { Validate.notNull(invokable); Validate.notNull(metaHolder); switch (executionType) { case SYNCHRONOUS: { //同步进入该方法 //会进行一个类型转换 然后执行execute()方法 return castToExecutable(invokable, executionType).execute(); } case ASYNCHRONOUS: { HystrixExecutable executable = castToExecutable(invokable, executionType); if (metaHolder.hasFallbackMethodCommand() && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) { return new FutureDecorator(executable.queue()); } return executable.queue(); } case OBSERVABLE: { HystrixObservable observable = castToObservable(invokable); return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable(); } default: throw new RuntimeException("unsupported execution type: " + executionType); } } private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) { if (invokable instanceof HystrixExecutable) { return (HystrixExecutable) invokable; } throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode"); } }
查看HystrixCommand类
public R execute() { try { //queue()方法会返回Feture对象(封装异步处理的结果) return queue().get(); } catch (Exception e) { throw Exceptions.sneakyThrow(decomposeException(e)); } } public Future<R> queue() { /* * The Future returned by Observable.toBlocking().toFuture() does not implement the * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true; * thus, to comply with the contract of Future, we must wrap around it. */ //future的获取,业务逻辑的执行,异常后对回退方法的调用一系列的处理都是用RxJava响应式编程的内容,了解到这里就ok final Future<R> delegate = toObservable().toBlocking().toFuture(); .................................. }
GenericCommand
方法中根据元数据信息等重写了run()
方法(对目标方法的调用),
getFallback()
方法(对回退方法的调用),在RxJava处理过程中会完成对这两个方法的调用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。