当前位置:   article > 正文

DangerWind-RPC-framework---二、动态代理

DangerWind-RPC-framework---二、动态代理

       RPC调用需要达到的效果是,远程调用某方法就像本地调用一样,以下列代码为例:

  1. @Component
  2. public class HelloController {
  3. @RpcReference(version = "version1", group = "test1")
  4. private HelloService helloService;
  5. public void test() throws InterruptedException {
  6. // 远程调用,就像本地调用
  7. String hello = this.helloService.hello(new Hello("111", "222"));
  8. Thread.sleep(12000);
  9. for (int i = 0; i < 10; i++) {
  10. System.out.println(helloService.hello(new Hello("111", "222")));
  11. }
  12. }

       调用HelloService的hello方法实际上是向RPC Server端发起远程调用,并且应该像本地调用一样。如Apache Thrift框架,通过配置调用的server端服务信息,即可像本地调用一样发起远程调用。这是如何做到的呢?实际上是使用的动态代理技术。在代理类中与RPC Server端建立连接进行通信。比如HelloService是RPC调用的interface,这样的interface还会有很多,如UserService、TradeService,我们没有办法为每个Service编写实现类并在每个实现类中均编写远程通信的逻辑,这样也失去了作为RPC框架的意义,因此需要使用统一的动态代理类,调用各个Service时实际上是调用生成的代理类,在动态代理类中统一封装远程调用逻辑以实现远程通信。

        代理类的生成与装配在Spring Bean的生命周期中完成,具体步骤计划在BeanPostProcessor的postProcessAfterInitialization中完成,代码如下:

  1. @Override
  2. public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
  3. Class<?> targetClass = bean.getClass();
  4. Field[] declaredFields = targetClass.getDeclaredFields();
  5. for (Field declaredField : declaredFields) {
  6. RpcReference rpcReference = declaredField.getAnnotation(RpcReference.class);
  7. if (rpcReference != null) {
  8. RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder()
  9. .group(rpcReference.group())
  10. .version(rpcReference.version()).build();
  11. RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcClient, rpcServiceConfig);
  12. Object clientProxy = rpcClientProxy.getProxy(declaredField.getType());
  13. declaredField.setAccessible(true);
  14. try {
  15. declaredField.set(bean, clientProxy);
  16. } catch (IllegalAccessException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. }
  21. return bean;
  22. }

       在各个Bean初始化的过程中,都会历经此步骤,重写方法后首先通过反射获取Bean的成员变量,并检查各个成员变量上是否有RpcReference注解,有的话证明是RPC调用相关interface,需要为该接口装配动态代理类,装配过程通过反射完成。

       接下来是动态代理的实现方式以及具体的调用逻辑,代码如下:

  1. @SneakyThrows
  2. @SuppressWarnings("unchecked")
  3. @Override
  4. public Object invoke(Object proxy, Method method, Object[] args) {
  5. log.info("invoked method: [{}]", method.getName());
  6. RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName())
  7. .parameters(args)
  8. .interfaceName(method.getDeclaringClass().getName())
  9. .paramTypes(method.getParameterTypes())
  10. .requestId(UUID.randomUUID().toString())
  11. .group(rpcServiceConfig.getGroup())
  12. .version(rpcServiceConfig.getVersion())
  13. .build();
  14. RpcResponse<Object> rpcResponse = null;
  15. if (rpcRequestTransport instanceof NettyRpcClient) {
  16. // 考虑主线程与NIOEventLoopGroup工作线程间的通信
  17. CompletableFuture<RpcResponse<Object>> completableFuture = (CompletableFuture<RpcResponse<Object>>) rpcRequestTransport.sendRpcRequest(rpcRequest);
  18. rpcResponse = completableFuture.get();
  19. }
  20. if (rpcRequestTransport instanceof SocketRpcClient) {
  21. rpcResponse = (RpcResponse<Object>) rpcRequestTransport.sendRpcRequest(rpcRequest);
  22. }
  23. this.check(rpcResponse, rpcRequest);
  24. return rpcResponse.getData();
  25. }

        RpcClientProxy通过实现InvocationHandler接口来进行动态代理,每次调用接口是实际上调用的是代理类的invoke方法。在该方法中首先封装RPC远程调用的Request信息,如接口名、方法名、参数类型、参数值、请求ID,group与version等信息。

        接下来是发送RPC请求的过程,代码如下:

  1. @Override
  2. public Object sendRpcRequest(RpcRequest rpcRequest) {
  3. // build return value
  4. CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
  5. // get server address
  6. InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest);
  7. // get server address related channel
  8. Channel channel = getChannel(inetSocketAddress);
  9. if (channel.isActive()) {
  10. // put unprocessed request
  11. unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);
  12. RpcMessage rpcMessage = RpcMessage.builder().data(rpcRequest)
  13. .codec(SerializationTypeEnum.HESSIAN.getCode())
  14. .compress(CompressTypeEnum.GZIP.getCode())
  15. .messageType(RpcConstants.REQUEST_TYPE).build();
  16. channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
  17. if (future.isSuccess()) {
  18. log.info("client send message: [{}]", rpcMessage);
  19. } else {
  20. future.channel().close();
  21. // 发送失败,设置异常信息
  22. resultFuture.completeExceptionally(future.cause());
  23. log.error("Send failed:", future.cause());
  24. }
  25. });
  26. } else {
  27. throw new IllegalStateException();
  28. }
  29. return resultFuture;
  30. }

       以Netty框架实现的通信为例,大致过程如下:首先从注册中心拉取服务列表并通过负载均衡算法获取到具体server端的机器,接下来连接对应的IP与端口号获取交换信息的channel。

        之后将对应的future保存到对应的未处理完成请求的集合中(只有一行代码,却是关键的一步),最后设置序列化算法以及压缩方式,将消息发出。

  1. public class UnprocessedRequests {
  2. private static final Map<String, CompletableFuture<RpcResponse<Object>>> UNPROCESSED_RESPONSE_FUTURES = new ConcurrentHashMap<>();
  3. public void put(String requestId, CompletableFuture<RpcResponse<Object>> future) {
  4. UNPROCESSED_RESPONSE_FUTURES.put(requestId, future);
  5. }
  6. public void complete(RpcResponse<Object> rpcResponse) {
  7. CompletableFuture<RpcResponse<Object>> future = UNPROCESSED_RESPONSE_FUTURES.remove(rpcResponse.getRequestId());
  8. if (null != future) {
  9. future.complete(rpcResponse);
  10. } else {
  11. throw new IllegalStateException();
  12. }
  13. }
  14. }

       CompletableFuture的使用是对这一过程的优化,更加优雅地实现了等待请求响应的过程。后续接收到server端响应时,处理线程是EventLoop线程,该线程拿到响应结果后可以为future设置结果,阻塞获取结果的主线程可以获取到结果。事实上,CompletableFuture的get方法也是一个同步阻塞的过程,但是这种获取响应的方式显得更加清晰,很好地实现了线程间通信,如接收到消息后可以调用complete方法设置响应结果,阻塞等待的主线程即可获取到结果。并且当消息发送失败时也可灵活设置异常信息。

       接下来对这一过程中的关键步骤进行介绍,首先是lookupService的过程,代码如下:

  1. @Override
  2. public InetSocketAddress lookupService(RpcRequest rpcRequest) {
  3. String rpcServiceName = rpcRequest.getRpcServiceName();
  4. CuratorFramework zkClient = CuratorUtils.getZkClient();
  5. List<String> serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, rpcServiceName);
  6. if (CollectionUtil.isEmpty(serviceUrlList)) {
  7. throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND, rpcServiceName);
  8. }
  9. // load balancing
  10. String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest);
  11. log.info("Successfully found the service address:[{}]", targetServiceUrl);
  12. String[] socketAddressArray = targetServiceUrl.split(":");
  13. String host = socketAddressArray[0];
  14. int port = Integer.parseInt(socketAddressArray[1]);
  15. return new InetSocketAddress(host, port);
  16. }

       不难看出,首先根据服务名拼接了zk的前缀,从zk上获取了子节点,对应路径下的子节点即为服务列表,之后通过负载均衡算法(后续介绍)从这些机器实例中选择一台实例进行访问。机器的IP地址和端口号也可以获取(注册时的叶子结点带上了对应信息)。

  1. private static final Map<String, List<String>> SERVICE_ADDRESS_MAP = new ConcurrentHashMap<>();
  2. public static List<String> getChildrenNodes(CuratorFramework zkClient, String rpcServiceName) {
  3. if (SERVICE_ADDRESS_MAP.containsKey(rpcServiceName)) {
  4. return SERVICE_ADDRESS_MAP.get(rpcServiceName);
  5. }
  6. List<String> result = null;
  7. String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName;
  8. try {
  9. result = zkClient.getChildren().forPath(servicePath);
  10. SERVICE_ADDRESS_MAP.put(rpcServiceName, result);
  11. registerWatcher(rpcServiceName, zkClient);
  12. } catch (Exception e) {
  13. log.error("get children nodes for path [{}] fail", servicePath);
  14. }
  15. return result;
  16. }

        SERVICE_ADDRESS_MAP起到了本地缓存的作用,第一次访问时缓存中必然不会有数据,需要访问ZK进行获取,将获取到的结果进行缓存。并且需要注意的是,服务端随时可能有机器上下线,缓存列表也需要及时更新,这就需要使用ZK的Watcher机制,Curator框架对其进行了封装,代码如下:       

  1. private static void registerWatcher(String rpcServiceName, CuratorFramework zkClient) throws Exception {
  2. String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName;
  3. PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, servicePath, true);
  4. // Curator框架此处的作用
  5. // 1. 缓存节点到本地,当节点发生变化时,会使用监听机制自动更新框架缓存
  6. // 2. 监听节点变化,当节点发生变化时,会触发监听器,拉取最新的服务列表到本地缓存(CHM)
  7. // 3. ZK原生的Watcher机制触发一次回调后就会失效,框架会再次自动注册Watcher事件
  8. PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildrenCacheEvent) -> {
  9. List<String> serviceAddresses = curatorFramework.getChildren().forPath(servicePath);
  10. SERVICE_ADDRESS_MAP.put(rpcServiceName, serviceAddresses);
  11. };
  12. pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
  13. pathChildrenCache.start();
  14. }

        Curator框架此处的作用如注释所言:首先是缓存节点到本地,当节点发生变化时,会使用监听机制自动更新框架缓存;监听节点变化,当节点发生变化时,会触发监听器,拉取最新的服务列表到本地缓存(ConcurrentHashMap);ZK原生的Watcher机制触发一次回调后就会失效,框架会再次自动注册Watcher事件。

        拿到一台具体的机器后需要和对应的机器建立连接,代码如下:

  1. @SneakyThrows
  2. public Channel doConnect(InetSocketAddress inetSocketAddress) {
  3. CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
  4. //连接操作和回调是在 Netty 的 EventLoop 线程中执行的,而 CompletableFuture.get() 是在调用 doConnect 方法的线程中执行的,因此 CompletableFuture 起到了线程间通信的作用。
  5. bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> {
  6. if (future.isSuccess()) {
  7. log.info("The client has connected [{}] successful!", inetSocketAddress.toString());
  8. completableFuture.complete(future.channel());
  9. } else {
  10. throw new IllegalStateException();
  11. }
  12. });
  13. return completableFuture.get();
  14. }

       这里同样使用了CompletableFuture,起到了线程间通信的作用,可以在与对应机器建立好连接后获取channel连接。将消息封装好后从channel刷出,如此便完成了RPC动态代理需要封装的逻辑,从而实现通过动态代理类屏蔽RPC远程调用细节的需求,并且可以在运行时完成代理类的装配。

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/805433
推荐阅读
相关标签
  

闽ICP备14008679号