当前位置:   article > 正文

dubbo源码:consumer中reference引用生成

dubbo:reference customer

    前面我们已经看了dubbo-config在解析了配置中dubbo:reference标签之后会在容器中创建一个referenceBean,在系统中调用远程服务方法时会先从容器中取出该referenceBean实例。我们看其源码发现ReferenceBean除了实现InitializingBean接口还实现了FactoryBean接口,所以在从容器中获取实例时会调用该类实现方法getObject()。

  1. public class ReferenceBean<T> extends ReferenceConfig<T>
  2. implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {
  3. private static final long serialVersionUID = 213195494150089726L;
  4. private transient ApplicationContext applicationContext;
  5. public ReferenceBean() {
  6. super();
  7. }
  8. public ReferenceBean(Reference reference) {
  9. super(reference);
  10. }
  11. public void setApplicationContext(ApplicationContext applicationContext) {
  12. this.applicationContext = applicationContext;
  13. SpringExtensionFactory.addApplicationContext(applicationContext);
  14. }
  15. //从IOC中获取实例时调用
  16. public Object getObject() throws Exception {
  17. return get();
  18. }
  19. public Class<?> getObjectType() {
  20. return getInterfaceClass();
  21. }
  22. @Parameter(excluded = true)
  23. public boolean isSingleton() {
  24. return true;
  25. }
  26. @SuppressWarnings({ "unchecked" })
  27. public void afterPropertiesSet() throws Exception {
  28. 。。。
  29. }
  30. }

我们再getObject()方法中看到调用了其父类实现的同步方法get()在get()方法中进行了init初始化操作。该初始化操作就是我们接下来要看的主要内容。

  1. public synchronized T get() {
  2. if (destroyed){
  3. throw new IllegalStateException("Already destroyed!");
  4. }
  5. if (ref == null) { //ref代理接口的引用
  6. init();
  7. }
  8. return ref;
  9. }
  10. private void init() {
  11. if (initialized) {
  12. return;
  13. }
  14. initialized = true;
  15. if (interfaceName == null || interfaceName.length() == 0) {
  16. throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
  17. }
  18. // 获取消费者全局配置
  19. checkDefault();
  20. appendProperties(this);//
  21. //判断标签配置接口是否为泛化应用接口
  22. if (getGeneric() == null && getConsumer() != null) {
  23. setGeneric(getConsumer().getGeneric());
  24. }
  25. //加载接口class
  26. if (ProtocolUtils.isGeneric(getGeneric())) {
  27. interfaceClass = GenericService.class;
  28. } else {
  29. try {
  30. interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
  31. .getContextClassLoader());
  32. } catch (ClassNotFoundException e) {
  33. throw new IllegalStateException(e.getMessage(), e);
  34. }
  35. checkInterfaceAndMethods(interfaceClass, methods);//检查接口以及配置方法均为接口方法
  36. }
  37. //判断该类是否配置直连提供者,也可以通过文件方式配置dubbo2.0以上版本自动加载${user.home}/dubbo-resolve.properties文件,不需要配置。
  38. String resolve = System.getProperty(interfaceName);
  39. String resolveFile = null;
  40. if (resolve == null || resolve.length() == 0) {
  41. resolveFile = System.getProperty("dubbo.resolve.file");
  42. if (resolveFile == null || resolveFile.length() == 0) {
  43. File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
  44. if (userResolveFile.exists()) {
  45. resolveFile = userResolveFile.getAbsolutePath();
  46. }
  47. }
  48. if (resolveFile != null && resolveFile.length() > 0) {
  49. Properties properties = new Properties();
  50. FileInputStream fis = null;
  51. try {
  52. fis = new FileInputStream(new File(resolveFile));
  53. properties.load(fis);
  54. } catch (IOException e) {
  55. throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
  56. } finally {
  57. try {
  58. if(null != fis) fis.close();
  59. } catch (IOException e) {
  60. logger.warn(e.getMessage(), e);
  61. }
  62. }
  63. resolve = properties.getProperty(interfaceName);
  64. }
  65. }
  66. if (resolve != null && resolve.length() > 0) {
  67. url = resolve;
  68. if (logger.isWarnEnabled()) {
  69. if (resolveFile != null && resolveFile.length() > 0) {
  70. logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");
  71. } else {
  72. logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");
  73. }
  74. }
  75. }
  76. //加载consumer、module、registries、monitor、application等配置属性
  77. if (consumer != null) {
  78. if (application == null) {
  79. application = consumer.getApplication();
  80. }
  81. if (module == null) {
  82. module = consumer.getModule();
  83. }
  84. if (registries == null) {
  85. registries = consumer.getRegistries();
  86. }
  87. if (monitor == null) {
  88. monitor = consumer.getMonitor();
  89. }
  90. }
  91. if (module != null) {
  92. if (registries == null) {
  93. registries = module.getRegistries();
  94. }
  95. if (monitor == null) {
  96. monitor = module.getMonitor();
  97. }
  98. }
  99. if (application != null) {
  100. if (registries == null) {
  101. registries = application.getRegistries();
  102. }
  103. if (monitor == null) {
  104. monitor = application.getMonitor();
  105. }
  106. }
  107. checkApplication();//检查application配置
  108. checkStubAndMock(interfaceClass);//检查该接口引用是否为local、stub、mock等配置
  109. //装载配置信息
  110. Map<String, String> map = new HashMap<String, String>();
  111. Map<Object, Object> attributes = new HashMap<Object, Object>();
  112. map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
  113. map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
  114. map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
  115. if (ConfigUtils.getPid() > 0) {
  116. map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
  117. }
  118. if (! isGeneric()) {
  119. String revision = Version.getVersion(interfaceClass, version);
  120. if (revision != null && revision.length() > 0) {
  121. map.put("revision", revision);
  122. }
  123. //将引用的接口类转化成包装类,然后再获取接口中方法名称
  124. String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
  125. if(methods.length == 0) {
  126. logger.warn("NO method found in service interface " + interfaceClass.getName());
  127. map.put("methods", Constants.ANY_VALUE);
  128. }
  129. else {
  130. map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));//拼接方法名称用“,”分割
  131. }
  132. }
  133. //组装属性配置
  134. map.put(Constants.INTERFACE_KEY, interfaceName);
  135. appendParameters(map, application);
  136. appendParameters(map, module);
  137. appendParameters(map, consumer, Constants.DEFAULT_KEY);
  138. appendParameters(map, this);
  139. String prifix = StringUtils.getServiceKey(map);
  140. if (methods != null && methods.size() > 0) {
  141. for (MethodConfig method : methods) {
  142. appendParameters(map, method, method.getName());
  143. String retryKey = method.getName() + ".retry";
  144. if (map.containsKey(retryKey)) {
  145. String retryValue = map.remove(retryKey);
  146. if ("false".equals(retryValue)) {
  147. map.put(method.getName() + ".retries", "0");
  148. }
  149. }
  150. appendAttributes(attributes, method, prifix + "." + method.getName());
  151. checkAndConvertImplicitConfig(method, map, attributes);
  152. }
  153. }
  154. //attributes通过系统context进行存储.(dubbo内部维护的一个ConcurrentHashMap用于存储属性)
  155. StaticContext.getSystemContext().putAll(attributes);
  156. //根据配置信息创建代理类
  157. ref = createProxy(map);
  158. }

初始化中首先进行了属性的组装,最后进行接口代理引用的创建,这里提供了引用创建的入口我们继续往下看(createProxy()方法)。

  1. private T createProxy(Map<String, String> map) {
  2. URL tmpUrl = new URL("temp", "localhost", 0, map);
  3. final boolean isJvmRefer;
  4. if (isInjvm() == null) {//优先从JVM内获取引用实例,是否配置了本地调用<dubbo:protocol name="injvm" />
  5. if (url != null && url.length() > 0) { //指定URL值的情况下,不做本地引用
  6. isJvmRefer = false;
  7. } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {//本地有服务暴露使用本地调用
  8. //默认情况下如果本地有服务暴露,则引用本地服务.
  9. isJvmRefer = true;
  10. } else {
  11. isJvmRefer = false;
  12. }
  13. } else {
  14. isJvmRefer = isInjvm().booleanValue();
  15. }
  16. if (isJvmRefer) {//本地调用情况
  17. URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
  18. invoker = refprotocol.refer(interfaceClass, url);//生成了invoker,它代表一个可执行体,可向它发起invoke调用
  19. if (logger.isInfoEnabled()) {
  20. logger.info("Using injvm service " + interfaceClass.getName());
  21. }
  22. } else {
  23. //非本地调用,用户配置url,直连调用
  24. if (url != null && url.length() > 0) { // 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URL
  25. String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
  26. if (us != null && us.length > 0) {
  27. for (String u : us) {
  28. URL url = URL.valueOf(u);
  29. if (url.getPath() == null || url.getPath().length() == 0) {
  30. url = url.setPath(interfaceName);
  31. }
  32. if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
  33. urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
  34. } else {
  35. urls.add(ClusterUtils.mergeUrl(url, map));
  36. }
  37. }
  38. }
  39. } else { // 通过注册中心配置拼装URL(注册中心serviceURL)
  40. List<URL> us = loadRegistries(false);
  41. if (us != null && us.size() > 0) {
  42. for (URL u : us) {
  43. URL monitorUrl = loadMonitor(u);//监控中心url
  44. if (monitorUrl != null) {
  45. map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
  46. }
  47. urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));//注册中心url拼上refer(接口引用ref相关信息)
  48. }
  49. }
  50. if (urls == null || urls.size() == 0) {
  51. throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
  52. }
  53. }
  54. if (urls.size() == 1) {//只有一个注册中心时
  55. invoker = refprotocol.refer(interfaceClass, urls.get(0));
  56. } else {//多注册中心时产生多个invoker
  57. List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
  58. URL registryURL = null;
  59. for (URL url : urls) {
  60. invokers.add(refprotocol.refer(interfaceClass, url));//我们通过注册中心生成了invoker,它代表一个可执行体,可向它发起invoke调用
  61. if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
  62. registryURL = url; // 用了最后一个registry url,也就是最后一个注册中心
  63. }
  64. }
  65. //多注册中心时先对注册中心进行路由,然后再对注册中心每个服务器进行路由
  66. if (registryURL != null) { // 有 注册中心协议的URL
  67. // 对有注册中心的Cluster 只用 AvailableCluster
  68. URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
  69. invoker = cluster.join(new StaticDirectory(u, invokers));
  70. } else { // 不是 注册中心的URL
  71. invoker = cluster.join(new StaticDirectory(invokers));
  72. }
  73. }
  74. }
  75. Boolean c = check;
  76. if (c == null && consumer != null) {
  77. c = consumer.isCheck();
  78. }
  79. if (c == null) {
  80. c = true; // default true
  81. }
  82. if (c && ! invoker.isAvailable()) {
  83. throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
  84. }
  85. if (logger.isInfoEnabled()) {
  86. logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
  87. }
  88. // 创建服务代理
  89. return (T) proxyFactory.getProxy(invoker);
  90. }

在createProxy()的代码中,首先是判断是否可以进行本地调用,因为本地调用要比远程调用效率高很多,所以如果可以进行本地调用就优先进行本地调用。如果是非本地调用,就根据注册中心生成invoker对象(可执行体可以发起invoke调用),每个注册中心生成一个invoker对象,如果是多个注册中心,就对生成的invokers做路由(只适用AvailableCluster)最后选出一个invoker创建代理。这里我们要详细看的代码是invoker的生成过程(refprotocol.refer(interfaceClass, urls.get(0));)和通过invoker创建代理的过程((T) proxyFactory.getProxy(invoker);)。Protocol接口有多个实现类支持多种协议这里我们就拿dubbo默认协议为例来看。

Protocol中ref()方法执行顺序:

ProtocolFilterWrapper.refer()-->ProtocolListenerWrapper.refer()-->RegistryProtocol.ref()这个过程进行客户端到zookeeper的注册和订阅,然后还会执行到ProtocolFilterWrapper.refer()-->ProtocolListenerWrapper.refer()-->DubboProtocol.refer()在这个过程中会进行invoker过滤器的创建最后包装成InvokerChain调用链。这里我们暂时先看DubboProtocol中的实现。

  1. //引用远程服务
  2. public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
  3. // create rpc invoker.
  4. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
  5. invokers.add(invoker);
  6. return invoker;
  7. }
  8. //获取链接客户端
  9. private ExchangeClient[] getClients(URL url){
  10. //是否共享连接
  11. boolean service_share_connect = false;
  12. int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
  13. //如果connections不配置,则共享连接,否则每服务每连接
  14. if (connections == 0){
  15. service_share_connect = true;
  16. connections = 1;
  17. }
  18. ExchangeClient[] clients = new ExchangeClient[connections];
  19. for (int i = 0; i < clients.length; i++) {
  20. if (service_share_connect){
  21. clients[i] = getSharedClient(url);
  22. } else {
  23. clients[i] = initClient(url);//如果不共享链接,会每次都创建一个新的client
  24. }
  25. }
  26. return clients;
  27. }
  28. /**
  29. *获取共享连接
  30. */
  31. private ExchangeClient getSharedClient(URL url){
  32. String key = url.getAddress();
  33. ReferenceCountExchangeClient client = referenceClientMap.get(key);//map中获取
  34. if ( client != null ){
  35. if ( !client.isClosed()){
  36. client.incrementAndGetCount();
  37. return client;
  38. } else {
  39. // logger.warn(new IllegalStateException("client is closed,but stay in clientmap .client :"+ client));
  40. referenceClientMap.remove(key);//已经关闭的链接从map中删除
  41. }
  42. }
  43. ExchangeClient exchagneclient = initClient(url);//如果从map中获取的链接为null,则创建新链接
  44. //ReferenceCountExchangeClient是exchagneclient的包装类,包装了一个计数器
  45. client = new ReferenceCountExchangeClient(exchagneclient, ghostClientMap);
  46. referenceClientMap.put(key, client);//新建之后都会存入map
  47. ghostClientMap.remove(key);
  48. return client;
  49. }
  50. /**
  51. * 创建新连接.
  52. */
  53. private ExchangeClient initClient(URL url) {
  54. // client type setting.
  55. //client属性值,为空时获取server值,如果server也是空就是用默认netty
  56. String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
  57. String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
  58. //兼容1.0版本codec
  59. boolean compatible = (version != null && version.startsWith("1.0."));
  60. url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
  61. //默认开启heartbeat,60s
  62. url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
  63. // BIO存在严重性能问题,暂时不允许使用
  64. if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
  65. throw new RpcException("Unsupported client type: " + str + "," +
  66. " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
  67. }
  68. ExchangeClient client ;
  69. try {
  70. //设置连接应该是lazy的
  71. if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)){
  72. client = new LazyConnectExchangeClient(url ,requestHandler);
  73. } else {
  74. client = Exchangers.connect(url ,requestHandler);//创建链接,Exchangers工具类里面进行操作的是HeaderExchanger
  75. }
  76. } catch (RemotingException e) {
  77. throw new RpcException("Fail to create remoting client for service(" + url
  78. + "): " + e.getMessage(), e);
  79. }
  80. return client;
  81. }

invoker中包含了链接客户端client。消费者端和生产者端就是通过这个client进行信息交换的。我们深入看一下client的创建过程。Exchangers是一个工具包装类里面执行的是HeaderExchanger的connect()方法

  1. public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
  2. if (url == null) {
  3. throw new IllegalArgumentException("url == null");
  4. }
  5. if (handler == null) {
  6. throw new IllegalArgumentException("handler == null");
  7. }
  8. url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
  9. //HeaderExchanger信息交换层
  10. return getExchanger(url).connect(url, handler);
  11. }

HeaderExchanger的connect()方法

  1. public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
  2. //DecodeHandler编码解码处理器,
  3. //Transporters是工具包装类,里面真正执行connect操作的是NettyTransporter
  4. return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
  5. }

Transporters也是一个工具包装类里面执行的是NettyTransporter中connect()方法

  1. public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
  2. if (url == null) {
  3. throw new IllegalArgumentException("url == null");
  4. }
  5. ChannelHandler handler;
  6. if (handlers == null || handlers.length == 0) {
  7. handler = new ChannelHandlerAdapter();
  8. } else if (handlers.length == 1) {
  9. handler = handlers[0];
  10. } else {
  11. handler = new ChannelHandlerDispatcher(handlers);
  12. }
  13. //NettyTransporter网络传输层
  14. return getTransporter().connect(url, handler);
  15. }

NettyTransporter的connect()方法

  1. public Client connect(URL url, ChannelHandler listener) throws RemotingException {
  2. //创建nettyClient客户端
  3. return new NettyClient(url, listener);
  4. }

NettyClient中直接调用父类的构造方法,在父类构造方法中又调用了子类中实现的doOpen()方法和doConnect()方法。

  1. protected void doOpen() throws Throwable {
  2. NettyHelper.setNettyLoggerFactory();
  3. bootstrap = new ClientBootstrap(channelFactory);
  4. // config
  5. // @see org.jboss.netty.channel.socket.SocketChannelConfig
  6. bootstrap.setOption("keepAlive", true);
  7. bootstrap.setOption("tcpNoDelay", true);
  8. bootstrap.setOption("connectTimeoutMillis", getTimeout());
  9. final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
  10. bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
  11. public ChannelPipeline getPipeline() {
  12. NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
  13. ChannelPipeline pipeline = Channels.pipeline();
  14. pipeline.addLast("decoder", adapter.getDecoder());
  15. pipeline.addLast("encoder", adapter.getEncoder());
  16. pipeline.addLast("handler", nettyHandler);
  17. return pipeline;
  18. }
  19. });
  20. }
  21. protected void doConnect() throws Throwable {
  22. long start = System.currentTimeMillis();
  23. ChannelFuture future = bootstrap.connect(getConnectAddress());
  24. try{
  25. boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
  26. if (ret && future.isSuccess()) {
  27. Channel newChannel = future.getChannel();
  28. newChannel.setInterestOps(Channel.OP_READ_WRITE);
  29. try {
  30. // 关闭旧的连接
  31. Channel oldChannel = NettyClient.this.channel; // copy reference
  32. if (oldChannel != null) {
  33. try {
  34. if (logger.isInfoEnabled()) {
  35. logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
  36. }
  37. oldChannel.close();
  38. } finally {
  39. NettyChannel.removeChannelIfDisconnected(oldChannel);
  40. }
  41. }
  42. } finally {
  43. if (NettyClient.this.isClosed()) {
  44. try {
  45. if (logger.isInfoEnabled()) {
  46. logger.info("Close new netty channel " + newChannel + ", because the client closed.");
  47. }
  48. newChannel.close();
  49. } finally {
  50. NettyClient.this.channel = null;
  51. NettyChannel.removeChannelIfDisconnected(newChannel);
  52. }
  53. } else {
  54. NettyClient.this.channel = newChannel;
  55. }
  56. }
  57. } else if (future.getCause() != null) {
  58. throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
  59. + getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause());
  60. } else {
  61. throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
  62. + getRemoteAddress() + " client-side timeout "
  63. + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
  64. + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
  65. }
  66. }finally{
  67. if (! isConnected()) {
  68. future.cancel();
  69. }
  70. }
  71. }

 

转载于:https://my.oschina.net/u/3100849/blog/855462

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/165826
推荐阅读
相关标签
  

闽ICP备14008679号