赞
踩
写在前面:借鉴@加点代码调调味相关内容,详见大佬思否主页 https://segmentfault.com/u/crazyhzm
Dubbo是阿里巴巴公司开源的一个高性能优秀的开源分布式服务框架,使得应用可通过高性能的 RPC 实现服务的输出和输入功能,可以和 Spring框架无缝集成。
节点角色说明
Provider 暴露服务的服务提供方
Consumer 调用远程服务的服务消费方。
Registry 服务注册与发现的注册中心
Monitor 统计服务的调用次数和调用时间的监控中心
Container 服务运行容器
注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心。
目前 Dubbo 内置了如下负载均衡算法,用户可直接配置使用:
Random
随机,按权重设置随机概率。
在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。
缺点:存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。
RoundRobin
轮循,按公约后的权重设置轮循比率。
存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。
LeastActive
加权最少活跃调用优先,活跃数越低,越优先调用,相同活跃数的进行加权随机。活跃数指调用前后计数差(针对特定提供者:请求发送数 - 响应返回数),表示特定提供者的任务堆积量,活跃数越低,代表该提供者处理能力越强。
使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大;相对的,处理能力越强的节点,处理更多的请求。
ShortestResponse
加权最短响应优先,在最近一个滑动窗口中,响应时间越短,越优先调用。相同响应时间的进行加权随机。
使得响应时间越快的提供者,处理更多的请求。
缺点:可能会造成流量过于集中于高性能节点的问题。
这里的响应时间 = 某个提供者在窗口时间内的平均响应时间,窗口时间默认是 30s。
ConsistentHash
一致性 Hash,相同参数的请求总是发到同一提供者。
当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
Dubbo 注册与发现的对象是 Dubbo 服务(Java 接口),而其载体为注册元信息,即 Dubbo URL如:dubbo://192.168.1.2:20880/com.foo.BarService?version=1.0.0&group=default,通常包含必须信息,如服务提供方 IP 和端口、 Java 接口,可选包含版本(version)和分组(group)等。服务 URL 所包含的信息能够唯一界别服务提供方的进程。
Dubbo 服务的 Java 接口(interface)允许不同的版本(version)或分组(group),所以仅凭 Java 接口无法唯一标识某个 Dubbo 服务,还需要增加通讯协议(protocol)方可Dubbo 服务 ID 字符表达模式为: p r o t o c o l : {protocol}: protocol:{interface}: v e r s i o n : {version}: version:{group} , 其中,版本(version)或分组(group)是可选的。当 Dubbo Consumer 订阅 Dubbo 服务时,构建对应 ID,通过这个 ID 来查询 Dubbo Provider 的 Service 名称列表。
由于 Dubbo 服务与 Service 的映射关系取决于业务场景,架构层面无从预判。因此,这种映射关系只能在 Dubbo 服务暴露时(运行时)才能确定,否则,Dubbo 服务能被多个 Consumer 应用订阅时,Consumer 无法定位 Provider Service 名称,进而无法完成服务发现。
元数据服务 Metadata
元数据服务 Metadata,称之为“元数据服务的元数据”,主要包括:
· inteface:Dubbo 元数据服务所暴露的接口,即 MetadataService
· serviceName : 当前 MetadataService 所部署的 Dubbo Service 名称,作为 MetadataService 分组信息
· group:当前 MetadataService 分组,数据使用 serviceName
· version:当前 MetadataService 的版本,版本号通常在接口层面声明,不同的 Dubbo 发行版本 version 可能相同,比如 Dubbo 2.7.5 和 2.7.6 中的 version 均为 1.0.0。理论上,version 版本越高,支持元信息类型更丰富
· protocol: MetadataService 所暴露协议,为了确保 Provider 和 Consumer 通讯兼容性,默认协议为:“dubbo”,也可以支持其他协议。
· port:协议所使用的网络端口
· host:当前 MetadataService 所在的服务实例主机或 IP
· params:当前 MetadataService 暴露后 URL 中的参数信息
不难得出,凭借以上元数据服务的 Metadata,可将元数据服务的 Dubbo 服务 ID 确定,辅助 Provider 服务暴露和 Consumer 服务订阅 MetadataService 。不过对于 Provider,这些元信息都是已知的,而对 Consumer 而言,它们直接能获取的元信息仅有:
· serviceName:通过“Dubbo 接口与 Service 映射”关系,可得到 Provider Service 名称
· interface:即 MetadataService ,因为 Provider 和 Consumer 公用 MetadataService 接口
· group:即 serviceName
不过 Consumer 合成 MetadataService Dubbo URL 还需获取 version、host、port、protocol 以及 params:
· version:尽管 MetadataService 接口是统一接口,然而 Provider 和 Consumer 可能引入的 Dubbo 版本不同,从而它们使用的 MetadataService version 也会不同,所以这个信息需要 Provider 在暴露MetadataService 时,同步到服务实例的 Metadata 中,方便 Consumer 从 Metadata 中获取
· host:由于 Consumer 已得到 serviceName,可通过服务发现 API 获取服务实例对象,该对象包含 host 属性,直接被 Consumer 获取即可。
· port:与 version 类似,从 Provider 服务实例中的 Metadata 中获取
· params:同上
通过元数据服务 Metadata 的描述,解释了不同 Dubbo Services 是怎样体现差异性的,并且说明了 MetadataService 元信息的存储介质,这也就是服务自省架构为什么强依赖支持 Metadata 的注册中心的原因。下个小节将讨论 MetadataService 所存储 Dubbo 应用服务 URL 存放在何处。
当前 Dubbo Service 暴露或发布 Dubbo 服务 URL 集合,如:[ dubbo://192.168.1.2:20880/com.acme.Interface1?group=default&version=v1 , thirft://192.168.1.2:20881/com.acme.InterfaceX , rest://192.168.1.2:20882/com.acme.interfaceN ]
demo演示
总体架构
接口dubbo-interface
实体类UserAddress
订单接口OrderService**
用户服务UserService
服务提供者dubbo-priveder
UserServiceImpl
启动类
application.properties
服务消费者dubbo-consumer
控制类
启动类
application.properties
访问结果
RegistryService
public interface RegistryService {
void register(URL url);
void unregister(URL url);
void subscribe(URL url, NotifyListener listener);
void unsubscribe(URL url, NotifyListener listener);
List<URL> lookup(URL url);
}
register && unregister
这两个方法实现了RegistryService接口的方法。其中注册的逻辑是把url加入到属性registered,而取消注册的逻辑就是把url从该属性中移除。真正的实现是FailbackRegistry类中。
@Override
public void register(URL url) {
this.register(new URL(url));
}
@Override
public void unregister(URL url) {
this.unregister(new URL(url));
}
subscribe && unsubscribe
这两个方法实现了RegistryService接口的方法,具体的实现也是在FailbackRegistry类中,分别是订阅和取消订阅,订阅代码。
@Override public void subscribe(URL url, NotifyListener listener) { if (url == null) { throw new IllegalArgumentException("subscribe url == null"); } if (listener == null) { throw new IllegalArgumentException("subscribe listener == null"); } if (logger.isInfoEnabled()) { logger.info("Subscribe: " + url); } // 获得该消费者url 已经订阅的服务 的监听器集合 Set<NotifyListener> listeners = subscribed.get(url); if (listeners == null) { subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>()); listeners = subscribed.get(url); } // 添加某个服务的监听器 listeners.add(listener); }
lookUp
该方法是实现了RegistryService接口的方法,作用是获得消费者url订阅的服务URL列表
@Override public List<URL> lookup(URL url) { List<URL> result = new ArrayList<URL>(); // 获得该消费者url订阅的 所有被通知的 服务URL集合 Map<String, List<URL>> notifiedUrls = getNotified().get(url); // 判断该消费者是否订阅服务 if (notifiedUrls != null && notifiedUrls.size() > 0) { for (List<URL> urls : notifiedUrls.values()) { for (URL u : urls) { // 判断协议是否为空 if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) { // 添加 该消费者订阅的服务URL result.add(u); } } } } else { // 原子类 避免在获取注册在注册中心的服务url时能够保证是最新的url集合 final AtomicReference<List<URL>> reference = new AtomicReference<List<URL>>(); // 通知监听器。当收到服务变更通知时触发 NotifyListener listener = new NotifyListener() { @Override public void notify(List<URL> urls) { reference.set(urls); } }; // 订阅服务,就是消费者url订阅已经 注册在注册中心的服务(也就是添加该服务的监听器) subscribe(url, listener); // Subscribe logic guarantees the first notify to return List<URL> urls = reference.get(); if (urls != null && !urls.isEmpty()) { for (URL u : urls) { if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) { result.add(u); } } } } return result; }
recover
在注册中心断开,重连成功的时候,会恢复注册和订阅
protected void recover() throws Exception { // register //把内存缓存中的registered取出来遍历进行注册 Set<URL> recoverRegistered = new HashSet<URL>(getRegistered()); if (!recoverRegistered.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover register url " + recoverRegistered); } for (URL url : recoverRegistered) { register(url); } } // subscribe //把内存缓存中的subscribed取出来遍历进行订阅 Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed()); if (!recoverSubscribed.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover subscribe url " + recoverSubscribed.keySet()); } for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) { URL url = entry.getKey(); for (NotifyListener listener : entry.getValue()) { subscribe(url, listener); } } } }
notify
notify方法是通知监听器,url的变化结果
发起订阅后,会获取全量数据,此时会调用notify方法。即Registry 获取到了全量数据
每次注册中心发生变更时会调用notify方法虽然变化是增量,调用这个方法的调用方,已经进行处理,传入的urls依然是全量的。
listener.notify,通知监听器。
protected void notify(List<URL> urls) { if (urls == null || urls.isEmpty()) return; // 遍历订阅URL的监听器集合,通知他们 for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) { URL url = entry.getKey(); if (!UrlUtils.isMatch(url, urls.get(0))) { continue; } // 遍历监听器集合,通知他们 Set<NotifyListener> listeners = entry.getValue(); if (listeners != null) { for (NotifyListener listener : listeners) { try { notify(url, listener, filterEmpty(url, urls)); } catch (Throwable t) { logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t); } } } } } protected void notify(URL url, NotifyListener listener, List<URL> urls) { if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } if ((urls == null || urls.isEmpty()) && !Constants.ANY_VALUE.equals(url.getServiceInterface())) { logger.warn("Ignore empty notify urls for subscribe url " + url); return; } if (logger.isInfoEnabled()) { logger.info("Notify urls for subscribe url " + url + ", urls: " + urls); } Map<String, List<URL>> result = new HashMap<String, List<URL>>(); // 将urls进行分类 for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { // 按照url中key为category对应的值进行分类,如果没有该值,就找key为providers的值进行分类 String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); List<URL> categoryList = result.get(category); if (categoryList == null) { categoryList = new ArrayList<URL>(); // 分类结果放入result result.put(category, categoryList); } categoryList.add(u); } } if (result.size() == 0) { return; } // 获得某一个消费者被通知的url集合(通知的 URL 变化结果) Map<String, List<URL>> categoryNotified = notified.get(url); if (categoryNotified == null) { // 添加该消费者对应的url notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>()); categoryNotified = notified.get(url); } // 处理通知监听器URL 变化结果 for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); // 把分类标实和分类后的列表放入notified的value中 // 覆盖到 `notified` // 当某个分类的数据为空时,会依然有 urls 。其中 `urls[0].protocol = empty` ,通过这样的方式,处理所有服务提供者为空的情况。 categoryNotified.put(category, categoryList); // 保存到文件 saveProperties(url); //通知监听器 listener.notify(categoryList); } }
FailbackRegistry
属性
// Scheduled executor service // 定时任务执行器 private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true)); // Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry // 失败重试定时器,定时去检查是否有请求失败的,如有,无限次重试。 private final ScheduledFuture<?> retryFuture; // 注册失败的URL集合 private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>(); // 取消注册失败的URL集合 private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>(); // 订阅失败的监听器集合 private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); // 取消订阅失败的监听器集合 private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); // 通知失败的URL集合 private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
构造函数
public FailbackRegistry(URL url) { super(url); // 从url中读取重试频率,如果为空,则默认5000ms this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); // 创建失败重试定时器 this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { // Check and connect to the registry try { //重试 retry(); } catch (Throwable t) { // Defensive fault tolerance logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); } } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); }
register && unregister && subscribe && unsubscribe
注册、取消注册、订阅、取消订阅具体实现代码逻辑相似,下面为注册的源码。
就是做了一个doRegister的操作,如果失败抛出异常,则加入到失败的缓存中进行重试。
public void register(URL url) { super.register(url); //首先从失败的缓存中删除该url failedRegistered.remove(url); failedUnregistered.remove(url); try { // Sending a registration request to the server side // 向注册中心发送一个注册请求 doRegister(url); } catch (Exception e) { Throwable t = e; // If the startup detection is opened, the Exception is thrown directly. // 如果开启了启动时检测,则直接抛出异常 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // Record a failed registration request to a failed list, retry regularly // 把这个注册失败的url放入缓存,并且定时重试。 failedRegistered.add(url); } }
notify
@Override protected void notify(URL url, NotifyListener listener, List<URL> urls) { if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } try { // 通知 url 数据变化 doNotify(url, listener, urls); } catch (Exception t) { // Record a failed registration request to a failed list, retry regularly // 放入失败的缓存中,重试 Map<NotifyListener, List<URL>> listeners = failedNotified.get(url); if (listeners == null) { failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>()); listeners = failedNotified.get(url); } listeners.put(listener, urls); logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } } protected void doNotify(URL url, NotifyListener listener, List<URL> urls) { super.notify(url, listener, urls); }
revocer
@Override protected void recover() throws Exception { // register // register 恢复注册,添加到 `failedRegistered` ,定时重试 Set<URL> recoverRegistered = new HashSet<URL>(getRegistered()); if (!recoverRegistered.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover register url " + recoverRegistered); } for (URL url : recoverRegistered) { failedRegistered.add(url); } } // subscribe // subscribe 恢复订阅,添加到 `failedSubscribed` ,定时重试 Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed()); if (!recoverSubscribed.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover subscribe url " + recoverSubscribed.keySet()); } for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) { URL url = entry.getKey(); for (NotifyListener listener : entry.getValue()) { addFailedSubscribed(url, listener); } } } }
dubbo的Root层是根目录,通过<dubbo:registry group=“dubbo” />的“group”来设置zookeeper的根节点,缺省值是“dubbo”。
Service层是服务接口的全名。
Type层是分类,一共有四种分类,分别是providers(服务提供者列表)、consumers(服务消费者列表)、routes(路由规则列表)、configurations(配置规则列表)。
URL层:根据不同的Type目录:可以有服务提供者 URL 、服务消费者 URL 、路由规则 URL 、配置规则 URL 。不同的Type关注的URL不同。
zookeeper以每个斜杠来分割每一层的znode,比如第一层根节点dubbo就是“/dubbo”,而第二层的Service层就是/com.foo.Barservice,zookeeper的每个节点通过路径来表示以及访问,例如服务提供者启动时,向/dubbo/com.foo.Barservice/providers目录下写入自己的URL地址。
调用流程:
服务提供者启动时: 向 /dubbo/com.foo.BarService/providers 目录下写入自己的 URL 地址。
服务消费者启动时: 订阅 /dubbo/com.foo.BarService/providers 目录下的提供者 URL 地址。并向 /dubbo/com.foo.BarService/consumers 目录下写入自己的 URL 地址
监控中心启动时: 订阅 /dubbo/com.foo.BarService 目录下的所有提供者和消费者 URL 地址。
ZookeeperRegistry
该类继承了FailbackRegistry类,该类就是针对注册中心核心的功能注册、订阅、取消注册、取消订阅,查询注册列表进行展开,基于zookeeper来实现。
属性
// 日志记录 private final static Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class); // 默认的zookeeper端口 private final static int DEFAULT_ZOOKEEPER_PORT = 2181; // 默认zookeeper根节点 private final static String DEFAULT_ROOT = "dubbo"; // zookeeper根节点 private final String root; // 服务接口集合 private final Set<String> anyServices = new ConcurrentHashSet<String>(); // 监听器集合 private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>(); // zookeeper客户端 private final ZookeeperClient zkClient;
构造方法
参数中ZookeeperTransporter是一个接口,并且在dubbo中有ZkclientZookeeperTransporter和CuratorZookeeperTransporter两个实现类。
dubbo在zookeeper节点层级有一层是root层,该层是通过group属性来设置的。
给客户端添加一个监听器,当状态为重连的时候调用FailbackRegistry的恢复方法
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } // 获得url携带的分组配置,并且作为zookeeper的根节点 String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } this.root = group; // 创建zookeeper client zkClient = zookeeperTransporter.connect(url); // 添加状态监听器,当状态为重连的时候调用恢复方法 zkClient.addStateListener(new StateListener() { @Override public void stateChanged(int state) { if (state == RECONNECTED) { try { // 恢复 recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); }
isAvailable && destroy
这两个方法分别是检测zookeeper是否连接以及销毁连接
@Override
public boolean isAvailable() {
return zkClient != null && zkClient.isConnected();
}
@Override
public void destroy() {
super.destroy();
// Just release zkClient reference, but can not close zk client here for zk client is shared somewhere else.
// See org.apache.dubbo.remoting.zookeeper.AbstractZookeeperTransporter#destroy()
zkClient = null;
}
doRegister && doUnregister
这两个方法分别是注册和取消注册,调用客户端create和delete方法,创建节点和删除节点。
@Override protected void doRegister(URL url) { try { // 创建URL节点,也就是URL层的节点 zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } @Override protected void doUnregister(URL url) { try { // 删除节点 zkClient.delete(toUrlPath(url)); } catch (Throwable e) { throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
doSubscribe
这里的实现把所有Service层发起的订阅以及指定的Service层发起的订阅分开处理。所有Service层类似于监控中心发起的订阅。指定的Service层发起的订阅可以看作是服务消费者的订阅。订阅的大致逻辑类似,有几个区别:
所有Service层发起的订阅中的ChildListener是在在 Service 层发生变更时,才会做出解码,用anyServices属性判断是否是新增的服务,最后调用父类的subscribe订阅。而指定的Service层发起的订阅是在URL层发生变更的时候,调用notify,回调回调NotifyListener的逻辑,做到通知服务变更。
所有Service层发起的订阅中客户端创建的节点是Service节点,该节点为持久节点,而指定的Service层发起的订阅中创偶建的节点是Type节点,该节点也是持久节点。
指定的Service层发起的订阅中调用了两次notify,第一次是增量的通知,只是通知这次增加的服务节点,而第二个是全量的通知。
@Override protected void doSubscribe(final URL url, final NotifyListener listener) { try { // 处理所有Service层发起的订阅,例如监控中心的订阅 if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { // 获得根目录 String root = toRootPath(); // 获得url对应的监听器集合 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); // 不存在就创建监听器集合 if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } // 获得节点监听器 ChildListener zkListener = listeners.get(listener); // 如果该节点监听器为空,则创建 if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { @Override public void childChanged(String parentPath, List<String> currentChilds) { // 遍历现有的节点,如果现有的服务集合中没有该节点,则加入该节点,然后订阅该节点 for (String child : currentChilds) { // 解码 child = URL.decode(child); if (!anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } }); // 重新获取,为了保证一致性 zkListener = listeners.get(listener); } // 创建service节点,该节点为持久节点 zkClient.create(root, false); // 向zookeeper的service节点发起订阅,获得Service接口全名数组 List<String> services = zkClient.addChildListener(root, zkListener); if (services != null && !services.isEmpty()) { // 遍历Service接口全名数组 for (String service : services) { service = URL.decode(service); anyServices.add(service); // 发起该service层的订阅 subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } else { // 处理指定 Service 层的发起订阅,例如服务消费者的订阅 List<URL> urls = new ArrayList<URL>(); // 遍历分类数组 for (String path : toCategoriesPath(url)) { // 获得监听器集合 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); // 如果没有则创建 if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } // 获得节点监听器 ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { @Override public void childChanged(String parentPath, List<String> currentChilds) { // 通知服务变化 回调NotifyListener ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); } }); // 重新获取节点监听器,保证一致性 zkListener = listeners.get(listener); } // 创建type节点,该节点为持久节点 zkClient.create(path, false); // 向zookeeper的type节点发起订阅 List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { // 加入到自子节点数据数组 urls.addAll(toUrlsWithEmpty(url, path, children)); } } // 通知数据变化 notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
doUnsubscribe
取消订阅,也是分为两种情况,所有的Service发起的取消订阅和指定的Service发起的取消订阅。
所有的Service发起的取消订阅就直接移除了根目录下所有的监听器,而指定的Service发起的取消订阅是移除了该Service层下面的所有Type节点监听器
@Override protected void doUnsubscribe(URL url, NotifyListener listener) { // 获得监听器集合 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners != null) { // 获得子节点的监听器 ChildListener zkListener = listeners.get(listener); if (zkListener != null) { // 如果为全部的服务接口,例如监控中心 if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { // 获得根目录 String root = toRootPath(); // 移除监听器 zkClient.removeChildListener(root, zkListener); } else { // 遍历分类数组进行移除监听器 for (String path : toCategoriesPath(url)) { zkClient.removeChildListener(path, zkListener); } } } } }
lookup
查询符合条件的已经注册的服务
@Override public List<URL> lookup(URL url) { if (url == null) { throw new IllegalArgumentException("lookup url == null"); } try { List<String> providers = new ArrayList<String>(); // 遍历分组类别 for (String path : toCategoriesPath(url)) { // 获得子节点 List<String> children = zkClient.getChildren(path); if (children != null) { providers.addAll(children); } } // 获得 providers 中,和 consumer 匹配的 URL 数组 return toUrlsWithoutEmpty(url, providers); } catch (Throwable e) { throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
toServicePath
获得服务路径,拼接规则:Root + Type
private String toServicePath(URL url) {
String name = url.getServiceInterface();
// 如果是包括所有服务,则返回根节点
if (Constants.ANY_VALUE.equals(name)) {
return toRootPath();
}
return toRootDir() + URL.encode(name);
}
toCategoriesPath
第一个方法是获得分类数组,也就是url携带的服务下的所有Type节点数组。第二个是获得分类路径,分类路径拼接规则:Root + Service + Type
private String[] toCategoriesPath(URL url) { String[] categories; // 如果url携带的分类配置为*,则创建包括所有分类的数组 if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) { categories = new String[]{Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY, Constants.ROUTERS_CATEGORY, Constants.CONFIGURATORS_CATEGORY}; } else { // 返回url携带的分类配置 categories = url.getParameter(Constants.CATEGORY_KEY, new String[]{Constants.DEFAULT_CATEGORY}); } String[] paths = new String[categories.length]; for (int i = 0; i < categories.length; i++) { // 加上服务路径 paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categories[i]; } return paths; } private String toCategoryPath(URL url) { return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); }
toUrlPath
获得URL路径,拼接规则是Root + Service + Type + URL
private String toUrlPath(URL url) {
return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());
}
toUrlsWithoutEmpty && toUrlsWithEmpty
第一个toUrlsWithoutEmpty方法是获得 providers 中,和 consumer 匹配的 URL 数组,第二个toUrlsWithEmpty方法是调用了第一个方法后增加了若不存在匹配,则创建 empty:// 的 URL返回。
private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) { List<URL> urls = new ArrayList<URL>(); if (providers != null && !providers.isEmpty()) { // 遍历服务提供者 for (String provider : providers) { // 解码 provider = URL.decode(provider); if (provider.contains("://")) { // 把服务转化成url的形式 URL url = URL.valueOf(provider); // 判断是否匹配,如果匹配, 则加入到集合中 if (UrlUtils.isMatch(consumer, url)) { urls.add(url); } } } } return urls; } private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) { // 返回和服务消费者匹配的服务提供者url List<URL> urls = toUrlsWithoutEmpty(consumer, providers); // 如果不存在,则创建`empty://` 的 URL返回 if (urls == null || urls.isEmpty()) { int i = path.lastIndexOf('/'); String category = i < 0 ? path : path.substring(i + 1); URL empty = consumer.setProtocol(Constants.EMPTY_PROTOCOL).addParameter(Constants.CATEGORY_KEY, category); urls.add(empty); } return urls; }
ZookeeperRegistryFactory
该类继承了AbstractRegistryFactory类,实现了AbstractRegistryFactory抽象出来的createRegistry方法,实例化ZookeeperRegistry
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
private ZookeeperTransporter zookeeperTransporter;
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
Hystrix的内部处理逻辑
Hystrix工作流程
当调用出现错误时,开启一个时间窗(默认 10秒)
在这个时间窗内,统计调用次数是否达到最小请求数?
如果没有达到,则重置统计信息,回到第1步
如果没有达到最小请求数,即使请求全部失败,也会回到第1步
如果达到了,则统计失败的请求数占所有请求数的百分比,是否达到阈值?
如果达到,则跳闸(不再 请求对应的服务)
如果没有达到,则重置统计信息,回到第1步
如果跳闸,则会开启一个活动窗口(默认5秒),每隔5秒,Hystrix 会让一个请求通过,到达那个正在苦苦挣扎的服务,看是否调用成功 如果成功,重置断路器,回到第1步 如果失败,回到第3步
@HystrixCommand 常用配置参数
@HystrixCommmand(commendKey=" “,groupKey=” “,threadPoolKey=” “,fallbackMethod=” ")
commandKey:用来标识一个 Hystrix 命令,默认会取被注解的方法名。
注意:Hystrix 里同一个键的唯一标识并不包括 groupKey,建议取一个独一二无的名字,防止多个方法之间因为键重复而互相影响
groupKey:一组 Hystrix 命令的集合, 用来统计、报告,默认取类名。
threadPoolKey:用来标识一个线程池,如果没设置的话会取 groupKey。
一般同一个类内的方法在共用同一个线程池,如果两个共用同一线程池的方法上配置了同样的属性,在第一个方法被执行后线程池的属性就固定了,所以属性会以第一个被执行的方法上的配置为准。
fallbackMethod:方法执行时熔断、错误、超时时会执行的回退方法,需要保持此方法与 Hystrix 方法的签名和返回值一致
例:
/**
* 调用内容库获取分类书籍信息接口
*/
@HystrixCommand(groupKey = "bookQueryFacade", threadPoolKey = "queryBook", commandKey = "getBookListByCategory", fallbackMethod = "getBookListByCategoryFallback")
public BaseResponseDTO<BookListDTO> getBookListByCategory(BaseRequestDTO<BookCategoryReqDTO> baseRequestDTO) {
return bookQueryFacade.getBookListByCategory(baseRequestDTO);
}
public BaseResponseDTO<BookListDTO> getBookListByCategoryFallback(BaseRequestDTO<BookCategoryReqDTO> baseRequestDTO) {
log.error("Hystrix ContentRepositoryCaller getBookListByCategory");
BookListDTO bookListDTO = new BookListDTO();
bookListDTO.setHasNext(false);
bookListDTO.setBookList(Collections.EMPTY_LIST);
return getDefaultResponseDTO(bookListDTO);
}
Hystrix源码总结
Hystrix在底层使用了Spring提供的切面技术:
通过HystrixCommandAspect.java定义了一个切面(该类有@Aspect注解),专门用来处理那些标注了@HystrixCommand的方法
AspectJ aspect to process methods which annotated with {@link HystrixCommand} annotation. */ @Aspect public class HystrixCommandAspect {…}
更详细的讲,该类中定义了一个切点。切点是@HystrixCommand
@Pointcut(“@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)”)public void hystrixCommandAnnotationPointcut() {}
而针对该切点,还设置一个环绕函数
@Around(“hystrixCommandAnnotationPointcut()”) public Object methodsAnnotatedWithHystrixCommand()
环绕函数中,最关键的一句是execute()。这里会根据executionType有SYNCHRONOUS,ASYNCHRONOUS,OBSERVABLE调用不同逻辑,使用rxjava的观察者模式并最终在这里计算出结果返回。
result = CommandExecutor.execute(invokable, executionType, metaHolder);
当调用execute()出现失败时,会调用getFallBack()方法。而如果没有设置降级方法则调用父类的getFallBack()方法,父类的getFallBack方法会抛出一个找不到降级方法的异常
protected T getFallback() { throw new RuntimeException(“No fallback available.”, getExecutionException());
具体源码:
/** * AspectJ aspect to process methods which annotated with {@link HystrixCommand} annotation. */ @Aspect public class HystrixCommandAspect { private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP; static { META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder() .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory()) .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory()) .build(); } @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)") public void hystrixCommandAnnotationPointcut() { } @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)") public void hystrixCollapserAnnotationPointcut() { } @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()") public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable { //通过切点获取被拦截的方法 Method method = getMethodFromTarget(joinPoint); Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint); if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) { throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " + "annotations at the same time"); } MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method)); //metaholder中保存了很多和切点相关的信息,详见后文的贴图 MetaHolder metaHolder = metaHolderFactory.create(joinPoint); HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType(); //result中保留操作的结果,可能是成功操作的返回值也可能是fallback方法的返回值 Object result; try { //如果返回的结果使用了observable模式则执行以下代码 if (!metaHolder.isObservable()) { result = CommandExecutor.execute(invokable, executionType, metaHolder); } else { //否则执行else result = executeObservable(invokable, executionType, metaHolder); } } catch (HystrixBadRequestException e) { throw e.getCause() != null ? e.getCause() : e; } catch (HystrixRuntimeException e) { throw hystrixRuntimeExceptionToThrowable(metaHolder, e); } return result; } .......... }
重要的是执行这一步
result = CommandExecutor.execute(invokable, executionType, metaHolder); * Calls a method of {@link HystrixExecutable} in accordance with specified execution type. * * @param invokable {@link HystrixInvokable} * @param metaHolder {@link MetaHolder} * @return the result of invocation of specific method. * @throws RuntimeException */ public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException { Validate.notNull(invokable); Validate.notNull(metaHolder); switch (executionType) { case SYNCHRONOUS: { return castToExecutable(invokable, executionType).execute(); } case ASYNCHRONOUS: { HystrixExecutable executable = castToExecutable(invokable, executionType); if (metaHolder.hasFallbackMethodCommand() && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) { return new FutureDecorator(executable.queue()); } return executable.queue(); } case OBSERVABLE: { HystrixObservable observable = castToObservable(invokable); return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable(); } default: throw new RuntimeException("unsupported execution type: " + executionType); }
在execute()方法中使用了rxjava的观察者模式并最终在这里计算出结果返回。当出现失败时,会调用GenericCommand.java中的getFallBack()方法,代码如下:
@Override protected Object getFallback() { final CommandAction commandAction = getFallbackAction(); if (commandAction != null) { try { return process(new Action() { @Override Object execute() { MetaHolder metaHolder = commandAction.getMetaHolder(); Object[] args = createArgsForFallback(metaHolder, getExecutionException()); return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args); } }); } catch (Throwable e) { LOGGER.error(FallbackErrorMessageBuilder.create() .append(commandAction, e).build()); throw new FallbackInvocationException(unwrapCause(e)); } } else { return super.getFallback(); } }
在这个方法中首先会对是否设置了commandAction进行判断,commandAction就是我们之前所设置的fallback字段指向的方法,如果发现有降级方法,则调用降级方法获取结果,进入process()方法,代码如下:
Object process(Action action) throws Exception { Object result; try { result = action.execute(); flushCache(); } catch (CommandActionExecutionException throwable) { Throwable cause = throwable.getCause(); if (isIgnorable(cause)) { throw new HystrixBadRequestException(cause.getMessage(), cause); } if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } else if (cause instanceof Exception) { throw (Exception) cause; } else { // instance of Throwable throw new CommandActionExecutionException(cause); } } return result; }
最终在process()方法中获取降级方法执行的结果,而如果没有设置降级方法则调用父类的getFallBack()方法,父类的getFallBack方法会抛出一个找不到降级方法的异常,代码如下:
protected T getFallback() {
throw new RuntimeException("No fallback available.", getExecutionException());
}
至于何时会跳转到降级方法,则是在AbstractCommand.java中,在这里定义了很多种执行失败的情况,通过rxjava框架的观察者模式对错误进行监听,根据不同的情况会进入不同的处理方法,最终这些处理方法都会调用HystrixCommand.java中的getFallbackObservable()方法,并最终进入上文所述的真正执行fallback方法的代码
服务暴露过程
服务暴露过程大致可分为三个部分:
前置工作,主要用于检查参数,组装 URL
导出服务,包含暴露服务到本地 (JVM),和暴露服务到远程两个过程。
向注册中心注册服务,用于服务发现。
暴露起点
Spring中有一个ApplicationListener接口,其中定义了一个onApplicationEvent()方法,在当容器内发生任何事件时,此方法都会被触发。
只要服务没有被暴露并且服务没有被取消暴露,就暴露服务
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// 如果服务没有被暴露并且服务没有被取消暴露,则打印日志
if (!isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
// 导出
export();
}
}
前置工作
前置工作主要包含两个部分,分别是配置检查,以及 URL 装配。
在暴露服务之前,Dubbo 需要检查用户的配置是否合理,或者为用户补充缺省配置。配置检查完成后,接下来需要根据这些配置组装 URL。
【在 Dubbo 中,URL 的作用十分重要。Dubbo 使用 URL 作为配置载体,所有的拓展点都是通过 URL 获取配置】
配置检查
首先对配置的检查和更新,执行的ServiceConfig中的checkAndUpdateSubConfigs方法。然后检测是否应该暴露,如果不应该暴露,则直接结束,然后检测是否配置了延迟加载,如果是,则使用定时器来实现延迟加载的目的。
public synchronized void export() { //检查并且更新配置 checkAndUpdateSubConfigs(); // 如果不应该暴露,则直接结束 if (!shouldExport()) { return; } // 如果使用延迟加载,则延迟delay时间后暴露服务 if (shouldDelay()) { delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS); } else { // 暴露服务 doExport(); } }
checkAndUpdateSubConfigs()
该方法中是对各类配置的校验,并且更新部分配置
public void checkAndUpdateSubConfigs() { // Use default configs defined explicitly on global configs // 用于检测 provider、application 等核心配置类对象是否为空, // 若为空,则尝试从其他配置类对象中获取相应的实例。 completeCompoundConfigs(); // Config Center should always being started first. // 开启配置中心 startConfigCenter(); // 检测 provider 是否为空,为空则新建一个,并通过系统变量为其初始化 checkDefault(); // 检查application是否为空 checkApplication(); // 检查注册中心是否为空 checkRegistry(); // 检查protocols是否为空 checkProtocol(); this.refresh(); // 核对元数据中心配置是否为空 checkMetadataReport(); // 服务接口名不能为空,否则抛出异常 if (StringUtils.isEmpty(interfaceName)) { throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!"); } // 检测 ref 是否为泛化服务类型 if (ref instanceof GenericService) { // 设置interfaceClass为GenericService interfaceClass = GenericService.class; if (StringUtils.isEmpty(generic)) { // 设置generic = true generic = Boolean.TRUE.toString(); } } else { try { // 获得接口类型 interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() .getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } // 对 interfaceClass,以及 <dubbo:method> 标签中的必要字段进行检查 checkInterfaceAndMethods(interfaceClass, methods); // 对 ref 合法性进行检测 checkRef(); generic = Boolean.FALSE.toString(); } // stub local一样都是配置本地存根 if (local != null) { if ("true".equals(local)) { local = interfaceName + "Local"; } Class<?> localClass; try { localClass = ClassHelper.forNameWithThreadContextClassLoader(local); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } if (!interfaceClass.isAssignableFrom(localClass)) { throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName); } } if (stub != null) { if ("true".equals(stub)) { stub = interfaceName + "Stub"; } Class<?> stubClass; try { stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } if (!interfaceClass.isAssignableFrom(stubClass)) { throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName); } } // 本地存根合法性校验 checkStubAndLocal(interfaceClass); // mock合法性校验 checkMock(interfaceClass); }
doExport()
对于服务是否暴露再一次校验,然后会执行ServiceConfig的doExportUrls()方法,对于多协议多注册中心暴露服务进行支持。
protected synchronized void doExport() { // 如果调用不暴露的方法,则unexported值为true if (unexported) { throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!"); } // 如果服务已经暴露了,则直接结束 if (exported) { return; } // 设置已经暴露 exported = true; // 如果path为空,则赋值接口名称 if (StringUtils.isEmpty(path)) { path = interfaceName; } // 多协议多注册中心暴露服务 doExportUrls(); }
doExportUrls()
private void doExportUrls() {
// 加载注册中心链接
List<URL> registryURLs = loadRegistries(true);
// 遍历 protocols,并在每个协议下暴露服务
for (ProtocolConfig protocolConfig : protocols) {
// 以path、group、version来作为服务唯一性确定的key
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
ApplicationModel.initProviderModel(pathKey, providerModel);
// 组装 URL
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
从该方法可以看到:
loadRegistries()方法是加载注册中心链接。
服务的唯一性是通过path、group、version一起确定的。
doExportUrlsFor1Protocol()方法开始组装URL。
loadRegistries()
protected List<URL> loadRegistries(boolean provider) { // check && override if necessary List<URL> registryList = new ArrayList<URL>(); // 如果registries为空,直接返回空集合 if (CollectionUtils.isNotEmpty(registries)) { // 遍历注册中心配置集合registries for (RegistryConfig config : registries) { // 获得地址 String address = config.getAddress(); // 若地址为空,则设置为0.0.0.0 if (StringUtils.isEmpty(address)) { address = Constants.ANYHOST_VALUE; } // 如果地址为N/A,则跳过 if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) { Map<String, String> map = new HashMap<String, String>(); // 添加 ApplicationConfig 中的字段信息到 map 中 appendParameters(map, application); // 添加 RegistryConfig 字段信息到 map 中 appendParameters(map, config); // 添加path map.put(Constants.PATH_KEY, RegistryService.class.getName()); // 添加 协议版本、发布版本,时间戳 等信息到 map 中 appendRuntimeParameters(map); // 如果map中没有protocol,则默认为使用dubbo协议 if (!map.containsKey(Constants.PROTOCOL_KEY)) { map.put(Constants.PROTOCOL_KEY, Constants.DUBBO_PROTOCOL); } // 解析得到 URL 列表,address 可能包含多个注册中心 ip,因此解析得到的是一个 URL 列表 List<URL> urls = UrlUtils.parseURLs(address, map); // 遍历URL 列表 for (URL url : urls) { // 将 URL 协议头设置为 registry url = URLBuilder.from(url) .addParameter(Constants.REGISTRY_KEY, url.getProtocol()) .setProtocol(Constants.REGISTRY_PROTOCOL) .build(); // 通过判断条件,决定是否添加 url 到 registryList 中,条件如下: // 如果是服务提供者,并且是注册中心服务 或者 是消费者端,并且是订阅服务 // 则加入到registryList if ((provider && url.getParameter(Constants.REGISTER_KEY, true)) || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) { registryList.add(url); } } } } } return registryList; }
组装URL
dubbo内部用URL来携带各类配置,贯穿整个调用链,它就是配置的载体。服务的配置被组装到URL中就是从这里开始
遍历每个协议配置,在每个协议下都暴露服务,就会执行ServiceConfig的doExportUrlsFor1Protocol()方法,该方法前半部分实现了组装URL的逻辑,后半部分实现了暴露dubbo服务等逻辑,其中为用分割线分隔了。
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { // 获取协议名 String name = protocolConfig.getName(); // 如果为空,则是默认的dubbo if (StringUtils.isEmpty(name)) { name = Constants.DUBBO; } Map<String, String> map = new HashMap<String, String>(); // 设置服务提供者册 map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE); // 添加 协议版本、发布版本,时间戳 等信息到 map 中 appendRuntimeParameters(map); // 添加metrics、application、module、provider、protocol的所有信息到map appendParameters(map, metrics); appendParameters(map, application); appendParameters(map, module); appendParameters(map, provider, Constants.DEFAULT_KEY); appendParameters(map, protocolConfig); appendParameters(map, this); // 如果method的配置列表不为空 if (CollectionUtils.isNotEmpty(methods)) { // 遍历method配置列表 for (MethodConfig method : methods) { // 把方法名加入map appendParameters(map, method, method.getName()); // 添加 MethodConfig 对象的字段信息到 map 中,键 = 方法名.属性名。 // 比如存储 <dubbo:method name="sayHello" retries="2"> 对应的 MethodConfig, // 键 = sayHello.retries,map = {"sayHello.retries": 2, "xxx": "yyy"} String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); // 如果retryValue为false,则不重试,设置值为0 if ("false".equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } } // 获得ArgumentConfig列表 List<ArgumentConfig> arguments = method.getArguments(); if (CollectionUtils.isNotEmpty(arguments)) { // 遍历ArgumentConfig列表 for (ArgumentConfig argument : arguments) { // convert argument type // // 检测 type 属性是否为空,或者空串 if (argument.getType() != null && argument.getType().length() > 0) { // 利用反射获取该服务的所有方法集合 Method[] methods = interfaceClass.getMethods(); // visit all methods if (methods != null && methods.length > 0) { // 遍历所有方法 for (int i = 0; i < methods.length; i++) { // 获得方法名 String methodName = methods[i].getName(); // target the method, and get its signature // 找到目标方法 if (methodName.equals(method.getName())) { // 通过反射获取目标方法的参数类型数组 argtypes Class<?>[] argtypes = methods[i].getParameterTypes(); // one callback in the method // 如果下标为-1 if (argument.getIndex() != -1) { // 检测 argType 的名称与 ArgumentConfig 中的 type 属性是否一致 if (argtypes[argument.getIndex()].getName().equals(argument.getType())) { // 添加 ArgumentConfig 字段信息到 map 中 appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { // 不一致,则抛出异常 throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } else { // multiple callbacks in the method // 遍历参数类型数组 argtypes,查找 argument.type 类型的参数 for (int j = 0; j < argtypes.length; j++) { Class<?> argclazz = argtypes[j]; if (argclazz.getName().equals(argument.getType())) { // 如果找到,则添加 ArgumentConfig 字段信息到 map 中 appendParameters(map, argument, method.getName() + "." + j); if (argument.getIndex() != -1 && argument.getIndex() != j) { throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } } } } } } } else if (argument.getIndex() != -1) { // 用户未配置 type 属性,但配置了 index 属性,且 index != -1,则直接添加到map appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { // 抛出异常 throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>"); } } } } // end of methods for } // 如果是泛化调用,则在map中设置generic和methods if (ProtocolUtils.isGeneric(generic)) { map.put(Constants.GENERIC_KEY, generic); map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { // 获得版本号 String revision = Version.getVersion(interfaceClass, version); // 放入map if (revision != null && revision.length() > 0) { map.put(Constants.REVISION_KEY, revision); } // 获得方法集合 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); // 如果为空,则告警 if (methods.length == 0) { logger.warn("No method found in service interface " + interfaceClass.getName()); // 设置method为* map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { // 否则加入方法集合 map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } // 把token 的值加入到map中 if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(Constants.TOKEN_KEY, token); } } // export service // 获得地址 String host = this.findConfigedHosts(protocolConfig, registryURLs, map); // 获得端口号 Integer port = this.findConfigedPorts(protocolConfig, name, map); // 生成 URL URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map); // —————————————————————————————————————分割线——————————————————————————————————————— // 加载 ConfiguratorFactory,并生成 Configurator 实例,判断是否有该协议的实现存在 if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { // 通过实例配置 url url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } String scope = url.getParameter(Constants.SCOPE_KEY); // don't export when none is configured // // 如果 scope = none,则什么都不做 if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) { // export to local if the config is not remote (export to remote only when config is remote) // // scope != remote,暴露到本地 if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) { // 暴露到本地 exportLocal(url); } // export to remote if the config is not local (export to local only when config is local) // // scope != local,导出到远程 if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) { if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } // 如果注册中心链接集合不为空 if (CollectionUtils.isNotEmpty(registryURLs)) { // 遍历注册中心 for (URL registryURL : registryURLs) { // 添加dynamic配置 url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY)); // 加载监视器链接 URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { // 添加监视器配置 url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } // For providers, this is used to enable custom proxy to generate invoker // 获得代理方式 String proxy = url.getParameter(Constants.PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { // 添加代理方式到注册中心到url registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy); } // 为服务提供类(ref)生成 Invoker Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); // DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // 暴露服务,并且生成Exporter Exporter<?> exporter = protocol.export(wrapperInvoker); // 加入到暴露者集合中 exporters.add(exporter); } } else { // 不存在注册中心,则仅仅暴露服务,不会记录暴露到地址 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } /** * @since 2.7.0 * ServiceData Store */ MetadataReportService metadataReportService = null; // 如果元数据中心服务不为空,则发布该服务,也就是在元数据中心记录url中到部分配置 if ((metadataReportService = getMetadataReportService()) != null) { metadataReportService.publishProvider(url); } } } this.urls.add(url); }
分割线上面部分,是组装URL的全过程,大致可以分为以下步骤:
它把metrics、application、module、provider、protocol等所有配置都放入map中,
针对method都配置,先做签名校验,先找到该服务是否有配置的方法存在,然后该方法签名是否有这个参数存在,都核对成功才将method的配置加入map。
将泛化调用、版本号、method或者methods、token等信息加入map
获得服务暴露地址和端口号,利用map内数据组装成URL。
暴露到远程的源码直接看doExportUrlsFor1Protocol()方法分割线下半部分
创建invoker
当生成暴露者的时候,服务已经暴露,接下来会细致的分析这暴露内部的过程。可以发现无论暴露到本地还是远程,都会通过代理工厂创建invoker。这个时候就走到了上述时序图的ProxyFactory。Dubbo 默认的 ProxyFactory 实现类是 JavassistProxyFactory。JavassistProxyFactory中有一个getInvoker()方法。
getInvoker() public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' // // 为目标类创建 Wrapper final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); // 创建匿名 Invoker 类对象,并实现 doInvoke 方法。 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { // 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
该方法就是创建了一个匿名的Invoker类对象,在doInvoke()方法中调用wrapper.invokeMethod()方法。
服务暴露
服务暴露分为暴露到本地 (JVM),和暴露到远程。doExportUrlsFor1Protocol()方法分割线下半部分就是服务暴露的逻辑。根据scope的配置分为:
scope = none,不暴露服务
scope != remote,暴露到本地
scope != local,暴露到远程
暴露到本地
导出本地执行的是ServiceConfig中的exportLocal()方法。
exportLocal()
private void exportLocal(URL url) { // 如果协议不是injvm if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { // 生成本地的url,分别把协议改为injvm,设置host和port URL local = URLBuilder.from(url) .setProtocol(Constants.LOCAL_PROTOCOL) .setHost(LOCALHOST_VALUE) .setPort(0) .build(); // 通过代理工程创建invoker // 再调用export方法进行暴露服务,生成Exporter Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); // 把生成的暴露者加入集合 exporters.add(exporter); logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry"); } }
本地暴露调用的是injvm协议方法,也就是InjvmProtocol 的 export()方法。
export()
public Exporter export(Invoker invoker) throws RpcException {
return new InjvmExporter(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
该方法只是创建了一个,因为暴露到本地,所以在同一个jvm中。所以不需要其他操作。
暴露到远程
暴露到远程,大致可以分为服务暴露和服务注册两个过程。
先来看看服务暴露。我们知道dubbo有很多协议实现,在doExportUrlsFor1Protocol()方法分割线下半部分中,生成了Invoker后,就需要调用protocol 的 export()方法,
export() public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // 获得注册中心的url URL registryUrl = getRegistryUrl(originInvoker); // url to export locally //获得已经注册的服务提供者url URL providerUrl = getProviderUrl(originInvoker); // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call // the same service. Because the subscribed is cached key with the name of the service, it causes the // subscription information to cover. // 获取override订阅 URL final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); // 创建override的监听器 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); // 把监听器添加到集合 overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); // 根据override的配置来覆盖原来的url,使得配置是最新的。 providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); //export invoker // 服务暴露 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // url to registry // 根据 URL 加载 Registry 实现类,比如ZookeeperRegistry final Registry registry = getRegistry(originInvoker); // 返回注册到注册表的url并过滤url参数一次 final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); // 生成ProviderInvokerWrapper,它会保存服务提供方和消费方的调用地址和代理对象 ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); // ————————————————————————————————分割线—————————————————————————————————————— //to judge if we need to delay publish // 获取 register 参数 boolean register = registeredProviderUrl.getParameter("register", true); // 如果需要注册服务 if (register) { // 向注册中心注册服务 register(registryUrl, registeredProviderUrl); // 设置reg为true,表示服务注册 providerInvokerWrapper.setReg(true); } // Deprecated! Subscribe to override rules in 2.6.x or before. // 向注册中心进行订阅 override 数据 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); // 设置注册中心url exporter.setRegisterUrl(registeredProviderUrl); // 设置override数据订阅的url exporter.setSubscribeUrl(overrideSubscribeUrl); //Ensure that a new exporter instance is returned every time export // 创建并返回 DestroyableExporter return new DestroyableExporter<>(exporter); }
从代码上看,我用分割线分成两部分,分别是服务暴露和服务注册。该方法的逻辑大致分为以下几个步骤:
获得服务提供者的url,再通过override数据重新配置url,然后执行doLocalExport()进行服务暴露。
加载注册中心实现类,向注册中心注册服务。
向注册中心进行订阅 override 数据。
创建并返回 DestroyableExporter
服务暴露先调用的是RegistryProtocol的doLocalExport()方法
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
// 加入缓存
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
// 创建 Invoker 为委托类对象
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
// 调用 protocol 的 export 方法暴露服务
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
根据不同的协议配置,调用不同的protocol实现。跟暴露到本地的时候实现InjvmProtocol一样。
服务注册
服务注册先调用的是register()方法。
RegistryProtocol的register()
public void register(URL registryUrl, URL registeredProviderUrl) {
// 获取 Registry
Registry registry = registryFactory.getRegistry(registryUrl);
// 注册服务
registry.register(registeredProviderUrl);
}
所以服务注册大致可以分为两步:
获得注册中心实例
注册服务
获得注册中心首先执行的是AbstractRegistryFactory的getRegistry()方法
AbstractRegistryFactory中的源码。
大概的逻辑就是先从缓存中取,如果没有命中,则创建注册中心实例,这里的createRegistry()是一个抽象方法,具体的实现逻辑由子类完成,假设这里使用zookeeper作为注册中心,则调用的是ZookeeperRegistryFactory的createRegistry()。
ZookeeperRegistryFactory的createRegistry()
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
就是创建了一个ZookeeperRegistry,执行了ZookeeperRegistry的构造方法。
ZookeeperRegistry的构造方法
ZookeeperRegistry中的源码分析。大致的逻辑可以分为以下几个步骤:
创建zookeeper客户端
添加监听器
主要看ZookeeperTransporter的connect方法,因为当connect方法执行完后,注册中心创建过程就结束了。首先执行的是AbstractZookeeperTransporter的connect方法。
AbstractZookeeperTransporter的connect()
public ZookeeperClient connect(URL url) { ZookeeperClient zookeeperClient; // 获得所有url地址 List<String> addressList = getURLBackupAddress(url); // The field define the zookeeper server , including protocol, host, port, username, password // 从缓存中查找可用的客户端,如果有,则直接返回 if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) { logger.info("find valid zookeeper client from the cache for address: " + url); return zookeeperClient; } // avoid creating too many connections, so add lock synchronized (zookeeperClientMap) { if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) { logger.info("find valid zookeeper client from the cache for address: " + url); return zookeeperClient; } // 创建客户端 zookeeperClient = createZookeeperClient(toClientURL(url)); logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url); // 加入缓存 writeToClientMap(addressList, zookeeperClient); } return zookeeperClient; }
上面的源码,主要是执行了createZookeeperClient()方法
服务引用有两种方式,一种就是直连,一种是通过注册中心。直连更多的时候被用来做服务测试,不建议在生产环境使用这样的方法,因为直连不适合服务治理,dubbo本身就是一个服务治理的框架,提供了很多服务治理的功能。所以更多的时候,我们都不会选择绕过注册中心,而是通过注册中心的方式来进行服务引用。
服务引用过程
大致可以分为三个步骤:
配置加载
创建invoker
创建服务接口代理类
引用起点
dubbo中有一个类ReferenceBean,它实现了FactoryBean接口,继承了ReferenceConfig,ReferenceBean作为dubbo中能生产对象的工厂Bean,而我们要引用服务,也就是要有一个该服务的对象。
因为ReferenceBean实现了FactoryBean接口的getObject()方法,所以在加载bean的时候,会调用ReferenceBean的getObject()方法
ReferenceBean的getObject()
public Object getObject() {
return get();
}
这个get方法是ReferenceConfig的get()方法
ReferenceConfig的get()
public synchronized T get() {
// 检查并且更新配置
checkAndUpdateSubConfigs();
// 如果被销毁,则抛出异常
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
// 检测 代理对象ref 是否为空,为空则通过 init 方法创建
if (ref == null) {
// 用于处理配置,以及调用 createProxy 生成代理类
init();
}
return ref;
}
init方法是处理各类配置的开始。
配置加载
ReferenceConfig的init()
private void init() { // 如果已经初始化过,则结束 if (initialized) { return; } // 设置初始化标志为true initialized = true; // 本地存根合法性校验 checkStubAndLocal(interfaceClass); // mock合法性校验 checkMock(interfaceClass); // 用来存放配置 Map<String, String> map = new HashMap<String, String>(); // 存放这是消费者侧 map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE); // 添加 协议版本、发布版本,时间戳 等信息到 map 中 appendRuntimeParameters(map); // 如果是泛化调用 if (!isGeneric()) { // 获得版本号 String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { // 设置版本号 map.put(Constants.REVISION_KEY, revision); } // 获得所有方法 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("No method found in service interface " + interfaceClass.getName()); map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { // 把所有方法签名拼接起来放入map map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), Constants.COMMA_SEPARATOR)); } } // 加入服务接口名称 map.put(Constants.INTERFACE_KEY, interfaceName); // 添加metrics、application、module、consumer、protocol的所有信息到map appendParameters(map, metrics); appendParameters(map, application); appendParameters(map, module); appendParameters(map, consumer, Constants.DEFAULT_KEY); appendParameters(map, this); Map<String, Object> attributes = null; if (CollectionUtils.isNotEmpty(methods)) { attributes = new HashMap<String, Object>(); // 遍历方法配置 for (MethodConfig methodConfig : methods) { // 把方法配置加入map appendParameters(map, methodConfig, methodConfig.getName()); // 生成重试的配置key String retryKey = methodConfig.getName() + ".retry"; // 如果map中已经有该配置,则移除该配置 if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); // 如果配置为false,也就是不重试,则设置重试次数为0次 if ("false".equals(retryValue)) { map.put(methodConfig.getName() + ".retries", "0"); } } // 设置异步配置 attributes.put(methodConfig.getName(), convertMethodConfig2AyncInfo(methodConfig)); } } // 获取服务消费者 ip 地址 String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY); // 如果为空,则获取本地ip if (StringUtils.isEmpty(hostToRegistry)) { hostToRegistry = NetUtils.getLocalHost(); } // 设置消费者ip map.put(Constants.REGISTER_IP_KEY, hostToRegistry); // 创建代理对象 ref = createProxy(map); // 生产服务key String serviceKey = URL.buildKey(interfaceName, group, version); // 根据服务名,ReferenceConfig,代理类构建 ConsumerModel, // 并将 ConsumerModel 存入到 ApplicationModel 中 ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes)); }
该方法大致分为以下几个步骤:
检测本地存根和mock合法性。
添加协议版本、发布版本,时间戳、metrics、application、module、consumer、protocol等的所有信息到 map 中
单独处理方法配置,设置重试次数配置以及设置该方法对异步配置信息。
添加消费者ip地址到map
创建代理对象
生成ConsumerModel存入到 ApplicationModel 中
创建invoker
ReferenceConfig的createProxy()
private T createProxy(Map<String, String> map) { // 根据配置检查是否为本地调用 if (shouldJvmRefer(map)) { // 生成url,protocol使用的是injvm URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map); // 利用InjvmProtocol 的 refer 方法生成 InjvmInvoker 实例 invoker = refprotocol.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } else { // 如果url不为空,则用户可能想进行直连来调用 if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address. // 当需要配置多个 url 时,可用分号进行分割,这里会进行切分 String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); // 遍历所有的url if (us != null && us.length > 0) { for (String u : us) { URL url = URL.valueOf(u); if (StringUtils.isEmpty(url.getPath())) { // 设置接口全限定名为 url 路径 url = url.setPath(interfaceName); } // 检测 url 协议是否为 registry,若是,表明用户想使用指定的注册中心 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { // 将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中 urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } else { // 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性), // 比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等 // 最后将合并后的配置设置为 url 查询字符串中。 urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // assemble URL from register center's configuration // 校验注册中心 checkRegistry(); // 加载注册中心的url List<URL> us = loadRegistries(false); if (CollectionUtils.isNotEmpty(us)) { // 遍历所有的注册中心 for (URL u : us) { // 生成监控url URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { // 加入监控中心url的配置 map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } // 添加 refer 参数到 url 中,并将 url 添加到 urls 中 urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } // 如果urls为空,则抛出异常 if (urls.isEmpty()) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config."); } } // 如果只有一个注册中心,则直接调用refer方法 if (urls.size() == 1) { // 调用 RegistryProtocol 的 refer 构建 Invoker 实例 invoker = refprotocol.refer(interfaceClass, urls.get(0)); } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; // 遍历所有的注册中心url for (URL url : urls) { // 通过 refprotocol 调用 refer 构建 Invoker, // refprotocol 会在运行时根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法 // 把生成的Invoker加入到集合中 invokers.add(refprotocol.refer(interfaceClass, url)); // 如果是注册中心的协议 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { // 则设置registryURL registryURL = url; // use last registry url } } // 优先用注册中心的url if (registryURL != null) { // registry url is available // use RegistryAwareCluster only when register's cluster is available // 只有当注册中心当链接可用当时候,采用RegistryAwareCluster URL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME); // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker // 由集群进行多个invoker合并 invoker = cluster.join(new StaticDirectory(u, invokers)); } else { // not a registry url, must be direct invoke. // 直接进行合并 invoker = cluster.join(new StaticDirectory(invokers)); } } } // 如果需要核对该服务是否可用,并且该服务不可用 if (shouldCheck() && !invoker.isAvailable()) { // make it possible for consumer to retry later if provider is temporarily unavailable // 修改初始化标志为false initialized = false; // 抛出异常 throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion()); } if (logger.isInfoEnabled()) { logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl()); } /** * @since 2.7.0 * ServiceData Store */ // 元数据中心服务 MetadataReportService metadataReportService = null; // 加载元数据服务,如果成功 if ((metadataReportService = getMetadataReportService()) != null) { // 生成url URL consumerURL = new URL(Constants.CONSUMER_PROTOCOL, map.remove(Constants.REGISTER_IP_KEY), 0, map.get(Constants.INTERFACE_KEY), map); // 把消费者配置加入到元数据中心中 metadataReportService.publishConsumer(consumerURL); } // create service proxy // 创建服务代理 return (T) proxyFactory.getProxy(invoker); }
该方法的大致逻辑可用分为以下几步:
如果是本地调用,则直接使用InjvmProtocol 的 refer 方法生成 Invoker 实例。
如果不是本地调用,但是是选择直连的方式来进行调用,则分割配置的多个url。如果协议是配置是registry,则表明用户想使用指定的注册中心,配置url后将url并且保存到urls里面,否则就合并url,并且保存到urls。
如果是通过注册中心来进行调用,则先校验所有的注册中心,然后加载注册中心的url,遍历每个url,加入监控中心url配置,最后把每个url保存到urls。
针对urls集合的数量,如果是单注册中心,直接引用RegistryProtocol 的 refer 构建 Invoker 实例,如果是多注册中心,则对每个url都生成Invoker,利用集群进行多个Invoker合并。
最终输出一个invoker。
Invoker 是 Dubbo 的核心模型,代表一个可执行体。在服务提供方,Invoker 用于调用服务提供类。在服务消费方,Invoker 用于执行远程调用。Invoker 是由 Protocol 实现类构建而来。
RegistryProtocol生成invoker
RegistryProtocol的refer()
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { // 取 registry 参数值,并将其设置为协议头,默认是dubbo url = URLBuilder.from(url) .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY)) .removeParameter(REGISTRY_KEY) .build(); // 获得注册中心实例 Registry registry = registryFactory.getRegistry(url); // 如果是注册中心服务,则返回注册中心服务的invoker if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // group="a,b" or group="*" // 将 url 查询字符串转为 Map Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)); // 获得group值 String group = qs.get(Constants.GROUP_KEY); if (group != null && group.length() > 0) { // 如果有多个组,或者组配置为*,则使用MergeableCluster,并调用 doRefer 继续执行服务引用逻辑 if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } // 只有一个组或者没有组配置,则直接执行doRefer return doRefer(cluster, registry, type, url); }
如果是注册服务中心,则直接创建代理。如果不是,先处理组配置,根据组配置来决定Cluster的实现方式,然后调用doRefer方法。
RegistryProtocol的doRefer()
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { // 创建 RegistryDirectory 实例 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); // 设置注册中心 directory.setRegistry(registry); // 设置协议 directory.setProtocol(protocol); // all attributes of REFER_KEY // 所有属性放到map中 Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); // 生成服务消费者链接 URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); // 注册服务消费者,在 consumers 目录下新节点 if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) { directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url)); // 注册服务消费者 registry.register(directory.getRegisteredConsumerUrl()); } // 创建路由规则链 directory.buildRouterChain(subscribeUrl); // 订阅 providers、configurators、routers 等节点数据 directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); // 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个,生成一个invoker Invoker invoker = cluster.join(directory); // 在服务提供者处注册消费者 ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }
该方法大致可以分为以下步骤:
创建一个 RegistryDirectory 实例,然后生成服务者消费者链接。
向注册中心进行注册。
紧接着订阅 providers、configurators、routers 等节点下的数据。完成订阅后,RegistryDirectory 会收到这几个节点下的子节点信息。
由于一个服务可能部署在多台服务器上,这样就会在 providers 产生多个节点,这个时候就需要 Cluster 将多个服务节点合并为一个,并生成一个 Invoker。
DubboProtocol生成invoker
首先还是从DubboProtocol的refer()开始。
DubboProtocol的refer()
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
// 创建一个DubboInvoker实例
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
// 加入到集合中
invokers.add(invoker);
return invoker;
}
创建DubboInvoker比较简单,调用了构造方法,这里主要讲这么生成ExchangeClient,也就是getClients方法。
DubboProtocol的getClients()
ubboProtocol中的源码,如果是配置的共享,则获得共享客户端对象,也就是getSharedClient()方法,否则新建客户端也就是initClient()方法。
DubboProtocol的getSharedClient()
DubboProtocol中的源码,先访问缓存,若缓存未命中,则通过 initClient 方法创建新的 ExchangeClient 实例,并将该实例传给 ReferenceCountExchangeClient 构造方法创建一个带有引用计数功能的 ExchangeClient 实例。
DubboProtocol的initClient()
DubboProtocol中的源码,initClient 方法首先获取用户配置的客户端类型。然后设置用户心跳配置,然后检测用户配置的客户端类型是否存在,不存在则抛出异常。最后根据 lazy 配置决定创建什么类型的客户端。
这里的 LazyConnectExchangeClient 代码并不是很复杂,该类会在 request 方法被调用时通过 Exchangers 的 connect 方法创建 ExchangeClient 客户端。下面我们分析一下 Exchangers 的 connect 方法。
Exchangers的connect()
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); // 获取 Exchanger 实例,默认为 HeaderExchangeClient return getExchanger(url).connect(url, handler); } getExchanger 会通过 SPI 加载 HeaderExchangeClient 实例,这个方法比较简单。接下来分析 HeaderExchangeClient 的connect的实现。 HeaderExchangeClient 的connect() public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { // 创建 HeaderExchangeHandler 对象 // 创建 DecodeHandler 对象 // 通过 Transporters 构建 Client 实例 // 创建 HeaderExchangeClient 对象 return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); }
Transporters的connect()
Transporters中源码分析。其中获得自适应拓展类,该类会在运行时根据客户端类型加载指定的 Transporter 实现类。若用户未配置客户端类型,则默认加载 NettyTransporter,并调用该类的 connect 方法。假设是netty4的实现,则执行以下代码。
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
到这里为止,DubboProtocol生成invoker过程也结束了。再回到createProxy方法的最后一句代码,根据invoker创建服务代理对象。
创建代理
为服务接口生成代理对象。有了代理对象,即可进行远程调用。首先来看AbstractProxyFactory 的 getProxy()方法。
AbstractProxyFactory 的 getProxy()
AbstractProxyFactory的源码分析。可以看到第二个getProxy方法其实就是获取 interfaces 数组,调用到第三个getProxy方法时,该getProxy是个抽象方法,由子类来实现,我们还是默认它的代理实现方式为Javassist。所以可以看JavassistProxyFactory的getProxy方法。
JavassistProxyFactory的getProxy()
public T getProxy(Invoker invoker, Class<?>[] interfaces) {
// 生成 Proxy 子类(Proxy 是抽象类)。并调用 Proxy 子类的 newInstance 方法创建 Proxy 实例
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
Proxy的getProxy方法。
/** * Get proxy. * * @param ics interface class array. * @return Proxy instance. */ public static Proxy getProxy(Class<?>... ics) { // 获得Proxy的类加载器来进行生成代理类 return getProxy(ClassHelper.getClassLoader(Proxy.class), ics); } /** * Get proxy. * * @param cl class loader. * @param ics interface class array. * @return Proxy instance. */ public static Proxy getProxy(ClassLoader cl, Class<?>... ics) { if (ics.length > Constants.MAX_PROXY_COUNT) { throw new IllegalArgumentException("interface limit exceeded"); } StringBuilder sb = new StringBuilder(); // 遍历接口列表 for (int i = 0; i < ics.length; i++) { String itf = ics[i].getName(); // 检测是否是接口,如果不是,则抛出异常 if (!ics[i].isInterface()) { throw new RuntimeException(itf + " is not a interface."); } Class<?> tmp = null; try { // 重新加载接口类 tmp = Class.forName(itf, false, cl); } catch (ClassNotFoundException e) { } // 检测接口是否相同,这里 tmp 有可能为空,也就是该接口无法被类加载器加载的。 if (tmp != ics[i]) { throw new IllegalArgumentException(ics[i] + " is not visible from class loader"); } // 拼接接口全限定名,分隔符为 ; sb.append(itf).append(';'); } // use interface class name list as key. // 使用拼接后的接口名作为 key String key = sb.toString(); // get cache by class loader. Map<String, Object> cache; // 把该类加载器加到本地缓存 synchronized (ProxyCacheMap) { cache = ProxyCacheMap.computeIfAbsent(cl, k -> new HashMap<>()); } Proxy proxy = null; synchronized (cache) { do { // 从缓存中获取 Reference<Proxy> 实例 Object value = cache.get(key); if (value instanceof Reference<?>) { proxy = (Proxy) ((Reference<?>) value).get(); if (proxy != null) { return proxy; } } // 并发控制,保证只有一个线程可以进行后续操作 if (value == PendingGenerationMarker) { try { // 其他线程在此处进行等待 cache.wait(); } catch (InterruptedException e) { } } else { // 放置标志位到缓存中,并跳出 while 循环进行后续操作 cache.put(key, PendingGenerationMarker); break; } } while (true); } long id = PROXY_CLASS_COUNTER.getAndIncrement(); String pkg = null; ClassGenerator ccp = null, ccm = null; try { // 创建 ClassGenerator 对象 ccp = ClassGenerator.newInstance(cl); Set<String> worked = new HashSet<>(); List<Method> methods = new ArrayList<>(); for (int i = 0; i < ics.length; i++) { // 检测接口访问级别是否为 protected 或 privete if (!Modifier.isPublic(ics[i].getModifiers())) { // 获取接口包名 String npkg = ics[i].getPackage().getName(); if (pkg == null) { pkg = npkg; } else { // 非 public 级别的接口必须在同一个包下,否者抛出异常 if (!pkg.equals(npkg)) { throw new IllegalArgumentException("non-public interfaces from different packages"); } } } // 添加接口到 ClassGenerator 中 ccp.addInterface(ics[i]); // 遍历接口方法 for (Method method : ics[i].getMethods()) { // 获取方法描述,可理解为方法签名 String desc = ReflectUtils.getDesc(method); // 如果方法描述字符串已在 worked 中,则忽略。考虑这种情况, // A 接口和 B 接口中包含一个完全相同的方法 if (worked.contains(desc)) { continue; } worked.add(desc); int ix = methods.size(); // 获取方法返回值类型 Class<?> rt = method.getReturnType(); // 获取参数列表 Class<?>[] pts = method.getParameterTypes(); // 生成 Object[] args = new Object[1...N] StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];"); for (int j = 0; j < pts.length; j++) { // 生成 args[1...N] = ($w)$1...N; code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";"); } // 生成 InvokerHandler 接口的 invoker 方法调用语句,如下: // Object ret = handler.invoke(this, methods[1...N], args); code.append(" Object ret = handler.invoke(this, methods[").append(ix).append("], args);"); // 返回值不为 void if (!Void.TYPE.equals(rt)) { // 生成返回语句,形如 return (java.lang.String) ret; code.append(" return ").append(asArgument(rt, "ret")).append(";"); } methods.add(method); // 添加方法名、访问控制符、参数列表、方法代码等信息到 ClassGenerator 中 ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString()); } } if (pkg == null) { pkg = PACKAGE_NAME; } // create ProxyInstance class. // 构建接口代理类名称:pkg + ".proxy" + id,比如 org.apache.dubbo.proxy0 String pcn = pkg + ".proxy" + id; ccp.setClassName(pcn); ccp.addField("public static java.lang.reflect.Method[] methods;"); // 生成 private java.lang.reflect.InvocationHandler handler; ccp.addField("private " + InvocationHandler.class.getName() + " handler;"); // 为接口代理类添加带有 InvocationHandler 参数的构造方法,比如: // porxy0(java.lang.reflect.InvocationHandler arg0) { // handler=$1; // } ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;"); // 为接口代理类添加默认构造方法 ccp.addDefaultConstructor(); // 生成接口代理类 Class<?> clazz = ccp.toClass(); clazz.getField("methods").set(null, methods.toArray(new Method[0])); // create Proxy class. // 构建 Proxy 子类名称,比如 Proxy1,Proxy2 等 String fcn = Proxy.class.getName() + id; ccm = ClassGenerator.newInstance(cl); ccm.setClassName(fcn); ccm.addDefaultConstructor(); ccm.setSuperClass(Proxy.class); // 为 Proxy 的抽象方法 newInstance 生成实现代码,形如: // public Object newInstance(java.lang.reflect.InvocationHandler h) { // return new org.apache.dubbo.proxy0($1); // } ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }"); Class<?> pc = ccm.toClass(); // 生成 Proxy 实现类 proxy = (Proxy) pc.newInstance(); } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } finally { // release ClassGenerator if (ccp != null) { // 释放资源 ccp.release(); } if (ccm != null) { ccm.release(); } synchronized (cache) { if (proxy == null) { cache.remove(key); } else { // 写缓存 cache.put(key, new WeakReference<Proxy>(proxy)); } // 唤醒其他等待线程 cache.notifyAll(); } } return proxy; }
大致可以分为以下几步:
对接口进行校验,检查是否是一个接口,是否不能被类加载器加载。
做并发控制,保证只有一个线程可以进行后续的代理生成操作。
创建cpp,用作为服务接口生成代理类。首先对接口定义以及包信息进行处理。
对接口的方法进行处理,包括返回类型,参数类型等。最后添加方法名、访问控制符、参数列表、方法代码等信息到 ClassGenerator 中。
创建接口代理类的信息,比如名称,默认构造方法等。
生成接口代理类。
创建ccm,ccm 则是用于为 org.apache.dubbo.common.bytecode.Proxy 抽象类生成子类,主要是实现 Proxy 类的抽象方法。
设置名称、创建构造方法、添加方法
生成 Proxy 实现类。
释放资源
创建弱引用,写入缓存,唤醒其他线程。
到这里,接口代理类生成后,服务引用也就结束了。
DubboInvoker
该类是dubbo协议独自实现的的invoker,其中实现了调用方法的三种模式,分别是异步发送、单向发送和同步发送。
doInvoker
@Override protected Result doInvoke(final Invocation invocation) throws Throwable { // rpc会话域 RpcInvocation inv = (RpcInvocation) invocation; // 获得方法名 final String methodName = RpcUtils.getMethodName(invocation); // 把path放入到附加值中 inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); // 把版本号放入到附加值 inv.setAttachment(Constants.VERSION_KEY, version); // 当前的客户端 ExchangeClient currentClient; // 如果数组内就一个客户端,则直接取出 if (clients.length == 1) { currentClient = clients[0]; } else { // 取模轮询 从数组中取,当取到最后一个时,从头开始 currentClient = clients[index.getAndIncrement() % clients.length]; } try { // 是否启用异步 boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); // 是否是单向发送 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); // 获得超时时间 int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 如果是单项发送 if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); // 单向发送只负责发送消息,不等待服务端应答,所以没有返回值 currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { // 异步调用 ResponseFuture future = currentClient.request(inv, timeout); // 保存future,方便后期处理 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { // 同步调用,等待返回结果 RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
在调用invoker的时候,通过远程通信将Invocation信息传递给服务端,服务端在接收到该invocation信息后,要找到对应的本地方法,然后执行该方法,将方法的执行结果返回给客户端,在这里,客户端发送有三种模式:
异步发送,也就是当我发送调用后,我不阻塞等待结果,直接返回,将返回的future保存到上下文,方便后期使用。
单向发送,执行方法不需要返回结果。
同步发送,执行方法后,等待结果返回,否则一直阻塞。
DubboExporter
该类继承了AbstractExporter,是dubbo协议中独有的服务暴露者。
/** * 服务key */ private final String key; /** * 服务暴露者集合 */ private final Map<String, Exporter<?>> exporterMap; public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) { super(invoker); this.key = key; this.exporterMap = exporterMap; } @Override public void unexport() { super.unexport(); // 从集合中移除该key exporterMap.remove(key); }
DubboProtocol
该类是dubbo协议的核心实现,其中增加了比如延迟加载等处理。 并且其中还包括了对服务暴露和服务引用的逻辑处理
reply与createInvocation
/* * 回复请求结果,返回的是请求结果 * */ @Override public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException { if (!(message instanceof Invocation)) { throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } Invocation inv = (Invocation) message; // 获得暴露的invoker Invoker<?> invoker = getInvoker(channel, inv); inv.setServiceModel(invoker.getUrl().getServiceModel()); // switch TCCL if (invoker.getUrl().getServiceModel() != null) { Thread.currentThread().setContextClassLoader(invoker.getUrl().getServiceModel().getClassLoader()); } // need to consider backward-compatibility if it's a callback // 如果是回调服务 if (Boolean.TRUE.toString().equals(inv.getObjectAttachmentWithoutConvert(IS_CALLBACK_SERVICE_INVOKE))) { // 获得 方法定义 String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || !methodsStr.contains(",")) { hasMethod = inv.getMethodName().equals(methodsStr); } else { // 如果方法不止一个,则分割后遍历查询,找到了则设置为true String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } // 设置远程地址 RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress()); // 调用下一个调用链 Result result = invoker.invoke(inv); return result.thenApply(Function.identity()); } /** * 创建会话域, 把url内的值加入到会话域的附加值中 * @param channel * @param url * @param methodKey * @return */ private Invocation createInvocation(Channel channel, URL url, String methodKey) { // 获得方法,methodKey是onconnect或者ondisconnect String method = url.getParameter(methodKey); if (method == null || method.length() == 0) { return null; } // 创建一个rpc会话域 RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]); // 加入附加值path invocation.setAttachment(Constants.PATH_KEY, url.getPath()); // 加入附加值group invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY)); // 加入附加值interface invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY)); // 加入附加值version invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY)); // 如果是本地存根服务,则加入附加值dubbo.stub.event为true if (url.getParameter(Constants.STUB_EVENT_KEY, false)) { invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString()); } return invocation; } };
export
该方法是基于dubbo协议的服务暴露,除了对于存根服务和本地服务进行标记以外,打开服务和序列化分别在openServer和optimizeSerialization中实现
@Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. // 得到服务key group+"/"+serviceName+":"+serviceVersion+":"+port String key = serviceKey(url); // 创建exporter DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); // 加入到集合 exporterMap.put(key, exporter); //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); // 如果是本地存根事件而不是回调服务 if (isStubSupportEvent && !isCallbackservice) { // 获得本地存根的方法 String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); // 如果为空,则抛出异常 if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { // 加入集合 stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } // 打开服务 openServer(url); // 序列化 optimizeSerialization(url); return exporter; } createServer private ExchangeServer createServer(URL url) { // send readonly event when server closes, it's enabled by default // 服务器关闭时发送readonly事件,默认情况下启用 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); // enable heartbeat by default // 心跳默认间隔一分钟 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // 获得远程通讯服务端实现方式,默认用netty3 String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); /** * 如果没有该配置,则抛出异常 */ if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); /** * 添加编解码器DubboCodec实现 */ url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); ExchangeServer server; try { // 启动服务器 server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } // 获得客户端侧设置的远程通信方式 str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { // 获得远程通信的实现集合 Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); // 如果客户端侧设置的远程通信方式不在支持的方式中,则抛出异常 if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }
该方法就是根据url携带的远程通信实现方法来创建一个服务器对象。
optimizeSerialization
private void optimizeSerialization(URL url) throws RpcException { // 获得类名 String className = url.getParameter(Constants.OPTIMIZER_KEY, ""); if (StringUtils.isEmpty(className) || optimizers.contains(className)) { return; } logger.info("Optimizing the serialization process for Kryo, FST, etc..."); try { // 加载类 Class clazz = Thread.currentThread().getContextClassLoader().loadClass(className); if (!SerializationOptimizer.class.isAssignableFrom(clazz)) { throw new RpcException("The serialization optimizer " + className + " isn't an instance of " + SerializationOptimizer.class.getName()); } // 强制类型转化为SerializationOptimizer SerializationOptimizer optimizer = (SerializationOptimizer) clazz.newInstance(); if (optimizer.getSerializableClasses() == null) { return; } // 遍历序列化的类,把该类放入到集合进行缓存 for (Class c : optimizer.getSerializableClasses()) { SerializableClassRegistry.registerClass(c); } // 加入到集合 optimizers.add(className); } catch (ClassNotFoundException e) { throw new RpcException("Cannot find the serialization optimizer class: " + className, e); } catch (InstantiationException e) { throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, e); } catch (IllegalAccessException e) { throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, e); } }
该方法是把序列化的类放入到集合,以便进行序列化
ChannelWrappedInvoker
doInvoke
该方法是在invoker调用的时候对发送请求消息进行了包装
@Override protected Result doInvoke(Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; // use interface's name as service path to export if it's not found on client side // 设置服务path,默认用接口名称 inv.setAttachment(Constants.PATH_KEY, getInterface().getName()); // 设置回调的服务key inv.setAttachment(Constants.CALLBACK_SERVICE_KEY, serviceKey); try { // 如果是异步的 if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) { // may have concurrency issue // 直接发送请求消息 currentClient.send(inv, getUrl().getMethodParameter(invocation.getMethodName(), Constants.SENT_KEY, false)); return new RpcResult(); } // 获得超时时间 int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (timeout > 0) { return (Result) currentClient.request(inv, timeout).get(); } else { return (Result) currentClient.request(inv).get(); } } catch (RpcException e) { throw e; } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, e.getMessage(), e); } catch (Throwable e) { // here is non-biz exception, wrap it. throw new RpcException(e.getMessage(), e); } }
DecodeableRpcInvocation
该类主要做了对于会话域内的数据进行序列化和解码。
DecodeableRpcResult
该类是做了基于dubbo协议对prc结果的解码
DubboCodec
该类是dubbo的编解码器,分别针对dubbo协议的request和response进行编码和解码。
FutureFilter
该类是处理异步和同步调用结果的过滤器
invoke
@Override public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException { // 是否是异步的调用 final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation); fireInvokeCallback(invoker, invocation); // need to configure if there's return value before the invocation in order to help invoker to judge if it's // necessary to return future. Result result = invoker.invoke(invocation); if (isAsync) { // 调用异步处理 asyncCallback(invoker, invocation); } else { // 调用同步结果处理 syncCallback(invoker, invocation, result); } return result; }
该方法中根据是否为异步调用来分别执行asyncCallback和syncCallback方法。
syncCallback
private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) {
// 如果有异常
if (result.hasException()) {
// 则调用异常的结果处理
fireThrowCallback(invoker, invocation, result.getException());
} else {
// 调用正常的结果处理
fireReturnCallback(invoker, invocation, result.getValue());
}
}
该方法是同步调用的返回结果处理。
asyncCallback
private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) { Future<?> f = RpcContext.getContext().getFuture(); if (f instanceof FutureAdapter) { ResponseFuture future = ((FutureAdapter<?>) f).getFuture(); // 设置回调 future.setCallback(new ResponseCallback() { @Override public void done(Object rpcResult) { // 如果结果为空,则打印错误日志 if (rpcResult == null) { logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName())); return; } ///must be rpcResult // 如果不是Result则打印错误日志 if (!(rpcResult instanceof Result)) { logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName())); return; } Result result = (Result) rpcResult; if (result.hasException()) { // 如果有异常,则调用异常处理方法 fireThrowCallback(invoker, invocation, result.getException()); } else { // 如果正常的返回结果,则调用正常的处理方法 fireReturnCallback(invoker, invocation, result.getValue()); } } @Override public void caught(Throwable exception) { fireThrowCallback(invoker, invocation, exception); } }); } }
该方法是异步调用的结果处理,把异步返回结果的逻辑写在回调函数里面。
fireInvokeCallback
private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) { // 获得调用的方法 final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY)); // 获得调用的服务 final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY)); if (onInvokeMethod == null && onInvokeInst == null) { return; } if (onInvokeMethod == null || onInvokeInst == null) { throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl()); } // 如果不可以访问,则设置为可访问 if (!onInvokeMethod.isAccessible()) { onInvokeMethod.setAccessible(true); } // 获得参数数组 Object[] params = invocation.getArguments(); try { // 调用方法 onInvokeMethod.invoke(onInvokeInst, params); } catch (InvocationTargetException e) { fireThrowCallback(invoker, invocation, e.getTargetException()); } catch (Throwable e) { fireThrowCallback(invoker, invocation, e); } }
该方法是调用方法的执行。
fireReturnCallback
private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) { final Method onReturnMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY)); final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY)); //not set onreturn callback if (onReturnMethod == null && onReturnInst == null) { return; } if (onReturnMethod == null || onReturnInst == null) { throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl()); } if (!onReturnMethod.isAccessible()) { onReturnMethod.setAccessible(true); } Object[] args = invocation.getArguments(); Object[] params; // 获得返回结果类型 Class<?>[] rParaTypes = onReturnMethod.getParameterTypes(); // 设置参数和返回结果 if (rParaTypes.length > 1) { if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) { params = new Object[2]; params[0] = result; params[1] = args; } else { params = new Object[args.length + 1]; params[0] = result; System.arraycopy(args, 0, params, 1, args.length); } } else { params = new Object[]{result}; } try { // 调用方法 onReturnMethod.invoke(onReturnInst, params); } catch (InvocationTargetException e) { fireThrowCallback(invoker, invocation, e.getTargetException()); } catch (Throwable e) { fireThrowCallback(invoker, invocation, e); } }
该方法是正常的返回结果的处理。
fireThrowCallback
private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) { final Method onthrowMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_METHOD_KEY)); final Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_INSTANCE_KEY)); //onthrow callback not configured if (onthrowMethod == null && onthrowInst == null) { return; } if (onthrowMethod == null || onthrowInst == null) { throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl()); } if (!onthrowMethod.isAccessible()) { onthrowMethod.setAccessible(true); } // 获得抛出异常的类型 Class<?>[] rParaTypes = onthrowMethod.getParameterTypes(); if (rParaTypes[0].isAssignableFrom(exception.getClass())) { try { Object[] args = invocation.getArguments(); Object[] params; // 把类型和抛出的异常值放入返回结果 if (rParaTypes.length > 1) { if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) { params = new Object[2]; params[0] = exception; params[1] = args; } else { params = new Object[args.length + 1]; params[0] = exception; System.arraycopy(args, 0, params, 1, args.length); } } else { params = new Object[]{exception}; } // 调用下一个调用连 onthrowMethod.invoke(onthrowInst, params); } catch (Throwable e) { logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), e); } } else { logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception); } }
该方法是异常抛出时的结果处理。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。