当前位置:   article > 正文

Dubbo源码分析(三)Dubbo的服务引用Refer

"no such constructor \"public stub("

Dubbo的服务引用

服务引用

先从Dubbo的配置文件看起

  1. <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService"/>
  2. 复制代码

源码入口: 根据上一篇说的,我们通过DubboNamespaceHandler类找到ReferenceBean类,在afterPropertiesSet()方法中我们找到关键代码getObject()
进入ReferenceConfig类中的get()方法,这个get() 方法是一个同步方法,调用了init()方法
我们看到init()方法中的最后一行代码ref = createProxy(map);我们从这这个方法开始分析:

  1. private T createProxy(Map<String, String> map) {
  2. ···
  3. if (urls.size() == 1) {
  4. invoker = refprotocol.refer(interfaceClass, urls.get(0));
  5. } else {
  6. List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
  7. URL registryURL = null;
  8. for (URL url : urls) {
  9. invokers.add(refprotocol.refer(interfaceClass, url));
  10. if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
  11. registryURL = url; // 用了最后一个registry url
  12. }
  13. }
  14. if (registryURL != null) { // 有 注册中心协议的URL
  15. // 对有注册中心的Cluster 只用 AvailableCluster
  16. URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
  17. invoker = cluster.join(new StaticDirectory(u, invokers));
  18. } else { // 不是 注册中心的URL
  19. invoker = cluster.join(new StaticDirectory(invokers));
  20. }
  21. }
  22. }
  23. ···
  24. // 创建服务代理
  25. return (T) proxyFactory.getProxy(invoker);
  26. }
  27. 复制代码

先看invoker = refprotocol.refer(interfaceClass, urls.get(0))这行代码
此时的refprotocol= Protocol$Adatptive,进入refer方法:

  1. public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
  2. if (arg1 == null) throw new IllegalArgumentException("url == null");
  3. com.alibaba.dubbo.common.URL url = arg1;
  4. String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
  5. if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
  6. com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
  7. return extension.refer(arg0, arg1);
  8. }
  9. 复制代码

此时的extName=registry,所以extension=ProtocolFilterWrapper(ProtocolListenerWrapper(RegistryProtocol)),我们直接进入RegistryProtocol.refer()方法中

  1. public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
  2. url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
  3. Registry registry = registryFactory.getRegistry(url);
  4. if (RegistryService.class.equals(type)) {
  5. return proxyFactory.getInvoker((T) registry, type, url);
  6. }
  7. // group="a,b" or group="*"
  8. Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
  9. String group = qs.get(Constants.GROUP_KEY);
  10. if (group != null && group.length() > 0) {
  11. if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
  12. || "*".equals(group)) {
  13. return doRefer(getMergeableCluster(), registry, type, url);
  14. }
  15. }
  16. return doRefer(cluster, registry, type, url);
  17. }
  18. 复制代码

看到第二行代码Registry registry = registryFactory.getRegistry(url);这里从字面上理解应该是建立和注册中心的连接,这里的代码和服务端发布是一样的,这里跳过,继续往下走group,Dubbo里面是可以对服务进行分组,这里不影响主流程走向,我们跳过,看到最后一行代码,我们进入

  1. private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
  2. RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
  3. directory.setRegistry(registry);
  4. directory.setProtocol(protocol);
  5. URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
  6. if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
  7. && url.getParameter(Constants.REGISTER_KEY, true)) {
  8. registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
  9. Constants.CHECK_KEY, String.valueOf(false)));
  10. }
  11. directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
  12. Constants.PROVIDERS_CATEGORY
  13. + "," + Constants.CONFIGURATORS_CATEGORY
  14. + "," + Constants.ROUTERS_CATEGORY));
  15. return cluster.join(directory);
  16. }
  17. 复制代码

先看subscribeUrl是啥,这里的url是consumer开头的url,看到registry.register()方法,这里是向注册中心去注册消费端信息,具体注册的节点是:/dubbo/com.alibaba.dubbo.demo.DemoService/consumers
directory.subscribe(),这句代码一看就明白,应该是向注册中心订阅我们刚刚注册的地址,我们进入到这个方法里面去看看如果目录地址有变化,怎么通知,该做什么样的处理,最终的实现类是ZookeeperRegistry.doSubscribe()方法中,这里用到了模板方法,我们看到doSubscribe()方法中的这段代码notify(url, listener, urls)

  1. protected void notify(URL url, NotifyListener listener, List<URL> urls) {
  2. ···
  3. try {
  4. doNotify(url, listener, urls);
  5. } catch (Exception t) {
  6. // 将失败的通知请求记录到失败列表,定时重试
  7. Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
  8. ···
  9. listeners.put(listener, urls);
  10. ···
  11. }
  12. }
  13. 复制代码

这里面执行了doNotify方法,如果执行失败,对应的通过定时策略去重试,继续进入doNotify方法

  1. protected void notify(URL url, NotifyListener listener, List<URL> urls) {
  2. ···
  3. for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
  4. String category = entry.getKey();
  5. List<URL> categoryList = entry.getValue();
  6. categoryNotified.put(category, categoryList);
  7. saveProperties(url);
  8. listener.notify(categoryList);
  9. }
  10. }
  11. 复制代码

这个是AbstractRegistry类中的方法,我们看到saveProperties方法,作用是把消费端注册的url信息缓存到本地

  1. registryCacheExecutor.execute(new SaveProperties(version));
  2. 复制代码

然后通过线程池来定时缓存数据,我们继续看一下listener.notify(categoryList)这句代码,这里的listener是RegistryDirectory

  1. public synchronized void notify(List<URL> urls) {
  2. List<URL> invokerUrls = new ArrayList<URL>();
  3. List<URL> routerUrls = new ArrayList<URL>();
  4. List<URL> configuratorUrls = new ArrayList<URL>();
  5. for (URL url : urls) {
  6. String protocol = url.getProtocol();
  7. String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
  8. if (Constants.ROUTERS_CATEGORY.equals(category)
  9. || Constants.ROUTE_PROTOCOL.equals(protocol)) {
  10. routerUrls.add(url);
  11. } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
  12. || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
  13. configuratorUrls.add(url);
  14. } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
  15. invokerUrls.add(url);
  16. } else {
  17. logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
  18. }
  19. }
  20. ···
  21. // providers
  22. refreshInvoker(invokerUrls);
  23. }
  24. 复制代码

看到最后一段代码refreshInvoker(invokerUrls)

  1. /**
  2. * 根据invokerURL列表转换为invoker列表。转换规则如下:
  3. * 1.如果url已经被转换为invoker,则不在重新引用,直接从缓存中获取,注意如果url中任何一个参数变更也会重新引用
  4. * 2.如果传入的invoker列表不为空,则表示最新的invoker列表
  5. * 3.如果传入的invokerUrl列表是空,则表示只是下发的override规则或route规则,需要重新交叉对比,决定是否需要重新引用。
  6. *
  7. * @param invokerUrls 传入的参数不能为null
  8. */
  9. // TODO: FIXME 使用线程池去刷新地址,否则可能会导致任务堆积
  10. private void refreshInvoker(List<URL> invokerUrls) {
  11. if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
  12. && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
  13. this.forbidden = true; // 禁止访问
  14. this.methodInvokerMap = null; // 置空列表
  15. destroyAllInvokers(); // 关闭所有Invoker
  16. } else {
  17. this.forbidden = false; // 允许访问
  18. Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
  19. if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null) {
  20. invokerUrls.addAll(this.cachedInvokerUrls);
  21. } else {
  22. this.cachedInvokerUrls = new HashSet<URL>();
  23. this.cachedInvokerUrls.addAll(invokerUrls);//缓存invokerUrls列表,便于交叉对比
  24. }
  25. if (invokerUrls.size() == 0) {
  26. return;
  27. }
  28. Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// 将URL列表转成Invoker列表
  29. Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表
  30. // state change
  31. //如果计算错误,则不进行处理.
  32. if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
  33. logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
  34. return;
  35. }
  36. this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
  37. this.urlInvokerMap = newUrlInvokerMap;
  38. try {
  39. destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // 关闭未使用的Invoker
  40. } catch (Exception e) {
  41. logger.warn("destroyUnusedInvokers error. ", e);
  42. }
  43. }
  44. }
  45. 复制代码

这段代码的最终目的是刷新urlInvokerMap缓存,并且关闭关闭未使用的Invoker 接下来我们继续cluster.join(directory)这个方法 ,此时的cluster=Cluster$Adaptive

  1. public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
  2. if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
  3. if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
  4. String extName = url.getParameter("cluster", "failover");
  5. if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
  6. com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
  7. return extension.join(arg0);
  8. }
  9. 复制代码

此时extension=MockClusterWrapper(FaileOverCluster), 这里有一个Mock包装类,猜想一下,这个Mock应该是Dubbo的容错机制中用到的Mock,进入MockClusterWrapper.join方法

  1. public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
  2. return new MockClusterInvoker<T>(directory,
  3. this.cluster.join(directory));
  4. }
  5. 复制代码

这里new了一个MockClusterInvoker,进入FaileOverCluster.join方法

  1. public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
  2. return new FailoverClusterInvoker<T>(directory);
  3. }
  4. 复制代码

这里new 了一个FailoverClusterInvoker,然后回到最初的ReferenceConfig.createProxy方法,看到最后一段代码return (T) proxyFactory.getProxy(invoker);这段代码的作用是创建服务代理,这里的invoker就是我们刚刚new的MockClusterInvoker,这里的proxyFactory=ProxyFactory$Adaptive,直接贴结果,进入StubProxyFactoryWrapper.getProxy

  1. public <T> T getProxy(Invoker<T> invoker) throws RpcException {
  2. T proxy = proxyFactory.getProxy(invoker);
  3. if (GenericService.class != invoker.getInterface()) {
  4. String stub = invoker.getUrl().getParameter(Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY));
  5. if (ConfigUtils.isNotEmpty(stub)) {
  6. Class<?> serviceType = invoker.getInterface();
  7. if (ConfigUtils.isDefault(stub)) {
  8. if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) {
  9. stub = serviceType.getName() + "Stub";
  10. } else {
  11. stub = serviceType.getName() + "Local";
  12. }
  13. }
  14. try {
  15. Class<?> stubClass = ReflectUtils.forName(stub);
  16. if (!serviceType.isAssignableFrom(stubClass)) {
  17. throw new IllegalStateException("The stub implemention class " + stubClass.getName() + " not implement interface " + serviceType.getName());
  18. }
  19. try {
  20. Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);
  21. proxy = (T) constructor.newInstance(new Object[]{proxy});
  22. //export stub service
  23. URL url = invoker.getUrl();
  24. if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)) {
  25. url = url.addParameter(Constants.STUB_EVENT_METHODS_KEY, StringUtils.join(Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ","));
  26. url = url.addParameter(Constants.IS_SERVER_KEY, Boolean.FALSE.toString());
  27. try {
  28. export(proxy, (Class) invoker.getInterface(), url);
  29. } catch (Exception e) {
  30. LOGGER.error("export a stub service error.", e);
  31. }
  32. }
  33. } catch (NoSuchMethodException e) {
  34. throw new IllegalStateException("No such constructor \"public " + stubClass.getSimpleName() + "(" + serviceType.getName() + ")\" in stub implemention class " + stubClass.getName(), e);
  35. }
  36. } catch (Throwable t) {
  37. LOGGER.error("Failed to create stub implemention class " + stub + " in consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", cause: " + t.getMessage(), t);
  38. // ignore
  39. }
  40. }
  41. }
  42. return proxy;
  43. }
  44. 复制代码

我们先看第一行代码 T proxy = proxyFactory.getProxy(invoker);
这里的proxyFactory=JavassitProxyFactory,我们首先进入的是AbstractProxyFactory.getProxy方法,这里又是一个模版方法,

  1. public <T> T getProxy(Invoker<T> invoker) throws RpcException {
  2. Class<?>[] interfaces = null;
  3. String config = invoker.getUrl().getParameter("interfaces");
  4. ···
  5. if (interfaces == null) {
  6. interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
  7. }
  8. return getProxy(invoker, interfaces);
  9. }
  10. 复制代码

进入JavassitProxyFactory.getProxy方法,

  1. public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
  2. return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
  3. }
  4. 复制代码

这里传入的interfaces=[interface com.alibaba.dubbo.demo.DemoService, interface com.alibaba.dubbo.rpc.service.EchoService]
再进入new InvokerInvocationHandler(invoker),这里初始化一个InvokerInvocationHandler对象,我们看下这个对象

  1. public class InvokerInvocationHandler implements InvocationHandler {
  2. private final Invoker<?> invoker;
  3. public InvokerInvocationHandler(Invoker<?> handler) {
  4. this.invoker = handler;
  5. }
  6. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  7. String methodName = method.getName();
  8. Class<?>[] parameterTypes = method.getParameterTypes();
  9. if (method.getDeclaringClass() == Object.class) {
  10. return method.invoke(invoker, args);
  11. }
  12. if ("toString".equals(methodName) && parameterTypes.length == 0) {
  13. return invoker.toString();
  14. }
  15. if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
  16. return invoker.hashCode();
  17. }
  18. if ("equals".equals(methodName) && parameterTypes.length == 1) {
  19. return invoker.equals(args[0]);
  20. }
  21. return invoker.invoke(new RpcInvocation(method, args)).recreate();
  22. }
  23. }
  24. 复制代码

这里用了JDK自带的动态代理Proxy类和InvocationHandler接口,到这里proxy代理类创建完成。

总结

从Dubbo官网上找到一张引用服务的时序图

转载于:https://juejin.im/post/5bb57e3f5188255c520d28b0

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/165817
推荐阅读
相关标签
  

闽ICP备14008679号