赞
踩
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface GxHystrixCommand { /** * 默认超时时间 * @return */ int timeout() default 1000; /** * 回退方法 * @return */ String fallback() default ""; }
@Component @Aspect public class GxHystrixCommandAspect { ExecutorService executorService= Executors.newFixedThreadPool(10); @Pointcut(value = "@annotation(GxHystrixCommand)") public void pointCut(){} @Around(value = "pointCut()&&@annotation(hystrixCommand)") public Object doPointCut(ProceedingJoinPoint joinPoint,GxHystrixCommand hystrixCommand) throws InterruptedException, ExecutionException, TimeoutException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { int timeout=hystrixCommand.timeout(); //前置的判断逻辑 Future future=executorService.submit(()->{ try { return joinPoint.proceed(); //执行目标方法 } catch (Throwable throwable) { throwable.printStackTrace(); } return null; }); Object result; try { result=future.get(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace(); future.cancel(true); // ? if(StringUtils.isBlank(hystrixCommand.fallback())){ throw e; } //调用fallback result=invokeFallback(joinPoint,hystrixCommand.fallback()); } return result; } private Object invokeFallback(ProceedingJoinPoint joinPoint,String fallback) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { MethodSignature signature=(MethodSignature)joinPoint.getSignature(); Method method=signature.getMethod(); Class<?>[] parameterTypes=method.getParameterTypes(); //以上是获取被代理的方法的参数和Method //得到fallback方法 try { Method fallbackMethod=joinPoint.getTarget().getClass().getMethod(fallback,parameterTypes); fallbackMethod.setAccessible(true); //完成反射调用 return fallbackMethod.invoke(joinPoint.getTarget(),joinPoint.getArgs()); } catch (Exception e) { e.printStackTrace(); throw e; } } }
@RestController public class GpHystrixController { @Autowired RestTemplate restTemplate; //设置降级方法,并且设置超时时间 @GxHystrixCommand(fallback = "fallback",timeout = 3000) @GetMapping("/hystrix/test") public String test(){ return restTemplate.getForObject("http://localhost:8082/orders",String.class); } public String fallback(){ return "请求被降级"; } }
@RestController
public class OrderServiceImpl implements OrderService{
@Override
public String orders() {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Return All Orders");
return "Return All Orders";
}
}
可以看到结果,请求被降级
public class HystrixCommandService extends HystrixCommand<String>{ int num; RestTemplate restTemplate; public HystrixCommandService(int num,RestTemplate restTemplate){ super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("order-service")). andCommandPropertiesDefaults(HystrixCommandProperties.Setter(). withCircuitBreakerEnabled(true). withCircuitBreakerRequestVolumeThreshold(5))); //TODO this.num=num; this.restTemplate=restTemplate; } @Override protected String run() throws Exception { if(num%2==0){ return "正常访问"; } //发起远程请求 return restTemplate.getForObject("http://localhost:8082/orders",String.class); } //如果Hystrix触发了降级,那么将会执行fallback方法 @Override protected String getFallback() { return "请求被降级"; } }
@RestController
public class HystrixCommandController {
@Autowired
RestTemplate restTemplate;
@GetMapping("/hystrix/command/{num}")
public String hystrixCommand(@PathVariable("num")int num){
HystrixCommandService hystrixCommandService=new HystrixCommandService(num,restTemplate);
return hystrixCommandService.execute();//执行,会调用HystrixCommandService的run()方法,
}
}
Hystrix熔断的@HystrixCommand注解,是通过HystrixCommandAspect这个切面来处理的。
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()") public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable { //获取目标方法信息 Method method = getMethodFromTarget(joinPoint); Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint); if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) { throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " + "annotations at the same time"); } MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method)); //获取元数据,比如调用方法,HystrixProperty注解数据、方法参数等 MetaHolder metaHolder = metaHolderFactory.create(joinPoint); //获取调用者,它持有一个命令对象,并且可以在合适的时候通过这个命令对象完成具体的业务逻辑 //如果是异步,则创建GenericObservableCommand, 否则,则创建GenericCommand HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType(); Object result; try { if (!metaHolder.isObservable()) { //执行 result = CommandExecutor.execute(invokable, executionType, metaHolder); } else { result = executeObservable(invokable, executionType, metaHolder); } } catch (HystrixBadRequestException e) { throw e.getCause(); } catch (HystrixRuntimeException e) { throw hystrixRuntimeExceptionToThrowable(metaHolder, e); } return result; }
这个方法主要用来执行命令,从代码中可以看出这里有三个执行类型,分别是同步、异步、以及响应 式。其中,响应式又分为
ColdObservable(observable.toObservable()) 和 HotObservable(observable.observe())
默认的executionType=SYNCHRONOUS ,同步请求。
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException { Validate.notNull(invokable); Validate.notNull(metaHolder); switch (executionType) { case SYNCHRONOUS: { return castToExecutable(invokable, executionType).execute(); } case ASYNCHRONOUS: { HystrixExecutable executable = castToExecutable(invokable, executionType); if (metaHolder.hasFallbackMethodCommand() && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) { return new FutureDecorator(executable.queue()); } return executable.queue(); } case OBSERVABLE: { HystrixObservable observable = castToObservable(invokable); return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable(); } default: throw new RuntimeException("unsupported execution type: " + executionType); } }
接着调用HystrixCommand.execute()方法,这个方法中,首先调用queue(),这个方法会返回一个 future对象。
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
queue这个方法中,返回了一个Future对象,这个future对象的实现是f,f是以匿名内部类,它是 Java.util.concurrent中定一个的一个异步带返回值对象。当调用queue().get()方法时,最终是委派给了 delegate.get 方法。
public Future<R> queue() { /* * The Future returned by Observable.toBlocking().toFuture() does not implement the * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true; * thus, to comply with the contract of Future, we must wrap around it. */ final Future<R> delegate = toObservable().toBlocking().toFuture(); final Future<R> f = new Future<R>() { @Override public boolean cancel(boolean mayInterruptIfRunning) { if (delegate.isCancelled()) { return false; } if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) { interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning); } final boolean res = delegate.cancel(interruptOnFutureCancel.get()); if (!isExecutionComplete() && interruptOnFutureCancel.get()) { final Thread t = executionThread.get(); if (t != null && !t.equals(Thread.currentThread())) { t.interrupt(); } } return res; } @Override public boolean isCancelled() { return delegate.isCancelled(); } @Override public boolean isDone() { return delegate.isDone(); } @Override public R get() throws InterruptedException, ExecutionException { return delegate.get(); } @Override public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return delegate.get(timeout, unit); } }; /* special handling of error states that throw immediately */ if (f.isDone()) { try { f.get(); return f; } catch (Exception e) { Throwable t = decomposeException(e); if (t instanceof HystrixBadRequestException) { return f; } else if (t instanceof HystrixRuntimeException) { HystrixRuntimeException hre = (HystrixRuntimeException) t; switch (hre.getFailureType()) { case COMMAND_EXCEPTION: case TIMEOUT: // we don't throw these types from queue() only from queue().get() as they are execution errors return f; default: // these are errors we throw from queue() as they as rejection type errors throw hre; } } else { throw Exceptions.sneakyThrow(t); } } } return f; }
在RxJava中,分为几种角色
在queue中,调用toObservable()方法创建一个被观察者。
通过Observable定义一个被观察者,这个被观察者会被 toObservable().toBlocking().toFuture() ,实际上就是返回可获得 run() 抽象方法执行结果的 Future 。 run() 方法由子类实现,执行正常的业务逻辑。在下面这段代码中,当存在subscriber时, 便会调用Func0#call() 方法,而这个subscriber是在 toBlocking() 中被订阅的。
调用 isRequestCachingEnabled(); 判断请求结果缓存功能是否开启,如果开启并且命中了缓 存,则会以Observable形式返回一个缓存结果
创建执行命令的Observable: hystrixObservable
当缓存处于开启状态并且没有命中缓存时,则创建一个“订阅了执行命令的Observable”: HystrixCommandResponseFromCache
当缓存特性没有开启时,则返回执行命令的Observable。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。