当前位置:   article > 正文

Dubbo框架详解(附源码)

dubbo

写在前面:借鉴@加点代码调调味相关内容,详见大佬思否主页 https://segmentfault.com/u/crazyhzm

Dubbo是阿里巴巴公司开源的一个高性能优秀的开源分布式服务框架,使得应用可通过高性能的 RPC 实现服务的输出和输入功能,可以和 Spring框架无缝集成。

1. 总体架构

图片: https://uploader.shimo.im/f/krfb6VUnSo3cGqnu.png!thumbnail?accessToken=eyJhbGciOiJIUzI1NiIsImtpZCI6ImRlZmF1bHQiLCJ0eXAiOiJKV1QifQ.eyJleHAiOjE2ODQ1OTE5MTYsImZpbGVHVUlEIjoiRWUzMk1ETVdYV1VZblZBMiIsImlhdCI6MTY4NDU5MTYxNiwiaXNzIjoidXBsb2FkZXJfYWNjZXNzX3Jlc291cmNlIiwidXNlcklkIjo4MjU1NDcyNX0.PWWBCYcaf7nMj-XFnUZnozD4srjLecpfRwYAgpGQE7g

节点角色说明
Provider 暴露服务的服务提供方

Consumer 调用远程服务的服务消费方。

Registry 服务注册与发现的注册中心

Monitor 统计服务的调用次数和调用时间的监控中心

Container 服务运行容器

注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心。

2. 负载均衡机制

目前 Dubbo 内置了如下负载均衡算法,用户可直接配置使用:

Random
随机,按权重设置随机概率。
在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。
缺点:存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

RoundRobin
轮循,按公约后的权重设置轮循比率。
存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

LeastActive
加权最少活跃调用优先,活跃数越低,越优先调用,相同活跃数的进行加权随机。活跃数指调用前后计数差(针对特定提供者:请求发送数 - 响应返回数),表示特定提供者的任务堆积量,活跃数越低,代表该提供者处理能力越强。
使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大;相对的,处理能力越强的节点,处理更多的请求。

ShortestResponse
加权最短响应优先,在最近一个滑动窗口中,响应时间越短,越优先调用。相同响应时间的进行加权随机。
使得响应时间越快的提供者,处理更多的请求。
缺点:可能会造成流量过于集中于高性能节点的问题。
这里的响应时间 = 某个提供者在窗口时间内的平均响应时间,窗口时间默认是 30s。

ConsistentHash
一致性 Hash,相同参数的请求总是发到同一提供者。
当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。

3. Dubbo如何实现服务的注册与发现

Dubbo 注册与发现的对象是 Dubbo 服务(Java 接口),而其载体为注册元信息,即 Dubbo URL如:dubbo://192.168.1.2:20880/com.foo.BarService?version=1.0.0&group=default,通常包含必须信息,如服务提供方 IP 和端口、 Java 接口,可选包含版本(version)和分组(group)等。服务 URL 所包含的信息能够唯一界别服务提供方的进程。

Dubbo 服务的 Java 接口(interface)允许不同的版本(version)或分组(group),所以仅凭 Java 接口无法唯一标识某个 Dubbo 服务,还需要增加通讯协议(protocol)方可Dubbo 服务 ID 字符表达模式为: p r o t o c o l : {protocol}: protocol:{interface}: v e r s i o n : {version}: version:{group} , 其中,版本(version)或分组(group)是可选的。当 Dubbo Consumer 订阅 Dubbo 服务时,构建对应 ID,通过这个 ID 来查询 Dubbo Provider 的 Service 名称列表。

由于 Dubbo 服务与 Service 的映射关系取决于业务场景,架构层面无从预判。因此,这种映射关系只能在 Dubbo 服务暴露时(运行时)才能确定,否则,Dubbo 服务能被多个 Consumer 应用订阅时,Consumer 无法定位 Provider Service 名称,进而无法完成服务发现。

元数据服务 Metadata
元数据服务 Metadata,称之为“元数据服务的元数据”,主要包括:
· inteface:Dubbo 元数据服务所暴露的接口,即 MetadataService
· serviceName : 当前 MetadataService 所部署的 Dubbo Service 名称,作为 MetadataService 分组信息
· group:当前 MetadataService 分组,数据使用 serviceName
· version:当前 MetadataService 的版本,版本号通常在接口层面声明,不同的 Dubbo 发行版本 version 可能相同,比如 Dubbo 2.7.5 和 2.7.6 中的 version 均为 1.0.0。理论上,version 版本越高,支持元信息类型更丰富
· protocol: MetadataService 所暴露协议,为了确保 Provider 和 Consumer 通讯兼容性,默认协议为:“dubbo”,也可以支持其他协议。
· port:协议所使用的网络端口
· host:当前 MetadataService 所在的服务实例主机或 IP
· params:当前 MetadataService 暴露后 URL 中的参数信息
不难得出,凭借以上元数据服务的 Metadata,可将元数据服务的 Dubbo 服务 ID 确定,辅助 Provider 服务暴露和 Consumer 服务订阅 MetadataService 。不过对于 Provider,这些元信息都是已知的,而对 Consumer 而言,它们直接能获取的元信息仅有:
· serviceName:通过“Dubbo 接口与 Service 映射”关系,可得到 Provider Service 名称
· interface:即 MetadataService ,因为 Provider 和 Consumer 公用 MetadataService 接口
· group:即 serviceName
不过 Consumer 合成 MetadataService Dubbo URL 还需获取 version、host、port、protocol 以及 params:
· version:尽管 MetadataService 接口是统一接口,然而 Provider 和 Consumer 可能引入的 Dubbo 版本不同,从而它们使用的 MetadataService version 也会不同,所以这个信息需要 Provider 在暴露MetadataService 时,同步到服务实例的 Metadata 中,方便 Consumer 从 Metadata 中获取
· host:由于 Consumer 已得到 serviceName,可通过服务发现 API 获取服务实例对象,该对象包含 host 属性,直接被 Consumer 获取即可。
· port:与 version 类似,从 Provider 服务实例中的 Metadata 中获取
· params:同上
通过元数据服务 Metadata 的描述,解释了不同 Dubbo Services 是怎样体现差异性的,并且说明了 MetadataService 元信息的存储介质,这也就是服务自省架构为什么强依赖支持 Metadata 的注册中心的原因。下个小节将讨论 MetadataService 所存储 Dubbo 应用服务 URL 存放在何处。

4. Dubbo 暴露的服务 URL 列表

当前 Dubbo Service 暴露或发布 Dubbo 服务 URL 集合,如:[ dubbo://192.168.1.2:20880/com.acme.Interface1?group=default&version=v1 , thirft://192.168.1.2:20881/com.acme.InterfaceX , rest://192.168.1.2:20882/com.acme.interfaceN ]

demo演示
总体架构
在这里插入图片描述
接口dubbo-interface
实体类UserAddress
图片: https://uploader.shimo.im/f/A0NrjX0S75pOMd1T.png!thumbnail?accessToken=eyJhbGciOiJIUzI1NiIsImtpZCI6ImRlZmF1bHQiLCJ0eXAiOiJKV1QifQ.eyJleHAiOjE2ODQ1OTE5MTYsImZpbGVHVUlEIjoiRWUzMk1ETVdYV1VZblZBMiIsImlhdCI6MTY4NDU5MTYxNiwiaXNzIjoidXBsb2FkZXJfYWNjZXNzX3Jlc291cmNlIiwidXNlcklkIjo4MjU1NDcyNX0.PWWBCYcaf7nMj-XFnUZnozD4srjLecpfRwYAgpGQE7g
订单接口OrderService**
图片: https://uploader.shimo.im/f/b5zQTBukDZls2qdZ.png!thumbnail?accessToken=eyJhbGciOiJIUzI1NiIsImtpZCI6ImRlZmF1bHQiLCJ0eXAiOiJKV1QifQ.eyJleHAiOjE2ODQ1OTE5MTYsImZpbGVHVUlEIjoiRWUzMk1ETVdYV1VZblZBMiIsImlhdCI6MTY4NDU5MTYxNiwiaXNzIjoidXBsb2FkZXJfYWNjZXNzX3Jlc291cmNlIiwidXNlcklkIjo4MjU1NDcyNX0.PWWBCYcaf7nMj-XFnUZnozD4srjLecpfRwYAgpGQE7g

用户服务UserService
图片: https://uploader.shimo.im/f/Crgxws6p4TyeILzE.png!thumbnail?accessToken=eyJhbGciOiJIUzI1NiIsImtpZCI6ImRlZmF1bHQiLCJ0eXAiOiJKV1QifQ.eyJleHAiOjE2ODQ1OTE5MTYsImZpbGVHVUlEIjoiRWUzMk1ETVdYV1VZblZBMiIsImlhdCI6MTY4NDU5MTYxNiwiaXNzIjoidXBsb2FkZXJfYWNjZXNzX3Jlc291cmNlIiwidXNlcklkIjo4MjU1NDcyNX0.PWWBCYcaf7nMj-XFnUZnozD4srjLecpfRwYAgpGQE7g

服务提供者dubbo-priveder
UserServiceImpl
图片: https://uploader.shimo.im/f/Fl340yWvzOHHvYOG.png!thumbnail?accessToken=eyJhbGciOiJIUzI1NiIsImtpZCI6ImRlZmF1bHQiLCJ0eXAiOiJKV1QifQ.eyJleHAiOjE2ODQ1OTE5MTYsImZpbGVHVUlEIjoiRWUzMk1ETVdYV1VZblZBMiIsImlhdCI6MTY4NDU5MTYxNiwiaXNzIjoidXBsb2FkZXJfYWNjZXNzX3Jlc291cmNlIiwidXNlcklkIjo4MjU1NDcyNX0.PWWBCYcaf7nMj-XFnUZnozD4srjLecpfRwYAgpGQE7g
启动类
在这里插入图片描述

application.properties
在这里插入图片描述

服务消费者dubbo-consumer
OrderServiceImpl
图片: https://uploader.shimo.im/f/oiTakD789YNePyuE.png!thumbnail?accessToken=eyJhbGciOiJIUzI1NiIsImtpZCI6ImRlZmF1bHQiLCJ0eXAiOiJKV1QifQ.eyJleHAiOjE2ODQ1OTE5MTYsImZpbGVHVUlEIjoiRWUzMk1ETVdYV1VZblZBMiIsImlhdCI6MTY4NDU5MTYxNiwiaXNzIjoidXBsb2FkZXJfYWNjZXNzX3Jlc291cmNlIiwidXNlcklkIjo4MjU1NDcyNX0.PWWBCYcaf7nMj-XFnUZnozD4srjLecpfRwYAgpGQE7g

控制类
图片: https://uploader.shimo.im/f/7UQi1Ov9Qtob1EUH.png!thumbnail?accessToken=eyJhbGciOiJIUzI1NiIsImtpZCI6ImRlZmF1bHQiLCJ0eXAiOiJKV1QifQ.eyJleHAiOjE2ODQ1OTE5MTYsImZpbGVHVUlEIjoiRWUzMk1ETVdYV1VZblZBMiIsImlhdCI6MTY4NDU5MTYxNiwiaXNzIjoidXBsb2FkZXJfYWNjZXNzX3Jlc291cmNlIiwidXNlcklkIjo4MjU1NDcyNX0.PWWBCYcaf7nMj-XFnUZnozD4srjLecpfRwYAgpGQE7g

启动类
图片: https://uploader.shimo.im/f/zt1fWP7eavZnqkwK.png!thumbnail?accessToken=eyJhbGciOiJIUzI1NiIsImtpZCI6ImRlZmF1bHQiLCJ0eXAiOiJKV1QifQ.eyJleHAiOjE2ODQ1OTE5MTYsImZpbGVHVUlEIjoiRWUzMk1ETVdYV1VZblZBMiIsImlhdCI6MTY4NDU5MTYxNiwiaXNzIjoidXBsb2FkZXJfYWNjZXNzX3Jlc291cmNlIiwidXNlcklkIjo4MjU1NDcyNX0.PWWBCYcaf7nMj-XFnUZnozD4srjLecpfRwYAgpGQE7g

application.properties
图片: https://uploader.shimo.im/f/8CVgUHcsurIlrcnX.png!thumbnail?accessToken=eyJhbGciOiJIUzI1NiIsImtpZCI6ImRlZmF1bHQiLCJ0eXAiOiJKV1QifQ.eyJleHAiOjE2ODQ1OTE5MTYsImZpbGVHVUlEIjoiRWUzMk1ETVdYV1VZblZBMiIsImlhdCI6MTY4NDU5MTYxNiwiaXNzIjoidXBsb2FkZXJfYWNjZXNzX3Jlc291cmNlIiwidXNlcklkIjo4MjU1NDcyNX0.PWWBCYcaf7nMj-XFnUZnozD4srjLecpfRwYAgpGQE7g

访问结果
图片: https://uploader.shimo.im/f/wfe1kWCezAkah2I3.png!thumbnail?accessToken=eyJhbGciOiJIUzI1NiIsImtpZCI6ImRlZmF1bHQiLCJ0eXAiOiJKV1QifQ.eyJleHAiOjE2ODQ1OTE5MTYsImZpbGVHVUlEIjoiRWUzMk1ETVdYV1VZblZBMiIsImlhdCI6MTY4NDU5MTYxNiwiaXNzIjoidXBsb2FkZXJfYWNjZXNzX3Jlc291cmNlIiwidXNlcklkIjo4MjU1NDcyNX0.PWWBCYcaf7nMj-XFnUZnozD4srjLecpfRwYAgpGQE7g

5. 注册中心源码解析

图片: https://uploader.shimo.im/f/guN8eJWt21p2pNZU.png!thumbnail?accessToken=eyJhbGciOiJIUzI1NiIsImtpZCI6ImRlZmF1bHQiLCJ0eXAiOiJKV1QifQ.eyJleHAiOjE2ODQ1OTE5MTYsImZpbGVHVUlEIjoiRWUzMk1ETVdYV1VZblZBMiIsImlhdCI6MTY4NDU5MTYxNiwiaXNzIjoidXBsb2FkZXJfYWNjZXNzX3Jlc291cmNlIiwidXNlcklkIjo4MjU1NDcyNX0.PWWBCYcaf7nMj-XFnUZnozD4srjLecpfRwYAgpGQE7g

RegistryService

 public interface RegistryService {

    void register(URL url);

    void unregister(URL url);

    void subscribe(URL url, NotifyListener listener);

    void unsubscribe(URL url, NotifyListener listener);

    List<URL> lookup(URL url);

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  1. 注册,url值必须唯一,不能有一模一样。
    void register(URL url)
  2. 取消注册
    void unregister(URL url);
  3. 订阅,不是根据全URL匹配订阅,而是根据条件去订阅,也就是说可以订阅多个服务。listener是用来监听处理注册数据变更的事件。
    void subscribe(URL url, NotifyListener listener);
  4. 取消订阅,按照全URL匹配去取消订阅
    void unsubscribe(URL url, NotifyListener listener);
  5. 查询注册列表,通过url进行条件查询所匹配的所有URL集合
    List lookup(URL url);

register && unregister
这两个方法实现了RegistryService接口的方法。其中注册的逻辑是把url加入到属性registered,而取消注册的逻辑就是把url从该属性中移除。真正的实现是FailbackRegistry类中。

@Override
public void register(URL url) {
    this.register(new URL(url));
}

@Override
public void unregister(URL url) {
    this.unregister(new URL(url));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

subscribe && unsubscribe
这两个方法实现了RegistryService接口的方法,具体的实现也是在FailbackRegistry类中,分别是订阅和取消订阅,订阅代码。

  @Override
    public void subscribe(URL url, NotifyListener listener) {
        if (url == null) {
            throw new IllegalArgumentException("subscribe url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("subscribe listener == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Subscribe: " + url);
        }
        // 获得该消费者url 已经订阅的服务 的监听器集合
        Set<NotifyListener> listeners = subscribed.get(url);
        if (listeners == null) {
            subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
            listeners = subscribed.get(url);
        }
        // 添加某个服务的监听器
        listeners.add(listener);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

lookUp

该方法是实现了RegistryService接口的方法,作用是获得消费者url订阅的服务URL列表

 @Override
    public List<URL> lookup(URL url) {
        List<URL> result = new ArrayList<URL>();
        // 获得该消费者url订阅的 所有被通知的 服务URL集合
        Map<String, List<URL>> notifiedUrls = getNotified().get(url);
        // 判断该消费者是否订阅服务
        if (notifiedUrls != null && notifiedUrls.size() > 0) {
            for (List<URL> urls : notifiedUrls.values()) {
                for (URL u : urls) {
                    // 判断协议是否为空
                    if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
                        // 添加 该消费者订阅的服务URL
                        result.add(u);
                    }
                }
            }
        } else {
            // 原子类 避免在获取注册在注册中心的服务url时能够保证是最新的url集合
            final AtomicReference<List<URL>> reference = new AtomicReference<List<URL>>();
            // 通知监听器。当收到服务变更通知时触发
            NotifyListener listener = new NotifyListener() {
                @Override
                public void notify(List<URL> urls) {
                    reference.set(urls);
                }
            };
            // 订阅服务,就是消费者url订阅已经 注册在注册中心的服务(也就是添加该服务的监听器)
            subscribe(url, listener); // Subscribe logic guarantees the first notify to return
            List<URL> urls = reference.get();
            if (urls != null && !urls.isEmpty()) {
                for (URL u : urls) {
                    if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
                        result.add(u);
                    }
                }
            }
        }
        return result;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

recover
在注册中心断开,重连成功的时候,会恢复注册和订阅

  protected void recover() throws Exception {
        // register
        //把内存缓存中的registered取出来遍历进行注册
        Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
        if (!recoverRegistered.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover register url " + recoverRegistered);
            }
            for (URL url : recoverRegistered) {
                register(url);
            }
        }
        // subscribe
        //把内存缓存中的subscribed取出来遍历进行订阅
        Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
        if (!recoverSubscribed.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover subscribe url " + recoverSubscribed.keySet());
            }
            for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    subscribe(url, listener);
                }
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

notify
notify方法是通知监听器,url的变化结果
发起订阅后,会获取全量数据,此时会调用notify方法。即Registry 获取到了全量数据
每次注册中心发生变更时会调用notify方法虽然变化是增量,调用这个方法的调用方,已经进行处理,传入的urls依然是全量的。
listener.notify,通知监听器。

protected void notify(List<URL> urls) {
    if (urls == null || urls.isEmpty()) return;
    // 遍历订阅URL的监听器集合,通知他们
    for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
        URL url = entry.getKey();

        if (!UrlUtils.isMatch(url, urls.get(0))) {
            continue;
        }
        // 遍历监听器集合,通知他们
        Set<NotifyListener> listeners = entry.getValue();
        if (listeners != null) {
            for (NotifyListener listener : listeners) {
                try {
                    notify(url, listener, filterEmpty(url, urls));
                } catch (Throwable t) {
                    logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
                }
            }
        }
    }
}
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    if (url == null) {
        throw new IllegalArgumentException("notify url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("notify listener == null");
    }
    if ((urls == null || urls.isEmpty())
            && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
        logger.warn("Ignore empty notify urls for subscribe url " + url);
        return;
    }
    if (logger.isInfoEnabled()) {
        logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
    }
    Map<String, List<URL>> result = new HashMap<String, List<URL>>();
    // 将urls进行分类
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) {
            // 按照url中key为category对应的值进行分类,如果没有该值,就找key为providers的值进行分类
            String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            List<URL> categoryList = result.get(category);
            if (categoryList == null) {
                categoryList = new ArrayList<URL>();
                // 分类结果放入result
                result.put(category, categoryList);
            }
            categoryList.add(u);
        }
    }
    if (result.size() == 0) {
        return;
    }
    // 获得某一个消费者被通知的url集合(通知的 URL 变化结果)
    Map<String, List<URL>> categoryNotified = notified.get(url);
    if (categoryNotified == null) {
        // 添加该消费者对应的url
        notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
        categoryNotified = notified.get(url);
    }
    // 处理通知监听器URL 变化结果
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        // 把分类标实和分类后的列表放入notified的value中
        // 覆盖到 `notified`
        // 当某个分类的数据为空时,会依然有 urls 。其中 `urls[0].protocol = empty` ,通过这样的方式,处理所有服务提供者为空的情况。
        categoryNotified.put(category, categoryList);
        // 保存到文件
        saveProperties(url);
        //通知监听器
        listener.notify(categoryList);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76

FailbackRegistry
属性

// Scheduled executor service
// 定时任务执行器
private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));

// Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
// 失败重试定时器,定时去检查是否有请求失败的,如有,无限次重试。
private final ScheduledFuture<?> retryFuture;

// 注册失败的URL集合
private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();

// 取消注册失败的URL集合
private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();

// 订阅失败的监听器集合
private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

// 取消订阅失败的监听器集合
private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

// 通知失败的URL集合
private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

构造函数

public FailbackRegistry(URL url) {
    super(url);
    // 从url中读取重试频率,如果为空,则默认5000ms
    this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
    // 创建失败重试定时器
    this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            // Check and connect to the registry
            try {
                //重试
                retry();
            } catch (Throwable t) { // Defensive fault tolerance
                logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
            }
        }
    }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

register && unregister && subscribe && unsubscribe
注册、取消注册、订阅、取消订阅具体实现代码逻辑相似,下面为注册的源码。
就是做了一个doRegister的操作,如果失败抛出异常,则加入到失败的缓存中进行重试。

public void register(URL url) {
    super.register(url);
    //首先从失败的缓存中删除该url
    failedRegistered.remove(url);
    failedUnregistered.remove(url);
    try {
        // Sending a registration request to the server side
        // 向注册中心发送一个注册请求
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;

        // If the startup detection is opened, the Exception is thrown directly.
        // 如果开启了启动时检测,则直接抛出异常
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // Record a failed registration request to a failed list, retry regularly
        // 把这个注册失败的url放入缓存,并且定时重试。
        failedRegistered.add(url);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

notify

@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    if (url == null) {
        throw new IllegalArgumentException("notify url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("notify listener == null");
    }
    try {
        // 通知 url 数据变化
        doNotify(url, listener, urls);
    } catch (Exception t) {
        // Record a failed registration request to a failed list, retry regularly
        // 放入失败的缓存中,重试
        Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
        if (listeners == null) {
            failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
            listeners = failedNotified.get(url);
        }
        listeners.put(listener, urls);
        logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
    }
}

protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
    super.notify(url, listener, urls);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

revocer

@Override
protected void recover() throws Exception {
    // register
    // register 恢复注册,添加到 `failedRegistered` ,定时重试
    Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
    if (!recoverRegistered.isEmpty()) {
        if (logger.isInfoEnabled()) {
            logger.info("Recover register url " + recoverRegistered);
        }
        for (URL url : recoverRegistered) {
            failedRegistered.add(url);
        }
    }
    // subscribe
    // subscribe 恢复订阅,添加到 `failedSubscribed` ,定时重试
    Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
    if (!recoverSubscribed.isEmpty()) {
        if (logger.isInfoEnabled()) {
            logger.info("Recover subscribe url " + recoverSubscribed.keySet());
        }
        for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
            URL url = entry.getKey();
            for (NotifyListener listener : entry.getValue()) {
                addFailedSubscribed(url, listener);
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

6. zookeeper注册中心

图片: https://uploader.shimo.im/f/HpOqwU5C0NvZQpsn.png!thumbnail?accessToken=eyJhbGciOiJIUzI1NiIsImtpZCI6ImRlZmF1bHQiLCJ0eXAiOiJKV1QifQ.eyJleHAiOjE2ODQ1OTE5MTYsImZpbGVHVUlEIjoiRWUzMk1ETVdYV1VZblZBMiIsImlhdCI6MTY4NDU5MTYxNiwiaXNzIjoidXBsb2FkZXJfYWNjZXNzX3Jlc291cmNlIiwidXNlcklkIjo4MjU1NDcyNX0.PWWBCYcaf7nMj-XFnUZnozD4srjLecpfRwYAgpGQE7g

  1. dubbo的Root层是根目录,通过<dubbo:registry group=“dubbo” />的“group”来设置zookeeper的根节点,缺省值是“dubbo”。

  2. Service层是服务接口的全名。

  3. Type层是分类,一共有四种分类,分别是providers(服务提供者列表)、consumers(服务消费者列表)、routes(路由规则列表)、configurations(配置规则列表)。

  4. URL层:根据不同的Type目录:可以有服务提供者 URL 、服务消费者 URL 、路由规则 URL 、配置规则 URL 。不同的Type关注的URL不同。

    zookeeper以每个斜杠来分割每一层的znode,比如第一层根节点dubbo就是“/dubbo”,而第二层的Service层就是/com.foo.Barservice,zookeeper的每个节点通过路径来表示以及访问,例如服务提供者启动时,向/dubbo/com.foo.Barservice/providers目录下写入自己的URL地址。

调用流程:
服务提供者启动时: 向 /dubbo/com.foo.BarService/providers 目录下写入自己的 URL 地址。
服务消费者启动时: 订阅 /dubbo/com.foo.BarService/providers 目录下的提供者 URL 地址。并向 /dubbo/com.foo.BarService/consumers 目录下写入自己的 URL 地址
监控中心启动时: 订阅 /dubbo/com.foo.BarService 目录下的所有提供者和消费者 URL 地址。

图片: https://uploader.shimo.im/f/N1svBrftZmj5A1qE.png!thumbnail?accessToken=eyJhbGciOiJIUzI1NiIsImtpZCI6ImRlZmF1bHQiLCJ0eXAiOiJKV1QifQ.eyJleHAiOjE2ODQ1OTE5MTYsImZpbGVHVUlEIjoiRWUzMk1ETVdYV1VZblZBMiIsImlhdCI6MTY4NDU5MTYxNiwiaXNzIjoidXBsb2FkZXJfYWNjZXNzX3Jlc291cmNlIiwidXNlcklkIjo4MjU1NDcyNX0.PWWBCYcaf7nMj-XFnUZnozD4srjLecpfRwYAgpGQE7g

ZookeeperRegistry
该类继承了FailbackRegistry类,该类就是针对注册中心核心的功能注册、订阅、取消注册、取消订阅,查询注册列表进行展开,基于zookeeper来实现。

属性

// 日志记录
private final static Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class);

// 默认的zookeeper端口
private final static int DEFAULT_ZOOKEEPER_PORT = 2181;

// 默认zookeeper根节点
private final static String DEFAULT_ROOT = "dubbo";

// zookeeper根节点
private final String root;

// 服务接口集合
private final Set<String> anyServices = new ConcurrentHashSet<String>();

// 监听器集合
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();

// zookeeper客户端
private final ZookeeperClient zkClient;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

构造方法
参数中ZookeeperTransporter是一个接口,并且在dubbo中有ZkclientZookeeperTransporter和CuratorZookeeperTransporter两个实现类。
dubbo在zookeeper节点层级有一层是root层,该层是通过group属性来设置的。
给客户端添加一个监听器,当状态为重连的时候调用FailbackRegistry的恢复方法

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    // 获得url携带的分组配置,并且作为zookeeper的根节点
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
        group = Constants.PATH_SEPARATOR + group;
    }
    this.root = group;
    // 创建zookeeper client
    zkClient = zookeeperTransporter.connect(url);
    // 添加状态监听器,当状态为重连的时候调用恢复方法
    zkClient.addStateListener(new StateListener() {
        @Override
        public void stateChanged(int state) {
            if (state == RECONNECTED) {
                try {
                    // 恢复
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

isAvailable && destroy
这两个方法分别是检测zookeeper是否连接以及销毁连接

@Override
public boolean isAvailable() {
    return zkClient != null && zkClient.isConnected();
}

@Override
public void destroy() {
    super.destroy();
    // Just release zkClient reference, but can not close zk client here for zk client is shared somewhere else.
    // See org.apache.dubbo.remoting.zookeeper.AbstractZookeeperTransporter#destroy()
    zkClient = null;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

doRegister && doUnregister
这两个方法分别是注册和取消注册,调用客户端create和delete方法,创建节点和删除节点。

@Override
protected void doRegister(URL url) {
    try {
        // 创建URL节点,也就是URL层的节点
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

@Override
protected void doUnregister(URL url) {
    try {
        // 删除节点
        zkClient.delete(toUrlPath(url));
    } catch (Throwable e) {
        throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

doSubscribe
这里的实现把所有Service层发起的订阅以及指定的Service层发起的订阅分开处理。所有Service层类似于监控中心发起的订阅。指定的Service层发起的订阅可以看作是服务消费者的订阅。订阅的大致逻辑类似,有几个区别:
所有Service层发起的订阅中的ChildListener是在在 Service 层发生变更时,才会做出解码,用anyServices属性判断是否是新增的服务,最后调用父类的subscribe订阅。而指定的Service层发起的订阅是在URL层发生变更的时候,调用notify,回调回调NotifyListener的逻辑,做到通知服务变更。
所有Service层发起的订阅中客户端创建的节点是Service节点,该节点为持久节点,而指定的Service层发起的订阅中创偶建的节点是Type节点,该节点也是持久节点。
指定的Service层发起的订阅中调用了两次notify,第一次是增量的通知,只是通知这次增加的服务节点,而第二个是全量的通知。

@Override
protected void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        // 处理所有Service层发起的订阅,例如监控中心的订阅
        if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            // 获得根目录
            String root = toRootPath();
            // 获得url对应的监听器集合
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
            // 不存在就创建监听器集合
            if (listeners == null) {
                zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                listeners = zkListeners.get(url);
            }
            // 获得节点监听器
            ChildListener zkListener = listeners.get(listener);
            // 如果该节点监听器为空,则创建
            if (zkListener == null) {
                listeners.putIfAbsent(listener, new ChildListener() {
                    @Override
                    public void childChanged(String parentPath, List<String> currentChilds) {
                        // 遍历现有的节点,如果现有的服务集合中没有该节点,则加入该节点,然后订阅该节点
                        for (String child : currentChilds) {
                            // 解码
                            child = URL.decode(child);
                            if (!anyServices.contains(child)) {
                                anyServices.add(child);
                                subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                        Constants.CHECK_KEY, String.valueOf(false)), listener);
                            }
                        }
                    }
                });
                // 重新获取,为了保证一致性
                zkListener = listeners.get(listener);
            }
            // 创建service节点,该节点为持久节点
            zkClient.create(root, false);
            // 向zookeeper的service节点发起订阅,获得Service接口全名数组
            List<String> services = zkClient.addChildListener(root, zkListener);
            if (services != null && !services.isEmpty()) {
                // 遍历Service接口全名数组
                for (String service : services) {
                    service = URL.decode(service);
                    anyServices.add(service);
                    // 发起该service层的订阅
                    subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                }
            }
        } else {
            // 处理指定 Service 层的发起订阅,例如服务消费者的订阅
            List<URL> urls = new ArrayList<URL>();
            // 遍历分类数组
            for (String path : toCategoriesPath(url)) {
                // 获得监听器集合
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                // 如果没有则创建
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }
                // 获得节点监听器
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        @Override
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            // 通知服务变化 回调NotifyListener
                            ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                        }
                    });
                    // 重新获取节点监听器,保证一致性
                    zkListener = listeners.get(listener);
                }
                // 创建type节点,该节点为持久节点
                zkClient.create(path, false);
                // 向zookeeper的type节点发起订阅
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    // 加入到自子节点数据数组
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            // 通知数据变化
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91

doUnsubscribe
取消订阅,也是分为两种情况,所有的Service发起的取消订阅和指定的Service发起的取消订阅。
所有的Service发起的取消订阅就直接移除了根目录下所有的监听器,而指定的Service发起的取消订阅是移除了该Service层下面的所有Type节点监听器

@Override
protected void doUnsubscribe(URL url, NotifyListener listener) {
    // 获得监听器集合
    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
    if (listeners != null) {
        // 获得子节点的监听器
        ChildListener zkListener = listeners.get(listener);
        if (zkListener != null) {
            // 如果为全部的服务接口,例如监控中心
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                // 获得根目录
                String root = toRootPath();
                // 移除监听器
                zkClient.removeChildListener(root, zkListener);
            } else {
                // 遍历分类数组进行移除监听器
                for (String path : toCategoriesPath(url)) {
                    zkClient.removeChildListener(path, zkListener);
                }
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

lookup
查询符合条件的已经注册的服务

@Override
public List<URL> lookup(URL url) {
    if (url == null) {
        throw new IllegalArgumentException("lookup url == null");
    }
    try {
        List<String> providers = new ArrayList<String>();
        // 遍历分组类别
        for (String path : toCategoriesPath(url)) {
            // 获得子节点
            List<String> children = zkClient.getChildren(path);
            if (children != null) {
                providers.addAll(children);
            }
        }
        // 获得 providers 中,和 consumer 匹配的 URL 数组
        return toUrlsWithoutEmpty(url, providers);
    } catch (Throwable e) {
        throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

toServicePath
获得服务路径,拼接规则:Root + Type

private String toServicePath(URL url) {
    String name = url.getServiceInterface();
    // 如果是包括所有服务,则返回根节点
    if (Constants.ANY_VALUE.equals(name)) {
        return toRootPath();
    }
    return toRootDir() + URL.encode(name);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

toCategoriesPath
第一个方法是获得分类数组,也就是url携带的服务下的所有Type节点数组。第二个是获得分类路径,分类路径拼接规则:Root + Service + Type

private String[] toCategoriesPath(URL url) {
    String[] categories;
    // 如果url携带的分类配置为*,则创建包括所有分类的数组
    if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) {
        categories = new String[]{Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY,
                Constants.ROUTERS_CATEGORY, Constants.CONFIGURATORS_CATEGORY};
    } else {
        // 返回url携带的分类配置
        categories = url.getParameter(Constants.CATEGORY_KEY, new String[]{Constants.DEFAULT_CATEGORY});
    }
    String[] paths = new String[categories.length];
    for (int i = 0; i < categories.length; i++) {
        // 加上服务路径
        paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categories[i];
    }
    return paths;
}

private String toCategoryPath(URL url) {
    return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

toUrlPath
获得URL路径,拼接规则是Root + Service + Type + URL

private String toUrlPath(URL url) {
    return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());
}
  • 1
  • 2
  • 3

toUrlsWithoutEmpty && toUrlsWithEmpty
第一个toUrlsWithoutEmpty方法是获得 providers 中,和 consumer 匹配的 URL 数组,第二个toUrlsWithEmpty方法是调用了第一个方法后增加了若不存在匹配,则创建 empty:// 的 URL返回。

private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
    List<URL> urls = new ArrayList<URL>();
    if (providers != null && !providers.isEmpty()) {
        // 遍历服务提供者
        for (String provider : providers) {
            // 解码
            provider = URL.decode(provider);
            if (provider.contains("://")) {
                // 把服务转化成url的形式
                URL url = URL.valueOf(provider);
                // 判断是否匹配,如果匹配, 则加入到集合中
                if (UrlUtils.isMatch(consumer, url)) {
                    urls.add(url);
                }
            }
        }
    }
    return urls;
}

private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
    // 返回和服务消费者匹配的服务提供者url
    List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
    // 如果不存在,则创建`empty://` 的 URL返回
    if (urls == null || urls.isEmpty()) {
        int i = path.lastIndexOf('/');
        String category = i < 0 ? path : path.substring(i + 1);
        URL empty = consumer.setProtocol(Constants.EMPTY_PROTOCOL).addParameter(Constants.CATEGORY_KEY, category);
        urls.add(empty);
    }
    return urls;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

ZookeeperRegistryFactory
该类继承了AbstractRegistryFactory类,实现了AbstractRegistryFactory抽象出来的createRegistry方法,实例化ZookeeperRegistry

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

    private ZookeeperTransporter zookeeperTransporter;

    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
        this.zookeeperTransporter = zookeeperTransporter;
    }

    @Override
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

7. Hystrix

Hystrix的内部处理逻辑
图片: https://uploader.shimo.im/f/opttViTXkqitsU3E.png!thumbnail?accessToken=eyJhbGciOiJIUzI1NiIsImtpZCI6ImRlZmF1bHQiLCJ0eXAiOiJKV1QifQ.eyJleHAiOjE2ODQ1OTE5MTYsImZpbGVHVUlEIjoiRWUzMk1ETVdYV1VZblZBMiIsImlhdCI6MTY4NDU5MTYxNiwiaXNzIjoidXBsb2FkZXJfYWNjZXNzX3Jlc291cmNlIiwidXNlcklkIjo4MjU1NDcyNX0.PWWBCYcaf7nMj-XFnUZnozD4srjLecpfRwYAgpGQE7g

  1. 构建Hystrix的Command对象, 调用执行方法.
  2. Hystrix检查当前服务的熔断器开关是否开启, 若开启, 则执行降级服务getFallback方法.
  3. 若熔断器开关关闭, 则Hystrix检查当前服务的线程池是否能接收新的请求, 若超过线程池已满, 则执行降级服务getFallback方法.
  4. 若线程池接受请求, 则Hystrix开始执行服务调用具体逻辑run方法.
  5. 若服务执行失败, 则执行降级服务getFallback方法, 并将执行结果上报Metrics更新服务健康状况.
  6. 若服务执行超时, 则执行降级服务getFallback方法, 并将执行结果上报Metrics更新服务健康状况.
  7. 若服务执行成功, 返回正常结果.
  8. 若服务降级方法getFallback执行成功, 则返回降级结果.
  9. 若服务降级方法getFallback执行失败, 则抛出异常

Hystrix工作流程
图片: https://uploader.shimo.im/f/qBtlNfaZ8GrDGIRr.png!thumbnail?accessToken=eyJhbGciOiJIUzI1NiIsImtpZCI6ImRlZmF1bHQiLCJ0eXAiOiJKV1QifQ.eyJleHAiOjE2ODQ1OTE5MTYsImZpbGVHVUlEIjoiRWUzMk1ETVdYV1VZblZBMiIsImlhdCI6MTY4NDU5MTYxNiwiaXNzIjoidXBsb2FkZXJfYWNjZXNzX3Jlc291cmNlIiwidXNlcklkIjo4MjU1NDcyNX0.PWWBCYcaf7nMj-XFnUZnozD4srjLecpfRwYAgpGQE7g

当调用出现错误时,开启一个时间窗(默认 10秒)

在这个时间窗内,统计调用次数是否达到最小请求数?
如果没有达到,则重置统计信息,回到第1步
如果没有达到最小请求数,即使请求全部失败,也会回到第1步
如果达到了,则统计失败的请求数占所有请求数的百分比,是否达到阈值?
如果达到,则跳闸(不再 请求对应的服务)
如果没有达到,则重置统计信息,回到第1步

如果跳闸,则会开启一个活动窗口(默认5秒),每隔5秒,Hystrix 会让一个请求通过,到达那个正在苦苦挣扎的服务,看是否调用成功 如果成功,重置断路器,回到第1步 如果失败,回到第3步

@HystrixCommand 常用配置参数
@HystrixCommmand(commendKey=" “,groupKey=” “,threadPoolKey=” “,fallbackMethod=” ")

commandKey:用来标识一个 Hystrix 命令,默认会取被注解的方法名
注意:Hystrix 里同一个键的唯一标识并不包括 groupKey,建议取一个独一二无的名字,防止多个方法之间因为键重复而互相影响

groupKey:一组 Hystrix 命令的集合, 用来统计、报告,默认取类名

threadPoolKey:用来标识一个线程池,如果没设置的话会取 groupKey。
一般同一个类内的方法在共用同一个线程池,如果两个共用同一线程池的方法上配置了同样的属性,在第一个方法被执行后线程池的属性就固定了,所以属性会以第一个被执行的方法上的配置为准。

fallbackMethod:方法执行时熔断、错误、超时时会执行的回退方法,需要保持此方法与 Hystrix 方法的签名和返回值一致

例:

/**
 * 调用内容库获取分类书籍信息接口
 */
@HystrixCommand(groupKey = "bookQueryFacade", threadPoolKey = "queryBook", commandKey = "getBookListByCategory", fallbackMethod = "getBookListByCategoryFallback")
public BaseResponseDTO<BookListDTO> getBookListByCategory(BaseRequestDTO<BookCategoryReqDTO> baseRequestDTO) {
    return bookQueryFacade.getBookListByCategory(baseRequestDTO);
}

public BaseResponseDTO<BookListDTO> getBookListByCategoryFallback(BaseRequestDTO<BookCategoryReqDTO> baseRequestDTO) {
    log.error("Hystrix ContentRepositoryCaller getBookListByCategory");
    BookListDTO bookListDTO = new BookListDTO();
    bookListDTO.setHasNext(false);
    bookListDTO.setBookList(Collections.EMPTY_LIST);
    return getDefaultResponseDTO(bookListDTO);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

Hystrix源码总结
Hystrix在底层使用了Spring提供的切面技术:

通过HystrixCommandAspect.java定义了一个切面(该类有@Aspect注解),专门用来处理那些标注了@HystrixCommand的方法

AspectJ aspect to process methods which annotated with {@link HystrixCommand} annotation. */ @Aspect public class HystrixCommandAspect {…}
更详细的讲,该类中定义了一个切点。切点是@HystrixCommand
@Pointcut(“@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)”)public void hystrixCommandAnnotationPointcut() {}
而针对该切点,还设置一个环绕函数
@Around(“hystrixCommandAnnotationPointcut()”) public Object methodsAnnotatedWithHystrixCommand()
环绕函数中,最关键的一句是execute()。这里会根据executionType有SYNCHRONOUS,ASYNCHRONOUS,OBSERVABLE调用不同逻辑,使用rxjava的观察者模式并最终在这里计算出结果返回。
result = CommandExecutor.execute(invokable, executionType, metaHolder);
当调用execute()出现失败时,会调用getFallBack()方法。而如果没有设置降级方法则调用父类的getFallBack()方法,父类的getFallBack方法会抛出一个找不到降级方法的异常
protected T getFallback() { throw new RuntimeException(“No fallback available.”, getExecutionException());

具体源码:

/**
 * AspectJ aspect to process methods which annotated with {@link HystrixCommand} annotation.
 */
@Aspect
public class HystrixCommandAspect {

    private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;

    static {
        META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
                .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
                .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
                .build();
    }

    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
    public void hystrixCommandAnnotationPointcut() {
    }

    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
    public void hystrixCollapserAnnotationPointcut() {
    }

    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
        //通过切点获取被拦截的方法
        Method method = getMethodFromTarget(joinPoint);
        Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
        if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
            throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                    "annotations at the same time");
        }
        MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
        //metaholder中保存了很多和切点相关的信息,详见后文的贴图
        MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
        HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
        ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
                metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

        //result中保留操作的结果,可能是成功操作的返回值也可能是fallback方法的返回值
        Object result;
        try {
            //如果返回的结果使用了observable模式则执行以下代码
            if (!metaHolder.isObservable()) {
                result = CommandExecutor.execute(invokable, executionType, metaHolder);
            } else {
            //否则执行else
                result = executeObservable(invokable, executionType, metaHolder);
            }
        } catch (HystrixBadRequestException e) {
            throw e.getCause() != null ? e.getCause() : e;
        } catch (HystrixRuntimeException e) {
            throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
        }
        return result;
    }
    ..........
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

重要的是执行这一步

result = CommandExecutor.execute(invokable, executionType, metaHolder);


     * Calls a method of {@link HystrixExecutable} in accordance with specified execution type.
     *
     * @param invokable  {@link HystrixInvokable}
     * @param metaHolder {@link MetaHolder}
     * @return the result of invocation of specific method.
     * @throws RuntimeException
     */
    public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
        Validate.notNull(invokable);
        Validate.notNull(metaHolder);

        switch (executionType) {
            case SYNCHRONOUS: {
                return castToExecutable(invokable, executionType).execute();
            }
            case ASYNCHRONOUS: {
                HystrixExecutable executable = castToExecutable(invokable, executionType);
                if (metaHolder.hasFallbackMethodCommand()
                        && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                    return new FutureDecorator(executable.queue());
                }
                return executable.queue();
            }
            case OBSERVABLE: {
                HystrixObservable observable = castToObservable(invokable);
                return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
            }
            default:
                throw new RuntimeException("unsupported execution type: " + executionType);
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

在execute()方法中使用了rxjava的观察者模式并最终在这里计算出结果返回。当出现失败时,会调用GenericCommand.java中的getFallBack()方法,代码如下:

@Override
    protected Object getFallback() {
        final CommandAction commandAction = getFallbackAction();
        if (commandAction != null) {
            try {
                return process(new Action() {
                    @Override
                    Object execute() {
                        MetaHolder metaHolder = commandAction.getMetaHolder();
                        Object[] args = createArgsForFallback(metaHolder, getExecutionException());
                        return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
                    }
                });
            } catch (Throwable e) {
                LOGGER.error(FallbackErrorMessageBuilder.create()
                        .append(commandAction, e).build());
                throw new FallbackInvocationException(unwrapCause(e));
            }
        } else {
            return super.getFallback();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

在这个方法中首先会对是否设置了commandAction进行判断,commandAction就是我们之前所设置的fallback字段指向的方法,如果发现有降级方法,则调用降级方法获取结果,进入process()方法,代码如下:

Object process(Action action) throws Exception {
        Object result;
        try {
            result = action.execute();
            flushCache();
        } catch (CommandActionExecutionException throwable) {
            Throwable cause = throwable.getCause();
            if (isIgnorable(cause)) {
                throw new HystrixBadRequestException(cause.getMessage(), cause);
            }
            if (cause instanceof RuntimeException) {
                throw (RuntimeException) cause;
            } else if (cause instanceof Exception) {
                throw (Exception) cause;
            } else {
                // instance of Throwable
                throw new CommandActionExecutionException(cause);
            }
        }
        return result;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

最终在process()方法中获取降级方法执行的结果,而如果没有设置降级方法则调用父类的getFallBack()方法,父类的getFallBack方法会抛出一个找不到降级方法的异常,代码如下:

protected T getFallback() {
    throw new RuntimeException("No fallback available.", getExecutionException());
}
  • 1
  • 2
  • 3

至于何时会跳转到降级方法,则是在AbstractCommand.java中,在这里定义了很多种执行失败的情况,通过rxjava框架的观察者模式对错误进行监听,根据不同的情况会进入不同的处理方法,最终这些处理方法都会调用HystrixCommand.java中的getFallbackObservable()方法,并最终进入上文所述的真正执行fallback方法的代码

8. Dubbo服务暴露过程

图片: https://uploader.shimo.im/f/nZ9ipLdFpxapceDL.png!thumbnail?accessToken=eyJhbGciOiJIUzI1NiIsImtpZCI6ImRlZmF1bHQiLCJ0eXAiOiJKV1QifQ.eyJleHAiOjE2ODQ1OTE5MTYsImZpbGVHVUlEIjoiRWUzMk1ETVdYV1VZblZBMiIsImlhdCI6MTY4NDU5MTYxNiwiaXNzIjoidXBsb2FkZXJfYWNjZXNzX3Jlc291cmNlIiwidXNlcklkIjo4MjU1NDcyNX0.PWWBCYcaf7nMj-XFnUZnozD4srjLecpfRwYAgpGQE7g

服务暴露过程
服务暴露过程大致可分为三个部分:
前置工作,主要用于检查参数,组装 URL
导出服务,包含暴露服务到本地 (JVM),和暴露服务到远程两个过程。
向注册中心注册服务,用于服务发现。

暴露起点
Spring中有一个ApplicationListener接口,其中定义了一个onApplicationEvent()方法,在当容器内发生任何事件时,此方法都会被触发。
只要服务没有被暴露并且服务没有被取消暴露,就暴露服务

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
      // 如果服务没有被暴露并且服务没有被取消暴露,则打印日志
    if (!isExported() && !isUnexported()) {
        if (logger.isInfoEnabled()) {
            logger.info("The service ready on spring started. service: " + getInterface());
        }
          // 导出
        export();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

前置工作
前置工作主要包含两个部分,分别是配置检查,以及 URL 装配。
在暴露服务之前,Dubbo 需要检查用户的配置是否合理,或者为用户补充缺省配置。配置检查完成后,接下来需要根据这些配置组装 URL。
【在 Dubbo 中,URL 的作用十分重要。Dubbo 使用 URL 作为配置载体,所有的拓展点都是通过 URL 获取配置】

配置检查
首先对配置的检查和更新,执行的ServiceConfig中的checkAndUpdateSubConfigs方法。然后检测是否应该暴露,如果不应该暴露,则直接结束,然后检测是否配置了延迟加载,如果是,则使用定时器来实现延迟加载的目的。

public synchronized void export() {
    //检查并且更新配置
    checkAndUpdateSubConfigs();

    // 如果不应该暴露,则直接结束
    if (!shouldExport()) {
        return;
    }

    // 如果使用延迟加载,则延迟delay时间后暴露服务
    if (shouldDelay()) {
        delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
    } else {
        // 暴露服务
        doExport();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

checkAndUpdateSubConfigs()
该方法中是对各类配置的校验,并且更新部分配置

 public void checkAndUpdateSubConfigs() {
    // Use default configs defined explicitly on global configs
    // 用于检测 provider、application 等核心配置类对象是否为空,
    // 若为空,则尝试从其他配置类对象中获取相应的实例。
    completeCompoundConfigs();
    // Config Center should always being started first.
    // 开启配置中心
    startConfigCenter();
    // 检测 provider 是否为空,为空则新建一个,并通过系统变量为其初始化
    checkDefault();
    // 检查application是否为空
    checkApplication();
    // 检查注册中心是否为空
    checkRegistry();
    // 检查protocols是否为空
    checkProtocol();
    this.refresh();
    // 核对元数据中心配置是否为空
    checkMetadataReport();

    // 服务接口名不能为空,否则抛出异常
    if (StringUtils.isEmpty(interfaceName)) {
        throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
    }

    // 检测 ref 是否为泛化服务类型
    if (ref instanceof GenericService) {
        // 设置interfaceClass为GenericService
        interfaceClass = GenericService.class;
        if (StringUtils.isEmpty(generic)) {
            // 设置generic = true
            generic = Boolean.TRUE.toString();
        }
    } else {
        try {
            // 获得接口类型
            interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                    .getContextClassLoader());
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        // 对 interfaceClass,以及 <dubbo:method> 标签中的必要字段进行检查
        checkInterfaceAndMethods(interfaceClass, methods);
        // 对 ref 合法性进行检测
        checkRef();
        generic = Boolean.FALSE.toString();
    }
    // stub local一样都是配置本地存根
    if (local != null) {
        if ("true".equals(local)) {
            local = interfaceName + "Local";
        }
        Class<?> localClass;
        try {
            localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        if (!interfaceClass.isAssignableFrom(localClass)) {
            throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
        }
    }
    if (stub != null) {
        if ("true".equals(stub)) {
            stub = interfaceName + "Stub";
        }
        Class<?> stubClass;
        try {
            stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        if (!interfaceClass.isAssignableFrom(stubClass)) {
            throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName);
        }
    }
    // 本地存根合法性校验
    checkStubAndLocal(interfaceClass);
    // mock合法性校验
    checkMock(interfaceClass);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81

doExport()
对于服务是否暴露再一次校验,然后会执行ServiceConfig的doExportUrls()方法,对于多协议多注册中心暴露服务进行支持。

protected synchronized void doExport() {
    // 如果调用不暴露的方法,则unexported值为true
    if (unexported) {
        throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
    }
    // 如果服务已经暴露了,则直接结束
    if (exported) {
        return;
    }
    // 设置已经暴露
    exported = true;

    // 如果path为空,则赋值接口名称
    if (StringUtils.isEmpty(path)) {
        path = interfaceName;
    }
    // 多协议多注册中心暴露服务
    doExportUrls();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

doExportUrls()

private void doExportUrls() {
    // 加载注册中心链接
    List<URL> registryURLs = loadRegistries(true);
    // 遍历 protocols,并在每个协议下暴露服务
    for (ProtocolConfig protocolConfig : protocols) {
        // 以path、group、version来作为服务唯一性确定的key
        String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
        ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
        ApplicationModel.initProviderModel(pathKey, providerModel);
        // 组装 URL
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

从该方法可以看到:
loadRegistries()方法是加载注册中心链接。
服务的唯一性是通过path、group、version一起确定的。
doExportUrlsFor1Protocol()方法开始组装URL。

loadRegistries()

protected List<URL> loadRegistries(boolean provider) {
    // check && override if necessary
    List<URL> registryList = new ArrayList<URL>();
    // 如果registries为空,直接返回空集合
    if (CollectionUtils.isNotEmpty(registries)) {
        // 遍历注册中心配置集合registries
        for (RegistryConfig config : registries) {
            // 获得地址
            String address = config.getAddress();
            // 若地址为空,则设置为0.0.0.0
            if (StringUtils.isEmpty(address)) {
                address = Constants.ANYHOST_VALUE;
            }
            // 如果地址为N/A,则跳过
            if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
                Map<String, String> map = new HashMap<String, String>();
                // 添加 ApplicationConfig 中的字段信息到 map 中
                appendParameters(map, application);
                // 添加 RegistryConfig 字段信息到 map 中
                appendParameters(map, config);
                // 添加path
                map.put(Constants.PATH_KEY, RegistryService.class.getName());
                // 添加 协议版本、发布版本,时间戳 等信息到 map 中
                appendRuntimeParameters(map);
                // 如果map中没有protocol,则默认为使用dubbo协议
                if (!map.containsKey(Constants.PROTOCOL_KEY)) {
                    map.put(Constants.PROTOCOL_KEY, Constants.DUBBO_PROTOCOL);
                }
                // 解析得到 URL 列表,address 可能包含多个注册中心 ip,因此解析得到的是一个 URL 列表
                List<URL> urls = UrlUtils.parseURLs(address, map);

                // 遍历URL 列表
                for (URL url : urls) {
                    // 将 URL 协议头设置为 registry
                    url = URLBuilder.from(url)
                            .addParameter(Constants.REGISTRY_KEY, url.getProtocol())
                            .setProtocol(Constants.REGISTRY_PROTOCOL)
                            .build();
                    // 通过判断条件,决定是否添加 url 到 registryList 中,条件如下:
                    // 如果是服务提供者,并且是注册中心服务   或者   是消费者端,并且是订阅服务
                    // 则加入到registryList
                    if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
                            || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
                        registryList.add(url);
                    }
                }
            }
        }
    }
    return registryList;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

组装URL
dubbo内部用URL来携带各类配置,贯穿整个调用链,它就是配置的载体。服务的配置被组装到URL中就是从这里开始
遍历每个协议配置,在每个协议下都暴露服务,就会执行ServiceConfig的doExportUrlsFor1Protocol()方法,该方法前半部分实现了组装URL的逻辑,后半部分实现了暴露dubbo服务等逻辑,其中为用分割线分隔了。

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    // 获取协议名
    String name = protocolConfig.getName();
    // 如果为空,则是默认的dubbo
    if (StringUtils.isEmpty(name)) {
        name = Constants.DUBBO;
    }

    Map<String, String> map = new HashMap<String, String>();
    // 设置服务提供者册
    map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);

    // 添加 协议版本、发布版本,时间戳 等信息到 map 中
    appendRuntimeParameters(map);
    // 添加metrics、application、module、provider、protocol的所有信息到map
    appendParameters(map, metrics);
    appendParameters(map, application);
    appendParameters(map, module);
    appendParameters(map, provider, Constants.DEFAULT_KEY);
    appendParameters(map, protocolConfig);
    appendParameters(map, this);
    // 如果method的配置列表不为空
    if (CollectionUtils.isNotEmpty(methods)) {
        // 遍历method配置列表
        for (MethodConfig method : methods) {
            // 把方法名加入map
            appendParameters(map, method, method.getName());
            // 添加 MethodConfig 对象的字段信息到 map 中,键 = 方法名.属性名。
            // 比如存储 <dubbo:method name="sayHello" retries="2"> 对应的 MethodConfig,
            // 键 = sayHello.retries,map = {"sayHello.retries": 2, "xxx": "yyy"}
            String retryKey = method.getName() + ".retry";
            if (map.containsKey(retryKey)) {
                String retryValue = map.remove(retryKey);
                // 如果retryValue为false,则不重试,设置值为0
                if ("false".equals(retryValue)) {
                    map.put(method.getName() + ".retries", "0");
                }
            }
            // 获得ArgumentConfig列表
            List<ArgumentConfig> arguments = method.getArguments();
            if (CollectionUtils.isNotEmpty(arguments)) {
                // 遍历ArgumentConfig列表
                for (ArgumentConfig argument : arguments) {
                    // convert argument type
                    // // 检测 type 属性是否为空,或者空串
                    if (argument.getType() != null && argument.getType().length() > 0) {
                        // 利用反射获取该服务的所有方法集合
                        Method[] methods = interfaceClass.getMethods();
                        // visit all methods
                        if (methods != null && methods.length > 0) {
                            // 遍历所有方法
                            for (int i = 0; i < methods.length; i++) {
                                // 获得方法名
                                String methodName = methods[i].getName();
                                // target the method, and get its signature
                                // 找到目标方法
                                if (methodName.equals(method.getName())) {
                                    // 通过反射获取目标方法的参数类型数组 argtypes
                                    Class<?>[] argtypes = methods[i].getParameterTypes();
                                    // one callback in the method
                                    // 如果下标为-1
                                    if (argument.getIndex() != -1) {
                                        // 检测 argType 的名称与 ArgumentConfig 中的 type 属性是否一致
                                        if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                            //  添加 ArgumentConfig 字段信息到 map 中
                                            appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                        } else {
                                            // 不一致,则抛出异常
                                            throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                        }
                                    } else {
                                        // multiple callbacks in the method
                                        // 遍历参数类型数组 argtypes,查找 argument.type 类型的参数
                                        for (int j = 0; j < argtypes.length; j++) {
                                            Class<?> argclazz = argtypes[j];
                                            if (argclazz.getName().equals(argument.getType())) {
                                                // 如果找到,则添加 ArgumentConfig 字段信息到 map 中
                                                appendParameters(map, argument, method.getName() + "." + j);
                                                if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                                    throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    } else if (argument.getIndex() != -1) {
                        // 用户未配置 type 属性,但配置了 index 属性,且 index != -1,则直接添加到map
                        appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                    } else {
                        // 抛出异常
                        throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                    }

                }
            }
        } // end of methods for
    }

    // 如果是泛化调用,则在map中设置generic和methods
    if (ProtocolUtils.isGeneric(generic)) {
        map.put(Constants.GENERIC_KEY, generic);
        map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
    } else {
        // 获得版本号
        String revision = Version.getVersion(interfaceClass, version);
        // 放入map
        if (revision != null && revision.length() > 0) {
            map.put(Constants.REVISION_KEY, revision);
        }

        // 获得方法集合
        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
        // 如果为空,则告警
        if (methods.length == 0) {
            logger.warn("No method found in service interface " + interfaceClass.getName());
            // 设置method为*
            map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
        } else {
            // 否则加入方法集合
            map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
        }
    }
    // 把token 的值加入到map中
    if (!ConfigUtils.isEmpty(token)) {
        if (ConfigUtils.isDefault(token)) {
            map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
        } else {
            map.put(Constants.TOKEN_KEY, token);
        }
    }
    // export service
    // 获得地址
    String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
    // 获得端口号
    Integer port = this.findConfigedPorts(protocolConfig, name, map);
    // 生成 URL
    URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

    // —————————————————————————————————————分割线———————————————————————————————————————


    // 加载 ConfiguratorFactory,并生成 Configurator 实例,判断是否有该协议的实现存在
    if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
            .hasExtension(url.getProtocol())) {
        // 通过实例配置 url
        url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
    }

    String scope = url.getParameter(Constants.SCOPE_KEY);
    // don't export when none is configured
    // // 如果 scope = none,则什么都不做
    if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {

        // export to local if the config is not remote (export to remote only when config is remote)
        // // scope != remote,暴露到本地
        if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {
            // 暴露到本地
            exportLocal(url);
        }
        // export to remote if the config is not local (export to local only when config is local)
        // // scope != local,导出到远程
        if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
            if (logger.isInfoEnabled()) {
                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
            }
            // 如果注册中心链接集合不为空
            if (CollectionUtils.isNotEmpty(registryURLs)) {
                // 遍历注册中心
                for (URL registryURL : registryURLs) {
                    // 添加dynamic配置
                    url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                    // 加载监视器链接
                    URL monitorUrl = loadMonitor(registryURL);
                    if (monitorUrl != null) {
                        // 添加监视器配置
                        url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                    }
                    if (logger.isInfoEnabled()) {
                        logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                    }

                    // For providers, this is used to enable custom proxy to generate invoker
                    // 获得代理方式
                    String proxy = url.getParameter(Constants.PROXY_KEY);
                    if (StringUtils.isNotEmpty(proxy)) {
                        // 添加代理方式到注册中心到url
                        registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
                    }

                    // 为服务提供类(ref)生成 Invoker
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                    // DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    // 暴露服务,并且生成Exporter
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    // 加入到暴露者集合中
                    exporters.add(exporter);
                }
            } else {
                // 不存在注册中心,则仅仅暴露服务,不会记录暴露到地址
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
            }
            /**
             * @since 2.7.0
             * ServiceData Store
             */
            MetadataReportService metadataReportService = null;
            // 如果元数据中心服务不为空,则发布该服务,也就是在元数据中心记录url中到部分配置
            if ((metadataReportService = getMetadataReportService()) != null) {
                metadataReportService.publishProvider(url);
            }
        }
    }
    this.urls.add(url);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223

分割线上面部分,是组装URL的全过程,大致可以分为以下步骤:
它把metrics、application、module、provider、protocol等所有配置都放入map中,
针对method都配置,先做签名校验,先找到该服务是否有配置的方法存在,然后该方法签名是否有这个参数存在,都核对成功才将method的配置加入map。
将泛化调用、版本号、method或者methods、token等信息加入map
获得服务暴露地址和端口号,利用map内数据组装成URL。

暴露到远程的源码直接看doExportUrlsFor1Protocol()方法分割线下半部分

创建invoker
当生成暴露者的时候,服务已经暴露,接下来会细致的分析这暴露内部的过程。可以发现无论暴露到本地还是远程,都会通过代理工厂创建invoker。这个时候就走到了上述时序图的ProxyFactory。Dubbo 默认的 ProxyFactory 实现类是 JavassistProxyFactory。JavassistProxyFactory中有一个getInvoker()方法。

getInvoker()
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
    // // 为目标类创建 Wrapper
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    // 创建匿名 Invoker 类对象,并实现 doInvoke 方法。
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            // 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

该方法就是创建了一个匿名的Invoker类对象,在doInvoke()方法中调用wrapper.invokeMethod()方法。

服务暴露
服务暴露分为暴露到本地 (JVM),和暴露到远程。doExportUrlsFor1Protocol()方法分割线下半部分就是服务暴露的逻辑。根据scope的配置分为:
scope = none,不暴露服务
scope != remote,暴露到本地
scope != local,暴露到远程

暴露到本地
导出本地执行的是ServiceConfig中的exportLocal()方法。
exportLocal()

private void exportLocal(URL url) {
    // 如果协议不是injvm
    if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
        // 生成本地的url,分别把协议改为injvm,设置host和port
        URL local = URLBuilder.from(url)
                .setProtocol(Constants.LOCAL_PROTOCOL)
                .setHost(LOCALHOST_VALUE)
                .setPort(0)
                .build();
        // 通过代理工程创建invoker
        // 再调用export方法进行暴露服务,生成Exporter
        Exporter<?> exporter = protocol.export(
                proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
        // 把生成的暴露者加入集合
        exporters.add(exporter);
        logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

本地暴露调用的是injvm协议方法,也就是InjvmProtocol 的 export()方法。
export()

public Exporter export(Invoker invoker) throws RpcException {
return new InjvmExporter(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
该方法只是创建了一个,因为暴露到本地,所以在同一个jvm中。所以不需要其他操作。

暴露到远程
暴露到远程,大致可以分为服务暴露和服务注册两个过程。
先来看看服务暴露。我们知道dubbo有很多协议实现,在doExportUrlsFor1Protocol()方法分割线下半部分中,生成了Invoker后,就需要调用protocol 的 export()方法,

export()
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    // 获得注册中心的url
    URL registryUrl = getRegistryUrl(originInvoker);
    // url to export locally
    //获得已经注册的服务提供者url
    URL providerUrl = getProviderUrl(originInvoker);

    // Subscribe the override data
    // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
    //  the same service. Because the subscribed is cached key with the name of the service, it causes the
    //  subscription information to cover.
    // 获取override订阅 URL
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
    // 创建override的监听器
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    // 把监听器添加到集合
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

    // 根据override的配置来覆盖原来的url,使得配置是最新的。
    providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
    //export invoker
    // 服务暴露
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

    // url to registry
    // 根据 URL 加载 Registry 实现类,比如ZookeeperRegistry
    final Registry registry = getRegistry(originInvoker);
    // 返回注册到注册表的url并过滤url参数一次
    final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
    // 生成ProviderInvokerWrapper,它会保存服务提供方和消费方的调用地址和代理对象
    ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
            registryUrl, registeredProviderUrl);


    // ————————————————————————————————分割线——————————————————————————————————————
    //to judge if we need to delay publish
    // 获取 register 参数
    boolean register = registeredProviderUrl.getParameter("register", true);
    // 如果需要注册服务
    if (register) {
        // 向注册中心注册服务
        register(registryUrl, registeredProviderUrl);
        // 设置reg为true,表示服务注册
        providerInvokerWrapper.setReg(true);
    }

    // Deprecated! Subscribe to override rules in 2.6.x or before.
    // 向注册中心进行订阅 override 数据
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

    // 设置注册中心url
    exporter.setRegisterUrl(registeredProviderUrl);
    // 设置override数据订阅的url
    exporter.setSubscribeUrl(overrideSubscribeUrl);
    //Ensure that a new exporter instance is returned every time export
    // 创建并返回 DestroyableExporter
    return new DestroyableExporter<>(exporter);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

从代码上看,我用分割线分成两部分,分别是服务暴露和服务注册。该方法的逻辑大致分为以下几个步骤:
获得服务提供者的url,再通过override数据重新配置url,然后执行doLocalExport()进行服务暴露。
加载注册中心实现类,向注册中心注册服务。
向注册中心进行订阅 override 数据。
创建并返回 DestroyableExporter

服务暴露先调用的是RegistryProtocol的doLocalExport()方法

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
    String key = getCacheKey(originInvoker);

    // 加入缓存
    return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
        // 创建 Invoker 为委托类对象
        Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
        // 调用 protocol 的 export 方法暴露服务
        return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
    });
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

根据不同的协议配置,调用不同的protocol实现。跟暴露到本地的时候实现InjvmProtocol一样。

服务注册
服务注册先调用的是register()方法。

RegistryProtocolregister()

public void register(URL registryUrl, URL registeredProviderUrl) {
    // 获取 Registry
    Registry registry = registryFactory.getRegistry(registryUrl);
    // 注册服务
    registry.register(registeredProviderUrl);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

所以服务注册大致可以分为两步:
获得注册中心实例
注册服务

获得注册中心首先执行的是AbstractRegistryFactory的getRegistry()方法

AbstractRegistryFactory中的源码。
大概的逻辑就是先从缓存中取,如果没有命中,则创建注册中心实例,这里的createRegistry()是一个抽象方法,具体的实现逻辑由子类完成,假设这里使用zookeeper作为注册中心,则调用的是ZookeeperRegistryFactory的createRegistry()。

ZookeeperRegistryFactorycreateRegistry()
public Registry createRegistry(URL url) {
    return new ZookeeperRegistry(url, zookeeperTransporter);
}
  • 1
  • 2
  • 3
  • 4

就是创建了一个ZookeeperRegistry,执行了ZookeeperRegistry的构造方法。
ZookeeperRegistry的构造方法

ZookeeperRegistry中的源码分析。大致的逻辑可以分为以下几个步骤:
创建zookeeper客户端
添加监听器

主要看ZookeeperTransporter的connect方法,因为当connect方法执行完后,注册中心创建过程就结束了。首先执行的是AbstractZookeeperTransporter的connect方法。

AbstractZookeeperTransporter的connect()

public ZookeeperClient connect(URL url) {
    ZookeeperClient zookeeperClient;
    // 获得所有url地址
    List<String> addressList = getURLBackupAddress(url);
    // The field define the zookeeper server , including protocol, host, port, username, password
    // 从缓存中查找可用的客户端,如果有,则直接返回
    if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
        logger.info("find valid zookeeper client from the cache for address: " + url);
        return zookeeperClient;
    }
    // avoid creating too many connections, so add lock
    synchronized (zookeeperClientMap) {
        if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
            logger.info("find valid zookeeper client from the cache for address: " + url);
            return zookeeperClient;
        }

        // 创建客户端
        zookeeperClient = createZookeeperClient(toClientURL(url));
        logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
        // 加入缓存
        writeToClientMap(addressList, zookeeperClient);
    }
    return zookeeperClient;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

上面的源码,主要是执行了createZookeeperClient()方法

9. dubbo服务引用过程

服务引用有两种方式,一种就是直连,一种是通过注册中心。直连更多的时候被用来做服务测试,不建议在生产环境使用这样的方法,因为直连不适合服务治理,dubbo本身就是一个服务治理的框架,提供了很多服务治理的功能。所以更多的时候,我们都不会选择绕过注册中心,而是通过注册中心的方式来进行服务引用。

服务引用过程
大致可以分为三个步骤:
配置加载
创建invoker
创建服务接口代理类

图片: https://uploader.shimo.im/f/cuc9DbRHcTCcNeEE.png!thumbnail?accessToken=eyJhbGciOiJIUzI1NiIsImtpZCI6ImRlZmF1bHQiLCJ0eXAiOiJKV1QifQ.eyJleHAiOjE2ODQ1OTE5MTYsImZpbGVHVUlEIjoiRWUzMk1ETVdYV1VZblZBMiIsImlhdCI6MTY4NDU5MTYxNiwiaXNzIjoidXBsb2FkZXJfYWNjZXNzX3Jlc291cmNlIiwidXNlcklkIjo4MjU1NDcyNX0.PWWBCYcaf7nMj-XFnUZnozD4srjLecpfRwYAgpGQE7g

引用起点
dubbo中有一个类ReferenceBean,它实现了FactoryBean接口,继承了ReferenceConfig,ReferenceBean作为dubbo中能生产对象的工厂Bean,而我们要引用服务,也就是要有一个该服务的对象。
因为ReferenceBean实现了FactoryBean接口的getObject()方法,所以在加载bean的时候,会调用ReferenceBean的getObject()方法
ReferenceBean的getObject()
public Object getObject() {
return get();
}
这个get方法是ReferenceConfig的get()方法

ReferenceConfig的get()

public synchronized T get() {
    // 检查并且更新配置
    checkAndUpdateSubConfigs();

    // 如果被销毁,则抛出异常
    if (destroyed) {
        throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
    }
    // 检测 代理对象ref 是否为空,为空则通过 init 方法创建
    if (ref == null) {
        // 用于处理配置,以及调用 createProxy 生成代理类
        init();
    }
    return ref;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

init方法是处理各类配置的开始。

配置加载
ReferenceConfig的init()

private void init() {
    // 如果已经初始化过,则结束
    if (initialized) {
        return;
    }
    // 设置初始化标志为true
    initialized = true;
    // 本地存根合法性校验
    checkStubAndLocal(interfaceClass);
    // mock合法性校验
    checkMock(interfaceClass);
    // 用来存放配置
    Map<String, String> map = new HashMap<String, String>();

    // 存放这是消费者侧
    map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);

    // 添加 协议版本、发布版本,时间戳 等信息到 map 中
    appendRuntimeParameters(map);
    // 如果是泛化调用
    if (!isGeneric()) {
        // 获得版本号
        String revision = Version.getVersion(interfaceClass, version);
        if (revision != null && revision.length() > 0) {
            // 设置版本号
            map.put(Constants.REVISION_KEY, revision);
        }

        // 获得所有方法
        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
        if (methods.length == 0) {
            logger.warn("No method found in service interface " + interfaceClass.getName());
            map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
        } else {
            // 把所有方法签名拼接起来放入map
            map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), Constants.COMMA_SEPARATOR));
        }
    }
    // 加入服务接口名称
    map.put(Constants.INTERFACE_KEY, interfaceName);
    // 添加metrics、application、module、consumer、protocol的所有信息到map
    appendParameters(map, metrics);
    appendParameters(map, application);
    appendParameters(map, module);
    appendParameters(map, consumer, Constants.DEFAULT_KEY);
    appendParameters(map, this);
    Map<String, Object> attributes = null;
    if (CollectionUtils.isNotEmpty(methods)) {
        attributes = new HashMap<String, Object>();
        // 遍历方法配置
        for (MethodConfig methodConfig : methods) {
            // 把方法配置加入map
            appendParameters(map, methodConfig, methodConfig.getName());
            // 生成重试的配置key
            String retryKey = methodConfig.getName() + ".retry";
            // 如果map中已经有该配置,则移除该配置
            if (map.containsKey(retryKey)) {
                String retryValue = map.remove(retryKey);
                // 如果配置为false,也就是不重试,则设置重试次数为0次
                if ("false".equals(retryValue)) {
                    map.put(methodConfig.getName() + ".retries", "0");
                }
            }
            // 设置异步配置
            attributes.put(methodConfig.getName(), convertMethodConfig2AyncInfo(methodConfig));
        }
    }

    // 获取服务消费者 ip 地址
    String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
    // 如果为空,则获取本地ip
    if (StringUtils.isEmpty(hostToRegistry)) {
        hostToRegistry = NetUtils.getLocalHost();
    }
    // 设置消费者ip
    map.put(Constants.REGISTER_IP_KEY, hostToRegistry);

    // 创建代理对象
    ref = createProxy(map);

    // 生产服务key
    String serviceKey = URL.buildKey(interfaceName, group, version);
    // 根据服务名,ReferenceConfig,代理类构建 ConsumerModel,
    // 并将 ConsumerModel 存入到 ApplicationModel 中
    ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86

该方法大致分为以下几个步骤:
检测本地存根和mock合法性。
添加协议版本、发布版本,时间戳、metrics、application、module、consumer、protocol等的所有信息到 map 中
单独处理方法配置,设置重试次数配置以及设置该方法对异步配置信息。
添加消费者ip地址到map
创建代理对象
生成ConsumerModel存入到 ApplicationModel 中

创建invoker
ReferenceConfig的createProxy()

private T createProxy(Map<String, String> map) {
    // 根据配置检查是否为本地调用
    if (shouldJvmRefer(map)) {
        // 生成url,protocol使用的是injvm
        URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
        // 利用InjvmProtocol 的 refer 方法生成 InjvmInvoker 实例
        invoker = refprotocol.refer(interfaceClass, url);
        if (logger.isInfoEnabled()) {
            logger.info("Using injvm service " + interfaceClass.getName());
        }
    } else {
        // 如果url不为空,则用户可能想进行直连来调用
        if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
            // 当需要配置多个 url 时,可用分号进行分割,这里会进行切分
            String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
            // 遍历所有的url
            if (us != null && us.length > 0) {
                for (String u : us) {
                    URL url = URL.valueOf(u);
                    if (StringUtils.isEmpty(url.getPath())) {
                        // 设置接口全限定名为 url 路径
                        url = url.setPath(interfaceName);
                    }
                    // 检测 url 协议是否为 registry,若是,表明用户想使用指定的注册中心
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        // 将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中
                        urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    } else {
                        // 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性),
                        // 比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等
                        // 最后将合并后的配置设置为 url 查询字符串中。
                        urls.add(ClusterUtils.mergeUrl(url, map));
                    }
                }
            }
        } else { // assemble URL from register center's configuration
            // 校验注册中心
            checkRegistry();
            // 加载注册中心的url
            List<URL> us = loadRegistries(false);
            if (CollectionUtils.isNotEmpty(us)) {
                // 遍历所有的注册中心
                for (URL u : us) {
                    // 生成监控url
                    URL monitorUrl = loadMonitor(u);
                    if (monitorUrl != null) {
                        // 加入监控中心url的配置
                        map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                    }
                    // 添加 refer 参数到 url 中,并将 url 添加到 urls 中
                    urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                }
            }
            // 如果urls为空,则抛出异常
            if (urls.isEmpty()) {
                throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
            }
        }

        // 如果只有一个注册中心,则直接调用refer方法
        if (urls.size() == 1) {
            // 调用 RegistryProtocol 的 refer 构建 Invoker 实例
            invoker = refprotocol.refer(interfaceClass, urls.get(0));
        } else {
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;
            // 遍历所有的注册中心url
            for (URL url : urls) {
                // 通过 refprotocol 调用 refer 构建 Invoker,
                // refprotocol 会在运行时根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法
                // 把生成的Invoker加入到集合中
                invokers.add(refprotocol.refer(interfaceClass, url));
                // 如果是注册中心的协议
                if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    // 则设置registryURL
                    registryURL = url; // use last registry url
                }
            }
            // 优先用注册中心的url
            if (registryURL != null) { // registry url is available
                // use RegistryAwareCluster only when register's cluster is available
                // 只有当注册中心当链接可用当时候,采用RegistryAwareCluster
                URL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME);
                // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
                // 由集群进行多个invoker合并
                invoker = cluster.join(new StaticDirectory(u, invokers));
            } else { // not a registry url, must be direct invoke.
                // 直接进行合并
                invoker = cluster.join(new StaticDirectory(invokers));
            }
        }
    }

    // 如果需要核对该服务是否可用,并且该服务不可用
    if (shouldCheck() && !invoker.isAvailable()) {
        // make it possible for consumer to retry later if provider is temporarily unavailable
        // 修改初始化标志为false
        initialized = false;
        // 抛出异常
        throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
    }
    if (logger.isInfoEnabled()) {
        logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
    }
    /**
     * @since 2.7.0
     * ServiceData Store
     */
    // 元数据中心服务
    MetadataReportService metadataReportService = null;
    // 加载元数据服务,如果成功
    if ((metadataReportService = getMetadataReportService()) != null) {
        // 生成url
        URL consumerURL = new URL(Constants.CONSUMER_PROTOCOL, map.remove(Constants.REGISTER_IP_KEY), 0, map.get(Constants.INTERFACE_KEY), map);
        // 把消费者配置加入到元数据中心中
        metadataReportService.publishConsumer(consumerURL);
    }
    // create service proxy
    // 创建服务代理
    return (T) proxyFactory.getProxy(invoker);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121

该方法的大致逻辑可用分为以下几步:
如果是本地调用,则直接使用InjvmProtocol 的 refer 方法生成 Invoker 实例。
如果不是本地调用,但是是选择直连的方式来进行调用,则分割配置的多个url。如果协议是配置是registry,则表明用户想使用指定的注册中心,配置url后将url并且保存到urls里面,否则就合并url,并且保存到urls。
如果是通过注册中心来进行调用,则先校验所有的注册中心,然后加载注册中心的url,遍历每个url,加入监控中心url配置,最后把每个url保存到urls。
针对urls集合的数量,如果是单注册中心,直接引用RegistryProtocol 的 refer 构建 Invoker 实例,如果是多注册中心,则对每个url都生成Invoker,利用集群进行多个Invoker合并。
最终输出一个invoker。

Invoker 是 Dubbo 的核心模型,代表一个可执行体。在服务提供方,Invoker 用于调用服务提供类。在服务消费方,Invoker 用于执行远程调用。Invoker 是由 Protocol 实现类构建而来。

RegistryProtocol生成invoker
RegistryProtocol的refer()

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // 取 registry 参数值,并将其设置为协议头,默认是dubbo
    url = URLBuilder.from(url)
            .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
            .removeParameter(REGISTRY_KEY)
            .build();
    // 获得注册中心实例
    Registry registry = registryFactory.getRegistry(url);
    // 如果是注册中心服务,则返回注册中心服务的invoker
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    // group="a,b" or group="*"
    // 将 url 查询字符串转为 Map
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
    // 获得group值
    String group = qs.get(Constants.GROUP_KEY);
    if (group != null && group.length() > 0) {
        // 如果有多个组,或者组配置为*,则使用MergeableCluster,并调用 doRefer 继续执行服务引用逻辑
        if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
    // 只有一个组或者没有组配置,则直接执行doRefer
    return doRefer(cluster, registry, type, url);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

如果是注册服务中心,则直接创建代理。如果不是,先处理组配置,根据组配置来决定Cluster的实现方式,然后调用doRefer方法。

RegistryProtocol的doRefer()

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    // 创建 RegistryDirectory 实例
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    // 设置注册中心
    directory.setRegistry(registry);
    // 设置协议
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    // 所有属性放到map中
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    // 生成服务消费者链接
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    // 注册服务消费者,在 consumers 目录下新节点
    if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
        directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
        // 注册服务消费者
        registry.register(directory.getRegisteredConsumerUrl());
    }
    // 创建路由规则链
    directory.buildRouterChain(subscribeUrl);
    // 订阅 providers、configurators、routers 等节点数据
    directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
            PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
    // 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个,生成一个invoker
    Invoker invoker = cluster.join(directory);
    // 在服务提供者处注册消费者
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    return invoker;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

该方法大致可以分为以下步骤:
创建一个 RegistryDirectory 实例,然后生成服务者消费者链接。
向注册中心进行注册。
紧接着订阅 providers、configurators、routers 等节点下的数据。完成订阅后,RegistryDirectory 会收到这几个节点下的子节点信息。
由于一个服务可能部署在多台服务器上,这样就会在 providers 产生多个节点,这个时候就需要 Cluster 将多个服务节点合并为一个,并生成一个 Invoker。

DubboProtocol生成invoker
首先还是从DubboProtocol的refer()开始。
DubboProtocol的refer()

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);

    // create rpc invoker.
    // 创建一个DubboInvoker实例
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    // 加入到集合中
    invokers.add(invoker);

    return invoker;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

创建DubboInvoker比较简单,调用了构造方法,这里主要讲这么生成ExchangeClient,也就是getClients方法。

DubboProtocol的getClients()
ubboProtocol中的源码,如果是配置的共享,则获得共享客户端对象,也就是getSharedClient()方法,否则新建客户端也就是initClient()方法。

DubboProtocol的getSharedClient()
DubboProtocol中的源码,先访问缓存,若缓存未命中,则通过 initClient 方法创建新的 ExchangeClient 实例,并将该实例传给 ReferenceCountExchangeClient 构造方法创建一个带有引用计数功能的 ExchangeClient 实例。

DubboProtocol的initClient()
DubboProtocol中的源码,initClient 方法首先获取用户配置的客户端类型。然后设置用户心跳配置,然后检测用户配置的客户端类型是否存在,不存在则抛出异常。最后根据 lazy 配置决定创建什么类型的客户端。

这里的 LazyConnectExchangeClient 代码并不是很复杂,该类会在 request 方法被调用时通过 Exchangers 的 connect 方法创建 ExchangeClient 客户端。下面我们分析一下 Exchangers 的 connect 方法。

Exchangers的connect()

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    // 获取 Exchanger 实例,默认为 HeaderExchangeClient
    return getExchanger(url).connect(url, handler);
}
getExchanger 会通过 SPI 加载 HeaderExchangeClient 实例,这个方法比较简单。接下来分析 HeaderExchangeClient 的connect的实现。

HeaderExchangeClientconnect()
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    // 创建 HeaderExchangeHandler 对象
    // 创建 DecodeHandler 对象
    // 通过 Transporters 构建 Client 实例
    // 创建 HeaderExchangeClient 对象
    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

Transporters的connect()
Transporters中源码分析。其中获得自适应拓展类,该类会在运行时根据客户端类型加载指定的 Transporter 实现类。若用户未配置客户端类型,则默认加载 NettyTransporter,并调用该类的 connect 方法。假设是netty4的实现,则执行以下代码。

public Client connect(URL url, ChannelHandler listener) throws RemotingException {
    return new NettyClient(url, listener);
}
  • 1
  • 2
  • 3

到这里为止,DubboProtocol生成invoker过程也结束了。再回到createProxy方法的最后一句代码,根据invoker创建服务代理对象。

创建代理
为服务接口生成代理对象。有了代理对象,即可进行远程调用。首先来看AbstractProxyFactory 的 getProxy()方法。

AbstractProxyFactory 的 getProxy()
AbstractProxyFactory的源码分析。可以看到第二个getProxy方法其实就是获取 interfaces 数组,调用到第三个getProxy方法时,该getProxy是个抽象方法,由子类来实现,我们还是默认它的代理实现方式为Javassist。所以可以看JavassistProxyFactory的getProxy方法。

JavassistProxyFactory的getProxy()
public T getProxy(Invoker invoker, Class<?>[] interfaces) {
// 生成 Proxy 子类(Proxy 是抽象类)。并调用 Proxy 子类的 newInstance 方法创建 Proxy 实例
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

Proxy的getProxy方法。

/**
 * Get proxy.
 *
 * @param ics interface class array.
 * @return Proxy instance.
 */
public static Proxy getProxy(Class<?>... ics) {
    // 获得Proxy的类加载器来进行生成代理类
    return getProxy(ClassHelper.getClassLoader(Proxy.class), ics);
}

/**
 * Get proxy.
 *
 * @param cl  class loader.
 * @param ics interface class array.
 * @return Proxy instance.
 */
public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
    if (ics.length > Constants.MAX_PROXY_COUNT) {
        throw new IllegalArgumentException("interface limit exceeded");
    }

    StringBuilder sb = new StringBuilder();
    // 遍历接口列表
    for (int i = 0; i < ics.length; i++) {
        String itf = ics[i].getName();
        // 检测是否是接口,如果不是,则抛出异常
        if (!ics[i].isInterface()) {
            throw new RuntimeException(itf + " is not a interface.");
        }

        Class<?> tmp = null;
        try {
            // 重新加载接口类
            tmp = Class.forName(itf, false, cl);
        } catch (ClassNotFoundException e) {
        }
        // 检测接口是否相同,这里 tmp 有可能为空,也就是该接口无法被类加载器加载的。
        if (tmp != ics[i]) {
            throw new IllegalArgumentException(ics[i] + " is not visible from class loader");
        }

        // 拼接接口全限定名,分隔符为 ;
        sb.append(itf).append(';');
    }

    // use interface class name list as key.
    // 使用拼接后的接口名作为 key
    String key = sb.toString();

    // get cache by class loader.
    Map<String, Object> cache;
    // 把该类加载器加到本地缓存
    synchronized (ProxyCacheMap) {
        cache = ProxyCacheMap.computeIfAbsent(cl, k -> new HashMap<>());
    }

    Proxy proxy = null;
    synchronized (cache) {
        do {
            // 从缓存中获取 Reference<Proxy> 实例
            Object value = cache.get(key);
            if (value instanceof Reference<?>) {
                proxy = (Proxy) ((Reference<?>) value).get();
                if (proxy != null) {
                    return proxy;
                }
            }
            // 并发控制,保证只有一个线程可以进行后续操作
            if (value == PendingGenerationMarker) {
                try {
                    // 其他线程在此处进行等待
                    cache.wait();
                } catch (InterruptedException e) {
                }
            } else {
                // 放置标志位到缓存中,并跳出 while 循环进行后续操作
                cache.put(key, PendingGenerationMarker);
                break;
            }
        }
        while (true);
    }

    long id = PROXY_CLASS_COUNTER.getAndIncrement();
    String pkg = null;
    ClassGenerator ccp = null, ccm = null;
    try {
        // 创建 ClassGenerator 对象
        ccp = ClassGenerator.newInstance(cl);

        Set<String> worked = new HashSet<>();
        List<Method> methods = new ArrayList<>();

        for (int i = 0; i < ics.length; i++) {
            // 检测接口访问级别是否为 protected 或 privete
            if (!Modifier.isPublic(ics[i].getModifiers())) {
                // 获取接口包名
                String npkg = ics[i].getPackage().getName();
                if (pkg == null) {
                    pkg = npkg;
                } else {
                    // 非 public 级别的接口必须在同一个包下,否者抛出异常
                    if (!pkg.equals(npkg)) {
                        throw new IllegalArgumentException("non-public interfaces from different packages");
                    }
                }
            }
            // 添加接口到 ClassGenerator 中
            ccp.addInterface(ics[i]);

            // 遍历接口方法
            for (Method method : ics[i].getMethods()) {
                // 获取方法描述,可理解为方法签名
                String desc = ReflectUtils.getDesc(method);
                // 如果方法描述字符串已在 worked 中,则忽略。考虑这种情况,
                // A 接口和 B 接口中包含一个完全相同的方法
                if (worked.contains(desc)) {
                    continue;
                }
                worked.add(desc);

                int ix = methods.size();
                // 获取方法返回值类型
                Class<?> rt = method.getReturnType();
                // 获取参数列表
                Class<?>[] pts = method.getParameterTypes();

                // 生成 Object[] args = new Object[1...N]
                StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
                for (int j = 0; j < pts.length; j++) {
                    // 生成 args[1...N] = ($w)$1...N;
                    code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
                }
                // 生成 InvokerHandler 接口的 invoker 方法调用语句,如下:
                // Object ret = handler.invoke(this, methods[1...N], args);
                code.append(" Object ret = handler.invoke(this, methods[").append(ix).append("], args);");
                // 返回值不为 void
                if (!Void.TYPE.equals(rt)) {
                    // 生成返回语句,形如 return (java.lang.String) ret;
                    code.append(" return ").append(asArgument(rt, "ret")).append(";");
                }

                methods.add(method);
                // 添加方法名、访问控制符、参数列表、方法代码等信息到 ClassGenerator 中
                ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
            }
        }

        if (pkg == null) {
            pkg = PACKAGE_NAME;
        }

        // create ProxyInstance class.
        // 构建接口代理类名称:pkg + ".proxy" + id,比如 org.apache.dubbo.proxy0
        String pcn = pkg + ".proxy" + id;
        ccp.setClassName(pcn);
        ccp.addField("public static java.lang.reflect.Method[] methods;");
        // 生成 private java.lang.reflect.InvocationHandler handler;
        ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
        // 为接口代理类添加带有 InvocationHandler 参数的构造方法,比如:
        // porxy0(java.lang.reflect.InvocationHandler arg0) {
        //     handler=$1;
        // }
        ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
        // 为接口代理类添加默认构造方法
        ccp.addDefaultConstructor();
        // 生成接口代理类
        Class<?> clazz = ccp.toClass();
        clazz.getField("methods").set(null, methods.toArray(new Method[0]));

        // create Proxy class.
        // 构建 Proxy 子类名称,比如 Proxy1,Proxy2 等
        String fcn = Proxy.class.getName() + id;
        ccm = ClassGenerator.newInstance(cl);
        ccm.setClassName(fcn);
        ccm.addDefaultConstructor();
        ccm.setSuperClass(Proxy.class);
        // 为 Proxy 的抽象方法 newInstance 生成实现代码,形如:
        // public Object newInstance(java.lang.reflect.InvocationHandler h) {
        //     return new org.apache.dubbo.proxy0($1);
        // }
        ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
        Class<?> pc = ccm.toClass();
        // 生成 Proxy 实现类
        proxy = (Proxy) pc.newInstance();
    } catch (RuntimeException e) {
        throw e;
    } catch (Exception e) {
        throw new RuntimeException(e.getMessage(), e);
    } finally {
        // release ClassGenerator
        if (ccp != null) {
            // 释放资源
            ccp.release();
        }
        if (ccm != null) {
            ccm.release();
        }
        synchronized (cache) {
            if (proxy == null) {
                cache.remove(key);
            } else {
                // 写缓存
                cache.put(key, new WeakReference<Proxy>(proxy));
            }
            // 唤醒其他等待线程
            cache.notifyAll();
        }
    }
    return proxy;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213

大致可以分为以下几步:
对接口进行校验,检查是否是一个接口,是否不能被类加载器加载。
做并发控制,保证只有一个线程可以进行后续的代理生成操作。
创建cpp,用作为服务接口生成代理类。首先对接口定义以及包信息进行处理。
对接口的方法进行处理,包括返回类型,参数类型等。最后添加方法名、访问控制符、参数列表、方法代码等信息到 ClassGenerator 中。
创建接口代理类的信息,比如名称,默认构造方法等。
生成接口代理类。
创建ccm,ccm 则是用于为 org.apache.dubbo.common.bytecode.Proxy 抽象类生成子类,主要是实现 Proxy 类的抽象方法。
设置名称、创建构造方法、添加方法
生成 Proxy 实现类。
释放资源
创建弱引用,写入缓存,唤醒其他线程。
到这里,接口代理类生成后,服务引用也就结束了。

10. RPC——dubbo协议

图片: https://uploader.shimo.im/f/TbeKxmRIav093BYH.png!thumbnail?accessToken=eyJhbGciOiJIUzI1NiIsImtpZCI6ImRlZmF1bHQiLCJ0eXAiOiJKV1QifQ.eyJleHAiOjE2ODQ1OTE5MTYsImZpbGVHVUlEIjoiRWUzMk1ETVdYV1VZblZBMiIsImlhdCI6MTY4NDU5MTYxNiwiaXNzIjoidXBsb2FkZXJfYWNjZXNzX3Jlc291cmNlIiwidXNlcklkIjo4MjU1NDcyNX0.PWWBCYcaf7nMj-XFnUZnozD4srjLecpfRwYAgpGQE7g

DubboInvoker
该类是dubbo协议独自实现的的invoker,其中实现了调用方法的三种模式,分别是异步发送、单向发送和同步发送。
doInvoker

@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    // rpc会话域
    RpcInvocation inv = (RpcInvocation) invocation;
    // 获得方法名
    final String methodName = RpcUtils.getMethodName(invocation);
    // 把path放入到附加值中
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    // 把版本号放入到附加值
    inv.setAttachment(Constants.VERSION_KEY, version);

    // 当前的客户端
    ExchangeClient currentClient;
    // 如果数组内就一个客户端,则直接取出
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        // 取模轮询 从数组中取,当取到最后一个时,从头开始
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        // 是否启用异步
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        // 是否是单向发送
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        // 获得超时时间
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // 如果是单项发送
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            // 单向发送只负责发送消息,不等待服务端应答,所以没有返回值
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) {
            // 异步调用
            ResponseFuture future = currentClient.request(inv, timeout);
            // 保存future,方便后期处理
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            return new RpcResult();
        } else {
            // 同步调用,等待返回结果
            RpcContext.getContext().setFuture(null);
            return (Result) currentClient.request(inv, timeout).get();
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

在调用invoker的时候,通过远程通信将Invocation信息传递给服务端,服务端在接收到该invocation信息后,要找到对应的本地方法,然后执行该方法,将方法的执行结果返回给客户端,在这里,客户端发送有三种模式:
异步发送,也就是当我发送调用后,我不阻塞等待结果,直接返回,将返回的future保存到上下文,方便后期使用。
单向发送,执行方法不需要返回结果。
同步发送,执行方法后,等待结果返回,否则一直阻塞。

DubboExporter
该类继承了AbstractExporter,是dubbo协议中独有的服务暴露者。

/**
 * 服务key
 */
private final String key;

/**
 * 服务暴露者集合
 */
private final Map<String, Exporter<?>> exporterMap;

public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
    super(invoker);
    this.key = key;
    this.exporterMap = exporterMap;
}

@Override
public void unexport() {
    super.unexport();
    // 从集合中移除该key
    exporterMap.remove(key);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

DubboProtocol
该类是dubbo协议的核心实现,其中增加了比如延迟加载等处理。 并且其中还包括了对服务暴露和服务引用的逻辑处理
reply与createInvocation

/*
* 回复请求结果,返回的是请求结果
* */
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

    if (!(message instanceof Invocation)) {
        throw new RemotingException(channel, "Unsupported request: "
            + (message == null ? null : (message.getClass().getName() + ": " + message))
            + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
    }

    Invocation inv = (Invocation) message;
    // 获得暴露的invoker
    Invoker<?> invoker = getInvoker(channel, inv);
    inv.setServiceModel(invoker.getUrl().getServiceModel());
    // switch TCCL
    if (invoker.getUrl().getServiceModel() != null) {
        Thread.currentThread().setContextClassLoader(invoker.getUrl().getServiceModel().getClassLoader());
    }
    // need to consider backward-compatibility if it's a callback
    // 如果是回调服务
    if (Boolean.TRUE.toString().equals(inv.getObjectAttachmentWithoutConvert(IS_CALLBACK_SERVICE_INVOKE))) {
        // 获得 方法定义
        String methodsStr = invoker.getUrl().getParameters().get("methods");
        boolean hasMethod = false;
        if (methodsStr == null || !methodsStr.contains(",")) {
            hasMethod = inv.getMethodName().equals(methodsStr);
        } else {
            // 如果方法不止一个,则分割后遍历查询,找到了则设置为true
            String[] methods = methodsStr.split(",");
            for (String method : methods) {
                if (inv.getMethodName().equals(method)) {
                    hasMethod = true;
                    break;
                }
            }
        }
        if (!hasMethod) {
            logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                + " not found in callback service interface ,invoke will be ignored."
                + " please update the api interface. url is:"
                + invoker.getUrl()) + " ,invocation is :" + inv);
            return null;
        }
    }
    // 设置远程地址
    RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress());
    // 调用下一个调用链
    Result result = invoker.invoke(inv);
    return result.thenApply(Function.identity());
}

 /**
     * 创建会话域, 把url内的值加入到会话域的附加值中
     * @param channel
     * @param url
     * @param methodKey
     * @return
     */
    private Invocation createInvocation(Channel channel, URL url, String methodKey) {
        // 获得方法,methodKey是onconnect或者ondisconnect
        String method = url.getParameter(methodKey);
        if (method == null || method.length() == 0) {
            return null;
        }
        // 创建一个rpc会话域
        RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
        // 加入附加值path
        invocation.setAttachment(Constants.PATH_KEY, url.getPath());
        // 加入附加值group
        invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
        // 加入附加值interface
        invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
        // 加入附加值version
        invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
        // 如果是本地存根服务,则加入附加值dubbo.stub.event为true
        if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
            invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
        }
        return invocation;
    }
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83

export
该方法是基于dubbo协议的服务暴露,除了对于存根服务和本地服务进行标记以外,打开服务和序列化分别在openServer和optimizeSerialization中实现

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // export service.
    // 得到服务key  group+"/"+serviceName+":"+serviceVersion+":"+port
    String key = serviceKey(url);
    // 创建exporter
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    // 加入到集合
    exporterMap.put(key, exporter);

    //export an stub service for dispatching event
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    // 如果是本地存根事件而不是回调服务
    if (isStubSupportEvent && !isCallbackservice) {
        // 获得本地存根的方法
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        // 如果为空,则抛出异常
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }
        } else {
            // 加入集合
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }


    // 打开服务
    openServer(url);
    // 序列化
    optimizeSerialization(url);
    return exporter;
}

createServer
private ExchangeServer createServer(URL url) {
    // send readonly event when server closes, it's enabled by default
    // 服务器关闭时发送readonly事件,默认情况下启用
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    // enable heartbeat by default
    // 心跳默认间隔一分钟
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    // 获得远程通讯服务端实现方式,默认用netty3
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

    /**
     * 如果没有该配置,则抛出异常
     */
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);

    /**
     * 添加编解码器DubboCodec实现
     */
    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    ExchangeServer server;
    try {
        // 启动服务器
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    // 获得客户端侧设置的远程通信方式
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        // 获得远程通信的实现集合
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        // 如果客户端侧设置的远程通信方式不在支持的方式中,则抛出异常
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79

该方法就是根据url携带的远程通信实现方法来创建一个服务器对象。

optimizeSerialization

private void optimizeSerialization(URL url) throws RpcException {
    // 获得类名
    String className = url.getParameter(Constants.OPTIMIZER_KEY, "");
    if (StringUtils.isEmpty(className) || optimizers.contains(className)) {
        return;
    }

    logger.info("Optimizing the serialization process for Kryo, FST, etc...");

    try {
        // 加载类
        Class clazz = Thread.currentThread().getContextClassLoader().loadClass(className);
        if (!SerializationOptimizer.class.isAssignableFrom(clazz)) {
            throw new RpcException("The serialization optimizer " + className + " isn't an instance of " + SerializationOptimizer.class.getName());
        }

        // 强制类型转化为SerializationOptimizer
        SerializationOptimizer optimizer = (SerializationOptimizer) clazz.newInstance();

        if (optimizer.getSerializableClasses() == null) {
            return;
        }

        // 遍历序列化的类,把该类放入到集合进行缓存
        for (Class c : optimizer.getSerializableClasses()) {
            SerializableClassRegistry.registerClass(c);
        }

        // 加入到集合
        optimizers.add(className);
    } catch (ClassNotFoundException e) {
        throw new RpcException("Cannot find the serialization optimizer class: " + className, e);
    } catch (InstantiationException e) {
        throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, e);
    } catch (IllegalAccessException e) {
        throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, e);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

该方法是把序列化的类放入到集合,以便进行序列化

ChannelWrappedInvoker
doInvoke
该方法是在invoker调用的时候对发送请求消息进行了包装

@Override
protected Result doInvoke(Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    // use interface's name as service path to export if it's not found on client side
    // 设置服务path,默认用接口名称
    inv.setAttachment(Constants.PATH_KEY, getInterface().getName());
    // 设置回调的服务key
    inv.setAttachment(Constants.CALLBACK_SERVICE_KEY, serviceKey);

    try {
        // 如果是异步的
        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) { // may have concurrency issue
            // 直接发送请求消息
            currentClient.send(inv, getUrl().getMethodParameter(invocation.getMethodName(), Constants.SENT_KEY, false));
            return new RpcResult();
        }
        // 获得超时时间
        int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        if (timeout > 0) {
            return (Result) currentClient.request(inv, timeout).get();
        } else {
            return (Result) currentClient.request(inv).get();
        }
    } catch (RpcException e) {
        throw e;
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, e.getMessage(), e);
    } catch (Throwable e) { // here is non-biz exception, wrap it.
        throw new RpcException(e.getMessage(), e);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

DecodeableRpcInvocation
该类主要做了对于会话域内的数据进行序列化和解码。

DecodeableRpcResult
该类是做了基于dubbo协议对prc结果的解码

DubboCodec
该类是dubbo的编解码器,分别针对dubbo协议的request和response进行编码和解码。

FutureFilter
该类是处理异步和同步调用结果的过滤器

invoke

@Override
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
    // 是否是异步的调用
    final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);

    fireInvokeCallback(invoker, invocation);
    // need to configure if there's return value before the invocation in order to help invoker to judge if it's
    // necessary to return future.
    Result result = invoker.invoke(invocation);
    if (isAsync) {
        // 调用异步处理
        asyncCallback(invoker, invocation);
    } else {
        // 调用同步结果处理
        syncCallback(invoker, invocation, result);
    }
    return result;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

该方法中根据是否为异步调用来分别执行asyncCallback和syncCallback方法。

syncCallback

private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) {
    // 如果有异常
    if (result.hasException()) {
        // 则调用异常的结果处理
        fireThrowCallback(invoker, invocation, result.getException());
    } else {
        // 调用正常的结果处理
        fireReturnCallback(invoker, invocation, result.getValue());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

该方法是同步调用的返回结果处理。

asyncCallback

private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
    Future<?> f = RpcContext.getContext().getFuture();
    if (f instanceof FutureAdapter) {
        ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
        // 设置回调
        future.setCallback(new ResponseCallback() {
            @Override
            public void done(Object rpcResult) {
                // 如果结果为空,则打印错误日志
                if (rpcResult == null) {
                    logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
                    return;
                }
                ///must be rpcResult
                // 如果不是Result则打印错误日志
                if (!(rpcResult instanceof Result)) {
                    logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
                    return;
                }
                Result result = (Result) rpcResult;
                if (result.hasException()) {
                    // 如果有异常,则调用异常处理方法
                    fireThrowCallback(invoker, invocation, result.getException());
                } else {
                    // 如果正常的返回结果,则调用正常的处理方法
                    fireReturnCallback(invoker, invocation, result.getValue());
                }
            }

            @Override
            public void caught(Throwable exception) {
                fireThrowCallback(invoker, invocation, exception);
            }
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

该方法是异步调用的结果处理,把异步返回结果的逻辑写在回调函数里面。

fireInvokeCallback

private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
    // 获得调用的方法
    final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY));
    // 获得调用的服务
    final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY));

    if (onInvokeMethod == null && onInvokeInst == null) {
        return;
    }
    if (onInvokeMethod == null || onInvokeInst == null) {
        throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
    }
    // 如果不可以访问,则设置为可访问
    if (!onInvokeMethod.isAccessible()) {
        onInvokeMethod.setAccessible(true);
    }

    // 获得参数数组
    Object[] params = invocation.getArguments();
    try {
        // 调用方法
        onInvokeMethod.invoke(onInvokeInst, params);
    } catch (InvocationTargetException e) {
        fireThrowCallback(invoker, invocation, e.getTargetException());
    } catch (Throwable e) {
        fireThrowCallback(invoker, invocation, e);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

该方法是调用方法的执行。

fireReturnCallback

private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) {
    final Method onReturnMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY));
    final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY));

    //not set onreturn callback
    if (onReturnMethod == null && onReturnInst == null) {
        return;
    }

    if (onReturnMethod == null || onReturnInst == null) {
        throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
    }
    if (!onReturnMethod.isAccessible()) {
        onReturnMethod.setAccessible(true);
    }

    Object[] args = invocation.getArguments();
    Object[] params;
    // 获得返回结果类型
    Class<?>[] rParaTypes = onReturnMethod.getParameterTypes();
    // 设置参数和返回结果
    if (rParaTypes.length > 1) {
        if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
            params = new Object[2];
            params[0] = result;
            params[1] = args;
        } else {
            params = new Object[args.length + 1];
            params[0] = result;
            System.arraycopy(args, 0, params, 1, args.length);
        }
    } else {
        params = new Object[]{result};
    }
    try {
        // 调用方法
        onReturnMethod.invoke(onReturnInst, params);
    } catch (InvocationTargetException e) {
        fireThrowCallback(invoker, invocation, e.getTargetException());
    } catch (Throwable e) {
        fireThrowCallback(invoker, invocation, e);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

该方法是正常的返回结果的处理。

fireThrowCallback

private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) {
    final Method onthrowMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_METHOD_KEY));
    final Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_INSTANCE_KEY));

    //onthrow callback not configured
    if (onthrowMethod == null && onthrowInst == null) {
        return;
    }
    if (onthrowMethod == null || onthrowInst == null) {
        throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
    }
    if (!onthrowMethod.isAccessible()) {
        onthrowMethod.setAccessible(true);
    }
    // 获得抛出异常的类型
    Class<?>[] rParaTypes = onthrowMethod.getParameterTypes();
    if (rParaTypes[0].isAssignableFrom(exception.getClass())) {
        try {
            Object[] args = invocation.getArguments();
            Object[] params;

            // 把类型和抛出的异常值放入返回结果
            if (rParaTypes.length > 1) {
                if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
                    params = new Object[2];
                    params[0] = exception;
                    params[1] = args;
                } else {
                    params = new Object[args.length + 1];
                    params[0] = exception;
                    System.arraycopy(args, 0, params, 1, args.length);
                }
            } else {
                params = new Object[]{exception};
            }
            // 调用下一个调用连
            onthrowMethod.invoke(onthrowInst, params);
        } catch (Throwable e) {
            logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), e);
        }
    } else {
        logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

该方法是异常抛出时的结果处理。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/166144?site
推荐阅读
相关标签
  

闽ICP备14008679号