赞
踩
spring-cloud-commons 包括了整个 SpringCloud 对各种微服务化组件的抽象,比如:
actuator:
circuitbreaker:断路器功能
在spring.factories中与LoadBalancer有关的自动化配置类有如下
- org.springframework.cloud.client.loadbalancer.AsyncLoadBalancerAutoConfiguration,\
- org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration,\
- org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerBeanPostProcessorAutoConfiguration,\
- org.springframework.cloud.client.loadbalancer.reactive.ReactorLoadBalancerClientAutoConfiguration,\
- org.springframework.cloud.client.loadbalancer.reactive.ReactiveLoadBalancerAutoConfiguration,\
load balancer会收集标记@LoadBalanced注解的RestTemplate进行定制化,实现客户端负载均衡
- @Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
- @Retention(RetentionPolicy.RUNTIME)
- @Documented
- @Inherited
- @Qualifier
- public @interface LoadBalanced {
-
- }
服务实例选择器,使用load balancer根据serviceId获取具体的实例
- public interface ServiceInstanceChooser {
- // 根据服务id获取具体的服务实例
- ServiceInstance choose(String serviceId);
- }
负载均衡客户端,继承ServiceInstanceChooser,对外提供API以供第三方实现自己的负载均衡客户端
- public interface LoadBalancerClient extends ServiceInstanceChooser {
- // 根据serviceId使用ServiceInstance执行请求
- <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
- // 重载方法。同上
- <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
- // 基于服务实例和URI重新构造一个新的带有host和port信息的URI。比如http://myservice/path/to/service.这个地址 myservice 这个服务名将会替换成比如 192.168.1.122:8080
- URI reconstructURI(ServiceInstance instance, URI original);
- }
那么该LoadBalancerClient接口就有了三个功能:
- public class ServiceRequestWrapper extends HttpRequestWrapper {
- private final ServiceInstance instance;
- private final LoadBalancerClient loadBalancer;
-
- public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance, LoadBalancerClient loadBalancer) {
- super(request);
- this.instance = instance;
- this.loadBalancer = loadBalancer;
- }
-
- //重写了HttpRequestWrapper的getURI方法,通过调用LoadBalancer重写的reconstructURI方法重构URI
- public URI getURI() {
- URI uri = this.loadBalancer.reconstructURI(this.instance, this.getRequest().getURI());
- return uri;
- }
- }
- @Configuration(proxyBeanMethods = false)
- @ConditionalOnClass(RestTemplate.class)
- @ConditionalOnBean(LoadBalancerClient.class)
- @EnableConfigurationProperties(LoadBalancerProperties.class)
- public class LoadBalancerAutoConfiguration {
-
- //收集所有标注 @LoadBalanced 的 RestTemplate
- @LoadBalanced
- @Autowired(required = false)
- private List<RestTemplate> restTemplates = Collections.emptyList();
-
- // 使用RestTemplateCustomizer定制化这些被@LoadBalanced注解修饰的RestTemplate
- @Bean
- public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
- final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
- return () -> restTemplateCustomizers.ifAvailable(customizers -> {
- for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
- for (RestTemplateCustomizer customizer : customizers) {
- customizer.customize(restTemplate);
- }
- }
- });
- }
-
- //请求工厂,生成LoadBalancerRequest,并使用ServiceRequestWrapper封装request
- @Bean
- @ConditionalOnMissingBean
- public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
- return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
- }
-
- /** 非 retry相关的配置,SpringCloud也提供了retryTemplate,增加了重试机制*/
-
- // 如果没有依赖spring-retry模块。如果依赖spring-retry模块的话会构造另外一个配置类RetryInterceptorAutoConfiguration。内部也会1个拦截器和1个定制化器,分别是RetryLoadBalancerInterceptor和RetryLoadBalancerInterceptor。原理类似
- @Configuration(proxyBeanMethods = false)
- @Conditional(RetryMissingOrDisabledCondition.class)
- static class LoadBalancerInterceptorConfig {
-
- //new 负载均衡器的拦截器
- @Bean
- public LoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
- LoadBalancerRequestFactory requestFactory) {
- return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
- }
-
-
- // 构造定制化器RestTemplateCustomizer,为这些RestTemplate添加拦截器LoadBalancerInterceptor
- // 使用了java8的函数式接口
- @Bean
- @ConditionalOnMissingBean
- public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
- return restTemplate -> {
- List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
- list.add(loadBalancerInterceptor);
- restTemplate.setInterceptors(list);
- };
- }
-
- }
-
- ...
-
- }
上述列出部分代码,核心自动装配类,该类由 spring-cloud-commons 提供(而非 spring-cloud-loadbalancer),因此提供了最 基本 的配置:
LoadBalancerInterceptor拦截器拦截住RestTemplate的request请求,通过请求的URI得到负载均衡的serviceId,负载均衡器根据serviceId得到ServiceInstance,通过重构得到最终的URI,然后通过LoadBalancer的execute方法进行真实的服务请求
- public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
- private LoadBalancerClient loadBalancer;
- private LoadBalancerRequestFactory requestFactory;
-
- public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
- this.loadBalancer = loadBalancer;
- this.requestFactory = requestFactory;
- }
-
- public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
- this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
- }
- //对Request请求进行拦截
- public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
- //这里获取到原来的请求URI,该URI里请求的host是负载均衡服务端的serviceId
- URI originalUri = request.getURI();
- String serviceName = originalUri.getHost();
- Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
- //调用LoadBalancer的execute进行真实的请求
- //通过LoadBalancerRequestFactory创造LoadBalancerRequest
- return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
- }
- }
LoadBalancerInterceptor
拦截器内部会对request请求进行拦截。
拦截器内部使用LoadBalancerClient
完成请求的调用,这里调用的时候需要的LoadBalancerRequest
由LoadBalancerRequestFactory
构造,LoadBalancerRequestFactory
内部使用LoadBalancerRequestTransformer
对request进行转换。
- public interface ReactiveLoadBalancer<T> {
- Request<DefaultRequestContext> REQUEST = new DefaultRequest();
-
- Publisher<Response<T>> choose(Request request);
-
- default Publisher<Response<T>> choose() {
- return this.choose(REQUEST);
- }
-
- @FunctionalInterface
- public interface Factory<T> {
- ReactiveLoadBalancer<T> getInstance(String serviceId);
- }
- }
目前在spring-cloud-loadbalancer中,只提供了RoundRobinLoadBalancer一种负载均衡策略
- public class LoadBalancerRequestFactory {
- private LoadBalancerClient loadBalancer;
- private List<LoadBalancerRequestTransformer> transformers;
-
- public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer, List<LoadBalancerRequestTransformer> transformers) {
- this.loadBalancer = loadBalancer;
- this.transformers = transformers;
- }
-
- public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer) {
- this.loadBalancer = loadBalancer;
- }
-
- //创建LoadBalancerRequest,函数式编程,实现了apply(ServiceInstance instance)方法
- //ClientHttpRequestExecution接口的实现类是InterceptingClientHttpRequest
- public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) {
- return (instance) -> {
- //在LoadBalancerClient的execute方法中会回调这个apply方法
- HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);
- LoadBalancerRequestTransformer transformer;
- if (this.transformers != null) {
- for(Iterator var6 = this.transformers.iterator(); var6.hasNext(); serviceRequest = transformer.transformRequest((HttpRequest)serviceRequest, instance)) {
- transformer = (LoadBalancerRequestTransformer)var6.next();
- }
- }
- //执行远程请求
- return execution.execute((HttpRequest)serviceRequest, body);
- };
- }
- }
- public interface LoadBalancerRequest<T> {
- T apply(ServiceInstance instance) throws Exception;
- }
客户端负载均衡是通过对RestTemplate添加拦截器实现的。
RestTemplate提供了一个方法setInterceptors,用于设置拦截器,拦截器需要实现ClientHttpRequestInterceptor接口即可,在实际远程去请求服务端接口之前会先调用拦截器的intercept方法逻辑。
在下面的代码中ClientHttpRequest的实现类是InterceptingClientHttpRequest
- public class RestTemplate extends InterceptingHttpAccessor implements RestOperations {
- ......省略......
- //执行远程请求
- protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback, @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
- Assert.notNull(url, "URI is required");
- Assert.notNull(method, "HttpMethod is required");
- ClientHttpResponse response = null;
-
- Object var14;
- try {
- //ClientHttpRequest的实现类是InterceptingClientHttpRequest
- ClientHttpRequest request = this.createRequest(url, method);
- if (requestCallback != null) {
- requestCallback.doWithRequest(request);
- }
-
- response = request.execute();
- this.handleResponse(url, method, response);
- var14 = responseExtractor != null ? responseExtractor.extractData(response) : null;
- } catch (IOException var12) {
- String resource = url.toString();
- String query = url.getRawQuery();
- resource = query != null ? resource.substring(0, resource.indexOf(63)) : resource;
- throw new ResourceAccessException("I/O error on " + method.name() + " request for \"" + resource + "\": " + var12.getMessage(), var12);
- } finally {
- if (response != null) {
- response.close();
- }
-
- }
-
- return var14;
- }
- ......省略......
- }
在InterceptingClientHttpRequest类中有一个内部类InterceptingRequestExecution,执行拦截器里的方法并进行最终的远程请求
- class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest {
- private final ClientHttpRequestFactory requestFactory;
- private final List<ClientHttpRequestInterceptor> interceptors;
- private HttpMethod method;
- private URI uri;
-
- protected InterceptingClientHttpRequest(ClientHttpRequestFactory requestFactory, List<ClientHttpRequestInterceptor> interceptors, URI uri, HttpMethod method) {
- this.requestFactory = requestFactory;
- this.interceptors = interceptors;
- this.method = method;
- this.uri = uri;
- }
-
- public HttpMethod getMethod() {
- return this.method;
- }
-
- public String getMethodValue() {
- return this.method.name();
- }
-
- public URI getURI() {
- return this.uri;
- }
-
- protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
- InterceptingClientHttpRequest.InterceptingRequestExecution requestExecution = new InterceptingClientHttpRequest.InterceptingRequestExecution();
- return requestExecution.execute(this, bufferedOutput);
- }
-
- //InterceptingRequestExecution是Spring中对ClientHttpRequestExecution接口的实现,执行拦截器里的方法,并进行最终的远程请求
- private class InterceptingRequestExecution implements ClientHttpRequestExecution {
- private final Iterator<ClientHttpRequestInterceptor> iterator;
-
- public InterceptingRequestExecution() {
- this.iterator = InterceptingClientHttpRequest.this.interceptors.iterator();
- }
-
- public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
- //如果拦截器不为空,则执行拦截器里的拦截方法
- if (this.iterator.hasNext()) {
- ClientHttpRequestInterceptor nextInterceptor = (ClientHttpRequestInterceptor)this.iterator.next();
- return nextInterceptor.intercept(request, body, this);
- } else {
- //进行最终的远程请求
- HttpMethod method = request.getMethod();
- Assert.state(method != null, "No standard HTTP method");
- ClientHttpRequest delegate = InterceptingClientHttpRequest.this.requestFactory.createRequest(request.getURI(), method);
- request.getHeaders().forEach((key, value) -> {
- delegate.getHeaders().addAll(key, value);
- });
- if (body.length > 0) {
- if (delegate instanceof StreamingHttpOutputMessage) {
- StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage)delegate;
- streamingOutputMessage.setBody((outputStream) -> {
- StreamUtils.copy(body, outputStream);
- });
- } else {
- StreamUtils.copy(body, delegate.getBody());
- }
- }
-
- return delegate.execute();
- }
- }
- }
- }
- @ConfigurationProperties("spring.cloud.loadbalancer")
- public class LoadBalancerProperties {
- private LoadBalancerProperties.HealthCheck healthCheck = new LoadBalancerProperties.HealthCheck();
-
- ...省略...
-
- public static class HealthCheck {
- private int initialDelay = 0;
- private Duration interval = Duration.ofSeconds(25L);
- private Map<String, String> path = new LinkedCaseInsensitiveMap();
-
- ...省略...
- }
- }
spring-cloud-commons 定义好了整个 loadbalance 的基调及相关接口,实现方需要实现响应的接口即可,同时 spring-cloud-commons 也给出了自己的 一方 实现: spring-cloud-loadbalancer
- public class BlockingLoadBalancerClient implements LoadBalancerClient {
- private final LoadBalancerClientFactory loadBalancerClientFactory;
-
- public BlockingLoadBalancerClient(LoadBalancerClientFactory loadBalancerClientFactory) {
- this.loadBalancerClientFactory = loadBalancerClientFactory;
- }
-
- public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
- //根据servicerId选择服务后端服务节点
- ServiceInstance serviceInstance = this.choose(serviceId);
- if (serviceInstance == null) {
- throw new IllegalStateException("No instances available for " + serviceId);
- } else {
- return this.execute(serviceId, serviceInstance, request);
- }
- }
-
- public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
- try {
- return request.apply(serviceInstance);
- } catch (IOException var5) {
- throw var5;
- } catch (Exception var6) {
- ReflectionUtils.rethrowRuntimeException(var6);
- return null;
- }
- }
- //重构URI
- public URI reconstructURI(ServiceInstance serviceInstance, URI original) {
- return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
- }
-
- //根据serviceId选择后端服务节点
- public ServiceInstance choose(String serviceId) {
- ReactiveLoadBalancer<ServiceInstance> loadBalancer = this.loadBalancerClientFactory.getInstance(serviceId);
- if (loadBalancer == null) {
- return null;
- } else {
- Response<ServiceInstance> loadBalancerResponse = (Response)Mono.from(loadBalancer.choose()).block();
- return loadBalancerResponse == null ? null : (ServiceInstance)loadBalancerResponse.getServer();
- }
- }
- }
- public class RibbonLoadBalancerClient implements LoadBalancerClient {
- private SpringClientFactory clientFactory;
-
- public RibbonLoadBalancerClient(SpringClientFactory clientFactory) {
- this.clientFactory = clientFactory;
- }
-
- public URI reconstructURI(ServiceInstance instance, URI original) {
- Assert.notNull(instance, "instance can not be null");
- String serviceId = instance.getServiceId();
- RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);
- URI uri;
- Server server;
- if (instance instanceof RibbonLoadBalancerClient.RibbonServer) {
- RibbonLoadBalancerClient.RibbonServer ribbonServer = (RibbonLoadBalancerClient.RibbonServer)instance;
- server = ribbonServer.getServer();
- uri = RibbonUtils.updateToSecureConnectionIfNeeded(original, ribbonServer);
- } else {
- server = new Server(instance.getScheme(), instance.getHost(), instance.getPort());
- IClientConfig clientConfig = this.clientFactory.getClientConfig(serviceId);
- ServerIntrospector serverIntrospector = this.serverIntrospector(serviceId);
- uri = RibbonUtils.updateToSecureConnectionIfNeeded(original, clientConfig, serverIntrospector, server);
- }
-
- return context.reconstructURIWithServer(server, uri);
- }
-
- public ServiceInstance choose(String serviceId) {
- return this.choose(serviceId, (Object)null);
- }
-
- public ServiceInstance choose(String serviceId, Object hint) {
- Server server = this.getServer(this.getLoadBalancer(serviceId), hint);
- return server == null ? null : new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
- }
-
- public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
- return this.execute(serviceId, (LoadBalancerRequest)request, (Object)null);
- }
-
- public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
- ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
- Server server = this.getServer(loadBalancer, hint);
- if (server == null) {
- throw new IllegalStateException("No instances available for " + serviceId);
- } else {
- RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
- return this.execute(serviceId, (ServiceInstance)ribbonServer, (LoadBalancerRequest)request);
- }
- }
-
- public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
- Server server = null;
- if (serviceInstance instanceof RibbonLoadBalancerClient.RibbonServer) {
- server = ((RibbonLoadBalancerClient.RibbonServer)serviceInstance).getServer();
- }
-
- if (server == null) {
- throw new IllegalStateException("No instances available for " + serviceId);
- } else {
- RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);
- RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
-
- try {
- //调用LoadBalancerRequest的apply方法
- T returnVal = request.apply(serviceInstance);
- statsRecorder.recordStats(returnVal);
- return returnVal;
- } catch (IOException var8) {
- statsRecorder.recordStats(var8);
- throw var8;
- } catch (Exception var9) {
- statsRecorder.recordStats(var9);
- ReflectionUtils.rethrowRuntimeException(var9);
- return null;
- }
- }
- }
-
- protected Server getServer(String serviceId) {
- return this.getServer(this.getLoadBalancer(serviceId), (Object)null);
- }
-
- protected Server getServer(ILoadBalancer loadBalancer) {
- return this.getServer(loadBalancer, (Object)null);
- }
-
- protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
- return loadBalancer == null ? null : loadBalancer.chooseServer(hint != null ? hint : "default");
- }
-
- protected ILoadBalancer getLoadBalancer(String serviceId) {
- return this.clientFactory.getLoadBalancer(serviceId);
- }
-
- public static class RibbonServer implements ServiceInstance {
- private final String serviceId;
- private final Server server;
- private final boolean secure;
- private Map<String, String> metadata;
-
- public RibbonServer(String serviceId, Server server) {
- this(serviceId, server, false, Collections.emptyMap());
- }
-
- public RibbonServer(String serviceId, Server server, boolean secure, Map<String, String> metadata) {
- this.serviceId = serviceId;
- this.server = server;
- this.secure = secure;
- this.metadata = metadata;
- }
-
- ......省略......
- }
- }
参考
https://blog.csdn.net/weixin_42189048/article/details/117781378
http://fangjian0423.github.io/2018/10/02/spring-cloud-commons-analysis/
https://blog.csdn.net/weixin_50518271/article/details/111449560
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。