赞
踩
Spring Cloud Gateway 是Spring Cloud 生态全新项目,基于Spring 5、Spring Boot 2.X、Project Reactor实现的API网关,旨在为微服务提供简单高效的API路由管理方法。
Spring Cloud Gateway 作为Spring Cloud 生态中的网关,目标是代替Zuul 1.X。Spring Cloud 2.X版本目前仍未对Zuul 2.X高性能版本进行集成,仍使用的是非Reactor的老版本Zuul网关。
- 目前Spring Cloud dependencies 最新版本Hoxton.SR8 仍使用的是Zuul 1.3.1
- Zuul 2.x 高性能Reactor版本本身与18年5月开源,目前最新版本2.1.9
为了提高网关性能,Spring Cloud Gateway基于WebFlux框架实现,而WebFlux框架底层则使用了高性能的Reactor模式通信框架Netty。
ServerWebExchange
,可以匹配HTTP请求的所有内容,比如标头或参数。GatewayFilter
实例,可以在发送下游请求之前或之后修改请求或响应。Springcloud 2.x 版本到目前为止中所集成的Zuul版本(1.x),采用的是Tomcat容器,使用的是传统的Servlet IO处理模型。
servlet由servlet container进行生命周期管理。container启动时构造servlet对象并调用servlet init()进行初始化;container关闭时调用servlet destory()销毁servlet;container运行时接受请求,并为每个请求分配一个线程(一般从线程池中获取空闲线程)然后调用service()。
弊端:servlet是一个简单的网络IO模型,当请求进入servlet container时,servlet container就会为其绑定一个线程,在并发不高的场景下这种模型是适用的,但是一旦并发上升,线程数量就会上涨,而线程资源代价是昂贵的(上线文切换,内存消耗大)严重影响请求的处理时间。在一些简单的业务场景下,不希望为每个request分配一个线程,只需要1个或几个线程就能应对极大并发的请求,这种业务场景下servlet模型没有优势。
所以Springcloud Zuul 是基于servlet之上的一个阻塞式处理模型,即spring实现了处理所有request请求的一个servlet(DispatcherServlet),并由该servlet阻塞式处理处理。所以Springcloud Zuul无法摆脱servlet模型的弊端。
Webflux模式替换了旧的Servlet线程模型。用少量的线程处理request和response io操作,这些线程称为Loop线程,而业务交给响应式编程框架处理,响应式编程是非常灵活的,用户可以将业务中阻塞的操作提交到响应式框架的work线程中执行,而不阻塞的操作依然可以在Loop线程中进行处理,大大提高了Loop线程的利用率。官方结构图:
Webflux虽然可以兼容多个底层的通信框架,但是一般情况下,底层使用的还是Netty,毕竟,Netty是目前业界认可的最高性能的通信框架。而Webflux的Loop线程,正好就是著名的Reactor 模式IO处理模型的Reactor线程,如果使用的是高性能的通信框架Netty,这就是Netty的EventLoop线程。
使用Gateway只需要简单引入依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
处理流程:
注意,在没有端口的路由中定义的URI,HTTP和HTTPS URI的默认端口值分别为80和443。
public class DispatcherHandler implements WebHandler, ApplicationContextAware { @Nullable private List<HandlerMapping> handlerMappings; @Nullable private List<HandlerAdapter> handlerAdapters; @Nullable private List<HandlerResultHandler> resultHandlers; public DispatcherHandler() { } public DispatcherHandler(ApplicationContext applicationContext) { this.initStrategies(applicationContext); } @Nullable public final List<HandlerMapping> getHandlerMappings() { return this.handlerMappings; } public void setApplicationContext(ApplicationContext applicationContext) { this.initStrategies(applicationContext); } # 初始、校验HandlerMapping并按order排序 protected void initStrategies(ApplicationContext context) { Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(context, HandlerMapping.class, true, false); ArrayList<HandlerMapping> mappings = new ArrayList(mappingBeans.values()); AnnotationAwareOrderComparator.sort(mappings); this.handlerMappings = Collections.unmodifiableList(mappings); Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(context, HandlerAdapter.class, true, false); this.handlerAdapters = new ArrayList(adapterBeans.values()); AnnotationAwareOrderComparator.sort(this.handlerAdapters); Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(context, HandlerResultHandler.class, true, false); this.resultHandlers = new ArrayList(beans.values()); AnnotationAwareOrderComparator.sort(this.resultHandlers); } //遍历handlerMappings ,根据exchange找到对应的handler // 对于Gateway 会找到对应的RoutePredicateHandlerMapping public Mono<Void> handle(ServerWebExchange exchange) { return this.handlerMappings == null ? this.createNotFoundError() : Flux.fromIterable(this.handlerMappings).concatMap((mapping) -> { return mapping.getHandler(exchange); }).next().switchIfEmpty(this.createNotFoundError())如果遍历不到结果,则切换到错误处理 .flatMap((handler) -> { //通过HandlerAdapter调用handler, //gateway使用的 SimpleHandlerAdapter return this.invokeHandler(exchange, handler); }).flatMap((result) -> {//对响应进行处理 return this.handleResult(exchange, result); }); } private <R> Mono<R> createNotFoundError() { return Mono.defer(() -> { Exception ex = new ResponseStatusException(HttpStatus.NOT_FOUND, "No matching handler"); return Mono.error(ex); }); } private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) { if (this.handlerAdapters != null) { Iterator var3 = this.handlerAdapters.iterator(); while(var3.hasNext()) { HandlerAdapter handlerAdapter = (HandlerAdapter)var3.next(); if (handlerAdapter.supports(handler)) { //调用handler的handle方法处理请求 return handlerAdapter.handle(exchange, handler); } } } return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler)); } //根据result获取对应的结果处理handler并处理结果 private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) { return this.getResultHandler(result).handleResult(exchange, result).checkpoint("Handler " + result.getHandler() + " [DispatcherHandler]").onErrorResume((ex) -> { return result.applyExceptionHandler(ex).flatMap((exResult) -> { String text = "Exception handler " + exResult.getHandler() + ", error=\"" + ex.getMessage() + "\" [DispatcherHandler]"; return this.getResultHandler(exResult).handleResult(exchange, exResult).checkpoint(text); }); }); } private HandlerResultHandler getResultHandler(HandlerResult handlerResult) { if (this.resultHandlers != null) { Iterator var2 = this.resultHandlers.iterator(); while(var2.hasNext()) { HandlerResultHandler resultHandler = (HandlerResultHandler)var2.next(); if (resultHandler.supports(handlerResult)) { return resultHandler; } } } throw new IllegalStateException("No HandlerResultHandler for " + handlerResult.getReturnValue()); } }
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping { private final FilteringWebHandler webHandler; private final RouteLocator routeLocator; private final Integer managementPort; private final RoutePredicateHandlerMapping.ManagementPortType managementPortType; public RoutePredicateHandlerMapping(FilteringWebHandler webHandler, RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties, Environment environment) { this.webHandler = webHandler; this.routeLocator = routeLocator; this.managementPort = getPortProperty(environment, "management.server."); this.managementPortType = this.getManagementPortType(environment); //设置排序字段1,此处的目的是Spring Cloud Gateway 的 GatewayWebfluxEndpoint 提供 HTTP API ,不需要经过网关 //它通过 RequestMappingHandlerMapping 进行请求匹配处理。RequestMappingHandlerMapping 的 order = 0 ,需要排在 RoutePredicateHandlerMapping 前面。所有,RoutePredicateHandlerMapping 设置 order = 1 。 this.setOrder(1); this.setCorsConfigurations(globalCorsProperties.getCorsConfigurations()); } private RoutePredicateHandlerMapping.ManagementPortType getManagementPortType(Environment environment) { Integer serverPort = getPortProperty(environment, "server."); if (this.managementPort != null && this.managementPort < 0) { return RoutePredicateHandlerMapping.ManagementPortType.DISABLED; } else { return this.managementPort != null && (serverPort != null || !this.managementPort.equals(8080)) && (this.managementPort == 0 || !this.managementPort.equals(serverPort)) ? RoutePredicateHandlerMapping.ManagementPortType.DIFFERENT : RoutePredicateHandlerMapping.ManagementPortType.SAME; } } private static Integer getPortProperty(Environment environment, String prefix) { return (Integer)environment.getProperty(prefix + "port", Integer.class); } //设置mapping到上下文环境 protected Mono<?> getHandlerInternal(ServerWebExchange exchange) { if (this.managementPortType == RoutePredicateHandlerMapping.ManagementPortType.DIFFERENT && this.managementPort != null && exchange.getRequest().getURI().getPort() == this.managementPort) { return Mono.empty(); } else { exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_HANDLER_MAPPER_ATTR, this.getSimpleName()); // 查找路由 return this.lookupRoute(exchange).flatMap((r) -> { exchange.getAttributes().remove(ServerWebExchangeUtils.GATEWAY_PREDICATE_ROUTE_ATTR); if (this.logger.isDebugEnabled()) { this.logger.debug("Mapping [" + this.getExchangeDesc(exchange) + "] to " + r); } //将查找到的路由设置到上下文环境 exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR, r); //返回mapping对应的WebHandler即FilteringWebHandler return Mono.just(this.webHandler); }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> { //当前未找到路由时返回空,并移除GATEWAY_PREDICATE_ROUTE_ATTR exchange.getAttributes().remove(ServerWebExchangeUtils.GATEWAY_PREDICATE_ROUTE_ATTR); if (this.logger.isTraceEnabled()) { this.logger.trace("No RouteDefinition found for [" + this.getExchangeDesc(exchange) + "]"); } }))); } } protected CorsConfiguration getCorsConfiguration(Object handler, ServerWebExchange exchange) { return super.getCorsConfiguration(handler, exchange); } private String getExchangeDesc(ServerWebExchange exchange) { StringBuilder out = new StringBuilder(); out.append("Exchange: "); out.append(exchange.getRequest().getMethod()); out.append(" "); out.append(exchange.getRequest().getURI()); return out.toString(); } //通过路由定位器获取路由信息 protected Mono<Route> lookupRoute(ServerWebExchange exchange) { return this.routeLocator.getRoutes().concatMap((route) -> { return Mono.just(route).filterWhen((r) -> { exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_PREDICATE_ROUTE_ATTR, r.getId()); return (Publisher)r.getPredicate().apply(exchange);//通过谓词过滤路由 }).doOnError((e) -> { this.logger.error("Error applying predicate for route: " + route.getId(), e); }).onErrorResume((e) -> { return Mono.empty(); }); }).next().map((route) -> { if (this.logger.isDebugEnabled()) { this.logger.debug("Route matched: " + route.getId()); } this.validateRoute(route, exchange); return route; }); } protected void validateRoute(Route route, ServerWebExchange exchange) { } protected String getSimpleName() { return "RoutePredicateHandlerMapping"; } public static enum ManagementPortType { DISABLED, SAME, DIFFERENT; private ManagementPortType() { } } }
# 通过过滤器处理web请求的处理器 public class FilteringWebHandler implements WebHandler { protected static final Log logger = LogFactory.getLog(FilteringWebHandler.class); # 全局过滤器 private final List<GatewayFilter> globalFilters; public FilteringWebHandler(List<GlobalFilter> globalFilters) { this.globalFilters = loadFilters(globalFilters); } private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) { return (List)filters.stream().map((filter) -> { FilteringWebHandler.GatewayFilterAdapter gatewayFilter = new FilteringWebHandler.GatewayFilterAdapter(filter); if (filter instanceof Ordered) { int order = ((Ordered)filter).getOrder(); return new OrderedGatewayFilter(gatewayFilter, order); } else { return gatewayFilter; } }).collect(Collectors.toList()); } public Mono<Void> handle(ServerWebExchange exchange) { #获取请求上下文设置的路由实例 Route route = (Route)exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR); # 获取网关路由定义下的网关过滤器集合 List<GatewayFilter> gatewayFilters = route.getFilters(); # 组合全局的过滤器与路由配置的过滤器,并将路由器定义的过滤器添加集合尾部 List<GatewayFilter> combined = new ArrayList(this.globalFilters); combined.addAll(gatewayFilters); AnnotationAwareOrderComparator.sort(combined); if (logger.isDebugEnabled()) { logger.debug("Sorted gatewayFilterFactories: " + combined); } # 创建过滤器链表对其进行链式调用 return (new FilteringWebHandler.DefaultGatewayFilterChain(combined)).filter(exchange); } private static class GatewayFilterAdapter implements GatewayFilter { private final GlobalFilter delegate; GatewayFilterAdapter(GlobalFilter delegate) { this.delegate = delegate; } public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { return this.delegate.filter(exchange, chain); } public String toString() { StringBuilder sb = new StringBuilder("GatewayFilterAdapter{"); sb.append("delegate=").append(this.delegate); sb.append('}'); return sb.toString(); } } private static class DefaultGatewayFilterChain implements GatewayFilterChain { private final int index; private final List<GatewayFilter> filters; DefaultGatewayFilterChain(List<GatewayFilter> filters) { this.filters = filters; this.index = 0; } private DefaultGatewayFilterChain(FilteringWebHandler.DefaultGatewayFilterChain parent, int index) { this.filters = parent.getFilters(); this.index = index; } public List<GatewayFilter> getFilters() { return this.filters; } public Mono<Void> filter(ServerWebExchange exchange) { return Mono.defer(() -> { if (this.index < this.filters.size()) { GatewayFilter filter = (GatewayFilter)this.filters.get(this.index); FilteringWebHandler.DefaultGatewayFilterChain chain = new FilteringWebHandler.DefaultGatewayFilterChain(this, this.index + 1); return filter.filter(exchange, chain); } else { return Mono.empty(); } }); } } }
根据DispatcherHandler入口整理的Gateway类图
Spring Cloud Gateway的配置由一系列RouteDefinitionLocator实例驱动。以下清单显示了RouteDefinitionLocator接口的定义:
RouteDefinitionLocator.java
public interface RouteDefinitionLocator {
Flux<RouteDefinition> getRouteDefinitions();
}
默认情况下,PropertiesRouteDefinitionLocator使用Spring Boot的@ConfigurationProperties机制来加载属性。
Gateway 提供了两种不同的方式用于配置路由,一种是通过yml文件来配置,另一种是通过Java Bean来配置。
service-url:
user-service: http://localhost:8201
spring:
cloud:
gateway:
routes:
- id: path_route #路由的ID
uri: ${service-url.user-service}/user/{id} #匹配后路由地址
predicates: # 断言,路径相匹配的进行路由
- Path=/user/{id}
@Configuration
public class GatewayConfig {
@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
return builder.routes()
.route("path_route2", r -> r.path("/user/getByUsername")
.uri("http://localhost:8201/user/getByUsername"))
.build();
}
}
Spring Cloud Gateway将路由匹配作为Spring WebFluxHandlerMapping基础架构的一部分。Spring Cloud Gateway包括许多内置的路由谓词工厂。所有这些谓词都与HTTP请求的不同属性匹配。可以将多个路由谓词工厂与逻辑and语句结合使用。
Predicate 来源于 Java 8,是 Java 8 中引入的一个函数,Predicate 接受一个输入参数,返回一个布尔值结果。该接口包含多种默认方法来将 Predicate 组合成其他复杂的逻辑(比如:与,或,非)。可以用于接口请求参数校验、判断新老数据是否有变化需要进行更新操作。
在 Spring Cloud Gateway 中 Spring 利用 Predicate 的特性实现了各种路由匹配规则,有通过 Header、请求参数等不同的条件来进行作为条件匹配到对应的路由。
下图为 Spring Cloud Gateway内置的几种常见谓词路由器:
After Route Predicate
在指定时间之后的请求会匹配该路由。
spring:
cloud:
gateway:
routes:
- id: after_route
uri: ${service-url.user-service}
predicates:
- After=2019-09-24T16:30:00+08:00[Asia/Shanghai]
Before Route Predicate
在指定时间之前的请求会匹配该路由。
spring:
cloud:
gateway:
routes:
- id: before_route
uri: ${service-url.user-service}
predicates:
- Before=2019-09-24T16:30:00+08:00[Asia/Shanghai]
Between Route Predicate
在指定时间区间内的请求会匹配该路由。
spring:
cloud:
gateway:
routes:
- id: before_route
uri: ${service-url.user-service}
predicates:
- Between=2019-09-24T16:30:00+08:00[Asia/Shanghai], 2019-09-25T16:30:00+08:00[Asia/Shanghai]
带有指定Cookie的请求会匹配该路由。
spring:
cloud:
gateway:
routes:
- id: cookie_route
uri: ${service-url.user-service}
predicates:
- Cookie=username,macro
带有指定请求头的请求会匹配该路由。
spring:
cloud:
gateway:
routes:
- id: header_route
uri: ${service-url.user-service}
predicates:
- Header=X-Request-Id, \d+
带有指定Host的请求会匹配该路由。
spring:
cloud:
gateway:
routes:
- id: host_route
uri: https://example.org
predicates:
- Host=**.qt.com
发送指定方法的请求会匹配该路由。
spring:
cloud:
gateway:
routes:
- id: method_route
uri: ${service-url.user-service}
predicates:
- Method=GET
发送指定路径的请求会匹配该路由。
spring:
cloud:
gateway:
routes:
- id: path_route
uri: ${service-url.user-service}/user/{id}
predicates:
- Path=/user/{id}
带指定查询参数的请求可以匹配该路由。
spring:
cloud:
gateway:
routes:
- id: query_route
uri: ${service-url.user-service}/user/getByUsername
predicates:
- Query=username
从指定远程地址发起的请求可以匹配该路由。
spring:
cloud:
gateway:
routes:
- id: remoteaddr_route
uri: ${service-url.user-service}
predicates:
- RemoteAddr=192.168.1.1/24
根据Gateway工作原理,我们知道Gateway实际是由路由匹配到的一系列Filter过滤链来处理请求的,Spring Cloud Gateway包括许多内置的GatewayFilter工厂。具体详情参考官网:
https://docs.spring.io/spring-cloud-gateway/docs/2.2.5.RELEASE/reference/html/#gatewayfilter-factories
当请求与路由匹配时,过滤Web处理程序会将的所有实例GlobalFilter和所有特定GatewayFilter于路由的实例添加到过滤器链中。该组合的过滤器链按org.springframework.core.Ordered接口排序,可以通过实现该getOrder()方法进行设置。
Spring Cloud Gateway区分了执行过滤器逻辑的“前”和“后”阶段,因此优先级最高的过滤器是“前”阶段的第一个,而“后”阶段的最后一个是优先级最低的一个。
例如,下面程序配置了一个过滤器链:
@Bean public GlobalFilter customFilter() { return new CustomGlobalFilter(); } public class CustomGlobalFilter implements GlobalFilter, Ordered { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("custom global filter"); return chain.filter(exchange); } @Override public int getOrder() { return -1; } }
Gateway会根据注册中心注册的服务列表,以服务名为路径创建动态路由。这里主要使用Nacos作为注册中心和配置中心
引入依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
启用DiscoveryClient网关集成
# spring.cloud.gateway.discovery.locator.enabled=true
spring:
cloud:
gateway:
discovery:
locator:
enabled: true #开启从注册中心动态创建路由的功能
lower-case-service-id: true #使用小写服务名,默认是大写
使用网关访问服务:
C:\Users\liangbodlz\.ssh>curl 192.168.132.49:1500/nacos-provider/index
Hello!
在实际生产环境中,我们往往不会通过服务的application-name来访问服务,而是通过某个固定的url path来访问,比如xx.xxx/user/login,来访问用户服务的接口
通过Spring Cloud Gateway 内置 Path Route Predicate Factory
可以实现该目标:
spring:
cloud:
gateway:
discovery:
locator:
enabled: true #开启从注册中心动态创建路由的功能
lower-case-service-id: true #使用小写服务名,默认是大写
routes:
- id: nacos-provider
uri: lb://nacos-provider
predicates:
- Path=/nprovider/**
filters:
- StripPrefix=1
使用指定path访问服务
C:\Users\liangbodlz\.ssh>curl 192.168.132.49:1500/nprovider/index
Hello!
C:\Users\liangbodlz\.ssh>
通常我们将微服务的Route Predicate Path和Gateway应用本身的配置放在一起,但是随着微服务的扩展,Route Predicate Path会逐渐增加导致Gateway 服务配置会变得臃肿,且Route Predicate Path配置会随着服务的增减进行变更,而更新的路由配置生效需要重启Gateway,这都是实际线上环境不可忍受的。因此独立管理Route Predicate Path配置且支持动态刷新配置变得必要起来。
基于上述需求,我们可以考虑将Gateway 路由配置存储到内存或者其他介质中。
从源码分析中可以知道Gateway路由配置信息由RouteDefinitionLocator 接口完成。
RouteDefinitionLocator 是Gateway路由配置读取的顶级接口,提供从缓存、配置文件、服务注册中心、组合等不同方式读取配置,以及提供RouteDefinitionRepository 接口方式对RouteDefinition进行增、删、查操作。要自定义路由配置实现可以考虑从上述接口着手实现。
这里主要基于Nacos配置中心+RouteDefinitionRepository 自定义路由配置加载,并参考,CachingRouteLocator实现路由配置的动态刷新
核心源码清单
//自定义路由配置加载核心接口 public interface RouteDefinitionRepository extends RouteDefinitionLocator, RouteDefinitionWriter { } //查询路由 public interface RouteDefinitionLocator { //返回自定义路由配置加载 Flux<RouteDefinition> getRouteDefinitions(); } //路由增、删 public interface RouteDefinitionWriter { Mono<Void> save(Mono<RouteDefinition> route); Mono<Void> delete(Mono<String> routeId); } //动态路由刷新实现 public class CachingRouteLocator implements Ordered, RouteLocator, ApplicationListener<RefreshRoutesEvent>, ApplicationEventPublisherAware { ....//省略 private ApplicationEventPublisher applicationEventPublisher; .....//省略 public void onApplicationEvent(RefreshRoutesEvent event) { try { this.fetch().collect(Collectors.toList()).subscribe((list) -> { Flux.fromIterable(list).materialize().collect(Collectors.toList()).subscribe((signals) -> { this.applicationEventPublisher.publishEvent(new RefreshRoutesResultEvent(this)); this.cache.put("routes", signals); }, (throwable) -> { this.handleRefreshError(throwable); }); }); } catch (Throwable var3) { this.handleRefreshError(var3); } } private void handleRefreshError(Throwable throwable) { if (log.isErrorEnabled()) { log.error("Refresh routes error !!!", throwable); } this.applicationEventPublisher.publishEvent(new RefreshRoutesResultEvent(this, throwable)); }
代码实现
//实现RouteDefinitionRepository接口 package com.easy.mall.route; import com.alibaba.fastjson.JSONObject; import com.alibaba.nacos.api.exception.NacosException; import com.easy.mall.config.GatewayConfig; import com.easy.mall.operation.NacosConfigOperation; import com.easy.mall.operation.NacosSubscribeCallback; import com.google.common.collect.Lists; import java.util.List; import java.util.Optional; import javax.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.cloud.gateway.event.RefreshRoutesEvent; import org.springframework.cloud.gateway.route.RouteDefinition; import org.springframework.cloud.gateway.route.RouteDefinitionRepository; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** * @description: 基于Nacos配置中心实现Gateway 动态路由配置 * @author: liangbo * @create 2020-12-15 19:29 * @Version 1.0 **/ @Slf4j @DependsOn(value= {"gatewayConfig","nacosAutoConfiguration"}) @Configuration @ConditionalOnProperty(prefix = "global.gateway.dynamicRoute", name = "enabled", havingValue = "true") public class NacosDynamicRouteDefinitionRepository implements RouteDefinitionRepository { @Autowired private NacosConfigOperation nacosConfigOperation; @Autowired private ApplicationEventPublisher publisher; @Override public Flux<RouteDefinition> getRouteDefinitions() { //从Nacos配置中心读取路由配置 try { String dynamicRouteStr = nacosConfigOperation.getConfig(GatewayConfig.NACOS_ROUTE_GROUP, GatewayConfig.NACOS_ROUTE_DATA_ID); log.info("init dynamicRoute success.:{}", dynamicRouteStr); List<RouteDefinition> routeDefinitions = Optional.ofNullable(dynamicRouteStr) .map(str -> JSONObject.parseArray(str, RouteDefinition.class)) .orElse(Lists.newArrayList()); return Flux.fromIterable(routeDefinitions); } catch (NacosException e) { log.error("load gateway dynamicRoute config error:{}", e); } return Flux.fromIterable(Lists.newArrayList()); } @Override public Mono<Void> save(Mono<RouteDefinition> route) { return null; } @Override public Mono<Void> delete(Mono<String> routeId) { return null; } /** * 侦听nacos config 实时刷新路由配置 */ @PostConstruct public void subscribeConfigRefresh() { try { nacosConfigOperation.subscribeConfig(GatewayConfig.NACOS_ROUTE_GROUP, GatewayConfig.NACOS_ROUTE_DATA_ID, null, new NacosSubscribeCallback () { @Override public void callback(String config) { publisher.publishEvent(new RefreshRoutesEvent(this)); } }); } catch (NacosException e) { log.error("nacos-addListener-error", e); } } }
动态路由配置清单
[ { "id": "easy-mall-auth", "predicates": [{ "name": "Path", "args": { "pattern": "/emallauth/**" } }], "uri": "lb://easy-mall-auth", "filters": [{ "name": "StripPrefix", "args": { "parts": "1" } }] } ]
实现思路见Nacos安装及Spring Cloud 集成 3.4
@Configuration
@AutoConfigureBefore(RibbonClientConfiguration.class)
//通过注解@RibbonClient声明附加配置,此处声明的配置会覆盖配置文件中的配置
@RibbonClients(defaultConfiguration = { GatewayStrategyLoadBalanceConfiguration.class })
@ConditionalOnProperty(value = StrategyConstant.SPRING_APPLICATION_STRATEGY_CONTROL_ENABLED, matchIfMissing = true)
public class GatewayStrategyAutoConfiguration {
//省略......
通过入口类,加载自定义全局过滤器、Ribbon自定义负载均衡配置、元数据处理适配器等。
自定义Ribbon 负载均衡实现
自定义Ribbon 负载均衡实现分别对PredicateBasedRule和ZoneAvoidanceRule进行了扩展
//通过注解@RibbonClient声明附加配置,此处声明的配置会覆盖配置文件中的配置 @RibbonClients(defaultConfiguration = { GatewayStrategyLoadBalanceConfiguration.class }) @Bean public IRule ribbonRule(IClientConfig config) { if (this.propertiesFactory.isSet(IRule.class, serviceId)) { return this.propertiesFactory.get(IRule.class, config, serviceId); } //开启和关闭Ribbon默认的ZoneAvoidanceRule负载均衡策略。一旦关闭,则使用RoundRobin简单轮询负载均衡策略。缺失则默认为true boolean zoneAvoidanceRuleEnabled = environment.getProperty(StrategyConstant.SPRING_APPLICATION_STRATEGY_ZONE_AVOIDANCE_RULE_ENABLED, Boolean.class, Boolean.TRUE); if (zoneAvoidanceRuleEnabled) { DiscoveryEnabledZoneAvoidanceRule discoveryEnabledRule = new DiscoveryEnabledZoneAvoidanceRule(); discoveryEnabledRule.initWithNiwsConfig(config); DiscoveryEnabledZoneAvoidancePredicate discoveryEnabledPredicate = discoveryEnabledRule.getDiscoveryEnabledPredicate(); discoveryEnabledPredicate.setPluginAdapter(pluginAdapter); discoveryEnabledPredicate.setDiscoveryEnabledAdapter(discoveryEnabledAdapter); return discoveryEnabledRule; } else { DiscoveryEnabledBaseRule discoveryEnabledRule = new DiscoveryEnabledBaseRule(); DiscoveryEnabledBasePredicate discoveryEnabledPredicate = discoveryEnabledRule.getDiscoveryEnabledPredicate(); discoveryEnabledPredicate.setPluginAdapter(pluginAdapter); discoveryEnabledPredicate.setDiscoveryEnabledAdapter(discoveryEnabledAdapter); return discoveryEnabledRule; } }
DiscoveryEnabledZoneAvoidanceRule:
DiscoveryEnabledBaseRule
自定义全局过滤器 实现将网关路由配置以及Http Header加载到请求ServerWebExchange中
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // 把ServerWebExchange放入ThreadLocal中 GatewayStrategyContext.getCurrentContext().setExchange(exchange); // 通过过滤器设置路由Header头部信息,并全链路传递到服务端 ServerHttpRequest.Builder requestBuilder = exchange.getRequest().mutate(); if (gatewayCoreHeaderTransmissionEnabled) { // 内置Header预先塞入 Map<String, String> headerMap = strategyWrapper.getHeaderMap(); if (MapUtils.isNotEmpty(headerMap)) { for (Map.Entry<String, String> entry : headerMap.entrySet()) { String key = entry.getKey(); String value = entry.getValue(); GatewayStrategyFilterResolver.setHeader(requestBuilder, key, value, gatewayHeaderPriority); } } //获取网关配置的路由规则 String routeVersion = getRouteVersion(); String routeVersionWeight = getRouteVersionWeight(); String routeIdBlacklist = getRouteIdBlacklist(); String routeAddressBlacklist = getRouteAddressBlacklist(); if (StringUtils.isNotEmpty(routeVersion)) { GatewayStrategyFilterResolver.setHeader(requestBuilder, DiscoveryConstant.N_D_VERSION, routeVersion, gatewayHeaderPriority); } else { GatewayStrategyFilterResolver.ignoreHeader(requestBuilder, DiscoveryConstant.N_D_VERSION, gatewayHeaderPriority, gatewayOriginalHeaderIgnored); } if (StringUtils.isNotEmpty(routeVersionWeight)) { GatewayStrategyFilterResolver.setHeader(requestBuilder, DiscoveryConstant.N_D_VERSION_WEIGHT, routeVersionWeight, gatewayHeaderPriority); } else { GatewayStrategyFilterResolver.ignoreHeader(requestBuilder, DiscoveryConstant.N_D_VERSION_WEIGHT, gatewayHeaderPriority, gatewayOriginalHeaderIgnored); } if (StringUtils.isNotEmpty(routeIdBlacklist)) { GatewayStrategyFilterResolver.setHeader(requestBuilder, DiscoveryConstant.N_D_ID_BLACKLIST, routeIdBlacklist, gatewayHeaderPriority); } else { GatewayStrategyFilterResolver.ignoreHeader(requestBuilder, DiscoveryConstant.N_D_ID_BLACKLIST, gatewayHeaderPriority, gatewayOriginalHeaderIgnored); } if (StringUtils.isNotEmpty(routeAddressBlacklist)) { GatewayStrategyFilterResolver.setHeader(requestBuilder, DiscoveryConstant.N_D_ADDRESS_BLACKLIST, routeAddressBlacklist, gatewayHeaderPriority); } else { GatewayStrategyFilterResolver.ignoreHeader(requestBuilder, DiscoveryConstant.N_D_ADDRESS_BLACKLIST, gatewayHeaderPriority, gatewayOriginalHeaderIgnored); } } else { GatewayStrategyFilterResolver.ignoreHeader(requestBuilder, DiscoveryConstant.N_D_VERSION); GatewayStrategyFilterResolver.ignoreHeader(requestBuilder, DiscoveryConstant.N_D_VERSION_WEIGHT); GatewayStrategyFilterResolver.ignoreHeader(requestBuilder, DiscoveryConstant.N_D_ID_BLACKLIST); GatewayStrategyFilterResolver.ignoreHeader(requestBuilder, DiscoveryConstant.N_D_ADDRESS_BLACKLIST); } // 对于服务A -> 网关 -> 服务B调用链 // 域网关下(zuulHeaderPriority=true),只传递网关自身的group,不传递服务A的group,起到基于组的网关端服务调用隔离 // 非域网关下(zuulHeaderPriority=false),优先传递服务A的group,基于组的网关端服务调用隔离不生效,但可以实现基于相关参数的熔断限流等功能 GatewayStrategyFilterResolver.setHeader(requestBuilder, DiscoveryConstant.N_D_SERVICE_GROUP, pluginAdapter.getGroup(), gatewayHeaderPriority); // 网关只负责传递服务A的相关参数(例如:serviceId),不传递自身的参数,实现基于相关参数的熔断限流等功能 GatewayStrategyFilterResolver.setHeader(requestBuilder, DiscoveryConstant.N_D_SERVICE_TYPE, pluginAdapter.getServiceType(), false); String serviceAppId = pluginAdapter.getServiceAppId(); if (StringUtils.isNotEmpty(serviceAppId)) { GatewayStrategyFilterResolver.setHeader(requestBuilder, DiscoveryConstant.N_D_SERVICE_APP_ID, serviceAppId, false); } GatewayStrategyFilterResolver.setHeader(requestBuilder, DiscoveryConstant.N_D_SERVICE_ID, pluginAdapter.getServiceId(), false); GatewayStrategyFilterResolver.setHeader(requestBuilder, DiscoveryConstant.N_D_SERVICE_VERSION, pluginAdapter.getVersion(), false); GatewayStrategyFilterResolver.setHeader(requestBuilder, DiscoveryConstant.N_D_SERVICE_ENVIRONMENT, pluginAdapter.getEnvironment(), false); ServerHttpRequest newRequest = requestBuilder.build(); ServerWebExchange newExchange = exchange.mutate().request(newRequest).build(); ServerWebExchange extensionExchange = extendFilter(newExchange, chain); ServerWebExchange finalExchange = extensionExchange != null ? extensionExchange : newExchange; // 把新的ServerWebExchange放入ThreadLocal中 GatewayStrategyContext.getCurrentContext().setExchange(newExchange); String path = finalExchange.getRequest().getPath().toString(); if (path.contains(DiscoveryConstant.INSPECTOR_ENDPOINT_URL)) { GatewayStrategyFilterResolver.setHeader(requestBuilder, DiscoveryConstant.INSPECTOR_ENDPOINT_HEADER, pluginAdapter.getPluginInfo(null), true); } return chain.filter(finalExchange); }
自定义DefaultDiscoveryEnabledAdapter封装实例过滤规则
Ribbon负载均衡器执行默认过滤后会执行该规则
protected boolean apply(Server server) { if (discoveryEnabledAdapter == null) { return true; } return discoveryEnabledAdapter.apply(server); } //自定义过滤规则 @Override public boolean apply(Server server) { boolean enabled = applyEnvironment(server); if (!enabled) { return false; } enabled = applyVersion(server); if (!enabled) { return false; } enabled = applyIdBlacklist(server); if (!enabled) { return false; } enabled = applyAddressBlacklist(server); if (!enabled) { return false; } return applyStrategy(server); }
基于nacos配置实现网关策略动态发布,根据网关元数据组以及serviceId创建路由策略配置:
配置通过网关的请求都走版本xx
<?xml version="1.0" encoding="UTF-8"?>
<rule>
<strategy>
<version>1.0</version>
</strategy>
</rule>
step1 启动网关以及2个服务实例
mvn spring-boot:run -Dspring-boot.run.arguments="--server.port=1100 --spring.cloud.nacos.discovery.metadata.version=1.0"
mvn spring-boot:run -Dspring-boot.run.arguments="--server.port=1101 --spring.cloud.nacos.discovery.metadata.version=1.1"
step2 通过网关调用服务,可以验证到请求始终访问到version为1.0 的服务实例
192.168.132.49:1500/nacos-provider/index
配置网关路由权重
<?xml version="1.0" encoding="UTF-8"?>
<rule>
<strategy>
<version>1.0;1.1</version>
<version-weight>1.0=90;1.1=10</version-weight>
</strategy>
</rule>
灰度策略信息基于Nacos Client以及异步事件处理,动态更新,无需重启网关。
通过网关访问多次服务,请求基本按照9:1的比例命中服务。
配置IP地址和端口屏蔽策略,实现服务流量无损策略下线
服务下线场景中,由于Ribbon负载均衡组件存在着缓存机制,当被调用的服务实例已经下线,而调用的服务实例还暂时缓存着它,直到下个心跳周期才会把已下线的服务实例剔除,在此期间,会造成流量有损
框架提供流量的实时性的绝对无损。采用下线之前,把服务实例添加到屏蔽名单中,负载均衡不会去寻址该服务实例。
代码清单:
//省略 enabled = applyIdBlacklist(server); if (!enabled) { return false; } //省略 //过滤黑名单IP,框架会将黑名单中IP从Ribbon负载实例中移除 public boolean applyIdBlacklist(Server server) { String ids = pluginContextHolder.getContextRouteIdBlacklist(); if (StringUtils.isEmpty(ids)) { return true; } String serviceUUId = pluginAdapter.getServerServiceUUId(server); List<String> idList = StringUtil.splitToList(ids, DiscoveryConstant.SEPARATE); if (idList.contains(serviceUUId)) { return false; } return true; }
配置Ip黑名单
<?xml version="1.0" encoding="UTF-8"?>
<rule>
//此处省略
<strategy-blacklist>
<!-- 单个Address形式。如果多个用“;”分隔,不允许出现空格 -->
<address value="192.168.132.49:1100"/>
</strategy-blacklist>
</rule>
发布配置后,访问服务可以发现请求屏蔽了端口为1100的服务,确保1100服务下线,请求不会命中到1100服务。
源码地址
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。