赞
踩
前面一篇讲了服务调用方启动的大致流程
本章主要讲refer服务引用,分成两个重点,一个是创建invoker,一个是创建代理。
入口:ReferenceConfig类的createProxy方法
1.关于connection的问题
如果connections不配置,则共享连接,否则每服务每连接,
共享连接的意思是对于同一个ip+port的所有服务只创建一个连接,
如果是非共享连接则每个服务+(ip+port)创建一个连接
该方法里面有一句
invoker = refprotocol.refer(interfaceClass, urls.get(0));
这一句是本文的入口
这里的 invoker = refprotocol.refer(interfaceClass, urls.get(0));就是通过dubbo的SPI机制,跟进去的调用流程是:
ProtocolListenerWrapper.refer方法 --------> ProtocolFilterWrapper.refer --------> QosProtocolWrapper.refer --------> QosProtocolWrapper.startQosServer(url); --------> RegistryProtocol.refer --------> RegistryProtocol.doRefer
--------> MockClusterWrapper.join ------> FailoverCluster.join ------> ProviderConsumerRegTable.registerConsumer
- @Override
- public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
- if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
- startQosServer(url);
- return protocol.refer(type, url);
- }
- return protocol.refer(type, url);
- }
这个startQosServer方法跟进去
- private void startQosServer(URL url) {
- try {
- boolean qosEnable = url.getParameter(QOS_ENABLE,true);
- if (!qosEnable) {
- logger.info("qos won't be started because it is disabled. " +
- "Please check dubbo.application.qos.enable is configured either in system property, " +
- "dubbo.properties or XML/spring boot configuration.");
- return;
- }
-
- if (!hasStarted.compareAndSet(false, true)) {
- return;
- }
-
- int port = url.getParameter(QOS_PORT, DEFAULT_PORT);
- boolean acceptForeignIp = Boolean.parseBoolean(url.getParameter(ACCEPT_FOREIGN_IP,"false"));
- Server server = com.alibaba.dubbo.qos.server.Server.getInstance();
- server.setPort(port);
- server.setAcceptForeignIp(acceptForeignIp);
- server.start();
-
- } catch (Throwable throwable) {
- logger.warn("Fail to start qos server: ", throwable);
- }
- }
做了几件事:
开始QOS服务
- @Override
- @SuppressWarnings("unchecked")
- public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
-
- // 取 registry 参数值,并将其设置为协议头
- url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
-
- // 获取注册中心实例
- Registry registry = registryFactory.getRegistry(url);
- if (RegistryService.class.equals(type)) {
-
- //debug的时候没走这里
- return proxyFactory.getInvoker((T) registry, type, url);
- }
-
- // group="a,b" or group="*"
-
- // 将 url 查询字符串转为 Map
- Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
-
- // 获取 group 配置
- String group = qs.get(Constants.GROUP_KEY);
- if (group != null && group.length() > 0) {
-
- // debug的时候没走这里
- if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
- || "*".equals(group)) {
-
- // 通过 SPI 加载 Cluster 实例,并调用 doRefer 继续执行服务引用逻辑
- return doRefer(getMergeableCluster(), registry, type, url);
- }
- }
-
- // 调用 doRefer 继续执行服务引用逻辑
- return doRefer(cluster, registry, type, url);
- }
这里的url=zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-annotation-consumer&dubbo=2.0.2&pid=9044&qos.port=33333&refer=application%3Ddubbo-annotation-consumer%26default.timeout%3D3000%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.samples.api.client.HelloService%26methods%3DsayHello%26pid%3D9044%26qos.port%3D33333%26register.ip%3DXXX.XXX.XXX.XXX%26side%3Dconsumer%26timestamp%3D1603880645072×tamp=1603880668326
里面的registryFactory.getRegistry(url);用到了dubbo的SPI,这里得到的registry是ZookeeperRegistry
- private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
- // 创建 RegistryDirectory 实例
- RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
-
- // 设置注册中心和协议
- directory.setRegistry(registry);
- directory.setProtocol(protocol);
- // all attributes of REFER_KEY
- Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
-
- // 生成服务消费者链接
- URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
-
- // 注册服务消费者,在 consumers 目录下新节点
- if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
- && url.getParameter(Constants.REGISTER_KEY, true)) {
- URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
- registry.register(registeredConsumerUrl);
- directory.setRegisteredConsumerUrl(registeredConsumerUrl);
- }
-
- // 订阅 providers、configurators、routers 等节点数据
- directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
- Constants.PROVIDERS_CATEGORY
- + "," + Constants.CONFIGURATORS_CATEGORY
- + "," + Constants.ROUTERS_CATEGORY));
-
- // 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个
- Invoker invoker = cluster.join(directory);
- ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
- return invoker;
- }
做了几件事
背景例子:
消费者服务所在的项目叫:dubbo-consumer
里面就一个类ProductServiceImpl一个方法addCostByItemId,需要去调用dubbo-provider项目中的itemService服务和costService服务
服务提供者所在的项目叫:dubbo-provider
里面有2个服务itemService和costService
现在debug是搞的消费者服务
当ProductServiceImpl实例化过后,依赖注入成员的时候会走到这里来,第一次来是注入costService
1.创建 RegistryDirectory 实例。
(这个是服务目录。服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。类似于注册中心在消费者端本地的缓存,不知道dubbo这个是不是从注册中心拉下来的。后面会专门开一篇来讲这个)
2.设置注册中心和协议
3.URL subscribeUrl
生成服务消费者链接
第一次进来是是注入costService出发的,此时subscribeUrl=consumer://XXX.XXX.XXX.XX/com.sid.api.service.CostService?application=dubbo-consumer&default.check=false&default.timeout=3000&dubbo=2.6.2&interface=com.sid.api.service.CostService&loadbalance=roundrobin&methods=add,subtract&pid=10116&revision=1.0.0&side=consumer×tamp=1620626010244&version=1.0.0
4.registry.register方法
注册服务消费者,在 consumers 目录下新节点
5.订阅服务directory.subscribe方法
订阅 providers、configurators、routers 等节点数据
下面三、服务订阅详细讲
6.由于一个服务可能部署在多台服务器上,这样就会在 providers 产生多个节点,这个时候就需要 Cluster 将多个服务节点合并为一个,并生成一个 Invoker
RegistryDirectory.subscribe方法
- public void subscribe(URL url) {
- setConsumerUrl(url);
- registry.subscribe(url, this);
- }
这里面调用的是FailbackRegistry.subscribe
- public void subscribe(URL url, NotifyListener listener) {
- super.subscribe(url, listener);
- removeFailedSubscribed(url, listener);
- try {
- // 向服务器端发送订阅请求
- // Sending a subscription request to the server side
- doSubscribe(url, listener);
- } catch (Exception e) {
- Throwable t = e;
-
- List<URL> urls = getCacheUrls(url);
- if (urls != null && !urls.isEmpty()) {
- notify(url, listener, urls);
- logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
- } else {
- // 如果开启了启动时检测,则直接抛出异常
- // If the startup detection is opened, the Exception is thrown directly.
- boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
- && url.getParameter(Constants.CHECK_KEY, true);
- boolean skipFailback = t instanceof SkipFailbackWrapperException;
- if (check || skipFailback) {
- if (skipFailback) {
- t = t.getCause();
- }
- throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
- } else {
- logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
- }
- }
-
- // 将失败的订阅请求记录到失败列表,定时重试
- // Record a failed registration request to a failed list, retry regularly
- addFailedSubscribed(url, listener);
- }
- }
主要做了几件事:
1.ZookeeperRegistry.doSubscribe方法
向服务器端发送订阅请求
2.doSubscribe方法如果抛出异常,则
notify()或者 如果开启了启动时检测,则直接抛出异常
将失败的订阅请求记录到失败列表,定时重试addFailedSubscribed(url, listener);
- @Override
- protected void doSubscribe(final URL url, final NotifyListener listener) {
- try {
-
- //如果provider的service的接口配置的是“*”
- if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
- //获取服务分组根路径
- String root = toRootPath();
- //获取服务的NotifyListener
- ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
- if (listeners == null) {
- //如果没有则创建一个
- zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
- listeners = zkListeners.get(url);
- }
- ChildListener zkListener = listeners.get(listener);
- //如果没有子监听器则创建一个
- if (zkListener == null) {
- listeners.putIfAbsent(listener, new ChildListener() {
- @Override
- public void childChanged(String parentPath, List<String> currentChilds) {
- for (String child : currentChilds) {
- child = URL.decode(child);
- if (!anyServices.contains(child)) {
- anyServices.add(child);
- subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
- Constants.CHECK_KEY, String.valueOf(false)), listener);
- }
- }
- }
- });
- zkListener = listeners.get(listener);
- }
- //向服务器订阅服务,注册中心会调用NotifyListener的notify函数返回服务列表
- zkClient.create(root, false);
- //获取服务地址列表
- List<String> services = zkClient.addChildListener(root, zkListener);
- if (services != null && !services.isEmpty()) {
- //如果存在服务
- for (String service : services) {
- service = URL.decode(service);
- anyServices.add(service);
-
- //如果serviceInterface是“*”则从分组根路径遍历service并订阅所有服务
- subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
- Constants.CHECK_KEY, String.valueOf(false)), listener);
- }
- }
- } else {
-
- //如果serviceInterface不是“*”则创建Zookeeper客户端索取服务列表,并通知(notify)消费者(consumer)这些服务可以用了
- List<URL> urls = new ArrayList<URL>();
-
- //获取类似于http://xxx.xxx.xxx.xxx/context/com.service.xxxService/consumer的地址
- for (String path : toCategoriesPath(url)) {
-
- //获取例如com.service.xxxService对应的NotifyListener map
- ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
- if (listeners == null) {
- zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
- listeners = zkListeners.get(url);
- }
- //获取ChildListener
- ChildListener zkListener = listeners.get(listener);
- if (zkListener == null) {
- listeners.putIfAbsent(listener, new ChildListener() {
- @Override
- public void childChanged(String parentPath, List<String> currentChilds) {
- ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
- }
- });
- zkListener = listeners.get(listener);
- }
- //创建Zookeeper客户端
- zkClient.create(path, false);
- List<String> children = zkClient.addChildListener(path, zkListener);
- if (children != null) {
- urls.addAll(toUrlsWithEmpty(url, path, children));
- }
- }
-
- //提醒消费者
- notify(url, listener, urls);
- }
- } catch (Throwable e) {
- throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
- }
- }
主要做了几件事
1.在zk中添加节点和节点监听器
2.notify通知消费者去创建跟服务提供者之间的TCP连接
里面的调用链需关注的是
这个里面是其实通知registryDirectory刷新invoker,因为这里是第一次服务调用方第一期启动,所以实际上是在创建invoker、通过NETTY与服务提供方建立TCP长链接。
可以看到通过RegistryDirectory.toInvokers方法触发了Protocol@Adaptive.refer()-------->QosProtocolWrapper.refer()-------->ProtocolFilterWrapper.refer()-------->ProtocolListenerWrapper.refer()-------->DubboProtocol.refer()
DubboProtocol.refer()方法源码:
- @Override
- public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
- optimizeSerialization(url);
- // create rpc invoker.
- DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
- invokers.add(invoker);
- return invoker;
- }
主要做了几件事:
1.创建RPC用的invoker并且返回
这里创建Invoker不复杂,就是一个new,但是重点关注一下getClients(url)方法,该方法会用netty去建立跟服务提供方的TCP链接。
在四、创建Invoker中详细讲
创建invoker的时候用netty去建立跟服务提供方的TCP链接
入口
DubboProtocol.refer()方法中的
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
- private ExchangeClient[] getClients(URL url) {
- // whether to share connection
- //是否共享连接
- boolean service_share_connect = false;
- int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
- // if not configured, connection is shared, otherwise, one connection for one service
- // 如果connections不配置,则共享连接,否则每服务每连接,
- // 共享连接的意思是对于同一个ip+port的所有服务只创建一个连接,
- // 如果是非共享连接则每个服务+(ip+port)创建一个连接
- if (connections == 0) {
- service_share_connect = true;
- connections = 1;
- }
-
- ExchangeClient[] clients = new ExchangeClient[connections];
- for (int i = 0; i < clients.length; i++) {
- if (service_share_connect) {
- clients[i] = getSharedClient(url);
- } else {
- clients[i] = initClient(url);
- }
- }
- return clients;
- }
主要做了几件事
1.获取交换客户端ExchangeClient,默认使用共享链接
这里注意下
如果connections不配置,则共享连接,否则每服务每连接,
共享连接的意思是对于同一个ip+port的所有服务只创建一个连接,
如果是非共享连接则每个服务+(ip+port)创建一个连接
- private ExchangeClient getSharedClient(URL url) {
- // 以address(ip:port)为key进行缓存
- String key = url.getAddress();
- ReferenceCountExchangeClient client = referenceClientMap.get(key);
- if (client != null) {
- // 如果连接存在了则引用数加1,引用数表示有多少个服务使用了此client,
- // 当某个client调用close()时,引用数减一,
- // 如果引用数大于0,表示还有服务在使用此连接, 不会真正关闭client
- // 如果引用数为0,表示没有服务在用此连接,此时连接彻底关闭
- if (!client.isClosed()) {
- client.incrementAndGetCount();
- return client;
- } else {
- referenceClientMap.remove(key);
- }
- }
- synchronized (key.intern()) {
- // 调用initClient来初始化Client
- ExchangeClient exchangeClient = initClient(url);
- // 使用ReferenceCountExchangeClient进行包装
- client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
- referenceClientMap.put(key, client);
- ghostClientMap.remove(key);
- return client;
- }
- }
主要做了几件事
1.以address(ip:port)为key进行缓存
如果连接存在了则引用数加1,引用数表示有多少个服务使用了此client,
当某个client调用close()时,引用数减一,
如果引用数大于0,表示还有服务在使用此连接, 不会真正关闭client
如果引用数为0,表示没有服务在用此连接,此时连接彻底关闭2.根据URL初始化exchangeClient
3.得到referenceCountExchangeClient
4.把这些client保存到MAP中
- private ExchangeClient initClient(URL url) {
-
- // 获取client参数的值,为空则获取server参数的值,默认为netty
- // client type setting.
- String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
-
- // 加入codec参数,默认为dubbo,即DubboCodec
- url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
-
- //默认开启心跳,默认每60s发送一次心跳包
- // enable heartbeat by default
- url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
-
- // BIO存在严重性能问题,暂时不允许使用
- // BIO is not allowed since it has severe performance issue.
- if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
- throw new RpcException("Unsupported client type: " + str + "," +
- " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
- }
-
- ExchangeClient client;
- try {
- // connection should be lazy
- if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
- client = new LazyConnectExchangeClient(url, requestHandler);
- } else {
- client = Exchangers.connect(url, requestHandler);
- }
- } catch (RemotingException e) {
- throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
- }
- return client;
- }
主要做了几件事
1.设置client的类型,默认是netty,设置codec,默认是dubboCodec,默认开启心跳60S发一次包
2.如果Connection不是懒加载,那就立马创建跟服务提供方的connection
Exchangers.connect(url, requestHandler)-------->getExchanger(url).connect(url, handler);-------->HeaderExchanger.connect()-------->Transporters.connect()-------->NettyTransporter.connect()-------->NettyClient构造函数-------->AbstractClient构造函数
- public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
- super(url, handler);
-
- send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
-
- shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
-
- // The default reconnection interval is 2s, 1800 means warning interval is 1 hour.
- reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);
-
- try {
- doOpen();
- } catch (Throwable t) {
- close();
- throw new RemotingException(url.toInetSocketAddress(), null,
- "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
- + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
- }
- try {
- // connect.
- connect();
- if (logger.isInfoEnabled()) {
- logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
- }
- } catch (RemotingException t) {
- if (url.getParameter(Constants.CHECK_KEY, true)) {
- close();
- throw t;
- } else {
- logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
- + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
- }
- } catch (Throwable t) {
- close();
- throw new RemotingException(url.toInetSocketAddress(), null,
- "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
- + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
- }
-
- executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
- .getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
- ExtensionLoader.getExtensionLoader(DataStore.class)
- .getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
- }
这部分看一下:https://blog.csdn.net/youaremoon/article/details/50826649
创建代理的入口在ReferenceConfig类的createProxy方法中有一句
return (T) proxyFactory.getProxy(invoker);
这里的proxyFactory通过dubbo的SPI得到
- package com.alibaba.dubbo.rpc;
-
- import com.alibaba.dubbo.common.Constants;
- import com.alibaba.dubbo.common.URL;
- import com.alibaba.dubbo.common.extension.Adaptive;
- import com.alibaba.dubbo.common.extension.SPI;
-
- /**
- * ProxyFactory. (API/SPI, Singleton, ThreadSafe)
- */
- @SPI("javassist")
- public interface ProxyFactory {
-
- @Adaptive({Constants.PROXY_KEY})
- <T> T getProxy(Invoker<T> invoker) throws RpcException;
-
- @Adaptive({Constants.PROXY_KEY})
- <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;
-
- @Adaptive({Constants.PROXY_KEY})
- <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
- }
调用了StubProxyFactoryWrapper的getProxy方法
- @Override
- @SuppressWarnings({"unchecked", "rawtypes"})
- public <T> T getProxy(Invoker<T> invoker) throws RpcException {
- T proxy = proxyFactory.getProxy(invoker);
- if (GenericService.class != invoker.getInterface()) {
- String stub = invoker.getUrl().getParameter(Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY));
- if (ConfigUtils.isNotEmpty(stub)) {
- Class<?> serviceType = invoker.getInterface();
- if (ConfigUtils.isDefault(stub)) {
- if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) {
- stub = serviceType.getName() + "Stub";
- } else {
- stub = serviceType.getName() + "Local";
- }
- }
- try {
- Class<?> stubClass = ReflectUtils.forName(stub);
- if (!serviceType.isAssignableFrom(stubClass)) {
- throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + serviceType.getName());
- }
- try {
- Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);
- proxy = (T) constructor.newInstance(new Object[]{proxy});
- //export stub service
- URL url = invoker.getUrl();
- if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)) {
- url = url.addParameter(Constants.STUB_EVENT_METHODS_KEY, StringUtils.join(Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ","));
- url = url.addParameter(Constants.IS_SERVER_KEY, Boolean.FALSE.toString());
- try {
- export(proxy, (Class) invoker.getInterface(), url);
- } catch (Exception e) {
- LOGGER.error("export a stub service error.", e);
- }
- }
- } catch (NoSuchMethodException e) {
- throw new IllegalStateException("No such constructor \"public " + stubClass.getSimpleName() + "(" + serviceType.getName() + ")\" in stub implementation class " + stubClass.getName(), e);
- }
- } catch (Throwable t) {
- LOGGER.error("Failed to create stub implementation class " + stub + " in consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", cause: " + t.getMessage(), t);
- // ignore
- }
- }
- }
- return proxy;
- }
AbstractProxyFactory.getProxy方法
- package com.alibaba.dubbo.rpc.proxy;
-
- import com.alibaba.dubbo.common.Constants;
- import com.alibaba.dubbo.common.utils.ReflectUtils;
- import com.alibaba.dubbo.rpc.Invoker;
- import com.alibaba.dubbo.rpc.ProxyFactory;
- import com.alibaba.dubbo.rpc.RpcException;
- import com.alibaba.dubbo.rpc.service.EchoService;
- import com.alibaba.dubbo.rpc.service.GenericService;
-
- /**
- * AbstractProxyFactory
- */
- public abstract class AbstractProxyFactory implements ProxyFactory {
-
- @Override
- public <T> T getProxy(Invoker<T> invoker) throws RpcException {
- return getProxy(invoker, false);
- }
-
- @Override
- public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
- Class<?>[] interfaces = null;
-
- // 获取接口列表
- String config = invoker.getUrl().getParameter("interfaces");
-
- // debug的时候这里没进来
- if (config != null && config.length() > 0) {
-
- // 切分接口列表
- String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
- if (types != null && types.length > 0) {
- interfaces = new Class<?>[types.length + 2];
-
- // 设置服务接口类和 EchoService.class 到 interfaces 中
- interfaces[0] = invoker.getInterface();
- interfaces[1] = EchoService.class;
- for (int i = 0; i < types.length; i++) {
-
- // 加载接口类
- interfaces[i + 1] = ReflectUtils.forName(types[i]);
- }
- }
- }
- if (interfaces == null) {
- interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
- }
-
- // 为 http 和 hessian 协议提供泛化调用支持
- // debug的时候这里没进来
- if (!invoker.getInterface().equals(GenericService.class) && generic) {
- int len = interfaces.length;
- Class<?>[] temp = interfaces;
-
- // 创建新的 interfaces 数组
- interfaces = new Class<?>[len + 1];
- System.arraycopy(temp, 0, interfaces, 0, len);
-
- // 设置 GenericService.class 到数组中
- interfaces[len] = GenericService.class;
- }
-
- // 调用重载方法
- return getProxy(invoker, interfaces);
- }
-
- public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
-
- }
做了几件事:
1.获取接口列表、切分接口列表、设置服务接口类和 EchoService.class 到 interfaces 中、加载接口类
2.为 http 和 hessian 协议提供泛化调用支持
3.调用子类重载方法
JavassistProxyFactory.getProxy
- public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
- return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
- }
做了几件事
1.通过 Proxy 的 getProxy 方法获取 Proxy 子类
2.创建 InvokerInvocationHandler 对象,并将该对象传给 newInstance 生成 Proxy 实例。
InvokerInvocationHandler 实现 JDK 的 InvocationHandler 接口,具体的用途是拦截接口类调用。
Proxy.getProxy方法
- public static Proxy getProxy(Class<?>... ics) {
- return getProxy(ClassHelper.getClassLoader(Proxy.class), ics);
- }
- public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
- if (ics.length > 65535)
- throw new IllegalArgumentException("interface limit exceeded");
-
- StringBuilder sb = new StringBuilder();
-
- // 遍历接口列表
- // 这里debug的时候接口列表有:HelloService和EchoService
- for (int i = 0; i < ics.length; i++) {
- String itf = ics[i].getName();
-
- // 检测类型是否为接口
- if (!ics[i].isInterface())
- throw new RuntimeException(itf + " is not a interface.");
-
- Class<?> tmp = null;
- try {
-
- // 重新加载接口类
- tmp = Class.forName(itf, false, cl);
- } catch (ClassNotFoundException e) {
- }
-
- // 检测接口是否相同,这里 tmp 有可能为空
- if (tmp != ics[i])
- throw new IllegalArgumentException(ics[i] + " is not visible from class loader");
-
- // 拼接接口全限定名,分隔符为 ;
- sb.append(itf).append(';');
- }
-
- // 使用拼接后的接口名作为 key
- //debug的时候key=org.apache.dubbo.samples.api.client.HelloService;com.alibaba.dubbo.rpc.service.EchoService;
- // use interface class name list as key.
- String key = sb.toString();
-
- // get cache by class loader.
- Map<String, Object> cache;
- synchronized (ProxyCacheMap) {
- cache = ProxyCacheMap.get(cl);
- if (cache == null) {
- cache = new HashMap<String, Object>();
- ProxyCacheMap.put(cl, cache);
- }
- }
-
- Proxy proxy = null;
- synchronized (cache) {
- do {
-
- // 从缓存中获取 Reference<Proxy> 实例
- Object value = cache.get(key);
- if (value instanceof Reference<?>) {
- proxy = (Proxy) ((Reference<?>) value).get();
- if (proxy != null)
- return proxy;
- }
-
- // 并发控制,保证只有一个线程可以进行后续操作
- if (value == PendingGenerationMarker) {
- try {
- // 其他线程在此处进行等待
- cache.wait();
- } catch (InterruptedException e) {
- }
- } else {
-
- // 放置标志位到缓存中,并跳出 while 循环进行后续操作
- cache.put(key, PendingGenerationMarker);
- break;
- }
- }
- while (true);
- }
-
- long id = PROXY_CLASS_COUNTER.getAndIncrement();
- String pkg = null;
- ClassGenerator ccp = null, ccm = null;
- try {
-
- // 创建 ClassGenerator 对象
- ccp = ClassGenerator.newInstance(cl);
-
- Set<String> worked = new HashSet<String>();
- List<Method> methods = new ArrayList<Method>();
-
- for (int i = 0; i < ics.length; i++) {
-
- // 检测接口访问级别是否为 protected 或 privete
- if (!Modifier.isPublic(ics[i].getModifiers())) {
-
- // 获取接口包名
- String npkg = ics[i].getPackage().getName();
- if (pkg == null) {
- pkg = npkg;
- } else {
- if (!pkg.equals(npkg))
-
- // 非 public 级别的接口必须在同一个包下,否者抛出异常
- throw new IllegalArgumentException("non-public interfaces from different packages");
- }
- }
-
- // 添加接口到 ClassGenerator 中
- ccp.addInterface(ics[i]);
-
- // 遍历接口方法
- for (Method method : ics[i].getMethods()) {
-
- // 获取方法描述,可理解为方法签名
- String desc = ReflectUtils.getDesc(method);
-
- // 如果方法描述字符串已在 worked 中,则忽略。考虑这种情况,
- // A 接口和 B 接口中包含一个完全相同的方法
- if (worked.contains(desc))
- continue;
- worked.add(desc);
-
- int ix = methods.size();
-
- // 获取方法返回值类型
- Class<?> rt = method.getReturnType();
-
- // 获取参数列表
- Class<?>[] pts = method.getParameterTypes();
-
- // 生成 Object[] args = new Object[1...N]
- StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
- for (int j = 0; j < pts.length; j++)
-
- // 生成 args[1...N] = ($w)$1...N;
- code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
-
-
- // 生成 InvokerHandler 接口的 invoker 方法调用语句
- code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
-
- // 返回值不为 void
- if (!Void.TYPE.equals(rt))
-
- // 生成返回语句,形如 return (java.lang.String) ret;
- code.append(" return ").append(asArgument(rt, "ret")).append(";");
-
- methods.add(method);
-
- // 添加方法名、访问控制符、参数列表、方法代码等信息到 ClassGenerator 中
- ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
- }
- }
-
- if (pkg == null)
- pkg = PACKAGE_NAME;
-
- // create ProxyInstance class.
-
- // 构建接口代理类名称:pkg + ".proxy" + id,比如 org.apache.dubbo.proxy0
- String pcn = pkg + ".proxy" + id;
- ccp.setClassName(pcn);
- ccp.addField("public static java.lang.reflect.Method[] methods;");
-
- // 生成 private java.lang.reflect.InvocationHandler handler;
- ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
-
- // 为接口代理类添加带有 InvocationHandler 参数的构造方法,比如:
- // porxy0(java.lang.reflect.InvocationHandler arg0) {
- // handler=$1;
- // }
- ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
-
- // 为接口代理类添加默认构造方法
- ccp.addDefaultConstructor();
-
- // 生成接口代理类
- Class<?> clazz = ccp.toClass();
- clazz.getField("methods").set(null, methods.toArray(new Method[0]));
-
-
- // 构建 Proxy 子类名称,比如 Proxy1,Proxy2 等
- // create Proxy class.
- String fcn = Proxy.class.getName() + id;
- ccm = ClassGenerator.newInstance(cl);
- ccm.setClassName(fcn);
- ccm.addDefaultConstructor();
- ccm.setSuperClass(Proxy.class);
-
- // 为 Proxy 的抽象方法 newInstance 生成实现代码,形如:
- // public Object newInstance(java.lang.reflect.InvocationHandler h) {
- // return new org.apache.dubbo.proxy0($1);
- // }
- ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
-
- // 生成 Proxy 实现类
- Class<?> pc = ccm.toClass();
-
- // 通过反射创建 Proxy 实例
- proxy = (Proxy) pc.newInstance();
- } catch (RuntimeException e) {
- throw e;
- } catch (Exception e) {
- throw new RuntimeException(e.getMessage(), e);
- } finally {
- // release ClassGenerator
- if (ccp != null)
-
- // 释放资源
- ccp.release();
- if (ccm != null)
- ccm.release();
- synchronized (cache) {
- if (proxy == null)
- cache.remove(key);
- else
-
- // 写缓存
- cache.put(key, new WeakReference<Proxy>(proxy));
-
- // 唤醒其他等待线程
- cache.notifyAll();
- }
- }
- return proxy;
- }
做了几件事:
1.遍历接口列表,重加载接口类
这里debug的时候接口列表有:HelloService和EchoService
2.ClassGenerator ccp 用于为服务接口生成代理类,比如测试代码中的 HelloService接口,这个接口代理类就是由 ccp 生成的。
3.ClassGenerator ccm 则是用于为 org.apache.dubbo.common.bytecode.Proxy 抽象类生成子类,主要是实现 Proxy 类的抽象方法。
找到HelloService接口生成的代理类的.class文件,看一下里面有些什么
1.创建文件hsdb.bat 文件内容如下:
java -classpath "E:\Java\jdk1.8.0_221\lib\sa-jdi.jar" sun.jvm.hotspot.HSDB
2.双击这个文件
3.输入进程ID
4.点击Tools---->Class Browser
5.搜索代理的名字,这个名字是在代码里面打断点看到的。然后点击create .class for this class ,生成的文件会在前面启动HSDB的那个目录下
6.用idea打开看proxy0.class
- package com.alibaba.dubbo.common.bytecode;
-
- import com.alibaba.dubbo.common.bytecode.ClassGenerator.DC;
- import com.alibaba.dubbo.rpc.service.EchoService;
- import java.lang.reflect.InvocationHandler;
- import java.lang.reflect.Method;
- import org.apache.dubbo.samples.api.client.HelloService;
-
- public class proxy0 implements DC, HelloService, EchoService {
- public static Method[] methods;
- private InvocationHandler handler;
-
- public proxy0(InvocationHandler var1) {
- this.handler = var1;
- }
-
- public proxy0() {
- }
-
- public String sayHello(String var1) {
- Object[] var2 = new Object[]{var1};
- Object var3 = this.handler.invoke(this, methods[0], var2);
- return (String)var3;
- }
-
- public Object $echo(Object var1) {
- Object[] var2 = new Object[]{var1};
- Object var3 = this.handler.invoke(this, methods[1], var2);
- return (Object)var3;
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。