赞
踩
这是本人学习的总结,主要学习资料如下
- 马士兵教育
我们知道Hystrix
是通过HystrixCommand
或HystrixObservableCommand
两个对象来处理服务请求,服务端以此来简单快速地实现熔断,限流,服务降级等功能。
这两个对象有四个方法execute(),queue(),observe(),toObservable()
,这篇文章就是讲解这四个方法如何使用,以及各自有什么特性。
这个Hytrix
的架构流程图。
HystrixCommand
都拥有这个方法。
execute()
是阻塞式的执行逻辑。从源码中能看出来它获取到Future
后直接执行了get()
方法,自然线程就会阻塞直到对应的job
执行完成返回结果。
public R execute() {
try {
return this.queue().get();
} catch (Exception var2) {
throw Exceptions.sneakyThrow(this.decomposeException(var2));
}
}
首先是写一个实现HytrixCommnad
的类,复写其中的run()
方法。实际开发中run()
方法里就应该写我们的逻辑代码。这里只是简单打印一下Hello World
。
为了验证execute()
是阻塞式执行,我特意让线程沉睡了800ms
。
public class CommandDemo extends HystrixCommand<String> {
String name;
public CommandDemo(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("command")));
this.name = name;
}
@Override
protected String run() throws Exception {
// 模拟业务逻辑
Thread.sleep(800);
String result = "Hello, World! I'm " + name;
return result;
}
}
这是测试代码,可以看到执行时间永远是大于800ms
,所以execute()
是阻塞式执行。
public class CommandTest {
@Test
public void execute() {
HystrixCommand<String> commandDemo = new CommandDemo("demo case");
long before = System.currentTimeMillis();
String execute = commandDemo.execute();
long after = System.currentTimeMillis();
System.out.println(after - before);
System.out.println(execute);
}
}
只有HystrixComand
拥有这个方法。
queue()
是阻塞式的。它返回一个Future
对象,get()
方法会阻塞我们的线程。
实现HystrixCommand
的代码不变,这里只展示调用queue()
的代码。
因为是非阻塞的,所以打印的时间基本不可能超过800ms
。
@Test
public void queue() throws Exception{
HystrixCommand<String> commandDemo = new CommandDemo("demo case");
long before = System.currentTimeMillis();
Future<String> queue = commandDemo.queue();
long after = System.currentTimeMillis();
System.out.println("queue方法执行的时间:" + (after - before) + "ms");
System.out.println(queue.get());
}
HystrixComand
和HystrixObserveCommand
拥有这个方法。
observe()
返回一个Observe
对象。它比较灵活,可以以阻塞式运行线程,也可以以非阻塞式运行线程。
toBlocking().single()
之类的方法,这些方法会阻塞线程直到结果返回。@Test
public void observe() throws Exception{
HystrixCommand<String> commandDemo = new CommandDemo("demo case");
long before = System.currentTimeMillis();
Observable<String> observe = commandDemo.observe();
System.out.println(observe.toBlocking().single());
long after = System.currentTimeMillis();
System.out.println("observe方法执行的时间:" + (after - before) + "ms");
}
subscribe(Subscriber subscriber)
方法,覆写参数Subscriber
中的onCompleted()
,onError()
和onNext()
三个方法。线程执行错误会调用onError()
,顺利执行结束后则会依次执行onNext()
和onCompleted()
在示例中,最后一行沉睡了主线程3000ms
。如果不沉睡的话主线程会早于observe
的线程结束,导致我们看不到运行结果。
@Test public void observe2() throws Exception{ HystrixCommand<String> commandDemo = new CommandDemo("demo case"); long before = System.currentTimeMillis(); Observable<String> observe = commandDemo.observe(); observe.subscribe(new Subscriber<String>() { @Override public void onCompleted() { System.out.println("Complete..."); } @Override public void onError(Throwable throwable) { System.out.println("Error..."); } @Override public void onNext(String s) { System.out.println(s); long after = System.currentTimeMillis(); System.out.println("observe方法执行的时间:" + (after - before) + "ms"); } }); System.out.println("主线程的执行时间:" + (System.currentTimeMillis() - before) + "ms"); Thread.sleep(3000); }
HystrixComand
和HystrixObserveCommand
拥有这个方法。
toObservable()
返回一个Observe
对象,用法和observe()
一模一样,这里不重复展示。
observe()
和toObserve()
的区别主要是加载run()
方法的时机不同。
observe()
的执行顺序:
run()
方法。Subscribe
对象。run()
方法的结果注入到Subscribe
对象的onNext()
方法中。toObservable()
的执行顺序的一二步则是相反:
Subscribe
对象。run()
方法。run()
方法的结果注入到Subscribe
对象的onNext()
方法中。执行顺序的不同可以通过Subscribe
的onStart()
方法来验证,这个方法在onNext()
之前进行。
对于observe()
来说,onStart()
先执行;而toObservable()
则是run()
先执行。
这是验证代码,需要注意的是,在执行subscribe()
方法之前,主线程沉睡一秒,确保observe()
和toObservable()
有足够的时间完成注册。
@Test public void toObservable() throws Exception{ HystrixCommand<String> commandDemo = new CommandDemo("demo case"); long before = System.currentTimeMillis(); Observable<String> toObservable = commandDemo.toObservable(); // Observable<String> toObservable = commandDemo.observe(); Thread.sleep(1000); toObservable.subscribe(new Subscriber<String>() { @Override public void onStart() { System.out.println("Start..."); } @Override public void onCompleted() { System.out.println("Complete..."); } @Override public void onError(Throwable throwable) { System.out.println("Error..."); } @Override public void onNext(String s) { System.out.println(s); long after = System.currentTimeMillis(); System.out.println("observe方法执行的时间:" + (after - before) + "ms"); } }); System.out.println("主线程的执行时间:" + (System.currentTimeMillis() - before) + "ms"); Thread.sleep(3000); }
这是observe()
的执行结果,run()
方法执行在前。
这是toObservable()
的执行结果,onStart()
执行在前
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。