当前位置:   article > 正文

熔断器 Hystrix 源码解析 —— 执行命令方式

熔断器调用的方法名

摘要: 原创出处 www.iocoder.cn/Hystrix/com… 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Hystrix 1.5.X 版本


???关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Hystrix 执行命令方法

建议 :对 RxJava 已经有一定的了解的基础上阅读本文。

在官方提供的示例中,我们看到 CommandHelloWorld 通过继承 HystrixCommand 抽象类,有四种调用方式:

方法
#execute()同步调用,返回直接结果
#queue()异步调用,返回 java.util.concurrent.Future
#observe()异步调用,返回 rx.Observable 。向 Observable 注册 rx.Subscriber 处理结果
#toObservable()未调用,返回 rx.Observable 。向 Observable 注册 rx.Subscriber 处理结果


推荐 Spring Cloud 书籍

2. 实现

  1. // AbstractCommand.java
  2. abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
  3. // ... 省略无关属性与方法
  4. public Observable<R> toObservable() {
  5. return Observable.defer(new Func0<Observable<R>>() {
  6. @Override
  7. public Observable<R> call() {
  8. // ....
  9. }
  10. }
  11. }
  12. public Observable<R> observe() {
  13. // us a ReplaySubject to buffer the eagerly subscribed-to Observable
  14. ReplaySubject<R> subject = ReplaySubject.create();
  15. // eagerly kick off subscription
  16. final Subscription sourceSubscription = toObservable().subscribe(subject);
  17. // return the subject that can be subscribed to later while the execution has already started
  18. return subject.doOnUnsubscribe(new Action0() {
  19. @Override
  20. public void call() {
  21. sourceSubscription.unsubscribe();
  22. }
  23. });
  24. }
  25. }
  26. // HystrixCommand.java
  27. public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
  28. // ... 省略无关属性与方法
  29. public Future<R> queue() {
  30. final Future<R> delegate = toObservable().toBlocking().toFuture();
  31. final Future<R> f = new Future<R>() {
  32. // ... 包装 delegate
  33. }
  34. // ...
  35. return f;
  36. }
  37. public R execute() {
  38. try {
  39. return queue().get();
  40. } catch (Exception e) {
  41. throw Exceptions.sneakyThrow(decomposeException(e));
  42. }
  43. }
  44. protected abstract R run() throws Exception;
  45. }复制代码
  • #toObservable() 方法 :做订阅,返回干净的 Observable 。这就是为什么上文说“未调用”
  • #observe() 方法 :调用 #toObservable() 方法的基础上,向 Observable 注册 rx.subjects.ReplaySubject 发起订阅
  • #queue() 方法 :调用 #toObservable() 方法的基础上,调用:
    • Observable#toBlocking() 方法 :将 Observable 转换成阻塞rx.observables.BlockingObservable
    • BlockingObservable#toFuture() 方法 :返回可获得 #run() 抽象方法执行结果的 Future 。
      • #run() 方法 :子类实现该方法,执行正常的业务逻辑
  • #execute() 方法 :调用 #queue() 方法的基础上,调用 Future#get() 方法,同步返回 #run() 的执行结果。
  • 整理四种调用方式如下:

    FROM 《【翻译】Hystrix文档-实现原理》

3. BlockingObservable

本小节为拓展内容,源码解析 RxJava ( 非 Hystrix ) 的 rx.observables.BlockingObservable 的实现,所以你可以选择:

《RxJava 源码解析 —— BlockingObservable》

666. 彩蛋

第一篇 Hystrix 正式的源码解析。

梳理 Hystrix 的源码还是蛮痛苦的,主要是因为对 RxJava 不够熟悉。

胖友,分享一波朋友圈可好!

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号