当前位置:   article > 正文

【六】dubbo源码分析之服务调用方refer(服务引用、服务订阅、创建invoker、创建代理、查看动态生成的.class文件)_qos won't be started because it is disabled. pleas

qos won't be started because it is disabled. please check dubbo.application.

一、简介

前面一篇讲了服务调用方启动的大致流程

本章主要讲refer服务引用,分成两个重点,一个是创建invoker,一个是创建代理。

入口:ReferenceConfig类的createProxy方法

1.关于connection的问题

  1. 如果connections不配置,则共享连接,否则每服务每连接,

  2. 共享连接的意思是对于同一个ip+port的所有服务只创建一个连接

  3. 如果是非共享连接则每个服务+(ip+port)创建一个连接 

二、服务引用

该方法里面有一句

invoker = refprotocol.refer(interfaceClass, urls.get(0));

这一句是本文的入口

这里的  invoker = refprotocol.refer(interfaceClass, urls.get(0));就是通过dubbo的SPI机制,跟进去的调用流程是:

ProtocolListenerWrapper.refer方法 --------> ProtocolFilterWrapper.refer --------> QosProtocolWrapper.refer --------> QosProtocolWrapper.startQosServer(url); --------> RegistryProtocol.refer --------> RegistryProtocol.doRefer

--------> MockClusterWrapper.join ------> FailoverCluster.join ------> ProviderConsumerRegTable.registerConsumer 

1.QosProtocolWrapper.refer

  1. @Override
  2. public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
  3. if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
  4. startQosServer(url);
  5. return protocol.refer(type, url);
  6. }
  7. return protocol.refer(type, url);
  8. }

这个startQosServer方法跟进去

  1. private void startQosServer(URL url) {
  2. try {
  3. boolean qosEnable = url.getParameter(QOS_ENABLE,true);
  4. if (!qosEnable) {
  5. logger.info("qos won't be started because it is disabled. " +
  6. "Please check dubbo.application.qos.enable is configured either in system property, " +
  7. "dubbo.properties or XML/spring boot configuration.");
  8. return;
  9. }
  10. if (!hasStarted.compareAndSet(false, true)) {
  11. return;
  12. }
  13. int port = url.getParameter(QOS_PORT, DEFAULT_PORT);
  14. boolean acceptForeignIp = Boolean.parseBoolean(url.getParameter(ACCEPT_FOREIGN_IP,"false"));
  15. Server server = com.alibaba.dubbo.qos.server.Server.getInstance();
  16. server.setPort(port);
  17. server.setAcceptForeignIp(acceptForeignIp);
  18. server.start();
  19. } catch (Throwable throwable) {
  20. logger.warn("Fail to start qos server: ", throwable);
  21. }
  22. }

做了几件事:

开始QOS服务

2.RegistryProtocol.refer

  1. @Override
  2. @SuppressWarnings("unchecked")
  3. public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
  4. // 取 registry 参数值,并将其设置为协议头
  5. url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
  6. // 获取注册中心实例
  7. Registry registry = registryFactory.getRegistry(url);
  8. if (RegistryService.class.equals(type)) {
  9. //debug的时候没走这里
  10. return proxyFactory.getInvoker((T) registry, type, url);
  11. }
  12. // group="a,b" or group="*"
  13. // 将 url 查询字符串转为 Map
  14. Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
  15. // 获取 group 配置
  16. String group = qs.get(Constants.GROUP_KEY);
  17. if (group != null && group.length() > 0) {
  18. // debug的时候没走这里
  19. if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
  20. || "*".equals(group)) {
  21. // 通过 SPI 加载 Cluster 实例,并调用 doRefer 继续执行服务引用逻辑
  22. return doRefer(getMergeableCluster(), registry, type, url);
  23. }
  24. }
  25. // 调用 doRefer 继续执行服务引用逻辑
  26. return doRefer(cluster, registry, type, url);
  27. }

 这里的url=zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-annotation-consumer&dubbo=2.0.2&pid=9044&qos.port=33333&refer=application%3Ddubbo-annotation-consumer%26default.timeout%3D3000%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.samples.api.client.HelloService%26methods%3DsayHello%26pid%3D9044%26qos.port%3D33333%26register.ip%3DXXX.XXX.XXX.XXX%26side%3Dconsumer%26timestamp%3D1603880645072×tamp=1603880668326

里面的registryFactory.getRegistry(url);用到了dubbo的SPI,这里得到的registry是ZookeeperRegistry

3.RegistryProtocol.doRefer

  1. private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
  2. // 创建 RegistryDirectory 实例
  3. RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
  4. // 设置注册中心和协议
  5. directory.setRegistry(registry);
  6. directory.setProtocol(protocol);
  7. // all attributes of REFER_KEY
  8. Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
  9. // 生成服务消费者链接
  10. URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
  11. // 注册服务消费者,在 consumers 目录下新节点
  12. if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
  13. && url.getParameter(Constants.REGISTER_KEY, true)) {
  14. URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
  15. registry.register(registeredConsumerUrl);
  16. directory.setRegisteredConsumerUrl(registeredConsumerUrl);
  17. }
  18. // 订阅 providers、configurators、routers 等节点数据
  19. directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
  20. Constants.PROVIDERS_CATEGORY
  21. + "," + Constants.CONFIGURATORS_CATEGORY
  22. + "," + Constants.ROUTERS_CATEGORY));
  23. // 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个
  24. Invoker invoker = cluster.join(directory);
  25. ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
  26. return invoker;
  27. }

做了几件事

背景例子:

消费者服务所在的项目叫:dubbo-consumer

里面就一个类ProductServiceImpl一个方法addCostByItemId,需要去调用dubbo-provider项目中的itemService服务和costService服务

服务提供者所在的项目叫:dubbo-provider

里面有2个服务itemService和costService

现在debug是搞的消费者服务

当ProductServiceImpl实例化过后,依赖注入成员的时候会走到这里来,第一次来是注入costService

1.创建 RegistryDirectory 实例。

(这个是服务目录。服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。类似于注册中心在消费者端本地的缓存,不知道dubbo这个是不是从注册中心拉下来的。后面会专门开一篇来讲这个)

2.设置注册中心和协议

3.URL subscribeUrl

生成服务消费者链接

第一次进来是是注入costService出发的,此时subscribeUrl=consumer://XXX.XXX.XXX.XX/com.sid.api.service.CostService?application=dubbo-consumer&default.check=false&default.timeout=3000&dubbo=2.6.2&interface=com.sid.api.service.CostService&loadbalance=roundrobin&methods=add,subtract&pid=10116&revision=1.0.0&side=consumer×tamp=1620626010244&version=1.0.0

4.registry.register方法

注册服务消费者,在 consumers 目录下新节点

5.订阅服务directory.subscribe方法

订阅 providers、configurators、routers 等节点数据

下面三、服务订阅详细讲

6.由于一个服务可能部署在多台服务器上,这样就会在 providers 产生多个节点,这个时候就需要 Cluster 将多个服务节点合并为一个,并生成一个 Invoker

三、服务订阅

RegistryDirectory.subscribe方法

  1. public void subscribe(URL url) {
  2. setConsumerUrl(url);
  3. registry.subscribe(url, this);
  4. }

这里面调用的是FailbackRegistry.subscribe

FailbackRegistry.subscribe方法

  1. public void subscribe(URL url, NotifyListener listener) {
  2. super.subscribe(url, listener);
  3. removeFailedSubscribed(url, listener);
  4. try {
  5. // 向服务器端发送订阅请求
  6. // Sending a subscription request to the server side
  7. doSubscribe(url, listener);
  8. } catch (Exception e) {
  9. Throwable t = e;
  10. List<URL> urls = getCacheUrls(url);
  11. if (urls != null && !urls.isEmpty()) {
  12. notify(url, listener, urls);
  13. logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
  14. } else {
  15. // 如果开启了启动时检测,则直接抛出异常
  16. // If the startup detection is opened, the Exception is thrown directly.
  17. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
  18. && url.getParameter(Constants.CHECK_KEY, true);
  19. boolean skipFailback = t instanceof SkipFailbackWrapperException;
  20. if (check || skipFailback) {
  21. if (skipFailback) {
  22. t = t.getCause();
  23. }
  24. throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
  25. } else {
  26. logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
  27. }
  28. }
  29. // 将失败的订阅请求记录到失败列表,定时重试
  30. // Record a failed registration request to a failed list, retry regularly
  31. addFailedSubscribed(url, listener);
  32. }
  33. }

 主要做了几件事:

1.ZookeeperRegistry.doSubscribe方法

向服务器端发送订阅请求

2.doSubscribe方法如果抛出异常,则

notify()或者 如果开启了启动时检测,则直接抛出异常

将失败的订阅请求记录到失败列表,定时重试addFailedSubscribed(url, listener);

 ZookeeperRegistry.doSubscribe方法

  1. @Override
  2. protected void doSubscribe(final URL url, final NotifyListener listener) {
  3. try {
  4. //如果provider的service的接口配置的是“*”
  5. if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
  6. //获取服务分组根路径
  7. String root = toRootPath();
  8. //获取服务的NotifyListener
  9. ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
  10. if (listeners == null) {
  11. //如果没有则创建一个
  12. zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
  13. listeners = zkListeners.get(url);
  14. }
  15. ChildListener zkListener = listeners.get(listener);
  16. //如果没有子监听器则创建一个
  17. if (zkListener == null) {
  18. listeners.putIfAbsent(listener, new ChildListener() {
  19. @Override
  20. public void childChanged(String parentPath, List<String> currentChilds) {
  21. for (String child : currentChilds) {
  22. child = URL.decode(child);
  23. if (!anyServices.contains(child)) {
  24. anyServices.add(child);
  25. subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
  26. Constants.CHECK_KEY, String.valueOf(false)), listener);
  27. }
  28. }
  29. }
  30. });
  31. zkListener = listeners.get(listener);
  32. }
  33. //向服务器订阅服务,注册中心会调用NotifyListener的notify函数返回服务列表
  34. zkClient.create(root, false);
  35. //获取服务地址列表
  36. List<String> services = zkClient.addChildListener(root, zkListener);
  37. if (services != null && !services.isEmpty()) {
  38. //如果存在服务
  39. for (String service : services) {
  40. service = URL.decode(service);
  41. anyServices.add(service);
  42. //如果serviceInterface是“*”则从分组根路径遍历service并订阅所有服务
  43. subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
  44. Constants.CHECK_KEY, String.valueOf(false)), listener);
  45. }
  46. }
  47. } else {
  48. //如果serviceInterface不是“*”则创建Zookeeper客户端索取服务列表,并通知(notify)消费者(consumer)这些服务可以用了
  49. List<URL> urls = new ArrayList<URL>();
  50. //获取类似于http://xxx.xxx.xxx.xxx/context/com.service.xxxService/consumer的地址
  51. for (String path : toCategoriesPath(url)) {
  52. //获取例如com.service.xxxService对应的NotifyListener map
  53. ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
  54. if (listeners == null) {
  55. zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
  56. listeners = zkListeners.get(url);
  57. }
  58. //获取ChildListener
  59. ChildListener zkListener = listeners.get(listener);
  60. if (zkListener == null) {
  61. listeners.putIfAbsent(listener, new ChildListener() {
  62. @Override
  63. public void childChanged(String parentPath, List<String> currentChilds) {
  64. ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
  65. }
  66. });
  67. zkListener = listeners.get(listener);
  68. }
  69. //创建Zookeeper客户端
  70. zkClient.create(path, false);
  71. List<String> children = zkClient.addChildListener(path, zkListener);
  72. if (children != null) {
  73. urls.addAll(toUrlsWithEmpty(url, path, children));
  74. }
  75. }
  76. //提醒消费者
  77. notify(url, listener, urls);
  78. }
  79. } catch (Throwable e) {
  80. throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
  81. }
  82. }

主要做了几件事

1.在zk中添加节点和节点监听器

2.notify通知消费者去创建跟服务提供者之间的TCP连接

FailbackRegistry.notify方法

里面的调用链需关注的是

这个里面是其实通知registryDirectory刷新invoker,因为这里是第一次服务调用方第一期启动,所以实际上是在创建invoker、通过NETTY与服务提供方建立TCP长链接。

可以看到通过RegistryDirectory.toInvokers方法触发了Protocol@Adaptive.refer()-------->QosProtocolWrapper.refer()-------->ProtocolFilterWrapper.refer()-------->ProtocolListenerWrapper.refer()-------->DubboProtocol.refer()

DubboProtocol.refer()方法源码:

  1. @Override
  2. public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
  3. optimizeSerialization(url);
  4. // create rpc invoker.
  5. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
  6. invokers.add(invoker);
  7. return invoker;
  8. }

主要做了几件事:

1.创建RPC用的invoker并且返回

 这里创建Invoker不复杂,就是一个new,但是重点关注一下getClients(url)方法,该方法会用netty去建立跟服务提供方的TCP链接。

在四、创建Invoker中详细讲

四、创建Invoker、用netty去建立跟服务提供方的TCP链接

创建invoker的时候用netty去建立跟服务提供方的TCP链接

入口

DubboProtocol.refer()方法中的

DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);

 getClients(url)方法源码

  1. private ExchangeClient[] getClients(URL url) {
  2. // whether to share connection
  3. //是否共享连接
  4. boolean service_share_connect = false;
  5. int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
  6. // if not configured, connection is shared, otherwise, one connection for one service
  7. // 如果connections不配置,则共享连接,否则每服务每连接,
  8. // 共享连接的意思是对于同一个ip+port的所有服务只创建一个连接,
  9. // 如果是非共享连接则每个服务+(ip+port)创建一个连接
  10. if (connections == 0) {
  11. service_share_connect = true;
  12. connections = 1;
  13. }
  14. ExchangeClient[] clients = new ExchangeClient[connections];
  15. for (int i = 0; i < clients.length; i++) {
  16. if (service_share_connect) {
  17. clients[i] = getSharedClient(url);
  18. } else {
  19. clients[i] = initClient(url);
  20. }
  21. }
  22. return clients;
  23. }

 主要做了几件事

1.获取交换客户端ExchangeClient,默认使用共享链接

这里注意下

  1. 如果connections不配置,则共享连接,否则每服务每连接,

  2. 共享连接的意思是对于同一个ip+port的所有服务只创建一个连接

  3. 如果是非共享连接则每个服务+(ip+port)创建一个连接

getSharedClient(url)方法

  1. private ExchangeClient getSharedClient(URL url) {
  2. // 以address(ip:port)为key进行缓存
  3. String key = url.getAddress();
  4. ReferenceCountExchangeClient client = referenceClientMap.get(key);
  5. if (client != null) {
  6. // 如果连接存在了则引用数加1,引用数表示有多少个服务使用了此client,
  7. // 当某个client调用close()时,引用数减一,
  8. // 如果引用数大于0,表示还有服务在使用此连接, 不会真正关闭client
  9. // 如果引用数为0,表示没有服务在用此连接,此时连接彻底关闭
  10. if (!client.isClosed()) {
  11. client.incrementAndGetCount();
  12. return client;
  13. } else {
  14. referenceClientMap.remove(key);
  15. }
  16. }
  17. synchronized (key.intern()) {
  18. // 调用initClient来初始化Client
  19. ExchangeClient exchangeClient = initClient(url);
  20. // 使用ReferenceCountExchangeClient进行包装
  21. client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
  22. referenceClientMap.put(key, client);
  23. ghostClientMap.remove(key);
  24. return client;
  25. }
  26. }

主要做了几件事

1.以address(ip:port)为key进行缓存

 如果连接存在了则引用数加1,引用数表示有多少个服务使用了此client,  
 当某个client调用close()时,引用数减一,  
 如果引用数大于0,表示还有服务在使用此连接, 不会真正关闭client  
 如果引用数为0,表示没有服务在用此连接,此时连接彻底关闭 

2.根据URL初始化exchangeClient 

3.得到referenceCountExchangeClient

4.把这些client保存到MAP中

initClient方法

  1. private ExchangeClient initClient(URL url) {
  2. // 获取client参数的值,为空则获取server参数的值,默认为netty
  3. // client type setting.
  4. String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
  5. // 加入codec参数,默认为dubbo,即DubboCodec
  6. url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
  7. //默认开启心跳,默认每60s发送一次心跳包
  8. // enable heartbeat by default
  9. url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
  10. // BIO存在严重性能问题,暂时不允许使用
  11. // BIO is not allowed since it has severe performance issue.
  12. if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
  13. throw new RpcException("Unsupported client type: " + str + "," +
  14. " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
  15. }
  16. ExchangeClient client;
  17. try {
  18. // connection should be lazy
  19. if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
  20. client = new LazyConnectExchangeClient(url, requestHandler);
  21. } else {
  22. client = Exchangers.connect(url, requestHandler);
  23. }
  24. } catch (RemotingException e) {
  25. throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
  26. }
  27. return client;
  28. }

主要做了几件事

1.设置client的类型,默认是netty,设置codec,默认是dubboCodec,默认开启心跳60S发一次包

2.如果Connection不是懒加载,那就立马创建跟服务提供方的connection

 Exchangers.connect(url, requestHandler)-------->getExchanger(url).connect(url, handler);-------->HeaderExchanger.connect()-------->Transporters.connect()-------->NettyTransporter.connect()-------->NettyClient构造函数-------->AbstractClient构造函数

AbstractClient构造函数

  1. public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
  2. super(url, handler);
  3. send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
  4. shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
  5. // The default reconnection interval is 2s, 1800 means warning interval is 1 hour.
  6. reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);
  7. try {
  8. doOpen();
  9. } catch (Throwable t) {
  10. close();
  11. throw new RemotingException(url.toInetSocketAddress(), null,
  12. "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
  13. + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
  14. }
  15. try {
  16. // connect.
  17. connect();
  18. if (logger.isInfoEnabled()) {
  19. logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
  20. }
  21. } catch (RemotingException t) {
  22. if (url.getParameter(Constants.CHECK_KEY, true)) {
  23. close();
  24. throw t;
  25. } else {
  26. logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
  27. + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
  28. }
  29. } catch (Throwable t) {
  30. close();
  31. throw new RemotingException(url.toInetSocketAddress(), null,
  32. "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
  33. + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
  34. }
  35. executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
  36. .getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
  37. ExtensionLoader.getExtensionLoader(DataStore.class)
  38. .getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
  39. }

这部分看一下:https://blog.csdn.net/youaremoon/article/details/50826649

五、创建代理proxy

创建代理的入口在ReferenceConfig类的createProxy方法中有一句

return (T) proxyFactory.getProxy(invoker);

这里的proxyFactory通过dubbo的SPI得到

  1. package com.alibaba.dubbo.rpc;
  2. import com.alibaba.dubbo.common.Constants;
  3. import com.alibaba.dubbo.common.URL;
  4. import com.alibaba.dubbo.common.extension.Adaptive;
  5. import com.alibaba.dubbo.common.extension.SPI;
  6. /**
  7. * ProxyFactory. (API/SPI, Singleton, ThreadSafe)
  8. */
  9. @SPI("javassist")
  10. public interface ProxyFactory {
  11. @Adaptive({Constants.PROXY_KEY})
  12. <T> T getProxy(Invoker<T> invoker) throws RpcException;
  13. @Adaptive({Constants.PROXY_KEY})
  14. <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;
  15. @Adaptive({Constants.PROXY_KEY})
  16. <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
  17. }

 调用了StubProxyFactoryWrapper的getProxy方法

  1. @Override
  2. @SuppressWarnings({"unchecked", "rawtypes"})
  3. public <T> T getProxy(Invoker<T> invoker) throws RpcException {
  4. T proxy = proxyFactory.getProxy(invoker);
  5. if (GenericService.class != invoker.getInterface()) {
  6. String stub = invoker.getUrl().getParameter(Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY));
  7. if (ConfigUtils.isNotEmpty(stub)) {
  8. Class<?> serviceType = invoker.getInterface();
  9. if (ConfigUtils.isDefault(stub)) {
  10. if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) {
  11. stub = serviceType.getName() + "Stub";
  12. } else {
  13. stub = serviceType.getName() + "Local";
  14. }
  15. }
  16. try {
  17. Class<?> stubClass = ReflectUtils.forName(stub);
  18. if (!serviceType.isAssignableFrom(stubClass)) {
  19. throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + serviceType.getName());
  20. }
  21. try {
  22. Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);
  23. proxy = (T) constructor.newInstance(new Object[]{proxy});
  24. //export stub service
  25. URL url = invoker.getUrl();
  26. if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)) {
  27. url = url.addParameter(Constants.STUB_EVENT_METHODS_KEY, StringUtils.join(Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ","));
  28. url = url.addParameter(Constants.IS_SERVER_KEY, Boolean.FALSE.toString());
  29. try {
  30. export(proxy, (Class) invoker.getInterface(), url);
  31. } catch (Exception e) {
  32. LOGGER.error("export a stub service error.", e);
  33. }
  34. }
  35. } catch (NoSuchMethodException e) {
  36. throw new IllegalStateException("No such constructor \"public " + stubClass.getSimpleName() + "(" + serviceType.getName() + ")\" in stub implementation class " + stubClass.getName(), e);
  37. }
  38. } catch (Throwable t) {
  39. LOGGER.error("Failed to create stub implementation class " + stub + " in consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", cause: " + t.getMessage(), t);
  40. // ignore
  41. }
  42. }
  43. }
  44. return proxy;
  45. }

AbstractProxyFactory.getProxy方法

  1. package com.alibaba.dubbo.rpc.proxy;
  2. import com.alibaba.dubbo.common.Constants;
  3. import com.alibaba.dubbo.common.utils.ReflectUtils;
  4. import com.alibaba.dubbo.rpc.Invoker;
  5. import com.alibaba.dubbo.rpc.ProxyFactory;
  6. import com.alibaba.dubbo.rpc.RpcException;
  7. import com.alibaba.dubbo.rpc.service.EchoService;
  8. import com.alibaba.dubbo.rpc.service.GenericService;
  9. /**
  10. * AbstractProxyFactory
  11. */
  12. public abstract class AbstractProxyFactory implements ProxyFactory {
  13. @Override
  14. public <T> T getProxy(Invoker<T> invoker) throws RpcException {
  15. return getProxy(invoker, false);
  16. }
  17. @Override
  18. public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
  19. Class<?>[] interfaces = null;
  20. // 获取接口列表
  21. String config = invoker.getUrl().getParameter("interfaces");
  22. // debug的时候这里没进来
  23. if (config != null && config.length() > 0) {
  24. // 切分接口列表
  25. String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
  26. if (types != null && types.length > 0) {
  27. interfaces = new Class<?>[types.length + 2];
  28. // 设置服务接口类和 EchoService.class 到 interfaces 中
  29. interfaces[0] = invoker.getInterface();
  30. interfaces[1] = EchoService.class;
  31. for (int i = 0; i < types.length; i++) {
  32. // 加载接口类
  33. interfaces[i + 1] = ReflectUtils.forName(types[i]);
  34. }
  35. }
  36. }
  37. if (interfaces == null) {
  38. interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
  39. }
  40. // 为 http 和 hessian 协议提供泛化调用支持
  41. // debug的时候这里没进来
  42. if (!invoker.getInterface().equals(GenericService.class) && generic) {
  43. int len = interfaces.length;
  44. Class<?>[] temp = interfaces;
  45. // 创建新的 interfaces 数组
  46. interfaces = new Class<?>[len + 1];
  47. System.arraycopy(temp, 0, interfaces, 0, len);
  48. // 设置 GenericService.class 到数组中
  49. interfaces[len] = GenericService.class;
  50. }
  51. // 调用重载方法
  52. return getProxy(invoker, interfaces);
  53. }
  54. public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
  55. }

做了几件事:

1.获取接口列表、切分接口列表、设置服务接口类和 EchoService.class 到 interfaces 中、加载接口类

2.为 http 和 hessian 协议提供泛化调用支持

3.调用子类重载方法

 JavassistProxyFactory.getProxy

  1. public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
  2. return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
  3. }

做了几件事

1.通过 Proxy 的 getProxy 方法获取 Proxy 子类

2.创建 InvokerInvocationHandler 对象,并将该对象传给 newInstance 生成 Proxy 实例。

InvokerInvocationHandler 实现 JDK 的 InvocationHandler 接口,具体的用途是拦截接口类调用。

Proxy.getProxy方法

  1. public static Proxy getProxy(Class<?>... ics) {
  2. return getProxy(ClassHelper.getClassLoader(Proxy.class), ics);
  3. }
  1. public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
  2. if (ics.length > 65535)
  3. throw new IllegalArgumentException("interface limit exceeded");
  4. StringBuilder sb = new StringBuilder();
  5. // 遍历接口列表
  6. // 这里debug的时候接口列表有:HelloService和EchoService
  7. for (int i = 0; i < ics.length; i++) {
  8. String itf = ics[i].getName();
  9. // 检测类型是否为接口
  10. if (!ics[i].isInterface())
  11. throw new RuntimeException(itf + " is not a interface.");
  12. Class<?> tmp = null;
  13. try {
  14. // 重新加载接口类
  15. tmp = Class.forName(itf, false, cl);
  16. } catch (ClassNotFoundException e) {
  17. }
  18. // 检测接口是否相同,这里 tmp 有可能为空
  19. if (tmp != ics[i])
  20. throw new IllegalArgumentException(ics[i] + " is not visible from class loader");
  21. // 拼接接口全限定名,分隔符为 ;
  22. sb.append(itf).append(';');
  23. }
  24. // 使用拼接后的接口名作为 key
  25. //debug的时候key=org.apache.dubbo.samples.api.client.HelloService;com.alibaba.dubbo.rpc.service.EchoService;
  26. // use interface class name list as key.
  27. String key = sb.toString();
  28. // get cache by class loader.
  29. Map<String, Object> cache;
  30. synchronized (ProxyCacheMap) {
  31. cache = ProxyCacheMap.get(cl);
  32. if (cache == null) {
  33. cache = new HashMap<String, Object>();
  34. ProxyCacheMap.put(cl, cache);
  35. }
  36. }
  37. Proxy proxy = null;
  38. synchronized (cache) {
  39. do {
  40. // 从缓存中获取 Reference<Proxy> 实例
  41. Object value = cache.get(key);
  42. if (value instanceof Reference<?>) {
  43. proxy = (Proxy) ((Reference<?>) value).get();
  44. if (proxy != null)
  45. return proxy;
  46. }
  47. // 并发控制,保证只有一个线程可以进行后续操作
  48. if (value == PendingGenerationMarker) {
  49. try {
  50. // 其他线程在此处进行等待
  51. cache.wait();
  52. } catch (InterruptedException e) {
  53. }
  54. } else {
  55. // 放置标志位到缓存中,并跳出 while 循环进行后续操作
  56. cache.put(key, PendingGenerationMarker);
  57. break;
  58. }
  59. }
  60. while (true);
  61. }
  62. long id = PROXY_CLASS_COUNTER.getAndIncrement();
  63. String pkg = null;
  64. ClassGenerator ccp = null, ccm = null;
  65. try {
  66. // 创建 ClassGenerator 对象
  67. ccp = ClassGenerator.newInstance(cl);
  68. Set<String> worked = new HashSet<String>();
  69. List<Method> methods = new ArrayList<Method>();
  70. for (int i = 0; i < ics.length; i++) {
  71. // 检测接口访问级别是否为 protected 或 privete
  72. if (!Modifier.isPublic(ics[i].getModifiers())) {
  73. // 获取接口包名
  74. String npkg = ics[i].getPackage().getName();
  75. if (pkg == null) {
  76. pkg = npkg;
  77. } else {
  78. if (!pkg.equals(npkg))
  79. // 非 public 级别的接口必须在同一个包下,否者抛出异常
  80. throw new IllegalArgumentException("non-public interfaces from different packages");
  81. }
  82. }
  83. // 添加接口到 ClassGenerator 中
  84. ccp.addInterface(ics[i]);
  85. // 遍历接口方法
  86. for (Method method : ics[i].getMethods()) {
  87. // 获取方法描述,可理解为方法签名
  88. String desc = ReflectUtils.getDesc(method);
  89. // 如果方法描述字符串已在 worked 中,则忽略。考虑这种情况,
  90. // A 接口和 B 接口中包含一个完全相同的方法
  91. if (worked.contains(desc))
  92. continue;
  93. worked.add(desc);
  94. int ix = methods.size();
  95. // 获取方法返回值类型
  96. Class<?> rt = method.getReturnType();
  97. // 获取参数列表
  98. Class<?>[] pts = method.getParameterTypes();
  99. // 生成 Object[] args = new Object[1...N]
  100. StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
  101. for (int j = 0; j < pts.length; j++)
  102. // 生成 args[1...N] = ($w)$1...N;
  103. code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
  104. // 生成 InvokerHandler 接口的 invoker 方法调用语句
  105. code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
  106. // 返回值不为 void
  107. if (!Void.TYPE.equals(rt))
  108. // 生成返回语句,形如 return (java.lang.String) ret;
  109. code.append(" return ").append(asArgument(rt, "ret")).append(";");
  110. methods.add(method);
  111. // 添加方法名、访问控制符、参数列表、方法代码等信息到 ClassGenerator 中
  112. ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
  113. }
  114. }
  115. if (pkg == null)
  116. pkg = PACKAGE_NAME;
  117. // create ProxyInstance class.
  118. // 构建接口代理类名称:pkg + ".proxy" + id,比如 org.apache.dubbo.proxy0
  119. String pcn = pkg + ".proxy" + id;
  120. ccp.setClassName(pcn);
  121. ccp.addField("public static java.lang.reflect.Method[] methods;");
  122. // 生成 private java.lang.reflect.InvocationHandler handler;
  123. ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
  124. // 为接口代理类添加带有 InvocationHandler 参数的构造方法,比如:
  125. // porxy0(java.lang.reflect.InvocationHandler arg0) {
  126. // handler=$1;
  127. // }
  128. ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
  129. // 为接口代理类添加默认构造方法
  130. ccp.addDefaultConstructor();
  131. // 生成接口代理类
  132. Class<?> clazz = ccp.toClass();
  133. clazz.getField("methods").set(null, methods.toArray(new Method[0]));
  134. // 构建 Proxy 子类名称,比如 Proxy1,Proxy2 等
  135. // create Proxy class.
  136. String fcn = Proxy.class.getName() + id;
  137. ccm = ClassGenerator.newInstance(cl);
  138. ccm.setClassName(fcn);
  139. ccm.addDefaultConstructor();
  140. ccm.setSuperClass(Proxy.class);
  141. // 为 Proxy 的抽象方法 newInstance 生成实现代码,形如:
  142. // public Object newInstance(java.lang.reflect.InvocationHandler h) {
  143. // return new org.apache.dubbo.proxy0($1);
  144. // }
  145. ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
  146. // 生成 Proxy 实现类
  147. Class<?> pc = ccm.toClass();
  148. // 通过反射创建 Proxy 实例
  149. proxy = (Proxy) pc.newInstance();
  150. } catch (RuntimeException e) {
  151. throw e;
  152. } catch (Exception e) {
  153. throw new RuntimeException(e.getMessage(), e);
  154. } finally {
  155. // release ClassGenerator
  156. if (ccp != null)
  157. // 释放资源
  158. ccp.release();
  159. if (ccm != null)
  160. ccm.release();
  161. synchronized (cache) {
  162. if (proxy == null)
  163. cache.remove(key);
  164. else
  165. // 写缓存
  166. cache.put(key, new WeakReference<Proxy>(proxy));
  167. // 唤醒其他等待线程
  168. cache.notifyAll();
  169. }
  170. }
  171. return proxy;
  172. }

 做了几件事:

1.遍历接口列表,重加载接口类

这里debug的时候接口列表有:HelloService和EchoService

2.ClassGenerator ccp 用于为服务接口生成代理类,比如测试代码中的 HelloService接口,这个接口代理类就是由 ccp 生成的。

3.ClassGenerator ccm 则是用于为 org.apache.dubbo.common.bytecode.Proxy 抽象类生成子类,主要是实现 Proxy 类的抽象方法。

四、查看动态生成的.class文件

找到HelloService接口生成的代理类的.class文件,看一下里面有些什么

1.创建文件hsdb.bat 文件内容如下:

java -classpath "E:\Java\jdk1.8.0_221\lib\sa-jdi.jar" sun.jvm.hotspot.HSDB

2.双击这个文件

3.输入进程ID

4.点击Tools---->Class Browser

5.搜索代理的名字,这个名字是在代码里面打断点看到的。然后点击create .class for this class ,生成的文件会在前面启动HSDB的那个目录下

6.用idea打开看proxy0.class

  1. package com.alibaba.dubbo.common.bytecode;
  2. import com.alibaba.dubbo.common.bytecode.ClassGenerator.DC;
  3. import com.alibaba.dubbo.rpc.service.EchoService;
  4. import java.lang.reflect.InvocationHandler;
  5. import java.lang.reflect.Method;
  6. import org.apache.dubbo.samples.api.client.HelloService;
  7. public class proxy0 implements DC, HelloService, EchoService {
  8. public static Method[] methods;
  9. private InvocationHandler handler;
  10. public proxy0(InvocationHandler var1) {
  11. this.handler = var1;
  12. }
  13. public proxy0() {
  14. }
  15. public String sayHello(String var1) {
  16. Object[] var2 = new Object[]{var1};
  17. Object var3 = this.handler.invoke(this, methods[0], var2);
  18. return (String)var3;
  19. }
  20. public Object $echo(Object var1) {
  21. Object[] var2 = new Object[]{var1};
  22. Object var3 = this.handler.invoke(this, methods[1], var2);
  23. return (Object)var3;
  24. }
  25. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/165833
推荐阅读
相关标签
  

闽ICP备14008679号