赞
踩
我们研究的源码主要有3类源码:
源码导入都比较简单,但维度Spring5.2.2导入是比较麻烦的,需要配Gradle而且IDEA和Gradle版本 要匹配,我这里选的环境是 IDEA2020.3 和Gradle5.4.6 ,这样的版本导入Spring源码就不费劲了。
1:Spring5.2.2源码
2:Spring-Cloud-Stream3.0.1源码
3:Spring-Cloud-Gateway2.2.1源码
4:Spring-Cloud-OpenFeign2.2.1源码
首先要下载SpringCloud-Gateway源码,下载地址https://github.com/spring-cloud/spring-cloud-gat eway。下载源码后,在 资料 目录中已经提供了对应版本的源码包,可以直接将源码放到工程对应的目 录下并导入:
1)导入
点击+号,再选择import module
项目此时和hailtaxi-parent放在了同一个目录下,如下图:
项目以maven模板导入。
2)安装
导入后,一定要记得安装到本地,安装到本地后,在 hailtaxi 中找源码包的时候会直接跳转到该工程 中,如下图:
SpringCloud-Stream下载地址:https://github.com/spring-cloud/spring-cloud-stream/tree/v3.0.1.RELEASE
SpringCloud-OpenFeign下载地址:https://github.com/spring-cloud/spring-cloud-openfeign/tree/v 2.2.1.RELEASE
关于SpringCloud-Stream和OpenFeign的源码导入是一样的操作,最主要记住放在同一个目录下,方 便管理和一同打开(不放在同一个目录也能一同打开,但管理不方便)。
通过前面的学习,我们知道SpringCloud Gateway是一个微服务网关,主要实现不同功能服务路由,关 于SpringCloud Gateway的实战使用我们就告一段落,我们接下来深入学习SpringCloud Gateway源 码。
前面我们已经学习过Gateway的工作流程,如上工作流程图,我们回顾一下工作流程:
1:所有都将由ReactorHttpHandlerAdapter.apply()方法拦截处理,此时会封装请求对象和响应对 象,并传递到HttpWebHandlerAdapter.handle()方法。
2:HttpWebHandlerAdapter.handle(),将request和response封装成上下文对象 ServerWebExchange,方法通过getDelegate()获取全局异常处理器 ExceptionHandlingWebHandler执行全局异常处理
3:ExceptionHandlingWebHandler执行完成后,调用DispatcherHandler.handle(),循环所有 handlerMappings查找处理当前请求的Handler
4:找到Handler后调用DispatcherHandler.invokeHandler()执行找到的Handler,此时会调用 FilteringWebHandler.handle()
5:DefaultGatewayFilterChain.filter()是关键流程,所有过滤器都会在这里执行,比如服务查找、 负载均衡、远程调用等,都在这一块。
我们首先来看一下Gateway拦截处理所有请求的方法handle():
/**** *处理所有请求 ****/ @Override public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) { if (this.forwardedHeaderTransformer != null) { request = this.forwardedHeaderTransformer.apply(request); } //创建网关上下文对象 ServerWebExchange exchange = createExchange(request, response); LogFormatUtils.traceDebug(logger, traceOn -> exchange.getLogPrefix() + formatRequest(exchange.getRequest()) + (traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : "")); //getDelegate()获取当前的Handler return getDelegate().handle(exchange) } .doOnSuccess(aVoid -> logResponse(exchange)) .onErrorResume(ex -> handleUnresolvedError(exchange, ex)) .then(Mono.defer(response::setComplete));
上面getDelegate()方法源码如下:
/**
* Return the wrapped delegate.
* 返回WebHandler:处理web请求的对象 */
public WebHandler getDelegate() {
return this.delegate;
}
我们进行Debug测试如下:
当前返回的WebHandler是 ExceptionHandlingWebHandler ,而ExceptionHandlingWebHandler 的 delegate是 FilteringWebHandler ,而FilteringWebHandler 的delegate是 delegate 是DispatcherHandler ,所有的delegate的handle() 方法都会依次执行,我们可以把断点放到 DispatcherHandler.handler() 方法上:
handler()方法会调用所有handlerMappings的 getHandler(exchange) 方法,而getHandler(exchange) 方法会调用 getHandlerInternal(exchange) 方法:
getHandlerInternal(exchange) 该方法由各个 HandlerMapping 自行实现,我们可以观察下断言处 理的 RoutePredicateHandlerMapping 的 getHandlerInternal(exchange) 方法会调用lookupRoute 方法,该方法用于返回对应的路由信息:
这里的路由匹配其实就是我们项目中对应路由配置的一个一个服务的信息,这些服务信息可以帮我们找到我们要调用的真实服务:
每个Route对象如下:
Route的DEBUG数据如下:
找到对应Route后会返回指定的FilterWebHandler,如下代码:
FilterWebHandler主要包含了所有的过滤器,过滤器按照一定顺序排序,主要是order值,越小越靠前 排,过滤器中主要将请求交给指定真实服务处理了,debug测试如下:
这里有RouteToRequestUrlFilter和ForwardRoutingFilter 以及LoadBalancerClientFilter等多个过滤器
上面FilterWebHandler中有两个过滤器,分别为RouteToRequestUrlFilter和ForwardRoutingFilter
RouteToRequestUrlFilter用来根据匹配的Route,计算请求地址得到lb://hailtaxi-order/order/list
ForwardRoutingFilter转发路由网关过滤器。其根据forward://前缀(Scheme)过滤处理,将请求转发到当前网关实例本地接口
RouteToRequestUrlFilter源码如下:
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // 获取Route Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR); if (route == null) { return chain.filter(exchange); } log.trace("RouteToRequestUrlFilter start"); // 获取URI URI uri = exchange.getRequest().getURI(); boolean encoded = containsEncodedParts(uri); URI routeUri = route.getUri(); if (hasAnotherScheme(routeUri)) { // this is a special url, save scheme to special attribute // replace routeUri with schemeSpecificPart exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme()); routeUri = URI.create(routeUri.getSchemeSpecificPart()); } if ("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) { // Load balanced URIs should always have a host. If the host is null it is // most // likely because the host name was invalid (for example included an // underscore) throw new IllegalStateException("Invalid host: " + routeUri.toString()); } // 组装真实地址 URI mergedUrl = UriComponentsBuilder.fromUri(uri) // .uri(routeUri) .scheme(routeUri.getScheme()).host(routeUri.getHost()) .port(routeUri.getPort()).build(encoded).toUri(); exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl); return chain.filter(exchange); }
debug调试结果如下:
从上面调试结果我们可以看到所选择的Route以及uri和routeUri和mergedUrl,该过滤器其实就是将用 户请求的地址换成服务地址,换成服务地址可以用来做负载均衡。
SpringCloud在实现对后端服务远程调用是基于Netty发送Http请求实现,核心代码在 NettyRoutingFilter.filter() 中,其中核心代码为send()方法,代码如下:
/*** * 实现远程调用 * @param exchange the current server exchange * @param chain provides a way to delegate to the next filter * @return */ @Override @SuppressWarnings("Duplicates") public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); String scheme = requestUrl.getScheme(); if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) { return chain.filter(exchange); } setAlreadyRouted(exchange); ServerHttpRequest request = exchange.getRequest(); final HttpMethod method = HttpMethod.valueOf(request.getMethodValue()); final String url = requestUrl.toASCIIString(); HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange); final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders(); filtered.forEach(httpHeaders::set); boolean preserveHost = exchange .getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false); Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR); Flux<HttpClientResponse> responseFlux = httpClientWithTimeoutFrom(route) // 头信息处理 .headers(headers -> { headers.add(httpHeaders); // Will either be set below, or later by Netty headers.remove(HttpHeaders.HOST); if (preserveHost) { String host = request.getHeaders().getFirst(HttpHeaders.HOST); headers.add(HttpHeaders.HOST, host); } // 执行发送,基于HTTP协议 }).request(method).uri(url).send((req, nettyOutbound) -> { if (log.isTraceEnabled()) { nettyOutbound .withConnection(connection -> log.trace("outbound route: " + connection.channel().id().asShortText() + ", inbound: " + exchange.getLogPrefix())); } return nettyOutbound.send(request.getBody() .map(dataBuffer -> ((NettyDataBuffer) dataBuffer) .getNativeBuffer())); }). // 响应结果 responseConnection((res, connection) -> { // Defer committing the response until all route filters have run // Put client response as ServerWebExchange attribute and write // response later NettyWriteResponseFilter exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res); exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection); // 获取响应结果 ServerHttpResponse response = exchange.getResponse(); // put headers and status so filters can modify the response HttpHeaders headers = new HttpHeaders(); res.responseHeaders().forEach( entry -> headers.add(entry.getKey(), entry.getValue())); String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE); if (StringUtils.hasLength(contentTypeValue)) { exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue); } setResponseStatus(res, response); // make sure headers filters run after setting status so it is // available in response HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter( getHeadersFilters(), headers, exchange, Type.RESPONSE); if (!filteredResponseHeaders .containsKey(HttpHeaders.TRANSFER_ENCODING) && filteredResponseHeaders .containsKey(HttpHeaders.CONTENT_LENGTH)) { // It is not valid to have both the transfer-encoding header and // the content-length header. // Remove the transfer-encoding header in the response if the // content-length header is present. response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING); } exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet()); response.getHeaders().putAll(filteredResponseHeaders); return Mono.just(res); }); Duration responseTimeout = getResponseTimeout(route); if (responseTimeout != null) { responseFlux = responseFlux .timeout(responseTimeout, Mono.error(new TimeoutException( "Response took longer than timeout: " + responseTimeout))) .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th)); } return responseFlux.then(chain.filter(exchange)); }
上面send方法最终会调用 ChannelOperations>send() 方法,而该方法其实是基于了Netty实现数据发 送,核心代码如下:
public NettyOutbound send(Publisher<? extends ByteBuf> dataStream, Predicate<ByteBuf> predicate) {
if (!this.channel().isActive()) {
return this.then(Mono.error(new AbortedException("Connection has been closed")));
} else {
return dataStream instanceof Mono ? this.then(((Mono)dataStream).flatMap((m) -> {
return FutureMono.from(this.channel().writeAndFlush(m));
}).doOnDiscard(ByteBuf.class, ReferenceCounted::release)) : this.then(MonoSendMany.byteBufSource(dataStream, this.channel(), predicate));
}
}
Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,他的并发性能得到了很大 提高,对比于BIO(Blocking I/O,阻塞IO),隐藏其背后的复杂性而提供一个易于使用的 API 的客户 端/服务器框架。Netty 是一个广泛使用的 Java 网络编程框架。
传输极快
Netty的传输快其实也是依赖了NIO的一个特性—— 。我们知道,Java的内存有堆内存、栈内存和 字符串常量池等等,其中堆内存是占用内存空间最大的一块,也是Java对象存放的地方,一般我们的数 据如果需要从IO读取到堆内存,中间需要经过Socket缓冲区,也就是说一个数据会被拷贝两次才能到达 他的的终点,如果数据量大,就会造成不必要的资源浪费。 Netty针对这种情况,使用了NIO中的另一 大特性——零拷贝,当他需要接收数据的时候,他会在堆内存之外开辟一块内存,数据就直接从IO读到 了那块内存中去,在netty里面通过ByteBuf可以直接对这些数据进行直接操作,从而加快了传输速度。
良好的封装
Netty无论是性能还是封装性都远远超越传统Socket编程。
Channel:表示一个连接,可以理解为每一个请求,就是一个Channel。
ChannelHandler:核心处理业务就在这里,用于处理业务请求。
ChannelHandlerContext:用于传输业务数据。
ChannelPipeline:用于保存处理过程需要用到的ChannelHandler和ChannelHandlerContext。
ByteBuf是一个存储字节的容器,最大特点就是使用方便,它既有自己的读索引和写索引,方便你对整 段字节缓存进行读写,也支持get/set,方便你对其中每一个字节进行读写,他的数据结构如下图所示:
前面源码剖析主要剖析了Gateway的工作流程,我们接下来剖析Gateway的负载均衡流程。在最后的过滤器集合中有 LoadBalancerClientFilter 过滤器,该过滤器是用于实现负载均衡。
LoadBalancerClientFilter 过滤器首先会将用户请求地址转换成真实服务地址,也就是IP:端口号, 源码如下:
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { //负载均衡的URL = lb://hailtaxi-order/order/list?token=123 URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR); String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR); if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) { return chain.filter(exchange); } // preserve the original url addOriginalRequestUrl(exchange, url); if (log.isTraceEnabled()) { log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url); } //服务选择 return choose(exchange).doOnNext(response -> { if (!response.hasServer()) { throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost()); } //用户提交的URI = http://localhost:8001/order/list?token=123 URI uri = exchange.getRequest().getURI(); // if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default, // if the loadbalancer doesn't provide one. String overrideScheme = null; if (schemePrefix != null) { overrideScheme = url.getScheme(); } //真实服务的URL =http://192.168.211.1:18182/order/list?token=123 DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance( response.getServer(), overrideScheme); URI requestUrl = LoadBalancerUriTools.reconstructURI(serviceInstance, uri); if (log.isTraceEnabled()) { log.trace("LoadBalancerClientFilter url chosen: " + requestUrl); } exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl); }).then(chain.filter(exchange)); }
上面代码的关键是 choose(exchange) 的调用,该方法调用其实就是选择指定服务,这里涉及到负载均衡服务轮询调用算法等,我们可以跟踪进去查看方法执行流程。
Gateway自身已经集成Ribbon,所以看到的对象是RibbonLoadBalancerClient,我们跟踪进去接着查 看:
上面方法会依次调用到getInstance()方法,该方法会返回所有可用实例,有可能有多个实例,如果有多 个实例就涉及到负载均衡算法,方法调用如下图:
此时调用getServer()方法,再调用 BaseLoadBalancer.chooseServer() ,这里是根据指定算法获取 对应实例,代码如下:
BaseLoadBalancer 是属于Ribbon的算法,我们可以通过如下依赖包了解,并且该算法默认用的是 RoundRobinRule ,也就是随机算法,如下代码:
feign的核心功能就是通过接口去访问网络资源,里面也是用动态代理来实现的,就跟Mybatis用接口去 访问数据库一样,我们就来看下源码的处理,核心就一个包:
使用OpenFeign的时候会用到2个注解,分别是@FeignClient(value = “hailtaxi-driver”)和 @EnableFeignClients(basePackages = “com.itheima.driver.feign”),这两个注解其实就是学
习OpenFeign的入口。
@EnableFeignClients 这 个注解的作用其实就是开启了一个 FeignClient 的扫描,那么点击启动类 的 @EnableFeignClients 注解看下他是怎么开启 FeignClient 的扫描的,进去后发现里面有个 @Import(FeignClientsRegistrar.class)这个FeignClientsRegistrar跟Bean的动态装载有关。
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
/***
* ImportBeanDefinitionRegistrar: 动态向容器中注入对象(Bean),可以通过该对象来实现
*/
@Import(FeignClientsRegistrar.class)
public @interface EnableFeignClients
FeignClientsRegistrar类中有一个方法 registerBeanDefinitions 用于注入Bean的,源码如下:
/**** * 向Spring容器注入对应对象(代理对象) * @param metadata * @param registry */ @Override public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) { //SpringBoot启动类上检查是否有@EnableFeignClients, 有该注解, 则完成 Feign 框架相 关的配置注册 registerDefaultConfiguration(metadata, registry); //从 classpath 中, 扫描获得 @FeignClient 修饰的类, 将类的内容解析为 BeanDefifinition , // 最终通过调用 Spring 框架中的 BeanDefifinitionReaderUtils.resgisterBeanDefifinition // 将解析处理过的 FeignClientBeanDeififinition 添加到 spring 容器中. registerFeignClients(metadata, registry); } private void registerDefaultConfiguration(AnnotationMetadata metadata, BeanDefinitionRegistry registry) { Map<String, Object> defaultAttrs = metadata .getAnnotationAttributes(EnableFeignClients.class.getName(), true); if (defaultAttrs != null && defaultAttrs.containsKey("defaultConfiguration")) { String name; if (metadata.hasEnclosingClass()) { name = "default." + metadata.getEnclosingClassName(); } else { name = "default." + metadata.getClassName(); } registerClientConfiguration(registry, name, defaultAttrs.get("defaultConfiguration")); } } /*** * 向SpringIOC容器注入指定的Bean对象 * @param metadata * @param registry */ public void registerFeignClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) { // 扫描器 ClassPathScanningCandidateComponentProvider scanner = getScanner(); // 设置资源加载器 scanner.setResourceLoader(this.resourceLoader); // 定义要扫描的包,从@EnableFeignClients(basePackages = "com.itheima.driver.feign")获取 Set<String> basePackages; // 获取相关属性 Map<String, Object> attrs = metadata .getAnnotationAttributes(EnableFeignClients.class.getName()); // 注解过滤器,设置只过滤出FeignClient注解标识的Bean AnnotationTypeFilter annotationTypeFilter = new AnnotationTypeFilter( FeignClient.class); final Class<?>[] clients = attrs == null ? null : (Class<?>[]) attrs.get("clients"); if (clients == null || clients.length == 0) { // 扫描器设置过滤器 scanner.addIncludeFilter(annotationTypeFilter); // 获取注解的扫描包路径 basePackages = getBasePackages(metadata); } else { final Set<String> clientClasses = new HashSet<>(); basePackages = new HashSet<>(); for (Class<?> clazz : clients) { basePackages.add(ClassUtils.getPackageName(clazz)); clientClasses.add(clazz.getCanonicalName()); } AbstractClassTestingTypeFilter filter = new AbstractClassTestingTypeFilter() { @Override protected boolean match(ClassMetadata metadata) { // 将类名上的[$]替换成[.] String cleaned = metadata.getClassName().replaceAll("\\$", "."); return clientClasses.contains(cleaned); } }; scanner.addIncludeFilter( new AllTypeFilter(Arrays.asList(filter, annotationTypeFilter))); } // 循环所有包,进行扫描 for (String basePackage : basePackages) { // 具有@FeignClient注解的接口信息集合 Set<BeanDefinition> candidateComponents = scanner .findCandidateComponents(basePackage); // 循环每个接口 for (BeanDefinition candidateComponent : candidateComponents) { if (candidateComponent instanceof AnnotatedBeanDefinition) { // verify annotated class is an interface AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent; // 获取beanDefinition的元数据,你想要的他基本都有 AnnotationMetadata annotationMetadata = beanDefinition.getMetadata(); // 验证@FeignClient修饰的必须是接口 Assert.isTrue(annotationMetadata.isInterface(), "@FeignClient can only be specified on an interface"); // 获取@FeignClient注解的属性 Map<String, Object> attributes = annotationMetadata .getAnnotationAttributes( FeignClient.class.getCanonicalName()); // 获取客户端名称 String name = getClientName(attributes); // 为FeignClient指定配置类 registerClientConfiguration(registry, name, attributes.get("configuration")); // 向SpringIOC容器注册一个新的Bean对象 // 注册客户端 registerFeignClient(registry, annotationMetadata, attributes); } } } }
上面注解解析后,会调用 registerFeignClient() 注册客户端,我们来看下registerFeignClient() 方法具体实现流程,代码如下:
/** * 向SpringIOC容器注册一个新的Bean对象 * @param registry * @param annotationMetadata * @param attributes */ private void registerFeignClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata, Map<String, Object> attributes) { // 创建一个JavaBean->SpringIOC->BeanDefinitionBuilder // 服务名字 // 被@FeignClient修饰的类名,比如 com.itheima.DriverFeign,是自己定义的接口 String className = annotationMetadata.getClassName(); // BeanDefinitionBuilder通过FeignClientFactoryBean这个类来生成BeanDefinition BeanDefinitionBuilder definition = // 重点:FeignClientFactoryBean->用于创建对象 BeanDefinitionBuilder.genericBeanDefinition(FeignClientFactoryBean.class); // 验证fallback和fallbackFactory是不是接口 validate(attributes); // 通过BeanDefinitionBuilder给beanDefinition增加属性 definition.addPropertyValue("url", getUrl(attributes)); definition.addPropertyValue("path", getPath(attributes)); String name = getName(attributes); definition.addPropertyValue("name", name); String contextId = getContextId(attributes); definition.addPropertyValue("contextId", contextId); definition.addPropertyValue("type", className); definition.addPropertyValue("decode404", attributes.get("decode404")); definition.addPropertyValue("fallback", attributes.get("fallback")); definition.addPropertyValue("fallbackFactory", attributes.get("fallbackFactory")); definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE); String alias = contextId + "FeignClient"; // 用Builder获取实际的BeanDefinition AbstractBeanDefinition beanDefinition = definition.getBeanDefinition(); boolean primary = (Boolean) attributes.get("primary"); // has a default, won't be // null beanDefinition.setPrimary(primary); String qualifier = getQualifier(attributes); if (StringUtils.hasText(qualifier)) { alias = qualifier; } // 创建对象 BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className, new String[] { alias }); // 注册到SpringIOC容器 BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry); }
上面方法中创建 BeanDefinitionBuilder 的时候传入了一个参数FeignClientFactoryBean.class ,注册的Bean就是参数中自己传进来的beanClass是工厂Bean,可以用来创建Feign的代理对象,我们来看一下 FeignClientFactoryBean 源码,可以发现它实现了 FactoryBean,所以它可以获取对象实例,同时也能创建对象的代理对象,部分源码如下:
它里面有一个方法 getObject() ,该方法就是用于返回一个对象实例,而对象其实是代理对象,源码 如下:
/***
* 核心方法入口
* @return
* @throws Exception
*/
@Override
public Object getObject() throws Exception {
return getTarget();
}
/** * @param <T> the target type of the Feign client * @return a {@link Feign} client created with the specified data and the context * information */ <T> T getTarget() { // 上下文对象(对应的容器) //FeignContext注册到容器是在FeignAutoConfiguration上完成的 //在初始化FeignContext时,会把configurations在容器中放入FeignContext中。configurations的 //来源就是在前面registerFeignClients方法中将@FeignClient的配置configuration。 FeignContext context = this.applicationContext.getBean(FeignContext.class); //构建Builder对象 Feign.Builder builder = feign(context); //如果url为空,则走负载均衡,生成有负载均衡功能的代理类 if (!StringUtils.hasText(this.url)) { if (!this.name.startsWith("http")) { this.url = "http://" + this.name; } else { this.url = this.name; } this.url += cleanPath(); /*** * 负载均衡调用 1:从容器中获取一个实例 2:创建一个代理对象 */ return (T) loadBalance(builder, context, new HardCodedTarget<>(this.type, this.name, this.url)); } //如果指定了url,则生成默认的代理类 if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) { this.url = "http://" + this.url; } String url = this.url + cleanPath(); Client client = getOptional(context, Client.class); if (client != null) { if (client instanceof LoadBalancerFeignClient) { // not load balancing because we have a url, // but ribbon is on the classpath, so unwrap client = ((LoadBalancerFeignClient) client).getDelegate(); } if (client instanceof FeignBlockingLoadBalancerClient) { // not load balancing because we have a url, // but Spring Cloud LoadBalancer is on the classpath, so unwrap client = ((FeignBlockingLoadBalancerClient) client).getDelegate(); } builder.client(client); } //生成默认代理类 Targeter targeter = get(context, Targeter.class); return (T) targeter.target(this, builder, context, new HardCodedTarget<>(this.type, this.name, url)); }
上面片段代码中Feign.Builder builder = feign(context)是用于构建Builder,关于Builder源码 属性我们进行详细讲解,源码如下:
public static class Builder { //这个就是拦截器,可以在请求之前设置请求头、设置请求体、设置参数、设置url等等,类型是: RequestInterceptor: private final List<RequestInterceptor> requestInterceptors = new ArrayList<RequestInterceptor>(); //日志等级 private Logger.Level logLevel = Logger.Level.NONE; //默认是Contract.Default(),它主要是用来解析feign接口上的那些注解,比如: @QueryMap、@Param、@RequestLine、@Header、@Body、@HeaderMap等,比如@Header操作,可以 把@Header(“name=value”)这里面的name=value取出来,重新设置到RequestTemplate里面。 private Contract contract = new Contract.Default(); //client是真正去执行request,得到response的客户端,它的入参是一个Request,这个 Request是用RequestTemplate构造出来的。 private Client client = new Client.Default(null, null); private Retryer retryer = new Retryer.Default(); private Logger logger = new NoOpLogger(); //encoder是用来编码请求的,默认能处理String和byte[]数组类型的参数,最终是存放在 RequestTemplate里面。 private Encoder encoder = new Encoder.Default(); //decoder用来解码响应,默认可以返回字节数组和字符串。 private Decoder decoder = new Decoder.Default(); private QueryMapEncoder queryMapEncoder = new QueryMapEncoder.Default(); //异常处理 private ErrorDecoder errorDecoder = new ErrorDecoder.Default(); private Options options = new Options(); //就是在这里面创建的动态代理类。当客户端调用Feign.builder()的时候,其实就是去设置 builder里面的这些参数的值。 private InvocationHandlerFactory invocationHandlerFactory = new InvocationHandlerFactory.Default(); private boolean decode404; private boolean closeAfterDecode = true; private ExceptionPropagationPolicy propagationPolicy = NONE;
protected Feign.Builder feign(FeignContext context) { //这个就是拦截器,可以在请求之前设置请求头、设置请求体、设置参数、设置url等等,类型是:RequestInterceptor FeignLoggerFactory loggerFactory = get(context, FeignLoggerFactory.class); Logger logger = loggerFactory.create(this.type); // @formatter:off //默认是Contract.Default(),它主要是用来解析feign接口上的那些注解,比如: @QueryMap、@Param、@RequestLine、@Header、@Body、@HeaderMap等,比如@Header操作,可以 把@Header(“name=value”)这里面的name=value取出来,重新设置到RequestTemplate里面。 Feign.Builder builder = get(context, Feign.Builder.class) // required values .logger(logger) .encoder(get(context, Encoder.class)) .decoder(get(context, Decoder.class)) .contract(get(context, Contract.class)); // @formatter:on configureFeign(context, builder); return builder; }
上面的builder构造完后继续向下走,配置完Feign.Builder之后,再判断是否需LoadBalance,如果 需要,则通过loadBalance(builder, context,new HardCodedTarget<>(this.type,his.name, this.url)); 的方法来设置。实际上他们最终调用的是Target.target()方法。
/*** * 负载均衡调用获取一个对象实例 * @param builder * @param context * @param target * @param <T> * @return */ protected <T> T loadBalance(Feign.Builder builder, FeignContext context, HardCodedTarget<T> target) { // 获取实例对象 Client client = getOptional(context, Client.class); if (client != null) { //将client设置进去相当于增加了客户端负载均衡解析的机制 builder.client(client); Targeter targeter = get(context, Targeter.class); // 创建了一个代理对象HystrixTargeter.target //实例创建(开启熔断后具有熔断降级效果) return targeter.target(this, builder, context, target); } throw new IllegalStateException( "No Feign Client for loadBalancing defined. Did you forget to include spring-cloud-starter-netflix-ribbon?"); }
上面方法会调用targeter.target(this, builder, context, target);,它支持服务熔断降级,我 们直接看默认的 DefaultTrageter 就可以了。
DefaultTargeter 的 target() 方法是一个非常简单的调用,但开启了Feign代理对象创建的开始:
target 方法调用了build().newInstance(),这个方法信息量比较大,我们要拆分这看 build() 和 newInstance(target) :
public <T> T target(Target<T> target) {
return build().newInstance(target);
}
build()方法是创建客户端对象 ReflectiveFeign ,看着名字就像代理的意思,源码如下:
public Feign build() {
Factory synchronousMethodHandlerFactory = new Factory(this.client, this.retryer, this.requestInterceptors, this.logger, this.logLevel, this.decode404, this.closeAfterDecode, this.propagationPolicy);
ParseHandlersByName handlersByName = new ParseHandlersByName(this.contract, this.options, this.encoder, this.decoder, this.queryMapEncoder, this.errorDecoder, synchronousMethodHandlerFactory);
return new ReflectiveFeign(handlersByName, this.invocationHandlerFactory, this.queryMapEncoder);
}
我们再来看 ReflectiveFeign ,它继承了 Feign 同时也有一个属性 InvocationHandlerFactory,该 对象其实就是代理工厂对象,源码如下:
ReflectiveFeign 源码:
public class ReflectiveFeign extends Feign {
private final ReflectiveFeign.ParseHandlersByName targetToHandlersByName;
private final InvocationHandlerFactory factory;
private final QueryMapEncoder queryMapEncoder;
ReflectiveFeign(ReflectiveFeign.ParseHandlersByName targetToHandlersByName, InvocationHandlerFactory factory, QueryMapEncoder queryMapEncoder) {
this.targetToHandlersByName = targetToHandlersByName;
this.factory = factory;
this.queryMapEncoder = queryMapEncoder;
}
InvocationHandlerFactory 源码
我们再来看newInstance(Target target)方法,该方法就是用来创建Feign的代理对象,源码如 下:
/*** * 创建代理 * @param target * @param <T> * @return */ public <T> T newInstance(Target<T> target) { //根据接口类和Contract协议解析方式,解析接口类上的方法和注解,转换成内部的MethodHandler处理方式 Map<String, InvocationHandlerFactory.MethodHandler> nameToHandler =targetToHandlersByName.apply(target); Map<Method, InvocationHandlerFactory.MethodHandler> methodToHandler = new LinkedHashMap<Method, InvocationHandlerFactory.MethodHandler>(); List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>(); for (Method method : target.type().getMethods()) { if (method.getDeclaringClass() == Object.class) { continue; } else if (Util.isDefault(method)) { DefaultMethodHandler handler = new DefaultMethodHandler(method); defaultMethodHandlers.add(handler); methodToHandler.put(method, handler); } else { methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method))); } } // 基于Proxy.newProxyInstance 为接口类创建动态实现,将所有的请求转换给 InvocationHandler 处理。 InvocationHandler handler = factory.create(target, methodToHandler); T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(), new Class<?>[] {target.type()}, handler); for (DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) { defaultMethodHandler.bindTo(proxy); } return proxy; }
远程请求一定是要有IP和端口的,OpenFeign将IP和端口封装到RequestTemplate中了,我们来看一下
RequestTemplate源码:
在 SynchronousMethodHandler 类中执行远程调用,源码如下:
/** * 远程调用 * @param argv * @return * @throws Throwable */ @Override public Object invoke(Object[] argv) throws Throwable { //封装成RequestTemplate RequestTemplate template = buildTemplateFromArgs.create(argv); Request.Options options = findOptions(argv); Retryer retryer = this.retryer.clone(); while (true) { try { //执行远程调用 return executeAndDecode(template, options); } catch (RetryableException e) { try { retryer.continueOrPropagate(e); } catch (RetryableException th) { Throwable cause = th.getCause(); if (propagationPolicy == UNWRAP && cause != null) { throw cause; } else { throw th; } } if (logLevel != Logger.Level.NONE) { logger.logRetry(metadata.configKey(), logLevel); } continue; } } }
上面调用会调用 executeAndDecode() 方法,该方法是执行远程请求,同时解析响应数据,源码如下:
/**** * 发起远程请求 * @param template * @param options * @return * @throws Throwable */ Object executeAndDecode(RequestTemplate template, Request.Options options) throws Throwable { //转换为HTTP请求报文 Request request = targetRequest(template); if (logLevel != Logger.Level.NONE) { logger.logRequest(metadata.configKey(), logLevel, request); } Response response; long start = System.nanoTime(); try { //发起远程通信 response = client.execute(request, options); } catch (IOException e) { if (logLevel != Logger.Level.NONE) { logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start)); } throw errorExecuting(request, e); } long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); boolean shouldClose = true; try { if (logLevel != Logger.Level.NONE) { response = logger.logAndRebufferResponse(metadata.configKey(), logLevel, response, elapsedTime); } if (Response.class == metadata.returnType()) { if (response.body() == null) { return response; } if (response.body().length() == null || response.body().length() > MAX_RESPONSE_BUFFER_SIZE) { shouldClose = false; return response; } // Ensure the response body is disconnected //获取返回结果 byte[] bodyData = Util.toByteArray(response.body().asInputStream()); return response.toBuilder().body(bodyData).build(); } if (response.status() >= 200 && response.status() < 300) { if (void.class == metadata.returnType()) { return null; } else { Object result = decode(response); shouldClose = closeAfterDecode; return result; } } else if (decode404 && response.status() == 404 && void.class != metadata.returnType()) { Object result = decode(response); shouldClose = closeAfterDecode; return result; } else { throw errorDecoder.decode(metadata.configKey(), response); } } catch (IOException e) { if (logLevel != Logger.Level.NONE) { logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime); } throw errorReading(request, response, e); } finally { if (shouldClose) { ensureClosed(response.body()); } } }
前面我们已经学过,Spring Cloud Stream 是一个消息驱动微服务的框架。应用程序通过inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而Spring Cloud Stream 的 binder 负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream 交互 就可以方便使用消息驱动的方式。
为了更深层次的学习SpringCloud Stream,我们展开对它的源码学习。
在Stream中,要想实现发消息,首先得注册绑定通信管道,注册绑定通信管道我们需要用到BindingBeansRegistrar 类,例如我们写了 @EnableBinding(Source.class) ,此时该类就会解析
这个注解,源码如下:
public class BindingBeansRegistrar implements ImportBeanDefinitionRegistrar { /*** * 向Spring容器注册Bean * @param metadata * @param registry */ @Override public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) { //获取所有@EnableBinding注解的类信息 AnnotationAttributes attrs = AnnotatedElementUtils.getMergedAnnotationAttributes( ClassUtils.resolveClassName(metadata.getClassName(), null), EnableBinding.class); //循环所有有@EnableBinding注解的类信息 for (Class<?> type : collectClasses(attrs, metadata.getClassName())) { //检查是否包含该名字的实例,不包含,则实现注册 if (!registry.containsBeanDefinition(type.getName())) { BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinitions(type, type.getName(), registry); //执行注册. //此时如果创建的是生产者,则创建Source的实例,如果是消费者,则创建Sink的实例 BindingBeanDefinitionRegistryUtils .registerBindingTargetsQualifiedBeanDefinitions(ClassUtils .resolveClassName(metadata.getClassName(), null), type,registry); } } } private Class<?>[] collectClasses(AnnotationAttributes attrs, String className) { EnableBinding enableBinding = AnnotationUtils.synthesizeAnnotation(attrs, EnableBinding.class, ClassUtils.resolveClassName(className, null)); return enableBinding.value(); } }
上面调用的实例化通信管道并注册通信管道对象的方法是 registerBindingTargetBeanDefinitions() ,源码如下
/*** * 绑定当前操作 * @param type * @param bindingTargetInterfaceBeanName * @param registry */ public static void registerBindingTargetBeanDefinitions(Class<?> type, final String bindingTargetInterfaceBeanName, final BeanDefinitionRegistry registry) { ReflectionUtils.doWithMethods(type, method -> { Input input = AnnotationUtils.findAnnotation(method, Input.class); //input类型 if (input != null) { //获取注解后面的值:@EnableBinding(Sink.class) String name = getBindingTargetName(input, method); if (!registry.containsBeanDefinition(name)) { //创建Sink的实例,并注入到SpringIOC容器中 registerInputBindingTargetBeanDefinition(input.value(), name, bindingTargetInterfaceBeanName, method.getName(), registry); } } //output类型 Output output = AnnotationUtils.findAnnotation(method, Output.class); if (output != null) { //获取注解后面的值:@EnableBinding(Source.class) String name = getBindingTargetName(output, method); if (!registry.containsBeanDefinition(name)) { //生产者注册 //创建Source的实例,并注入到SpringIOC容器中 registerOutputBindingTargetBeanDefinition(output.value(), name, bindingTargetInterfaceBeanName, method.getName(), registry); } } }); }
此时运行时,我们可以发现消息发送绑定对象是 DirectWithAttributesChannel 。
消息发送比较抽象,需要根据引入不同MQ中间件依赖包决定,但主题流程保持一致,其中消息检查和 消息发送会和引入的包不同有差异,发送消息前会适配不同MQ的Binder,如果是RabbitMQ,Binder 是 RabbitMessageChannelBinder ,消息发送的源码在AbstractMessageChannel#send() 如下:
public boolean send(Message<?> messageArg, long timeout) { Assert.notNull(messageArg, "message must not be null"); Assert.notNull(messageArg.getPayload(), "message payload must not be null"); Message<?> message = messageArg; if (this.shouldTrack) { message = MessageHistory.write(messageArg, this, this.getMessageBuilderFactory()); } Deque<ChannelInterceptor> interceptorStack = null; boolean sent = false; boolean metricsProcessed = false; MetricsContext metricsContext = null; boolean countsAreEnabled = this.countsEnabled; AbstractMessageChannel.ChannelInterceptorList interceptorList = this.interceptors; AbstractMessageChannelMetrics metrics = this.channelMetrics; SampleFacade sample = null; try { //拦截器拦截,处理信息 message = this.convertPayloadIfNecessary(message); boolean debugEnabled = this.loggingEnabled && this.logger.isDebugEnabled(); if (debugEnabled) { this.logger.debug("preSend on channel '" + this + "', message: " + message); } if (interceptorList.getSize() > 0) { interceptorStack = new ArrayDeque(); message = interceptorList.preSend(message, this, interceptorStack); if (message == null) { return false; } } //是否限制发送消息数量,如果限制,需要拦截器检查 if (countsAreEnabled) { metricsContext = metrics.beforeSend(); if (this.metricsCaptor != null) { sample = this.metricsCaptor.start(); } //执行消息发送 sent = this.doSend(message, timeout); if (sample != null) { sample.stop(this.sendTimer(sent)); } metrics.afterSend(metricsContext, sent); metricsProcessed = true; } else { //执行消息发送 sent = this.doSend(message, timeout); } if (debugEnabled) { this.logger.debug("postSend (sent=" + sent + ") on channel '" + this + "', message: " + message); } if (interceptorStack != null) { interceptorList.postSend(message, this, sent); interceptorList.afterSendCompletion(message, this, sent, (Exception)null, interceptorStack); } return sent; } catch (Exception var14) { if (countsAreEnabled && !metricsProcessed) { if (sample != null) { sample.stop(this.buildSendTimer(false, var14.getClass().getSimpleName())); } metrics.afterSend(metricsContext, false); } if (interceptorStack != null) { interceptorList.afterSendCompletion(message, this, sent, var14, interceptorStack); } throw IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message, () -> { return "failed to send Message to channel '" + this.getComponentName() + "'"; }, var14); } }
在执行消息发送的时候,获取消息发送对象前,会获取Binder,如果我们用的是RabbitMQ,此时通信 信道是RabbitMQ的Binder,源码如下:
消息的监听在 StreamListenerAnnotationBeanPostProcessor 类中注册,每次监听到消息后,会调 用mappedListenerMethods中指定队列的方法,源码如下:
/*** * 注册监听 * @param method * @param streamListener * @param bean */ private void registerHandlerMethodOnListenedChannel(Method method, StreamListener streamListener, Object bean) { Assert.hasText(streamListener.value(), "The binding name cannot be null"); if (!StringUtils.hasText(streamListener.value())) { throw new BeanInitializationException( "A bound component name must be specified"); } final String defaultOutputChannel = StreamListenerMethodUtils .getOutboundBindingTargetName(method); if (Void.TYPE.equals(method.getReturnType())) { Assert.isTrue(StringUtils.isEmpty(defaultOutputChannel), "An output channel cannot be specified for a method that does not return a value"); } else { Assert.isTrue(!StringUtils.isEmpty(defaultOutputChannel), "An output channel must be specified for a method that can return a value"); } //对每个方法进行校验 StreamListenerMethodUtils.validateStreamListenerMessageHandler(method); //将方法添加到mappedListenerMethods中,它是这样的结构:<String,StreamListenerHandlerMethodMapping> //信道的input值是key(可以理解成队列,但不同MQ意义不一样),每次可以用监听的input作为key去取出方法,进行调用 StreamListenerAnnotationBeanPostProcessor.this.mappedListenerMethods.add( streamListener.value(), new StreamListenerHandlerMethodMapping(bean, method, streamListener.condition(), defaultOutputChannel, streamListener.copyHeaders())); }
我们调试后,可以发现此时会注册对应的监听方法,测试效果如下:
ReactorHttpHandlerAdapter.apply() 封装Request 封装Response HttpWebHandlerAdapter.handle()->handle()返回处理结果集 发生异常->doOnError() 处理成功->doOnSuccess() Reactor->非阻塞网络模型 Spring5.x基于WebFlux->异步通信 HttpWebHandlerAdapter.handle() ExceptionHandlingWebHandler.handle()->全局异常处理器配置 DispatcherHandler.handle()->所有流程执行 处理当前请求->需要处理器->加载所有的映射处理器->handlerMapping 循环所有handlerMappings->路径处理映射器(比如断言)->找到Handler对象 DispatcherHandler.invokeHandler()->执行所有Handler对象(过滤器) FilteringWebHandler.handle()->执行匹配的Handler(每个Handler都是一个过滤器) FilteringWebHandler.DefaultGatewayFilterChain.filter()执行所有过滤器-有顺序 RouteToRequestUrlFilter.filter()->找到当前请求要处理的服务 LoadBalancerClientFilter.filter()->负载均衡->找到处理请求的唯一服务 WebsocketRoutingFilter.filter() NettyRoutingFilter.filter()->执行远程调用 Netty 执行Http请求 ForwardRoutingFilter.filter()->执行响应结果处理 成功->doOnSuccess() 失败->onErrorResume() then(Mono.defer(response::setComplete))完成当前操作 负载均衡 RouteToRequestUrlFilter.filter()->服务确定 确定要处理当前请求的服务对象(Route)->Route->FIlter组 LoadBalancerClientFilter LoadBalancerClientFilter.filter()->负载均衡->找到处理请求的唯一服务
OpenFeign如何被创建? 接口->Proxy->invoke()实现网络请求 接口->Proxy->SpringIOC OpenFeign如何发起网络请求? 核心组件 FeignClientFactoryBean:创建@FeignClient 修饰的接口类Bean 实例的工厂类 FeignContext:配置组件的上下文环境,保存着相关组件的不同实例,这些实例由不同的FeignConfiguration 配置类构造出来 SynchronousMethodHandler:MethodHandler 的子类,可以在FeignClient 相应方法被调用时发送网络请求,然后再将请求响应转化为函数返回值进行输出 OpenFeign源码剖析 @EnableFeignClients作用 引入FeignClientsRegistrar 指定扫描@FeignClient的包信息(扫描FeignClient接口) 指定FeignClient接口的自定义配置类 Bean注册 添加@EnableFeignClients注解开启Feign Spring通过ImportBeanDefinitionRegistrar实现动态注册 FeignClientsRegistrar实现了ImportBeanDefinitionRegistrar FeignClientsRegistrar.registerBeanDefinitions()实现注册 关键代码: BeanDefinitionBuilder.genericBeanDefinition(FeignClientFactoryBean.class) 对象创建(初始化):FeignClientFactoryBean getObject()->getTarget() getTarget() NamedContextFactory.getInstance()从容器中获取实例 ReflectiveFeign.newInstance()为实例对象创建代理对象 newInstance() ParseHandlersByName 属性:SynchronousMethodHandler.Factory,远程调用过程 SynchronousMethodHandler.create()->InvocationHandler 远程调用 SynchronousMethodHandler.invoke() 使用RequestTemplate实现远程调用->Http协议
生产者/消费者注册
@EnableBinding注解->BindingBeansRegistrar
BindingBeansRegistrar
registerBeanDefinitions注册
@Input注册为消费者
@Output注册为生产者
消息监听:StreamListenerAnnotationBeanPostProcessor
postProcessAfterInitialization()->初始化解析
注册监听:orchestrateStreamListenerSetupMethod()->registerHandlerMethodOnListenedChannel()
registerHandlerMethodOnListenedChannel()->将需要监听的方法添加到mappedListenerMethods
afterSingletonsInstantiated()->触发监听执行->subscribe(handler)
StreamListenerMessageHandler.handleRequestMessage()处理消息
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。