赞
踩
RPC调用需要达到的效果是,远程调用某方法就像本地调用一样,以下列代码为例:
- @Component
- public class HelloController {
-
- @RpcReference(version = "version1", group = "test1")
- private HelloService helloService;
-
- public void test() throws InterruptedException {
- // 远程调用,就像本地调用
- String hello = this.helloService.hello(new Hello("111", "222"));
- Thread.sleep(12000);
- for (int i = 0; i < 10; i++) {
- System.out.println(helloService.hello(new Hello("111", "222")));
- }
- }
调用HelloService的hello方法实际上是向RPC Server端发起远程调用,并且应该像本地调用一样。如Apache Thrift框架,通过配置调用的server端服务信息,即可像本地调用一样发起远程调用。这是如何做到的呢?实际上是使用的动态代理技术。在代理类中与RPC Server端建立连接进行通信。比如HelloService是RPC调用的interface,这样的interface还会有很多,如UserService、TradeService,我们没有办法为每个Service编写实现类并在每个实现类中均编写远程通信的逻辑,这样也失去了作为RPC框架的意义,因此需要使用统一的动态代理类,调用各个Service时实际上是调用生成的代理类,在动态代理类中统一封装远程调用逻辑以实现远程通信。
代理类的生成与装配在Spring Bean的生命周期中完成,具体步骤计划在BeanPostProcessor的postProcessAfterInitialization中完成,代码如下:
- @Override
- public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
- Class<?> targetClass = bean.getClass();
- Field[] declaredFields = targetClass.getDeclaredFields();
- for (Field declaredField : declaredFields) {
- RpcReference rpcReference = declaredField.getAnnotation(RpcReference.class);
- if (rpcReference != null) {
- RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder()
- .group(rpcReference.group())
- .version(rpcReference.version()).build();
- RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcClient, rpcServiceConfig);
- Object clientProxy = rpcClientProxy.getProxy(declaredField.getType());
- declaredField.setAccessible(true);
- try {
- declaredField.set(bean, clientProxy);
- } catch (IllegalAccessException e) {
- e.printStackTrace();
- }
- }
-
- }
- return bean;
- }
在各个Bean初始化的过程中,都会历经此步骤,重写方法后首先通过反射获取Bean的成员变量,并检查各个成员变量上是否有RpcReference注解,有的话证明是RPC调用相关interface,需要为该接口装配动态代理类,装配过程通过反射完成。
接下来是动态代理的实现方式以及具体的调用逻辑,代码如下:
- @SneakyThrows
- @SuppressWarnings("unchecked")
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) {
- log.info("invoked method: [{}]", method.getName());
- RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName())
- .parameters(args)
- .interfaceName(method.getDeclaringClass().getName())
- .paramTypes(method.getParameterTypes())
- .requestId(UUID.randomUUID().toString())
- .group(rpcServiceConfig.getGroup())
- .version(rpcServiceConfig.getVersion())
- .build();
- RpcResponse<Object> rpcResponse = null;
- if (rpcRequestTransport instanceof NettyRpcClient) {
- // 考虑主线程与NIOEventLoopGroup工作线程间的通信
- CompletableFuture<RpcResponse<Object>> completableFuture = (CompletableFuture<RpcResponse<Object>>) rpcRequestTransport.sendRpcRequest(rpcRequest);
- rpcResponse = completableFuture.get();
- }
- if (rpcRequestTransport instanceof SocketRpcClient) {
- rpcResponse = (RpcResponse<Object>) rpcRequestTransport.sendRpcRequest(rpcRequest);
- }
- this.check(rpcResponse, rpcRequest);
- return rpcResponse.getData();
- }
RpcClientProxy通过实现InvocationHandler接口来进行动态代理,每次调用接口是实际上调用的是代理类的invoke方法。在该方法中首先封装RPC远程调用的Request信息,如接口名、方法名、参数类型、参数值、请求ID,group与version等信息。
接下来是发送RPC请求的过程,代码如下:
- @Override
- public Object sendRpcRequest(RpcRequest rpcRequest) {
- // build return value
- CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
- // get server address
- InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest);
- // get server address related channel
- Channel channel = getChannel(inetSocketAddress);
- if (channel.isActive()) {
- // put unprocessed request
- unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);
- RpcMessage rpcMessage = RpcMessage.builder().data(rpcRequest)
- .codec(SerializationTypeEnum.HESSIAN.getCode())
- .compress(CompressTypeEnum.GZIP.getCode())
- .messageType(RpcConstants.REQUEST_TYPE).build();
- channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
- if (future.isSuccess()) {
- log.info("client send message: [{}]", rpcMessage);
- } else {
- future.channel().close();
- // 发送失败,设置异常信息
- resultFuture.completeExceptionally(future.cause());
- log.error("Send failed:", future.cause());
- }
- });
- } else {
- throw new IllegalStateException();
- }
-
- return resultFuture;
- }
以Netty框架实现的通信为例,大致过程如下:首先从注册中心拉取服务列表并通过负载均衡算法获取到具体server端的机器,接下来连接对应的IP与端口号获取交换信息的channel。
之后将对应的future保存到对应的未处理完成请求的集合中(只有一行代码,却是关键的一步),最后设置序列化算法以及压缩方式,将消息发出。
- public class UnprocessedRequests {
- private static final Map<String, CompletableFuture<RpcResponse<Object>>> UNPROCESSED_RESPONSE_FUTURES = new ConcurrentHashMap<>();
-
- public void put(String requestId, CompletableFuture<RpcResponse<Object>> future) {
- UNPROCESSED_RESPONSE_FUTURES.put(requestId, future);
- }
-
- public void complete(RpcResponse<Object> rpcResponse) {
- CompletableFuture<RpcResponse<Object>> future = UNPROCESSED_RESPONSE_FUTURES.remove(rpcResponse.getRequestId());
- if (null != future) {
- future.complete(rpcResponse);
- } else {
- throw new IllegalStateException();
- }
- }
- }
CompletableFuture的使用是对这一过程的优化,更加优雅地实现了等待请求响应的过程。后续接收到server端响应时,处理线程是EventLoop线程,该线程拿到响应结果后可以为future设置结果,阻塞获取结果的主线程可以获取到结果。事实上,CompletableFuture的get方法也是一个同步阻塞的过程,但是这种获取响应的方式显得更加清晰,很好地实现了线程间通信,如接收到消息后可以调用complete方法设置响应结果,阻塞等待的主线程即可获取到结果。并且当消息发送失败时也可灵活设置异常信息。
接下来对这一过程中的关键步骤进行介绍,首先是lookupService的过程,代码如下:
- @Override
- public InetSocketAddress lookupService(RpcRequest rpcRequest) {
- String rpcServiceName = rpcRequest.getRpcServiceName();
- CuratorFramework zkClient = CuratorUtils.getZkClient();
- List<String> serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, rpcServiceName);
- if (CollectionUtil.isEmpty(serviceUrlList)) {
- throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND, rpcServiceName);
- }
- // load balancing
- String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest);
- log.info("Successfully found the service address:[{}]", targetServiceUrl);
- String[] socketAddressArray = targetServiceUrl.split(":");
- String host = socketAddressArray[0];
- int port = Integer.parseInt(socketAddressArray[1]);
- return new InetSocketAddress(host, port);
- }
不难看出,首先根据服务名拼接了zk的前缀,从zk上获取了子节点,对应路径下的子节点即为服务列表,之后通过负载均衡算法(后续介绍)从这些机器实例中选择一台实例进行访问。机器的IP地址和端口号也可以获取(注册时的叶子结点带上了对应信息)。
- private static final Map<String, List<String>> SERVICE_ADDRESS_MAP = new ConcurrentHashMap<>();
-
- public static List<String> getChildrenNodes(CuratorFramework zkClient, String rpcServiceName) {
- if (SERVICE_ADDRESS_MAP.containsKey(rpcServiceName)) {
- return SERVICE_ADDRESS_MAP.get(rpcServiceName);
- }
- List<String> result = null;
- String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName;
- try {
- result = zkClient.getChildren().forPath(servicePath);
- SERVICE_ADDRESS_MAP.put(rpcServiceName, result);
- registerWatcher(rpcServiceName, zkClient);
- } catch (Exception e) {
- log.error("get children nodes for path [{}] fail", servicePath);
- }
- return result;
- }
SERVICE_ADDRESS_MAP起到了本地缓存的作用,第一次访问时缓存中必然不会有数据,需要访问ZK进行获取,将获取到的结果进行缓存。并且需要注意的是,服务端随时可能有机器上下线,缓存列表也需要及时更新,这就需要使用ZK的Watcher机制,Curator框架对其进行了封装,代码如下:
- private static void registerWatcher(String rpcServiceName, CuratorFramework zkClient) throws Exception {
- String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName;
- PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, servicePath, true);
- // Curator框架此处的作用
- // 1. 缓存节点到本地,当节点发生变化时,会使用监听机制自动更新框架缓存
- // 2. 监听节点变化,当节点发生变化时,会触发监听器,拉取最新的服务列表到本地缓存(CHM)
- // 3. ZK原生的Watcher机制触发一次回调后就会失效,框架会再次自动注册Watcher事件
- PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildrenCacheEvent) -> {
- List<String> serviceAddresses = curatorFramework.getChildren().forPath(servicePath);
- SERVICE_ADDRESS_MAP.put(rpcServiceName, serviceAddresses);
- };
- pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
- pathChildrenCache.start();
- }
Curator框架此处的作用如注释所言:首先是缓存节点到本地,当节点发生变化时,会使用监听机制自动更新框架缓存;监听节点变化,当节点发生变化时,会触发监听器,拉取最新的服务列表到本地缓存(ConcurrentHashMap);ZK原生的Watcher机制触发一次回调后就会失效,框架会再次自动注册Watcher事件。
拿到一台具体的机器后需要和对应的机器建立连接,代码如下:
- @SneakyThrows
- public Channel doConnect(InetSocketAddress inetSocketAddress) {
- CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
- //连接操作和回调是在 Netty 的 EventLoop 线程中执行的,而 CompletableFuture.get() 是在调用 doConnect 方法的线程中执行的,因此 CompletableFuture 起到了线程间通信的作用。
- bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> {
- if (future.isSuccess()) {
- log.info("The client has connected [{}] successful!", inetSocketAddress.toString());
- completableFuture.complete(future.channel());
- } else {
- throw new IllegalStateException();
- }
- });
- return completableFuture.get();
- }
这里同样使用了CompletableFuture,起到了线程间通信的作用,可以在与对应机器建立好连接后获取channel连接。将消息封装好后从channel刷出,如此便完成了RPC动态代理需要封装的逻辑,从而实现通过动态代理类屏蔽RPC远程调用细节的需求,并且可以在运行时完成代理类的装配。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。