当前位置:   article > 正文

Dubbo服务引用解析_dubboreference

dubboreference

上一次我们讲解了Dubbo的服务暴露, 这次我们来看一下Dubbo是如何调用服务的


本文会根据dubbo的架构图进行解析

目录

客户端启动流程

Dubbo服务调用


客户端启动流程

大家都知道,客户端在调用服务时只有接口没有相对应的实现类,所以呢,我们需要在调用服务时需要先生成一个代理,再通过代理去执行服务。

        

 可以看到我们通过从ioc中获取DemoService的Bean,那其实本身客户端中并没有DemoService的bean,这个时候我们可以看到配置文件中进行了这样的一个配置使用了reference标签,在dubbo中reference标签配置的服务会被另一个Bean生成代理,那就是ReferenceBean,它会通过getObject()方法将生成的代理对象放入ioc中,所以点进去

<dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService"/>

 这里会对接口的代理对象进行检查,如果没有则会初始化一个

  1. public synchronized T get() {
  2. checkAndUpdateSubConfigs();
  3. if (destroyed) {
  4. throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
  5. }
  6. // 检测 ref 是否为空,为空则通过 init 方法创建
  7. if (ref == null) {
  8. // init 方法主要用于处理配置,以及调用 createProxy 生成代理类
  9. init();
  10. }
  11. return ref;
  12. }

点进init()方法中,可以看到核心的方法就是创建代理对象的(能看到我们本身配置的一些信息,接口路径等等)

接着就是对提供者的数量的一个判断,是有多个还是单个,然后生成invoker,最后会根据invoker返回生成的代理,重点则在于如何生成invoker,这里的protocol是自适应的,所以会根据url中的信息选择对应的实现类,可以看到协议中使用的是registry,于是来到RegistryProtocol

  1. // 单个注册中心或服务提供者(服务直连,下同)
  2. if (urls.size() == 1) {
  3. // 调用 RegistryProtocol 的 refer 构建 Invoker 实例
  4. invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0)); // registry://192.168.200.129:2181/org.apache.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&pid=14276&qos.port=33333&refer=application%3Ddemo-consumer%26check%3Dfalse%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.demo.DemoService%26lazy%3Dfalse%26methods%3DsayHello%26pid%3D14276%26qos.port%3D33333%26register.ip%3D192.168.200.10%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1622603489641&registry=zookeeper&timestamp=1622603494381
  5. } else {
  6. // 多个注册中心或多个服务提供者,或者两者混合
  7. List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
  8. URL registryURL = null;
  9. // 获取所有的 Invoker 到 invokers中
  10. for (URL url : urls) {
  11. // 通过 ref_protocol 调用 refer 构建 Invoker,refprotocol 会在运行时
  12. // 根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法
  13. invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
  14. if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
  15. registryURL = url; // use last registry url
  16. }
  17. }
  18. if (registryURL != null) { // registry url is available
  19. // use RegistryAwareCluster only when register's CLUSTER is available
  20. // 如果注册中心链接不为空,则将使用 AvailableCluster
  21. URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
  22. // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
  23. // 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并 Cluster扩展点默认找的是FailoverCluster
  24. invoker = CLUSTER.join(new StaticDirectory(u, invokers));
  25. } else { // not a registry url, must be direct invoke.
  26. invoker = CLUSTER.join(new StaticDirectory(invokers));
  27. }
  28. }
  29. // invoker 可用性检查
  30. if (shouldCheck() && !invoker.isAvailable()) {
  31. throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
  32. }
  33. if (logger.isInfoEnabled()) {
  34. logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
  35. }
  36. /**
  37. * @since 2.7.0
  38. * ServiceData Store
  39. */
  40. MetadataReportService metadataReportService = null;
  41. if ((metadataReportService = getMetadataReportService()) != null) {
  42. URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
  43. metadataReportService.publishConsumer(consumerURL);
  44. }
  45. // create service proxy 根据Invoker 真正生成代理 PROXY_FACTORY=ProxyFactory$Adaptive@xxxx
  46. return (T) PROXY_FACTORY.getProxy(invoker);

来到RegistryProtocol中的refer(),这里根据配置的信息获取了注册中心并连接上了zk,随后将包含集群容错以及负载均衡的cluster、注册中心,接口类型以及url传入doRefer()

在doRefer中,先是将自己注册到注册中心去,随后进行了服务的订阅(重点)在里面为每个提供者生成一个invoker,最后将多个invoker包装成一个

  1. public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
  2. url = URLBuilder.from(url)
  3. .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
  4. .removeParameter(REGISTRY_KEY)
  5. .build();
  6. //获取注册中心实例 ZookeeperRegistry url=zookeeper://192.168.200.129:2181/org.apache.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=14276&qos.port=33333&timestamp=1622603494381
  7. Registry registry = registryFactory.getRegistry(url);// 获取实例并使用curator连接上了zk
  8. if (RegistryService.class.equals(type)) {
  9. return proxyFactory.getInvoker((T) registry, type, url);
  10. }
  11. // group="a,b" or group="*" 将 url 查询字符串转为 Map
  12. Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
  13. String group = qs.get(GROUP_KEY); // // 获取 group 配置
  14. if (group != null && group.length() > 0) {
  15. if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
  16. // 通过 SPI 加载 MergeableCluster 实例,并调用 doRefer 继续执行服务引用逻辑
  17. return doRefer(getMergeableCluster(), registry, type, url);
  18. }
  19. }
  20. return doRefer(cluster, registry, type, url);
  21. }
  22. private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
  23. // 创建 RegistryDirectory 实例
  24. RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
  25. // 设置注册中心和协议
  26. directory.setRegistry(registry);
  27. directory.setProtocol(protocol);
  28. // all attributes of REFER_KEY
  29. Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
  30. // 生成服务消费者链接 consumer://192.168.200.10/org.apache.dubbo.demo.DemoService?application=demo-consumer&check=false&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello&pid=12468&qos.port=33333&side=consumer&sticky=false&timestamp=1622604057385
  31. URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
  32. // 注册服务消费者,在 consumers 目录下创建新节点
  33. if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
  34. directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
  35. registry.register(directory.getRegisteredConsumerUrl());
  36. }
  37. directory.buildRouterChain(subscribeUrl);
  38. /**
  39. * 订阅 providers、configurators、routers 等节点数据
  40. * 拿到 providers 下的提供者信息后,会创建客户端(nettyclient),并连接服务端,重点 DubboProtocol 中的 protocolBindingRefer
  41. * ********所以重点跟这个方法的调用****
  42. */
  43. directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
  44. PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
  45. Invoker invoker = cluster.join(directory); // 一个注册中心下可能有多个服务提供者,每个提供者都对应一个invoker,这里将多个invoker合并为一个
  46. ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
  47. return invoker;
  48. }

进入到订阅这个部分,订阅本身就是一个拓展点,由于我们使用的是zk,所以最终会到ZookeeperRegistry中的doSubscribe,在进行了一系列的zk操作之后,会通知所有的客户端拉取的提供者节点将它们封装成invoker

  1. public synchronized void notify(List<URL> urls) {
  2. Map<String, List<URL>> categoryUrls = urls.stream()
  3. .filter(Objects::nonNull)
  4. .filter(this::isValidCategory)
  5. .filter(this::isNotCompatibleFor26x)
  6. .collect(Collectors.groupingBy(url -> {
  7. if (UrlUtils.isConfigurator(url)) {
  8. return CONFIGURATORS_CATEGORY;
  9. } else if (UrlUtils.isRoute(url)) {
  10. return ROUTERS_CATEGORY;
  11. } else if (UrlUtils.isProvider(url)) {
  12. return PROVIDERS_CATEGORY;
  13. }
  14. return "";
  15. }));
  16. List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
  17. this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
  18. List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
  19. toRouters(routerURLs).ifPresent(this::addRouters);
  20. // providers 拿到 providers 对应的URL
  21. List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
  22. /**
  23. * 将 /providers 下每个 provider url 封装成 invoker ******重点*****
  24. */
  25. refreshOverrideAndInvoker(providerURLs);
  26. }

这里会将提供者封装成invoker并与其建立连接

这里根据url进行创建,协议本身是拓展点,默认使用的是DubboProtocol去生辰invoker,在这个invoker的外层套了一层InvokerDelegate,我们重点看DubboProtocol中的内容

 在创建DubboInvoker时,可以看到构造函数中还需要一个客户端,这个会先去看看这个提供者本地是否已有连接,有的话直接拿来用,没有则需要去创建一个

  1. @Override
  2. public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
  3. optimizeSerialization(url);
  4. /**
  5. * create rpc invoker. 创建 DubboInvoker
  6. * 重点看 getClients(url) 根据provider url 获取客户端,
  7. *
  8. * url=dubbo://192.168.200.10:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&bean.name=org.apache.dubbo.demo.DemoService&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello&pid=20192&qos.port=33333&register=true&register.ip=192.168.200.10&release=&remote.application=demo-provider&side=consumer&sticky=false&timestamp=1622964228064
  9. */
  10. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
  11. invokers.add(invoker);
  12. return invoker;
  13. }
  14. private ExchangeClient[] getClients(URL url) {
  15. // whether to share connection 是否共享连接
  16. boolean useShareConnect = false;
  17. int connections = url.getParameter(CONNECTIONS_KEY, 0);// 获取连接数,默认为0,表示未配置
  18. List<ReferenceCountExchangeClient> shareClients = null;
  19. // if not configured, connection is shared, otherwise, one connection for one service
  20. if (connections == 0) {
  21. useShareConnect = true;
  22. /**
  23. * The xml configuration should have a higher priority than properties. xml配置比properties优先级更高
  24. * 获取<dubbo:consumer/>中的shareconnections属性值,表示共享连接的数量
  25. */
  26. String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
  27. connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
  28. DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
  29. shareClients = getSharedClient(url, connections); // 获取指定数量的共享连接(第一次获取还没有会创建)
  30. }
  31. // 此时的connections代表的是连接数,已经不区分是共享还是新创建的连接了
  32. ExchangeClient[] clients = new ExchangeClient[connections];
  33. for (int i = 0; i < clients.length; i++) {
  34. // 若是共享的,直接从shareClients 里取
  35. if (useShareConnect) {
  36. // 获取共享客户端
  37. clients[i] = shareClients.get(i);
  38. } else {
  39. // 若不是共享的,则新建连接 初始化新的客户端 看这块的实现即可 重点关注的地方
  40. clients[i] = initClient(url);
  41. }
  42. }
  43. return clients;
  44. }

简单看一下先判断是否有配置懒加载,没有则创建普通的,看到Exchanger可能有朋友会记得,在服务端暴露时也是有个Exchanger,这个本身就是一个拓展点,默认会使用HeaderExchanger

  1. private ExchangeClient initClient(URL url) {
  2. // client type setting. 获取客户端类型,默认为 netty
  3. String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
  4. // 添加编解码和到 url 中
  5. url = url.addParameter(CODEC_KEY, DubboCodec.NAME); // dubbo编解码器,重要
  6. // enable heartbeat by default 添加心跳包参数到 url 中
  7. url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
  8. // BIO is not allowed since it has severe performance issue.
  9. if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
  10. throw new RpcException("Unsupported client type: " + str + "," +
  11. " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
  12. }
  13. ExchangeClient client;
  14. try {
  15. // connection should be lazy 获取 lazy 配置,并根据配置值决定创建的客户端类型
  16. if (url.getParameter(LAZY_CONNECT_KEY, false)) {
  17. // 创建懒加载 ExchangeClient 实例
  18. client = new LazyConnectExchangeClient(url, requestHandler);
  19. } else {
  20. /**
  21. * 创建普通 ExchangeClient 实例 分析这块 创建exchangeClient,会绑定一个 Netty Client
  22. *
  23. * ExchangeHandler requestHandler = new ExchangeHandlerAdapter 在 DubboProtocol 中作了内部实现
  24. */
  25. client = Exchangers.connect(url, requestHandler);
  26. }
  27. } catch (RemotingException e) {
  28. throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
  29. }
  30. return client;
  31. }

在HeaderExchanger中会进行连接,默认会采用netty进行连接,后面netty之前讲过,就不在重复了

  1. public class HeaderExchanger implements Exchanger {
  2. public static final String NAME = "header";
  3. @Override
  4. public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
  5. /**
  6. * 这里包含了多个调用,分别如下:
  7. * 1. 创建 HeaderExchangeHandler 对象 参数handler= DubboProtocol.requestHandler 很重要,涉及到发送请求时的处理
  8. * 2. 创建 DecodeHandler 对象
  9. * 3. 通过 Transporters 构建 Client 实例
  10. * 4. 创建 HeaderExchangeClient 对象
  11. *
  12. * 重点看:Transporters.connect
  13. */
  14. return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
  15. }
  16. @Override
  17. public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
  18. // 创建 HeaderExchangeServer 实例,该方法包含了多个逻辑,分别如下:
  19. // 1. new HeaderExchangeHandler(handler) 负载处理请求回写结果
  20. // 2. new DecodeHandler(new HeaderExchangeHandler(handler)) 对请求数据和响应结果进行解码操作,如何交由后续流程继续处理
  21. // 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))) 重点看这个bind方法,
  22. return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
  23. }
  24. }

到这里呢,我们就能拿到封装完的invoker,那么再回到根据invoker生成代理的地方(上面是生成invoker,下面是根据invoker生成代理),proxy_factory本身是拓展点,默认会使用JavassistProxyFactory

 这里值得注意点的是new 了一个InvokerInvocationHandler并将invoker注入进去,这个InvokerInvocationHandler又实现了jdk的InvocationHandler,所以在执行相对应的方法时会进行拦截

 

 

到这里服务端的连接以及注册就讲完了,接下来就是服务的调用了


Dubbo服务调用

上面刚刚讲了客户端的代理的生成,那么接下来生成的代理的方法进行调用时,不就是Dubbo的调用链嘛,我们直接来看代码

 可以看到demoService就是刚刚生成的InvokerInvocationHandler,当执行方法时,就是调用了其中的invoke方法

先是排除了Object类中的方法,再是派出了toString等方法,然后将我们的方法和参数封装到RpcInvocation中

在MockClusterInvoker判断了是否为mock调用,不是的话会直接传到AbstractClusterInvoker中,这里主要进行了三个操作,先是从注册目录中获取服务端的Invoker的list进行路由,然后初始化了负载均衡策略,随后传入了子类的FailoverClusterInvoker中

 FailoverClusterInvoker中会有一个容错机制,它的重试次数会是默认次数(默认为2)+1,随后开始循环,先是通过负载均衡选择出相应的Invoker,然后开始调用

 接着会走到DubboInvoker中,这里值得注意的是会创建一个Future去订阅调用的结果

 然后会在HeaderExchangeChannel中调用客户端的netty进行发送,其实看到HeaderExchangeChannel这个名字就会觉得很眼熟了,因为无论是在服务端还是客户端都会有HeaderExchange

 随后就会在dubbo封装的NettyChannel中将消息发送出去,不过在真正发送消息前,还得进行二次编码以及一次编码 ,编码完成之后请求就会通过channel传输到服务端

接着我们来看服务端的代码,服务端接收到请求的第一件事就是解码,所以我们直接看到NettyServer中的解码,关于解码先放一张Dubbo协议的设计,协议头会有16个字节的位置,存放的信息就不一一讲解了

 

可以看到采用了一个do while的形式,先会读取协议头的16个字节,如果没有读到16个,会设置一个NEED_MORE_INPUT的状态并抛出,等待下一次读取

 在对协议头读取完成后会对其进行一些解析以及判断,如拿到请求id,判断是否为心跳

 最后会判断为请求,并返回给业务处理器

 在业务处理中能看到我们请求中的一些信息已经被解析成对象,比如我们的方法名

 在调用会经过HeartbeatHandler中判断是否为心跳

 接着就到了重点,dubbo是可以设置请求派发的,什么意思呢,就是在处理任务时可以根据任务类型选择在netty的io线程中执行还是在工作线程池中执行,我这里没有配置,所以默认是在工作线程中执行,所以来到了AllChannelHandler

 

 可以看到在异步线程中各种事件相对应的处理

        

 看到received方法中又进行了解码,因为我们之前没有对传入的参数进行解码,所以这里得解码一下 

 在这里先获取了请求id,毕竟得一一对应,调用之后会得到一个future,当future有结果时,就会通过信道将响应结果写回客户端

 具体的调用流程先是根据信息获取invoker,然后通过invoker进行调用,由于invoker就是代理,所以在调用时会执行服务端的实现类,随后将结果返回


以上就是对dubbo的调用流程的简介

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/165839
推荐阅读
相关标签
  

闽ICP备14008679号