赞
踩
springBoot 1.5.x
springCloud Dalston.SR1
euraka + zuul + ribbon + hystrix
该项目是在spring-cloud-ribbon的基础上进行扩展,以实现接口的多个版本的调用及负载均衡,支持feign方式和断路器(spring-cloud-hystrix)。
1、服务A部署了实例 serivce-a,服务B部署实例service-a spring cloud ribbon默认是轮询的方式将请求分别转到两个实例上。用于验证外部请求通过网关(zuul)转发到服务
2、服务C部署实例service-b,用于验证内部服务之间的调用请求
在spring cloud微服务体系中,服务的请求来源无外乎两个方面:
来源1:外部请求通过网关(zuul)转发而来。
来源2:内部服务之间的调用请求。
不论网关转发过来的请求,还是内部服务调用过来的请求,都需要ribbon做负载均衡,所以可以扩展ribbon的负载均衡策略从而实现不同版本的请求转发到不同的服务实例上。
网关的转发过程是:zuul > hystrix > ribbon
内部服务调用的过程有两种:
RestTemplate > hystrix > ribbon
Feign > hystrix > ribbon
而其中hystrix有一个线程池隔离的能力,会创建另一个线程去请求服务,拥有更好的控制并发访问量、以及服务降级等能力,但是会出现一个问题,就是线程变量(ThreadLocal)的传递问题,这可以通过com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableDefault对象解决。
虽然整个项目实现起来代码量不少, 但是在接口设计上, 却只有三个简单的接口负责数据传递,路由的逻辑依然是封装在实现了IRule接口的实现类中(后面分析)。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Wp5EA10i-1593685115985)(…/doc/img/cd-ribbon-connection-point.png)]
public class DefaultRibbonConnectionPoint implements BambooRibbonConnectionPoint, ApplicationContextAware { ... @Override public void executeConnectPoint(ConnectPointContext connectPointContext) { ConnectPointContext.contextLocal.set(connectPointContext); BambooRequest bambooRequest = connectPointContext.getBambooRequest(); String requestVersion = versionExtractor.extractVersion(bambooRequest); BambooRequestContext.initRequestContext(bambooRequest, requestVersion); executeBeforeReuqestTrigger(); } @Override public void shutdownconnectPoint() { try { executeAfterReuqestTrigger(); } catch (Exception e) { ConnectPointContext.getContextLocal().setExcption(e); } finally { curRequestTriggers.remove(); ConnectPointContext.contextLocal.remove(); BambooRequestContext.shutdownRequestContext(); } } ... }
RequestVersionExtractor
这个接口负责获取请求需要访问的目标接口的版本。比如有些接口版本是放在路径上,如:/v1/api/test/get。也有放在uri参数中:/api/test/get?v=1。也有可能放到header中,所以在bamboo抽象出来一个接口, 具体的实现由开发者根据业务去实现。
LoadBalanceRequestTrigger
Ribbon请求的触发器,在ribbon请求发起时, 会被执行。这个接口有三个方法,分别是判断是否需要执行的方法(shouldExecute),以及请求之前执行(before)和请求完成之后执行(after),如果出现异常,after方法依然会被执行。
上面三个接口只是简单的实现了获取请求的目标版本、触发ribbon请求的触发器,以及将信息向下一步传递。在这一段中,将介绍如何与zuul、feign、RestTemplate以及ribbon和hystrix衔接起来。
/** * 用于@LoadBalance 标记的 RestTemplate,主要作用是用来获取request的相关信息,为后面的路由提供数据基础。 */ public class BambooClientHttpRequestIntercptor implements ClientHttpRequestInterceptor { @Override public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException { URI uri = request.getURI(); BambooRequest bambooRequest = BambooRequest.builder() .serviceId(uri.getHost()) .uri(uri.getPath()) .ip(RequestIpKeeper.getRequestIp()) .addMultiHeaders(request.getHeaders()) .addMultiParams(WebUtils.getQueryParams(uri.getQuery())) .build(); ConnectPointContext connectPointContext = ConnectPointContext.builder().bambooRequest(bambooRequest).build(); try { BambooAppContext.getBambooRibbonConnectionPoint().executeConnectPoint(connectPointContext); return execution.execute(request, body); } finally { BambooAppContext.getBambooRibbonConnectionPoint().shutdownconnectPoint(); } } }
/** * 主要作用是用来获取request的相关信息,为后面的路由提供数据基础。 */ public class BambooFeignClient implements Client { private Client delegate; public BambooFeignClient(Client delegate) { this.delegate = delegate; } @Override public Response execute(Request request, Request.Options options) throws IOException { URI uri = URI.create(request.url()); BambooRequest.Builder builder = BambooRequest.builder() .serviceId(uri.getHost()) .uri(uri.getPath()) .ip(RequestIpKeeper.getRequestIp()) .addMultiParams(WebUtils.getQueryParams(uri.getQuery())); request.headers().entrySet().forEach(entry ->{ for (String v : entry.getValue()) { builder.addHeader(entry.getKey(), v); } }); ConnectPointContext connectPointContext = ConnectPointContext.builder().bambooRequest(builder.build()).build(); try { BambooAppContext.getBambooRibbonConnectionPoint().executeConnectPoint(connectPointContext); return delegate.execute(request, options); }finally { BambooAppContext.getBambooRibbonConnectionPoint().shutdownconnectPoint(); } } }
/** * 主要作用是用来获取request的相关信息,为后面的路由提供数据基础。 */ public class BambooPreZuulFilter extends ZuulFilter { @Override public String filterType() { return FilterConstants.PRE_TYPE; } @Override public int filterOrder() { return 10000; } @Override public boolean shouldFilter() { return true; } @Override public Object run() { RequestContext context = RequestContext.getCurrentContext(); BambooRequest.Builder builder = BambooRequest.builder() .serviceId((String)context.get(FilterConstants.SERVICE_ID_KEY)) .uri((String)context.get(FilterConstants.REQUEST_URI_KEY)) .ip(context.getZuulRequestHeaders().get(FilterConstants.X_FORWARDED_FOR_HEADER.toLowerCase())) .addMultiParams(context.getRequestQueryParams()) .addHeaders(context.getZuulRequestHeaders()) .addHeaders(context.getOriginResponseHeaders().stream().collect(Collectors.toMap(Pair::first, Pair::second))); context.getOriginResponseHeaders().forEach(pair-> builder.addHeader(pair.first(), pair.second())); ConnectPointContext connectPointContext = ConnectPointContext.builder().bambooRequest(builder.build()).build(); BambooAppContext.getBambooRibbonConnectionPoint().executeConnectPoint(connectPointContext); return null; } }
/** * 做一些善后工作。比如删除BambooRequestContext在ThreadLocal中的信息。 */ public class BambooPostZuulFilter extends ZuulFilter { @Override public String filterType() { return FilterConstants.POST_TYPE; } @Override public int filterOrder() { return 0; } @Override public boolean shouldFilter() { return true; } @Override public Object run() { // BambooRequestContext.shutdownRequestContext(); BambooAppContext.getBambooRibbonConnectionPoint().shutdownconnectPoint(); return null; } }
public class HystrixContextRunnable implements Runnable { private final Callable<Void> actual; private final HystrixRequestContext parentThreadState; //... @Override public void run() { HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread(); try { // set the state of this thread to that of its parent HystrixRequestContext.setContextOnCurrentThread(parentThreadState); // execute actual Callable with the state of the parent try { actual.call(); } catch (Exception e) { throw new RuntimeException(e); } } finally { // restore this thread back to its original state HystrixRequestContext.setContextOnCurrentThread(existingState); } } }
parentThreadState也是一个HystrixRequestContext对象,它是在hystrix创建线程之前的,也就是处理http请求的线程的HystrixRequestContext对象,我们一般也是维护这个对象。在使用线程池隔离时,hystrix会将parentThreadState中的信息复到到新线程中,实现跨线程的数据传递,从而在后面的逻辑中可以获取到parentThreadState中维护的信息,包括ribbon的路由信息。在bamboo中,将一步骤的逻辑放到BambooRequestContext中,将BambooRequestContext实例本身传递下去。
public class BambooRequestContext { private static final Logger log = LoggerFactory.getLogger(BambooRequestContext.class); private static final HystrixRequestVariableDefault<BambooRequestContext> CURRENT_CONTEXT = new HystrixRequestVariableDefault<BambooRequestContext>(); private final String apiVersion; private final BambooRequest bambooRequest; private Map<String, Object> params; private BambooRequestContext(BambooRequest bambooRequest, String apiVersion) { params = new HashMap<>(); this.apiVersion = apiVersion; this.bambooRequest = bambooRequest; } public static BambooRequestContext currentRequestCentxt() { return CURRENT_CONTEXT.get(); } public static void initRequestContext(BambooRequest bambooRequest, String apiVersion) { if (!HystrixRequestContext.isCurrentThreadInitialized()) { HystrixRequestContext.initializeContext(); } CURRENT_CONTEXT.set(new BambooRequestContext(bambooRequest, apiVersion)); } public static void shutdownRequestContext() { if (HystrixRequestContext.isCurrentThreadInitialized()) { HystrixRequestContext.getContextForCurrentThread().shutdown(); } } //忽略setter/getter }
public class BambooApiVersionPredicate extends AbstractServerPredicate { public BambooApiVersionPredicate(BambooZoneAvoidanceRule rule) { super(rule); } @Override public boolean apply(PredicateKey input) { BambooLoadBalancerKey loadBalancerKey = getBambooLoadBalancerKey(input); if (loadBalancerKey != null && !StringUtils.isEmpty(loadBalancerKey.getApiVersion())) { Map<String, String> serverMetadata = ((BambooZoneAvoidanceRule) this.rule) .getServerMetadata(loadBalancerKey.getServiceId(), input.getServer()); String versions = serverMetadata.get("versions"); return matchVersion(versions, loadBalancerKey.getApiVersion()); } return true; } private BambooLoadBalancerKey getBambooLoadBalancerKey(PredicateKey input) { if(BambooRequestContext.currentRequestCentxt()!=null){ BambooRequestContext bambooRequestContext = BambooRequestContext.currentRequestCentxt(); String apiVersion = bambooRequestContext.getApiVersion(); if(!StringUtils.isEmpty(apiVersion)){ return BambooLoadBalancerKey.builder().apiVersion(apiVersion) .serviceId(bambooRequestContext.getServiceId()).build(); } } return null; } //... }
(以下说明都是假设浏览者对spring-cloud-netflix有过了解)
1、在使用多版本控制时,需要修改服务提供方和服务消费方,分别是application.properties和pom.xml。具体见github代码
2、在服务消费方,只需要在pom.xml添加spring-cloud-starter-multi-version到pom.xml依赖中即可
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-eureka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-feign</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.5</version> </dependency> <dependency> <groupId>cn.springcloud.gray</groupId> <artifactId>spring-cloud-starter-multi-version</artifactId> </dependency> </dependencies>
以下例子中,在一个名为spring-cloud-bamboo-service-a-samples的项目中加入第1个步骤, 启动服务。网关spring-cloud-bamboo-zuul-samples做为服务消费方,在pom.xml中加入spring-cloud-starter-multi-version, 并在application.properties中加入zuul的配置:
ribbon.eureka.enabled= true
zuul.prefix=/gateway #为zuul设置一个公共的前缀
3、验证:
1、eureka注册中心:localhost:9011
2、验证zuul转发
携带指定版本:localhost:9016/gateway/service-a/get?version=1
不携带指定版本:localhost:9016/gateway/service-a/get ribbon轮询访问service-a
3、验证内部调用
携带指定版本:localhost:9017/restTemplateGet?version=1
不携带指定版本:localhost:9017/restTemplateGet ribbon轮询访问service-a
github源码地址:https://github.com/yy19951002/test/tree/master/gray
遇见的错误:
com.netflix.client.ClientException: Load balancer does not have available server for client
解决方式:
1、zuul配置文件添加以下
ribbon.eureka.enabled=true
zuul.prefix=/gateway
hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=60000
zuul.host.socket-timeout-millis=12000
zuul.host.connect-timeout-millis=12000
2、生产者注册服务名和zuul网关调用的服务名信息是否一致(本人项目所遇问题最终解决方式)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。