赞
踩
Hystrix 是 Netflix 针对微服务分布式系统采用的熔断保护中间件,相当于电路中的保险丝。在微服务架构下,很多服务都相互依赖,如果不能对依赖的服务进行隔离,那么服务本身也有可能发生故障,Hystrix 通过 HystrixCommand 对调用进行隔离,这样可以阻止故障的连锁反应,能够让接口调用快速失败并迅速恢复正常,或者回退并优雅降级
//hystrix 容错机制
compile group: 'com.netflix.hystrix', name: 'hystrix-core', version: '1.5.18'
首先需要继承 HystrixCommand,通过构造函数设置一个 Groupkey。具体的逻辑在 run 方法中,返回了一个当前线程名称的值
public class MyHystrixCommand extends HystrixCommand<String> { private final String name; public MyHystrixCommand(String name){ super(HystrixCommandGroupKey.Factory.asKey("MyGroup")); this.name = name; } @Override protected String run() throws Exception { try { Thread.sleep(1000*10); }catch (InterruptedException e){ e.printStackTrace(); } return this.name + ":" + Thread.currentThread().getName(); } protected String getFallback(){ return "error, get FallBack"; } public static void main(String[] args) throws InterruptedException, ExecutionException { //同步 // String result = new MyHystrixCommand("zwt").execute(); // System.out.print(result); //异步 Future<String> future = new MyHystrixCommand("zwt").queue(); System.out.print(future.get()); } }
执行时间模拟调用失败
getFallback 方法返回回退内容
执行调用代码,可以发现返回的内容是“error, get FallBack”,证明已经触发了回退
public MyHystrixCommand(String name){
// super(HystrixCommandGroupKey.Factory.asKey("MyGroup"));
super(HystrixCommand.Setter.withGroupKey(
HystrixCommandGroupKey.Factory.asKey("MyGroup")
)
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter().withExecutionIsolationStrategy(
HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE
)
));
this.name = name;
}
在 run 方法中输出了线程名称,通过这个名称就可以确定当前是线程隔离还是信号量隔离 。
系统默认采用隔离策略,可以通过 andThreadPoolPropertiesDefaults 配置线程池的一些参数
public MyHystrixCommand(String name){ // super(HystrixCommandGroupKey.Factory.asKey("MyGroup")); super(HystrixCommand.Setter.withGroupKey( HystrixCommandGroupKey.Factory.asKey("MyGroup") ) .andCommandPropertiesDefaults( HystrixCommandProperties.Setter().withExecutionIsolationStrategy( HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE ) ) .andThreadPoolPropertiesDefaults( HystrixThreadPoolProperties.Setter() .withCoreSize(10) .withMaxQueueSize(100) .withMaximumSize(100) )); this.name = name; }
一般常用 Redis 第三方缓存数据库来进行对数据的缓存处理,Hystrix中提供了方法级别的缓存,通过重写 getCacheKey 来判断是否返回缓存的数据,getCacheKey 可以根据参数来生成。
增加 getCacheKey 方法,当创建对象时传进来的 name 参数作为缓存的 key
@Override
protected String getCacheKey(){
return String.valueOf(this.name);
}
@Override
protected String run() throws Exception {
System.err.println("get data");
return this.name + ":" + Thread.currentThread().getName();
}
缓存的处理取决于请求的上下文,我们必须初始化 HystrixRequestContext,在main方法中添加
public static void main(String[] args) throws InterruptedException, ExecutionException {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
//同步
String result = new MyHystrixCommand("zwt").execute();
System.out.print(result);
//异步
Future<String> future = new MyHystrixCommand("zwt").queue();
System.out.print(future.get());
context.shutdown();
}
同步会走一次run方法,异步也会走一次run方法,但不初始化 HystrixRequestContext 会报错
当数据发生变动时,必须将缓存中的数据清除掉,不然就会产生脏数据,添加一个设置 andCommandKey 使之支持清除
private static final HystrixCommandKey GETTER_KEY = HystrixCommandKey.Factory.asKey("MyKey"); public MyHystrixCommand(String name){ super(HystrixCommand.Setter.withGroupKey( HystrixCommandGroupKey.Factory.asKey("MyGroup") ) .andCommandPropertiesDefaults( HystrixCommandProperties.Setter().withExecutionIsolationStrategy( HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE ) ) .andThreadPoolPropertiesDefaults( HystrixThreadPoolProperties.Setter() .withCoreSize(10) .withMaxQueueSize(100) .withMaximumSize(100) ) .andCommandKey(GETTER_KEY)); this.name = name; }
增加一个清除的方法
//清除缓存 public static void flushCache(String name){ HystrixRequestCache.getInstance(GETTER_KEY, HystrixConcurrencyStrategyDefault.getInstance()).clear(name); } public static void main(String[] args) throws InterruptedException, ExecutionException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); //同步 String result = new MyHystrixCommand("zwt").execute(); System.out.println(result); //清除缓存 flushCache("zwt"); //异步 Future<String> future = new MyHystrixCommand("zwt").queue(); System.out.println(future.get()); context.shutdown(); }
运行可得输出结果运行了两次 run 方法
Hystrix 为了节省网络开销,可将多个请求自动合并为一个请求,
public class MyHystrixCollapser extends HystrixCollapser<List<String>, String, String> { private final String name; public MyHystrixCollapser(String name){ this.name = name; } @Override public String getRequestArgument() { return name; } @Override protected HystrixCommand<List<String>> createCommand(Collection<CollapsedRequest<String, String>> collapsedRequests) { return new BatchCommand(collapsedRequests); } @Override protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, String>> collapsedRequests) { int count = 0; for (CollapsedRequest<String, String> request : collapsedRequests){ request.setResponse(batchResponse.get(count++)); } } private static final class BatchCommand extends HystrixCommand<List<String>>{ private final Collection<CollapsedRequest<String ,String>> requests; protected BatchCommand(Collection<CollapsedRequest<String ,String>> requests) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey( HystrixCommandKey.Factory.asKey("GetValueForKey") )); this.requests = requests; } @Override protected List<String> run() throws Exception { System.out.println("执行请求"); ArrayList<String> response = new ArrayList<String>(); for (CollapsedRequest<String, String> request : requests){ response.add("返回结果:" + request.getArgument()); } return response; } } public static void main(String args[]) throws InterruptedException, ExecutionException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); Future<String> f1 = new MyHystrixCollapser("zwt111").queue(); Future<String> f2 = new MyHystrixCollapser("zwt222").queue(); System.out.println(f1.get()); System.out.println(f2.get()); context.shutdown(); } }
通过 MyHystrixCollapser 创建两个执行任务,按照正常的逻辑肯定是分别执行这两个任务,通过 HystrixCollapser 可以将多个任务合并到一起执行,从输出可以看出 run 方法只执行了一次
//spring-hystrix 容错机制
compile group: 'org.springframework.cloud', name: 'spring-cloud-starter-netflix-hystrix', version: '2.2.2.RELEASE'
在启动类上添加@EnableHystrix 或者@EnableCircuitBreaker, 注意,@EnableHystrix 中包含了@EnableCircuitBreaker
在上面增加一个@HystrixCommand 注解 , 用于指定 Hystrix 服务调用延迟或失败时调用的方法
@GetMapping("/callhello/{name}")
@HystrixCommand(fallbackMethod = "defaultCallHello")
public String callHello(@PathVariable("name") String name){
// String result = restTemplate.getForObject("http://hello-client/house/data/"+name, String .class);
String result = restTemplate.getForObject("http://localhost:8081/house/data/"+name, String .class);
// String result = myFeignClient.data(name);
System.out.println("调用接口"+result);
return result;
}
//注意失败时调用的方法和原方法的参数类型,个数要一致,需要统一参数列表,不然报下面的错误
//com.netflix.hystrix.contrib.javanica.exception.FallbackDefinitionException: fallback method wasn't found:
public String defaultCallHello(String name) {
return "error:"+name;
}
这里的调用http://localhost:8081/house/data/{name}
8081服务器没有启动,当调用失败触发熔断时会用 defaultCallHello 方法来回退具体的内容,这里只是简单的打印输出
在配置文件中开启 Feign 对 Hystrix 的支持:
# 开启feign对hystrix的支持
feign.hystrix.enabled=true
在 Feign 的客户端类上的 @FeignClient 注解中指定 fallback 进行回退,注意这里要将前面@HystrixCommand(fallbackMethod = "defaultCallHello")
注解去掉
@FeignClient(value = "hello-client", path = "/house", configuration = MyFeignConfiguratic.class, fallback = MyFeignClientHystrix.class)
public interface MyFeignClient {
@RequestMapping("data/{name}")
String data(@PathVariable("name") String name);
}
@GetMapping("/callhello/{name}")
// @HystrixCommand(fallbackMethod = "defaultCallHello")
public String callHello(@PathVariable("name") String name){
// String result = restTemplate.getForObject("http://hello-client/house/data/"+name, String .class);
// String result = restTemplate.getForObject("http://localhost:8081/house/data/"+name, String .class);
String result = myFeignClient.data(name);
System.out.println("调用接口"+result);
return result;
}
编写错误时 MyFeignClientHystrix 类,需要实现 MyFeignClient 类中所有的方法,返回回退时的内容
@Component
public class MyFeignClientHystrix implements MyFeignClient {
@Override
public String data(String name) {
System.err.println("=======error=========");
return "出错error";
}
}
注意不要缺少注解 @Component
,不添加会导致编译时未将 “MyFeignClientHystrix” 类自动实例化,当服务接口不可用时,进入到熔断器fallback的逻辑处理中,此时检查不到对应的实例,将无法使用,因此虽然编译器没检查到语法错误但运行时报错
在调用接口中制造异常
@GetMapping("/data/{name}")
public String getData(@PathVariable("name") String name){
try {
Thread.sleep(1000*10);
}catch (InterruptedException e){
e.printStackTrace();
}
return name + ":" + serverPort;
}
FallbackFactory 方式
@FeignClient(value = "hello-client", path = "/house", configuration = MyFeignConfiguratic.class, fallbackFactory = MyFeignClientHystrixFallbackFactory.class)
public interface MyFeignClient {
@RequestMapping("data/{name}")
String data(@PathVariable("name") String name);
}
public class MyFeignClientHystrixFallbackFactory implements FallbackFactory<MyFeignClient> {
@Override
public MyFeignClient create(Throwable cause) {
return new MyFeignClient(){
@Override
public String data(String name) {
System.err.println("=======error=========");
return "出错error";
}
};
}
}
# actuator
compile group: 'org.springframework.boot', name: 'spring-boot-starter-actuator', version: '2.3.1.RELEASE'
启动类中添加 @EnableHystrix 开启Hystrix
浏览器打开 http://localhost:8080/actuator/hystrix.stream
Dashboard
# Dashboard
compile group: 'org.springframework.cloud', name: 'spring-cloud-starter-netflix-hystrix-dashboard', version: '2.2.2.RELEASE'
在启动类上添加 @EnableHystrixDashboard 注解
浏览器访问 http://localhost:8080/hystrix
第一行是监控的 stream 地址,也就是将之前文字监控信息的地址输入到第一个文本框中 。
第二行的 Delay 是时间,表示用多少毫秒同步一次监控信息, Title 是标题,这个可以随便填写,
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。