当前位置:   article > 正文

Hystrix源码分析_hystrix 源码分析

hystrix 源码分析

Hystrix源码分析

Hystrix是如何工作的

自定义注解GxHystrixCommand

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface GxHystrixCommand {

    /**
     * 默认超时时间
     * @return
     */
    int timeout() default 1000;

    /**
     * 回退方法
     * @return
     */
    String fallback() default "";

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

自定义切面 GxHystrixCommandAspect

@Component
@Aspect
public class GxHystrixCommandAspect {

    ExecutorService executorService= Executors.newFixedThreadPool(10);

    @Pointcut(value = "@annotation(GxHystrixCommand)")
    public void pointCut(){}

    @Around(value = "pointCut()&&@annotation(hystrixCommand)")
    public Object doPointCut(ProceedingJoinPoint joinPoint,GxHystrixCommand hystrixCommand) throws InterruptedException, ExecutionException, TimeoutException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        int timeout=hystrixCommand.timeout();
        //前置的判断逻辑
        Future future=executorService.submit(()->{
            try {
                return joinPoint.proceed(); //执行目标方法
            } catch (Throwable throwable) {
                throwable.printStackTrace();
            }
            return null;
        });
        Object result;
        try {
            result=future.get(timeout, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
            future.cancel(true);
            // ?
            if(StringUtils.isBlank(hystrixCommand.fallback())){
                throw e;
            }
            //调用fallback
            result=invokeFallback(joinPoint,hystrixCommand.fallback());
        }
        return result;
    }

    private Object invokeFallback(ProceedingJoinPoint joinPoint,String fallback) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        MethodSignature signature=(MethodSignature)joinPoint.getSignature();
        Method method=signature.getMethod();
        Class<?>[] parameterTypes=method.getParameterTypes();
        //以上是获取被代理的方法的参数和Method
        //得到fallback方法
        try {
            Method fallbackMethod=joinPoint.getTarget().getClass().getMethod(fallback,parameterTypes);
            fallbackMethod.setAccessible(true);
            //完成反射调用
            return fallbackMethod.invoke(joinPoint.getTarget(),joinPoint.getArgs());
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

自定义GxHystrixController,并且在调用方法头上添加@GxHystrixCommand注解

@RestController
public class GpHystrixController {

    @Autowired
    RestTemplate restTemplate;

    //设置降级方法,并且设置超时时间
    @GxHystrixCommand(fallback = "fallback",timeout = 3000)
    @GetMapping("/hystrix/test")
    public String test(){
        return restTemplate.getForObject("http://localhost:8082/orders",String.class);
    }

    public String fallback(){
        return "请求被降级";
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

在Order-service中调整睡眠时间

@RestController
public class OrderServiceImpl implements OrderService{

    @Override
    public String orders() {
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Return All Orders");
        return "Return All Orders";
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

可以看到结果,请求被降级

自定义HystrixCommandService 继承HystrixCommand

public class HystrixCommandService extends HystrixCommand<String>{
    int num;
    RestTemplate restTemplate;
    public HystrixCommandService(int num,RestTemplate restTemplate){
        super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("order-service")).
              andCommandPropertiesDefaults(HystrixCommandProperties.Setter().
                                           withCircuitBreakerEnabled(true).
                                           withCircuitBreakerRequestVolumeThreshold(5))); //TODO
        this.num=num;
        this.restTemplate=restTemplate;
    }
    @Override
    protected String run() throws Exception {
        if(num%2==0){
            return "正常访问";
        }
        //发起远程请求
        return restTemplate.getForObject("http://localhost:8082/orders",String.class);
    }

    //如果Hystrix触发了降级,那么将会执行fallback方法
    @Override
    protected String getFallback() {
        return "请求被降级";
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

定义HystrixCommandController

@RestController
public class HystrixCommandController {

    @Autowired
    RestTemplate restTemplate;

    @GetMapping("/hystrix/command/{num}")
    public String hystrixCommand(@PathVariable("num")int num){
        HystrixCommandService hystrixCommandService=new HystrixCommandService(num,restTemplate);
        return  hystrixCommandService.execute();//执行,会调用HystrixCommandService的run()方法,
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

Hystrix熔断的源码分析

Hystrix熔断的@HystrixCommand注解,是通过HystrixCommandAspect这个切面来处理的。

@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
    //获取目标方法信息
    Method method = getMethodFromTarget(joinPoint);
    Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
    if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
        throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                                        "annotations at the same time");
    }
    MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
    //获取元数据,比如调用方法,HystrixProperty注解数据、方法参数等
    MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
    //获取调用者,它持有一个命令对象,并且可以在合适的时候通过这个命令对象完成具体的业务逻辑
    //如果是异步,则创建GenericObservableCommand, 否则,则创建GenericCommand
    HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
    ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
        metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

    Object result;
    try {
        if (!metaHolder.isObservable()) {
            //执行
            result = CommandExecutor.execute(invokable, executionType, metaHolder);
        } else {
            result = executeObservable(invokable, executionType, metaHolder);
        }
    } catch (HystrixBadRequestException e) {
        throw e.getCause();
    } catch (HystrixRuntimeException e) {
        throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
    }
    return result;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

CommandExecutor.execute

这个方法主要用来执行命令,从代码中可以看出这里有三个执行类型,分别是同步、异步、以及响应 式。其中,响应式又分为

ColdObservable(observable.toObservable()) 和 HotObservable(observable.observe())

默认的executionType=SYNCHRONOUS ,同步请求。

  • execute():同步执行,返回一个单一的对象结果,发生错误时抛出异常。
  • queue():异步执行,返回一个 Future 对象,包含着执行结束后返回的单一结果。
  • observe():这个方法返回一个 Observable 对象,它代表操作的多个结果,但是已经被订阅者消费 掉了。
  • toObservable():这个方法返回一个 Observable 对象,它代表操作的多个结果,需要咱们自己手 动订阅并消费掉。
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
    Validate.notNull(invokable);
    Validate.notNull(metaHolder);

    switch (executionType) {
        case SYNCHRONOUS: {
            return castToExecutable(invokable, executionType).execute();
        }
        case ASYNCHRONOUS: {
            HystrixExecutable executable = castToExecutable(invokable, executionType);
            if (metaHolder.hasFallbackMethodCommand()
                && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                return new FutureDecorator(executable.queue());
            }
            return executable.queue();
        }
        case OBSERVABLE: {
            HystrixObservable observable = castToObservable(invokable);
            return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
        }
        default:
            throw new RuntimeException("unsupported execution type: " + executionType);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

HystrixCommand.execute()

接着调用HystrixCommand.execute()方法,这个方法中,首先调用queue(),这个方法会返回一个 future对象。

public R execute() {
    try {
        return queue().get();
    } catch (Exception e) {
        throw Exceptions.sneakyThrow(decomposeException(e));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

queue这个方法中,返回了一个Future对象,这个future对象的实现是f,f是以匿名内部类,它是 Java.util.concurrent中定一个的一个异步带返回值对象。当调用queue().get()方法时,最终是委派给了 delegate.get 方法。

public Future<R> queue() {
    /*
         * The Future returned by Observable.toBlocking().toFuture() does not implement the
         * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
         * thus, to comply with the contract of Future, we must wrap around it.
         */
    final Future<R> delegate = toObservable().toBlocking().toFuture();

    final Future<R> f = new Future<R>() {

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            if (delegate.isCancelled()) {
                return false;
            }

            if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
                interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
            }

            final boolean res = delegate.cancel(interruptOnFutureCancel.get());

            if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                final Thread t = executionThread.get();
                if (t != null && !t.equals(Thread.currentThread())) {
                    t.interrupt();
                }
            }

            return res;
        }

        @Override
        public boolean isCancelled() {
            return delegate.isCancelled();
        }

        @Override
        public boolean isDone() {
            return delegate.isDone();
        }

        @Override
        public R get() throws InterruptedException, ExecutionException {
            return delegate.get();
        }

        @Override
        public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            return delegate.get(timeout, unit);
        }

    };

    /* special handling of error states that throw immediately */
    if (f.isDone()) {
        try {
            f.get();
            return f;
        } catch (Exception e) {
            Throwable t = decomposeException(e);
            if (t instanceof HystrixBadRequestException) {
                return f;
            } else if (t instanceof HystrixRuntimeException) {
                HystrixRuntimeException hre = (HystrixRuntimeException) t;
                switch (hre.getFailureType()) {
                    case COMMAND_EXCEPTION:
                    case TIMEOUT:
                        // we don't throw these types from queue() only from queue().get() as they are execution errors
                        return f;
                    default:
                        // these are errors we throw from queue() as they as rejection type errors
                        throw hre;
                }
            } else {
                throw Exceptions.sneakyThrow(t);
            }
        }
    }

    return f;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82

toObservable()

在RxJava中,分为几种角色

  • Observable(被观察者),它的主要作用是产生事件
  • Observer(观察者),它的作用是接收事件并作出相应
  • Subscribe(订阅),它用来连接被观察者和观察者
  • Event(事件),被观察者、观察者、沟通的载体

在queue中,调用toObservable()方法创建一个被观察者。

AbstractCommand.toObservable

通过Observable定义一个被观察者,这个被观察者会被 toObservable().toBlocking().toFuture() ,实际上就是返回可获得 run() 抽象方法执行结果的 Future 。 run() 方法由子类实现,执行正常的业务逻辑。在下面这段代码中,当存在subscriber时, 便会调用Func0#call() 方法,而这个subscriber是在 toBlocking() 中被订阅的。

  • 调用 isRequestCachingEnabled(); 判断请求结果缓存功能是否开启,如果开启并且命中了缓 存,则会以Observable形式返回一个缓存结果

  • 创建执行命令的Observable: hystrixObservable

  • 当缓存处于开启状态并且没有命中缓存时,则创建一个“订阅了执行命令的Observable”: HystrixCommandResponseFromCache

    • 创建存储到缓存的Observable: HystrixCachedObservable
    • 将toCache添加到缓存中,返回获取缓存的Observable:fromCache
    • 如果添加失败: fromCache!=null, 则调用 toCache.unsubscribe() 方法,取消 HystrixCachedObservable 的订阅
    • 如果添加成功,则调用 toCache.toObservable(); 获得缓存Observable
  • 当缓存特性没有开启时,则返回执行命令的Observable。

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号