赞
踩
上一次我们讲解了Dubbo的服务暴露, 这次我们来看一下Dubbo是如何调用服务的
本文会根据dubbo的架构图进行解析
目录
大家都知道,客户端在调用服务时只有接口没有相对应的实现类,所以呢,我们需要在调用服务时需要先生成一个代理,再通过代理去执行服务。
可以看到我们通过从ioc中获取DemoService的Bean,那其实本身客户端中并没有DemoService的bean,这个时候我们可以看到配置文件中进行了这样的一个配置使用了reference标签,在dubbo中reference标签配置的服务会被另一个Bean生成代理,那就是ReferenceBean,它会通过getObject()方法将生成的代理对象放入ioc中,所以点进去
<dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService"/>
这里会对接口的代理对象进行检查,如果没有则会初始化一个
- public synchronized T get() {
- checkAndUpdateSubConfigs();
-
- if (destroyed) {
- throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
- }
- // 检测 ref 是否为空,为空则通过 init 方法创建
- if (ref == null) {
- // init 方法主要用于处理配置,以及调用 createProxy 生成代理类
- init();
- }
- return ref;
- }
点进init()方法中,可以看到核心的方法就是创建代理对象的(能看到我们本身配置的一些信息,接口路径等等)
接着就是对提供者的数量的一个判断,是有多个还是单个,然后生成invoker,最后会根据invoker返回生成的代理,重点则在于如何生成invoker,这里的protocol是自适应的,所以会根据url中的信息选择对应的实现类,可以看到协议中使用的是registry,于是来到RegistryProtocol
- // 单个注册中心或服务提供者(服务直连,下同)
- if (urls.size() == 1) {
- // 调用 RegistryProtocol 的 refer 构建 Invoker 实例
- invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0)); // registry://192.168.200.129:2181/org.apache.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&pid=14276&qos.port=33333&refer=application%3Ddemo-consumer%26check%3Dfalse%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.demo.DemoService%26lazy%3Dfalse%26methods%3DsayHello%26pid%3D14276%26qos.port%3D33333%26register.ip%3D192.168.200.10%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1622603489641®istry=zookeeper×tamp=1622603494381
- } else {
- // 多个注册中心或多个服务提供者,或者两者混合
- List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
- URL registryURL = null;
- // 获取所有的 Invoker 到 invokers中
- for (URL url : urls) {
- // 通过 ref_protocol 调用 refer 构建 Invoker,refprotocol 会在运行时
- // 根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法
- invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
- if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
- registryURL = url; // use last registry url
- }
- }
- if (registryURL != null) { // registry url is available
- // use RegistryAwareCluster only when register's CLUSTER is available
- // 如果注册中心链接不为空,则将使用 AvailableCluster
- URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
- // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
- // 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并 Cluster扩展点默认找的是FailoverCluster
- invoker = CLUSTER.join(new StaticDirectory(u, invokers));
- } else { // not a registry url, must be direct invoke.
- invoker = CLUSTER.join(new StaticDirectory(invokers));
- }
- }
-
- // invoker 可用性检查
- if (shouldCheck() && !invoker.isAvailable()) {
- throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
- }
- if (logger.isInfoEnabled()) {
- logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
- }
- /**
- * @since 2.7.0
- * ServiceData Store
- */
- MetadataReportService metadataReportService = null;
- if ((metadataReportService = getMetadataReportService()) != null) {
- URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
- metadataReportService.publishConsumer(consumerURL);
- }
- // create service proxy 根据Invoker 真正生成代理 PROXY_FACTORY=ProxyFactory$Adaptive@xxxx
- return (T) PROXY_FACTORY.getProxy(invoker);
来到RegistryProtocol中的refer(),这里根据配置的信息获取了注册中心并连接上了zk,随后将包含集群容错以及负载均衡的cluster、注册中心,接口类型以及url传入doRefer()
在doRefer中,先是将自己注册到注册中心去,随后进行了服务的订阅(重点)在里面为每个提供者生成一个invoker,最后将多个invoker包装成一个
- public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
- url = URLBuilder.from(url)
- .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
- .removeParameter(REGISTRY_KEY)
- .build();
- //获取注册中心实例 ZookeeperRegistry url=zookeeper://192.168.200.129:2181/org.apache.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=14276&qos.port=33333×tamp=1622603494381
- Registry registry = registryFactory.getRegistry(url);// 获取实例并使用curator连接上了zk
- if (RegistryService.class.equals(type)) {
- return proxyFactory.getInvoker((T) registry, type, url);
- }
-
- // group="a,b" or group="*" 将 url 查询字符串转为 Map
- Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
- String group = qs.get(GROUP_KEY); // // 获取 group 配置
- if (group != null && group.length() > 0) {
- if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
- // 通过 SPI 加载 MergeableCluster 实例,并调用 doRefer 继续执行服务引用逻辑
- return doRefer(getMergeableCluster(), registry, type, url);
- }
- }
- return doRefer(cluster, registry, type, url);
- }
-
-
- private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
- // 创建 RegistryDirectory 实例
- RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
- // 设置注册中心和协议
- directory.setRegistry(registry);
- directory.setProtocol(protocol);
- // all attributes of REFER_KEY
- Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
- // 生成服务消费者链接 consumer://192.168.200.10/org.apache.dubbo.demo.DemoService?application=demo-consumer&check=false&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello&pid=12468&qos.port=33333&side=consumer&sticky=false×tamp=1622604057385
- URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
- // 注册服务消费者,在 consumers 目录下创建新节点
- if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
- directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
- registry.register(directory.getRegisteredConsumerUrl());
- }
- directory.buildRouterChain(subscribeUrl);
- /**
- * 订阅 providers、configurators、routers 等节点数据
- * 拿到 providers 下的提供者信息后,会创建客户端(nettyclient),并连接服务端,重点 DubboProtocol 中的 protocolBindingRefer
- * ********所以重点跟这个方法的调用****
- */
- directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
- PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
-
- Invoker invoker = cluster.join(directory); // 一个注册中心下可能有多个服务提供者,每个提供者都对应一个invoker,这里将多个invoker合并为一个
- ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
- return invoker;
- }
进入到订阅这个部分,订阅本身就是一个拓展点,由于我们使用的是zk,所以最终会到ZookeeperRegistry中的doSubscribe,在进行了一系列的zk操作之后,会通知所有的客户端拉取的提供者节点将它们封装成invoker
- public synchronized void notify(List<URL> urls) {
- Map<String, List<URL>> categoryUrls = urls.stream()
- .filter(Objects::nonNull)
- .filter(this::isValidCategory)
- .filter(this::isNotCompatibleFor26x)
- .collect(Collectors.groupingBy(url -> {
- if (UrlUtils.isConfigurator(url)) {
- return CONFIGURATORS_CATEGORY;
- } else if (UrlUtils.isRoute(url)) {
- return ROUTERS_CATEGORY;
- } else if (UrlUtils.isProvider(url)) {
- return PROVIDERS_CATEGORY;
- }
- return "";
- }));
-
- List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
- this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
-
- List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
- toRouters(routerURLs).ifPresent(this::addRouters);
-
- // providers 拿到 providers 对应的URL
- List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
- /**
- * 将 /providers 下每个 provider url 封装成 invoker ******重点*****
- */
- refreshOverrideAndInvoker(providerURLs);
- }
这里会将提供者封装成invoker并与其建立连接
这里根据url进行创建,协议本身是拓展点,默认使用的是DubboProtocol去生辰invoker,在这个invoker的外层套了一层InvokerDelegate,我们重点看DubboProtocol中的内容
在创建DubboInvoker时,可以看到构造函数中还需要一个客户端,这个会先去看看这个提供者本地是否已有连接,有的话直接拿来用,没有则需要去创建一个
- @Override
- public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
- optimizeSerialization(url);
-
- /**
- * create rpc invoker. 创建 DubboInvoker
- * 重点看 getClients(url) 根据provider url 获取客户端,
- *
- * url=dubbo://192.168.200.10:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&bean.name=org.apache.dubbo.demo.DemoService&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello&pid=20192&qos.port=33333®ister=true®ister.ip=192.168.200.10&release=&remote.application=demo-provider&side=consumer&sticky=false×tamp=1622964228064
- */
- DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
- invokers.add(invoker);
-
- return invoker;
- }
-
- private ExchangeClient[] getClients(URL url) {
- // whether to share connection 是否共享连接
- boolean useShareConnect = false;
-
- int connections = url.getParameter(CONNECTIONS_KEY, 0);// 获取连接数,默认为0,表示未配置
- List<ReferenceCountExchangeClient> shareClients = null;
- // if not configured, connection is shared, otherwise, one connection for one service
- if (connections == 0) {
- useShareConnect = true;
-
- /**
- * The xml configuration should have a higher priority than properties. xml配置比properties优先级更高
- * 获取<dubbo:consumer/>中的shareconnections属性值,表示共享连接的数量
- */
- String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
- connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
- DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
-
- shareClients = getSharedClient(url, connections); // 获取指定数量的共享连接(第一次获取还没有会创建)
- }
- // 此时的connections代表的是连接数,已经不区分是共享还是新创建的连接了
- ExchangeClient[] clients = new ExchangeClient[connections];
- for (int i = 0; i < clients.length; i++) {
- // 若是共享的,直接从shareClients 里取
- if (useShareConnect) {
- // 获取共享客户端
- clients[i] = shareClients.get(i);
- } else {
- // 若不是共享的,则新建连接 初始化新的客户端 看这块的实现即可 重点关注的地方
- clients[i] = initClient(url);
- }
- }
-
- return clients;
- }
简单看一下先判断是否有配置懒加载,没有则创建普通的,看到Exchanger可能有朋友会记得,在服务端暴露时也是有个Exchanger,这个本身就是一个拓展点,默认会使用HeaderExchanger
- private ExchangeClient initClient(URL url) {
-
- // client type setting. 获取客户端类型,默认为 netty
- String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
-
- // 添加编解码和到 url 中
- url = url.addParameter(CODEC_KEY, DubboCodec.NAME); // dubbo编解码器,重要
- // enable heartbeat by default 添加心跳包参数到 url 中
- url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
-
- // BIO is not allowed since it has severe performance issue.
- if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
- throw new RpcException("Unsupported client type: " + str + "," +
- " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
- }
-
- ExchangeClient client;
- try {
- // connection should be lazy 获取 lazy 配置,并根据配置值决定创建的客户端类型
- if (url.getParameter(LAZY_CONNECT_KEY, false)) {
- // 创建懒加载 ExchangeClient 实例
- client = new LazyConnectExchangeClient(url, requestHandler);
- } else {
- /**
- * 创建普通 ExchangeClient 实例 分析这块 创建exchangeClient,会绑定一个 Netty Client
- *
- * ExchangeHandler requestHandler = new ExchangeHandlerAdapter 在 DubboProtocol 中作了内部实现
- */
- client = Exchangers.connect(url, requestHandler);
- }
-
- } catch (RemotingException e) {
- throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
- }
-
- return client;
- }
在HeaderExchanger中会进行连接,默认会采用netty进行连接,后面netty之前讲过,就不在重复了
- public class HeaderExchanger implements Exchanger {
-
- public static final String NAME = "header";
-
- @Override
- public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
- /**
- * 这里包含了多个调用,分别如下:
- * 1. 创建 HeaderExchangeHandler 对象 参数handler= DubboProtocol.requestHandler 很重要,涉及到发送请求时的处理
- * 2. 创建 DecodeHandler 对象
- * 3. 通过 Transporters 构建 Client 实例
- * 4. 创建 HeaderExchangeClient 对象
- *
- * 重点看:Transporters.connect
- */
- return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
- }
-
- @Override
- public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
- // 创建 HeaderExchangeServer 实例,该方法包含了多个逻辑,分别如下:
- // 1. new HeaderExchangeHandler(handler) 负载处理请求回写结果
- // 2. new DecodeHandler(new HeaderExchangeHandler(handler)) 对请求数据和响应结果进行解码操作,如何交由后续流程继续处理
- // 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))) 重点看这个bind方法,
- return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
- }
-
- }
到这里呢,我们就能拿到封装完的invoker,那么再回到根据invoker生成代理的地方(上面是生成invoker,下面是根据invoker生成代理),proxy_factory本身是拓展点,默认会使用JavassistProxyFactory
这里值得注意点的是new 了一个InvokerInvocationHandler并将invoker注入进去,这个InvokerInvocationHandler又实现了jdk的InvocationHandler,所以在执行相对应的方法时会进行拦截
到这里服务端的连接以及注册就讲完了,接下来就是服务的调用了
上面刚刚讲了客户端的代理的生成,那么接下来生成的代理的方法进行调用时,不就是Dubbo的调用链嘛,我们直接来看代码
可以看到demoService就是刚刚生成的InvokerInvocationHandler,当执行方法时,就是调用了其中的invoke方法
先是排除了Object类中的方法,再是派出了toString等方法,然后将我们的方法和参数封装到RpcInvocation中
在MockClusterInvoker判断了是否为mock调用,不是的话会直接传到AbstractClusterInvoker中,这里主要进行了三个操作,先是从注册目录中获取服务端的Invoker的list进行路由,然后初始化了负载均衡策略,随后传入了子类的FailoverClusterInvoker中
FailoverClusterInvoker中会有一个容错机制,它的重试次数会是默认次数(默认为2)+1,随后开始循环,先是通过负载均衡选择出相应的Invoker,然后开始调用
接着会走到DubboInvoker中,这里值得注意的是会创建一个Future去订阅调用的结果
然后会在HeaderExchangeChannel中调用客户端的netty进行发送,其实看到HeaderExchangeChannel这个名字就会觉得很眼熟了,因为无论是在服务端还是客户端都会有HeaderExchange
随后就会在dubbo封装的NettyChannel中将消息发送出去,不过在真正发送消息前,还得进行二次编码以及一次编码 ,编码完成之后请求就会通过channel传输到服务端
接着我们来看服务端的代码,服务端接收到请求的第一件事就是解码,所以我们直接看到NettyServer中的解码,关于解码先放一张Dubbo协议的设计,协议头会有16个字节的位置,存放的信息就不一一讲解了
可以看到采用了一个do while的形式,先会读取协议头的16个字节,如果没有读到16个,会设置一个NEED_MORE_INPUT的状态并抛出,等待下一次读取
在对协议头读取完成后会对其进行一些解析以及判断,如拿到请求id,判断是否为心跳
最后会判断为请求,并返回给业务处理器
在业务处理中能看到我们请求中的一些信息已经被解析成对象,比如我们的方法名
在调用会经过HeartbeatHandler中判断是否为心跳
接着就到了重点,dubbo是可以设置请求派发的,什么意思呢,就是在处理任务时可以根据任务类型选择在netty的io线程中执行还是在工作线程池中执行,我这里没有配置,所以默认是在工作线程中执行,所以来到了AllChannelHandler
可以看到在异步线程中各种事件相对应的处理
看到received方法中又进行了解码,因为我们之前没有对传入的参数进行解码,所以这里得解码一下
在这里先获取了请求id,毕竟得一一对应,调用之后会得到一个future,当future有结果时,就会通过信道将响应结果写回客户端
具体的调用流程先是根据信息获取invoker,然后通过invoker进行调用,由于invoker就是代理,所以在调用时会执行服务端的实现类,随后将结果返回
以上就是对dubbo的调用流程的简介
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。