赞
踩
场景:公司构建paas层(能力层),包括:表单服务、im服务、文件服务等;
目标:开发能力给第三方企业使用,同时允许第三方企业已供应商的身份将自己的服务融入到paas层中;
实现:
平台层面:
用户服务做鉴权;
网关服务做控制;
管理服务做配置;
云市场做购买;
第三方供应商A申请文件服务入驻到开放平台,提供服务域名(https://www.wenjian.com),并实现购买通知相应逻辑(支付成功之后平台会使用域名+固定url同步消息到第三方供应商),应用开通逻辑(开通应用的时候,会推送相应的消息给供应商);
公司运维人员登录开放平台管理后台,创建开发者账号A,生成相应的appid、appsecret,提供给第三方供应商A;创建文件服务应用(供应商类),配置路由规则(/third/party/wj/**->https://www.wenjian.com/**),并将账号信息推送至开放平台云市场;
企业B想使用文件服务能力,访问开放平台云市场,搜索文件服务,找到两个商品(平台自研的,第三方供应商A提供的),首先注册账号(企业、个人),并进行认证,将企业B信息同步至开放平台审核列表中;
公司运营人员登录开放平台运营后台,打开审核列表,审核企业B,通过之后,自动创建一个开发者账号B;
于此同时企业B购买了文件服务商品(第三方文件服务能力),支付成功之后将消息同步至开放平台;企业B点击文件服务,进入到文件服务管理(商品管理)中,创建应用,获取到平台的appid、appsecret,同时平台将此消息同步至第三方供应商A;
企业B使用appid、appsecret秘钥经过认证之后,调取文件服务相应接口。接口请求到达网关的时候,网关根据第三方应用配置,进行并发控制,加密验签设置,将请求转发至相应的域名,并在header中写入平台相应的认证信息;
流程图
基于oauth2.0协议,网关层引入spring-security-oauth2进行控制;
自定义SecurityWebFilterChain,重写各种类与接口,配置各种过滤器...
- @Configuration
- public class ResourceServerConfiguration {
-
- private static final String MAX_AGE = "18000L";
-
- @Autowired
- private RedisConnectionFactory redisConnectionFactory;
- @Autowired
- private ResourceLocator apiresourceLocator;
- @Autowired
- private ApiProperties apiProperties;
- @Autowired
- private AccessLogService accessLogService;
- @Autowired
- private BaseAppServiceClient baseAppServiceClient;
-
- /**
- * 跨域配置
- *
- * @return
- */
- public WebFilter corsFilter() {
- return (ServerWebExchange ctx, WebFilterChain chain) -> {
- ServerHttpRequest request = ctx.getRequest();
- if (CorsUtils.isCorsRequest(request)) {
- HttpHeaders requestHeaders = request.getHeaders();
- ServerHttpResponse response = ctx.getResponse();
- HttpMethod requestMethod = requestHeaders.getAccessControlRequestMethod();
- HttpHeaders headers = response.getHeaders();
- headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN, requestHeaders.getOrigin());
- headers.addAll(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS, requestHeaders.getAccessControlRequestHeaders());
- if (requestMethod != null) {
- headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS, requestMethod.name());
- }
- headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true");
- headers.add(HttpHeaders.ACCESS_CONTROL_EXPOSE_HEADERS, "*");
- headers.add(HttpHeaders.ACCESS_CONTROL_MAX_AGE, MAX_AGE);
- if (request.getMethod() == HttpMethod.OPTIONS) {
- response.setStatusCode(HttpStatus.OK);
- return Mono.empty();
- }
- }
- return chain.filter(ctx);
- };
- }
-
- @Bean
- SecurityWebFilterChain springWebFilterChain(ServerHttpSecurity http) throws Exception {
- // 自定义oauth2 认证, 使用redis读取token,而非jwt方式
- // 配置应用程序请求身份验证时要执行的操作
- // 要使用的入口点
- // JsonAuthenticationEntryPoint implements ServerAuthenticationEntryPoint(重写commence方法)
- JsonAuthenticationEntryPoint entryPoint = new JsonAuthenticationEntryPoint(accessLogService);
- // 配置当经过身份验证的用户未持有所需权限时要执行的操作
- // 要使用的拒绝访问处理程序
- // JsonAccessDeniedHandler implements ServerAccessDeniedHandler(重写handle方法)
- JsonAccessDeniedHandler accessDeniedHandler = new JsonAccessDeniedHandler(accessLogService);
- // 自定义授权校验,在此可实现权限控制
- // AccessManager implements ReactiveAuthorizationManager<AuthorizationContext>(Authorization,授权管理,重写check方法)
- AccessManager accessManager = new AccessManager(apiresourceLocator, apiProperties);
- // 自定义认证
- // RedisAuthenticationManager implements ReactiveAuthenticationManager(Authentication,认证管理,重写authenticate方法)
- AuthenticationWebFilter oauth2 = new AuthenticationWebFilter(new RedisAuthenticationManager(new RedisTokenStore(redisConnectionFactory)));
- oauth2.setServerAuthenticationConverter(new ServerBearerTokenAuthenticationConverter());
- oauth2.setAuthenticationFailureHandler(new ServerAuthenticationEntryPointFailureHandler(entryPoint));
- oauth2.setAuthenticationSuccessHandler(new ServerAuthenticationSuccessHandler() {
- @Override
- public Mono<Void> onAuthenticationSuccess(WebFilterExchange webFilterExchange, Authentication authentication) {
- ServerWebExchange exchange = webFilterExchange.getExchange();
- SecurityContextServerWebExchange securityContextServerWebExchange = new SecurityContextServerWebExchange(exchange, ReactiveSecurityContextHolder.getContext().subscriberContext(
- ReactiveSecurityContextHolder.withAuthentication(authentication)
- ));
- return webFilterExchange.getChain().filter(securityContextServerWebExchange);
- }
- });
- http
- .httpBasic().disable()
- .csrf().disable()
- .authorizeExchange()
- .pathMatchers("/").permitAll()
- // 动态权限验证
- .anyExchange().access(accessManager)
- .and().exceptionHandling()
- .accessDeniedHandler(accessDeniedHandler)
- .authenticationEntryPoint(entryPoint).and()
- // 日志前置过滤器
- .addFilterAt(new PreRequestFilter(), SecurityWebFiltersOrder.FIRST)
- // 跨域过滤器
- .addFilterAt(corsFilter(), SecurityWebFiltersOrder.CORS)
- // 签名验证过滤器
- .addFilterAt(new PreSignatureFilter(baseAppServiceClient, apiProperties, new JsonSignatureDeniedHandler(accessLogService)), SecurityWebFiltersOrder.CSRF)
- // 访问验证前置过滤器
- .addFilterAt(new PreCheckFilter(accessManager, accessDeniedHandler), SecurityWebFiltersOrder.CSRF)
- // oauth2认证过滤器
- .addFilterAt(oauth2, SecurityWebFiltersOrder.AUTHENTICATION)
- // 日志过滤器
- .addFilterAt(new AccessLogFilter(accessLogService), SecurityWebFiltersOrder.SECURITY_CONTEXT_SERVER_WEB_EXCHANGE);
- return http.build();
- }
- }
扩展:oauth2.0
业务场景:
oauth2的四种模式
授权码模式示例:
InMemoryRouteDefinitionRepository该接口继承了RouteDefinitionWriter,RouteDefinitionWriter中定义了save、delete方法,通过方法名称可以知道是用来保存/添加/删除路由信息;使用 InMemoryRouteDefinitionRepository 来维护 RouteDefinition 信息,在网关实例重启或者崩溃后,RouteDefinition 就会丢失。 实现方案: 1.我们可以实现 RouteDefinitionRepository 接口,以实现例如 MySQLRouteDefinitionRepository; 2.或者基于InMemoryRouteDefinitionRepository 结合MySQL实现路由信息的数据库存储; 本文采用方案2实现,InMemoryRouteDefinitionRepository+MySQL+ApplicationEvent:
定义一个监听事件RefreshRouteEvent:
- /**
- * 自定义网关刷新事件
- */
- public class RefreshRouteEvent extends RemoteApplicationEvent {
-
- private RefreshRouteEvent() {
- }
-
- public RefreshRouteEvent(Object source, String originService, String destinationService) {
- super(source, originService, destinationService);
- }
- }
定义一个监听RefreshRouteEventListener:
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.cloud.gateway.event.RefreshRoutesEvent;
- import org.springframework.cloud.gateway.filter.FilterDefinition;
- import org.springframework.cloud.gateway.handler.predicate.PredicateDefinition;
- import org.springframework.cloud.gateway.route.InMemoryRouteDefinitionRepository;
- import org.springframework.cloud.gateway.route.RouteDefinition;
- import org.springframework.cloud.gateway.support.NameUtils;
- import org.springframework.context.ApplicationEventPublisher;
- import org.springframework.context.ApplicationEventPublisherAware;
- import org.springframework.context.ApplicationListener;
- import org.springframework.jdbc.core.JdbcTemplate;
- import org.springframework.jdbc.core.RowMapper;
- import org.springframework.web.util.UriComponentsBuilder;
- import reactor.core.publisher.Mono;
-
- import java.net.URI;
- import java.sql.ResultSet;
- import java.sql.SQLException;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
-
- /**
- * 动态路由监听器
- */
- @Slf4j
- public class RefreshRouteEventListener implements ApplicationListener<RefreshRouteEvent>, ApplicationEventPublisherAware {
- private JdbcTemplate jdbcTemplate;
- private ApplicationEventPublisher publisher;
- private InMemoryRouteDefinitionRepository repository;
-
- //查询路由的sql
- private final static String SELECT_ROUTES = "SELECT * FROM gateway_route WHERE status = 1";
-
- //初始化监听器
- public RefreshRouteEventListener(JdbcTemplate jdbcTemplate, InMemoryRouteDefinitionRepository repository) {
- this.jdbcTemplate = jdbcTemplate;
- this.repository = repository;
- }
-
- /**
- * 刷新路由
- *
- * @return
- */
- public Mono<Void> refresh() {
- //从数据库中加载路由到InMemoryRouteDefinitionRepository中
- this.loadRoutes();
- //触发默认路由刷新事件,刷新缓存路由
- this.publisher.publishEvent(new RefreshRoutesEvent(this));
- return Mono.empty();
- }
-
- /**
- * 监听器监听到之间之后执行的业务
- *
- * @param event
- */
- @Override
- public void onApplicationEvent(RefreshRouteEvent event) {
- refresh();
- }
-
- /**
- * 加载路由
- * @return
- */
- private Mono<Void> loadRoutes() {
- //从数据库拿到路由配置
- try {
- //查询数据库获得路由列表
- List<GatewayRoute> routeList = jdbcTemplate.query(SELECT_ROUTES, new RowMapper<GatewayRoute>() {
- @Override
- public GatewayRoute mapRow(ResultSet rs, int i) throws SQLException {
- GatewayRoute result = new GatewayRoute();
- result.setRouteId(rs.getLong("route_id"));
- result.setPath(rs.getString("path"));
- result.setServiceId(rs.getString("service_id"));
- result.setUrl(rs.getString("url"));
- result.setStatus(rs.getInt("status"));
- result.setRetryable(rs.getInt("retryable"));
- result.setStripPrefix(rs.getInt("strip_prefix"));
- result.setIsPersist(rs.getInt("is_persist"));
- result.setRouteName(rs.getString("route_name"));
- return result;
- }
- });
- if (routeList != null) {
- //加载路由
- routeList.forEach(gatewayRoute -> {
- RouteDefinition definition = new RouteDefinition();
- List<PredicateDefinition> predicates = Lists.newArrayList();
- List<FilterDefinition> filters = Lists.newArrayList();
- definition.setId(gatewayRoute.getRouteName());
- //路由地址
- PredicateDefinition predicatePath = new PredicateDefinition();
- Map<String, String> predicatePathParams = new HashMap<>(8);
- predicatePath.setName("Path");
- //predicates.name
- predicatePathParams.put("name", StringUtils.isBlank(gatewayRoute.getRouteName()) ? gatewayRoute.getRouteId().toString() : gatewayRoute.getRouteName());
- //predicates.args.pattern
- predicatePathParams.put("pattern", gatewayRoute.getPath());
- //predicates.args.pathPattern
- predicatePathParams.put("pathPattern", gatewayRoute.getPath());
- //predicates.args
- predicatePath.setArgs(predicatePathParams);
- predicates.add(predicatePath);
- //服务地址,url:完整地址,serviceId:服务ID;
- //配置完整地址的使用完整地址,否则使用服务ID;
- URI uri = UriComponentsBuilder.fromUriString(StringUtils.isNotBlank(gatewayRoute.getUrl()) ? gatewayRoute.getUrl() : "lb://" + gatewayRoute.getServiceId()).build().toUri();
- FilterDefinition stripPrefixDefinition = new FilterDefinition();
- Map<String, String> stripPrefixParams = new HashMap<>(8);
- //StripPrefix参数表示在将请求发送到下游之前从请求中剥离的路径个数
- stripPrefixDefinition.setName("StripPrefix");
- stripPrefixParams.put(NameUtils.GENERATED_NAME_PREFIX + "0", "1");
- stripPrefixDefinition.setArgs(stripPrefixParams);
- filters.add(stripPrefixDefinition);
- //yml配置中的predicates
- definition.setPredicates(predicates);
- //yml配置中的filters
- definition.setFilters(filters);
- definition.setUri(uri);
- this.repository.save(Mono.just(definition)).subscribe();
- });
- }
- log.info("=============加载动态路由:{}==============", routeList.size());
- } catch (Exception e) {
- log.error("加载动态路由错误:{}", e);
- }
- return Mono.empty();
- }
-
- @Override
- public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
- this.publisher = applicationEventPublisher;
- }
- }
- import com.baomidou.mybatisplus.annotation.IdType;
- import com.baomidou.mybatisplus.annotation.TableId;
- import com.baomidou.mybatisplus.annotation.TableName;
- import lombok.Data;
- import lombok.EqualsAndHashCode;
- import lombok.NoArgsConstructor;
-
- /**
- * 网关动态路由
- */
- @Data
- @EqualsAndHashCode(callSuper = true)
- @NoArgsConstructor
- @TableName("gateway_route")
- public class GatewayRoute extends AbstractEntity {
- private static final long serialVersionUID = -2952097064941740301L;
-
- /**
- * 路由ID
- */
- @TableId(type = IdType.ID_WORKER)
- private Long routeId;
-
- /**
- * 路由名称
- */
- private String routeName;
-
- /**
- * 路径
- */
- private String path;
-
- /**
- * 服务ID
- */
- private String serviceId;
-
- /**
- * 完整地址
- */
- private String url;
-
- /**
- * 忽略前缀
- */
- private Integer stripPrefix;
-
- /**
- * 0-不重试 1-重试
- */
- private Integer retryable;
-
- /**
- * 状态:0-无效 1-有效
- */
- private Integer status;
-
- /**
- * 保留数据0-否 1-是 不允许删除
- */
- private Integer isPersist;
-
- /**
- * 路由说明
- */
- private String routeDesc;
- }
动态刷新1:
- import org.springframework.boot.actuate.endpoint.web.annotation.RestControllerEndpoint;
- import org.springframework.cloud.bus.endpoint.AbstractBusEndpoint;
- import org.springframework.context.ApplicationEventPublisher;
- import org.springframework.web.bind.annotation.PostMapping;
- import org.springframework.web.bind.annotation.RequestParam;
-
- /**
- * 自定义网关监控端点
- */
- @RestControllerEndpoint(
- id = "open"
- )
- public class ApiEndpoint extends AbstractBusEndpoint {
-
- public ApiEndpoint(ApplicationEventPublisher context, String id) {
- super(context, id);
- }
-
- /**
- * 动态刷新
- */
- @PostMapping("/refresh")
- public ResultBody busRefreshWithDestination(@RequestParam(required = false) String destination) {
- this.publish(new RefreshRouteEvent(this, this.getInstanceId(), destination));
- return ResultBody.ok();
- }
- }
动态刷新2:
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.cloud.bus.BusProperties;
- import org.springframework.context.ApplicationEventPublisher;
- import org.springframework.web.client.RestTemplate;
-
- /**
- * 自定义请求工具类
- */
- @Slf4j
- public class OpenRestTemplate extends RestTemplate {
-
- private ApplicationEventPublisher publisher;
- private BusProperties busProperties;
-
- public OpenRestTemplate(BusProperties busProperties, ApplicationEventPublisher publisher) {
- this.publisher = publisher;
- this.busProperties = busProperties;
- }
-
-
- /**
- * 刷新网关
- */
- public void refreshGateway() {
- try {
- publisher.publishEvent(new RefreshRouteEvent(this, busProperties.getId(), null));
- log.info("refreshGateway:success");
- } catch (Exception e) {
- log.error("refreshGateway error:{}", e.getMessage());
- }
- }
- }
- import io.swagger.annotations.Api;
- import io.swagger.annotations.ApiOperation;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.PostMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- /**
- * 系统接口资源管理
- */
- @Api(tags = "系统接口资源管理")
- @RestController
- public class BaseApiController {
- @Autowired
- private OpenRestTemplate openRestTemplate;
-
- /**
- * 刷新网关
- */
- @ApiOperation(value = "刷新网关", notes = "刷新网关")
- @PostMapping("/refreshGateway")
- public ResultBody refreshGateway() {
- //刷新网关
- openRestTemplate.refreshGateway();
- return ResultBody.ok();
- }
- }
实现方案与动态路由一致,修改动态路由中的loadRoutes()方法(从数据库中获取限流规则,与路由定义绑定,满足每个路由都有自己的限流规则): 底层:基于RequestRateLimiter,spring cloud gateway 的 RequestRateLimiter 使用令牌桶算法来控制请求速率; 限流算法:令牌桶;系统会以一定的速度生成令牌,并将其放置到令牌桶中,可以将令牌桶想象成一个缓冲区(可以用队列这种数据结构来实现),当缓冲区填满的时候,新生成的令牌会被扔掉(除了要求能够限制数据的平均传输速率外,还允许某种程度的突发传输): 第一个是生成令牌的速度,一般称为 rate 。比如,我们设定 rate = 2 ,即每秒钟生成 2 个令牌,也就是每 1/2 秒生成一个令牌; 第二个是令牌桶的大小,一般称为 burst 。比如,我们设定 burst = 10 ,即令牌桶最大只能容纳 10 个令牌。 扩展:漏桶算法,漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。 replenishRate:你允许用户每秒执行多少请求,而不丢弃任何请求,这是令牌桶的填充速率; burstCapacity:允许用户在一秒钟内执行的最大请求数。这是令牌桶可以保存的令牌数,将此值设置为零将阻止所有请求;
- private Mono<Void> loadRoutes() {
- //从数据库拿到路由配置
- try {
- List<GatewayRoute> routeList = jdbcTemplate.query(SELECT_ROUTES, new RowMapper<GatewayRoute>() {
- @Override
- public GatewayRoute mapRow(ResultSet rs, int i) throws SQLException {
- GatewayRoute result = new GatewayRoute();
- result.setRouteId(rs.getLong("route_id"));
- result.setPath(rs.getString("path"));
- result.setServiceId(rs.getString("service_id"));
- result.setUrl(rs.getString("url"));
- result.setStatus(rs.getInt("status"));
- result.setRetryable(rs.getInt("retryable"));
- result.setStripPrefix(rs.getInt("strip_prefix"));
- result.setIsPersist(rs.getInt("is_persist"));
- result.setRouteName(rs.getString("route_name"));
- return result;
- }
- });
- List<RateLimitApi> limitApiList = jdbcTemplate.query(SELECT_LIMIT_PATH, new RowMapper<RateLimitApi>() {
- @Override
- public RateLimitApi mapRow(ResultSet rs, int i) throws SQLException {
- RateLimitApi result = new RateLimitApi();
- result.setPolicyId(rs.getLong("policy_id"));
- result.setPolicyName(rs.getString("policy_name"));
- result.setServiceId(rs.getString("service_id"));
- result.setPath(rs.getString("path"));
- result.setApiId(rs.getLong("api_id"));
- result.setApiCode(rs.getString("api_code"));
- result.setApiName(rs.getString("api_name"));
- result.setApiCategory(rs.getString("api_category"));
- result.setLimitQuota(rs.getLong("limit_quota"));
- result.setIntervalUnit(rs.getString("interval_unit"));
- result.setUrl(rs.getString("url"));
- return result;
- }
- });
- if (limitApiList != null) {
- // 加载限流
- limitApiList.forEach(item -> {
- long[] arry = ResourceLocator.getIntervalAndQuota(item.getIntervalUnit());
- Long refreshInterval = arry[0];
- Long quota = arry[1];
- // 允许用户每秒处理多少个请求
- long replenishRate = item.getLimitQuota() / refreshInterval;
- replenishRate = replenishRate < 1 ? 1 : refreshInterval;
- // 令牌桶的容量,允许在一秒钟内完成的最大请求数
- long burstCapacity = replenishRate * 2;
- RouteDefinition definition = new RouteDefinition();
- List<PredicateDefinition> predicates = Lists.newArrayList();
- List<FilterDefinition> filters = Lists.newArrayList();
- definition.setId(item.getApiId().toString());
- PredicateDefinition predicatePath = new PredicateDefinition();
- String fullPath = getFullPath(routeList, item.getServiceId(), item.getPath());
- Map<String, String> predicatePathParams = new HashMap<>(8);
- predicatePath.setName("Path");
- predicatePathParams.put("pattern", fullPath);
- predicatePathParams.put("pathPattern", fullPath);
- predicatePathParams.put("_rateLimit", "1");
- predicatePath.setArgs(predicatePathParams);
- predicates.add(predicatePath);
- // 服务地址
- URI uri = UriComponentsBuilder.fromUriString(StringUtils.isNotBlank(item.getUrl()) ? item.getUrl() : "lb://" + item.getServiceId()).build().toUri();
- // 路径去前缀
- FilterDefinition stripPrefixDefinition = new FilterDefinition();
- Map<String, String> stripPrefixParams = new HashMap<>(8);
- stripPrefixDefinition.setName("StripPrefix");
- stripPrefixParams.put(NameUtils.GENERATED_NAME_PREFIX + "0", "1");
- stripPrefixDefinition.setArgs(stripPrefixParams);
- filters.add(stripPrefixDefinition);
- // 限流
- FilterDefinition rateLimiterDefinition = new FilterDefinition();
- Map<String, String> rateLimiterParams = new HashMap<>(8);
- rateLimiterDefinition.setName("RequestRateLimiter");
- //令牌桶流速
- rateLimiterParams.put("redis-rate-limiter.replenishRate", String.valueOf(replenishRate));
- //令牌桶容量
- rateLimiterParams.put("redis-rate-limiter.burstCapacity", String.valueOf(burstCapacity));
- //限流策略(#{@BeanName})
- rateLimiterParams.put("key-resolver", "#{@pathKeyResolver}");
- rateLimiterDefinition.setArgs(rateLimiterParams);
- //限流策略与filters绑定
- filters.add(rateLimiterDefinition);
- //指定限流prodicates
- definition.setPredicates(predicates);
- //filters与路由定义绑定
- definition.setFilters(filters);
- definition.setUri(uri);
- this.repository.save(Mono.just(definition)).subscribe();
- });
- }
-
- log.info("=============加载动态路由:{}==============", routeList.size());
- log.info("=============加载动态限流:{}==============", limitApiList.size());
- } catch (Exception e) {
- log.error("加载动态路由错误:{}", e);
- }
- return Mono.empty();
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。