当前位置:   article > 正文

由浅入深RPC通信原理实战

rpc通信


1 背景知识

在这里插入图片描述

单体架构
在这里插入图片描述
RPC产生解决的问题:

其实这是应用开发到一定的阶段的强烈需求驱动的。

如果我们开发简单的单一应用,逻辑简单、用户不多、流量不大,那我们用不着;

当我们的系统访问量增大、业务增多时,我们会发现一台单机运行此系统已经无法承受。此时,我们可以将业务拆分成几个互不关联的应用,分别部署在各自机器上,以划清逻辑并减小压力。此时,我们也可以不需要RPC,因为应用之间是互不关联的。

当我们的业务越来越多、应用也越来越多时,自然的,我们会发现有些功能已经不能简单划分开来或者划分不出来。此时,可以将公共业务逻辑抽离出来,将之组成独立的服务Service应用 。而原有的、新增的应用都可以与那些独立的Service应用 交互,以此来完成完整的业务功能。所以此时,我们急需一种高效的应用程序之间的通讯手段来完成这种需求,所以你看,RPC大显身手的时候来了!

其实3描述的场景也是服务化 、微服务 和分布式系统架构 的基础场景。即RPC框架就是实现以上结构的有力方式。
在这里插入图片描述
在这里插入图片描述
序列化是指把一个Java对象变成二进制内容(010101011010),本质上就是一个byte[]数组。
网络模块就是IO,底层已结写好了调用,开发者只需注入即可
在这里插入图片描述

2 RPC概述

RPC 的主要功能目标是让构建分布式计算(应用)更容易,是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议规范,简单的来说就是像调用本地服务一样调用远程服务,对开发者而言是透明的。

在这里插入图片描述
为什么用RPC
1、分布式设计
2、部署灵活
3、解耦服务
4、扩展性强

常见RPC框架
1、Dubbo:阿里巴巴,java
2、gRPC:Google,多语言
3、Thrift:Facebook/apache,多语言
4、Spring Cloud:不仅仅是RPC,更多的是微服务架构下的一站式解决方案

  1. Thrift:thrift是一个软件框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和代码生成引擎,以构建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 这些编程语言间无缝结合的、高效的服务。

  2. Dubbo:Dubbo是一个分布式服务框架,以及SOA治理方案。其功能主要包括:高性能NIO通讯及多协议集成,服务动态寻址与路由,软负载均衡与容错,依赖分析与降级等。 Dubbo是阿里巴巴内部的SOA服务化治理方案的核心框架,Dubbo自2011年开源后,已被许多非阿里系公司使用。

  3. Spring Cloud:Spring Cloud由众多子项目组成,如Spring Cloud Config、Spring Cloud Netflix、Spring Cloud Consul 等,提供了搭建分布式系统及微服务常用的工具,如配置管理、服务发现、断路器、智能路由、微代理、控制总线、一次性token、全局锁、选主、分布式会话和集群状态等,满足了构建微服务所需的所有解决方案。Spring Cloud基于Spring Boot, 使得开发部署极其简单。

RPC的优势

1、RPC框架一般使用长链接,不必每次通信都要3次握手,减少网络开销
2、RPC框架一般都有注册中心,有丰富的监控管理
3、发布、下线接口、动态扩展等,对调用方来说是无感知、统一化的操作
4、协议私密,安全性较高
5、rpc 能做到协议更简单内容更小,效率更高
6、rpc是面向服务的更高级的抽象,支持服务注册发现,负载均衡,超时重试,熔断降级等高级特性

在这里插入图片描述
Nelson 的论文中指出实现 RPC 的程序包括 5 个部分:

  1. User
  2. User-stub
  3. RPCRuntime
  4. Server-stub
  5. Server

这 5 个部分的关系如下图所示
在这里插入图片描述
这里 user 就是 client 端,当 user 想发起一个远程调用时,它实际是通过本地调用user-stub。user-stub 负责将调用的接口、方法和参数通过约定的协议规范进行编码并通过本地的 RPCRuntime 实例传输到远端的实例。远端 RPCRuntime 实例收到请求后交给 server-stub 进行解码后发起本地端调用,调用结果再返回给 user 端。
在这里插入图片描述
RPC 服务方通过 RpcServer 去导出(export)远程接口方法,而客户方通过 RpcClient 去引入(import)远程接口方法。客户方像调用本地方法一样去调用远程接口方法,RPC 框架提供接口的代理实现,实际的调用将委托给代理RpcProxy 。代理封装调用信息并将调用转交给RpcInvoker 去实际执行。在客户端的RpcInvoker 通过连接器RpcConnector 去维持与服务端的通道RpcChannel,并使用RpcProtocol 执行协议编码(encode)并将编码后的请求消息通过通道发送给服务方。
RPC 服务端接收器 RpcAcceptor 接收客户端的调用请求,同样使用RpcProtocol 执行协议解码(decode)。解码后的调用信息传递给RpcProcessor 去控制处理调用过程,最后再委托调用给RpcInvoker 去实际执行并返回调用结果。如下是各个部分的详细职责:

1. RpcServer  
   负责导出(export)远程接口  
2. RpcClient  
   负责导入(import)远程接口的代理实现  
3. RpcProxy  
   远程接口的代理实现  
4. RpcInvoker  
   客户方实现:负责编码调用信息和发送调用请求到服务方并等待调用结果返回  
   服务方实现:负责调用服务端接口的具体实现并返回调用结果  
5. RpcProtocol  
   负责协议编/解码  
6. RpcConnector  
   负责维持客户方和服务方的连接通道和发送数据到服务方  
7. RpcAcceptor  
   负责接收客户方请求并返回请求结果  
8. RpcProcessor  
   负责在服务方控制调用过程,包括管理调用线程池、超时时间等  
9. RpcChannel  
   数据传输通道  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

3 RPC框架实现要点

3.1 注册中心

服务注册发现的作用
在高可用的生产环境中,服务一般都以集群方式提供服务,集群里面的IP等重要参数信息可能随时会发生变化,节点也可能会动态扩缩容,客户端需要能够及时感知服务端的变化,获取集群最新服务节点的连接信息,而这些变化要求是要对调用方应用无感知的
在这里插入图片描述
主流服务注册工具
Zookeeper
Consul
Nacos

在这里插入图片描述
在这里插入图片描述
优雅关闭:如何避免服务停机带来的业务损失

服务对象在关闭过程中,会拒绝的新的请求,同时根据引用计数器等待正在处理的请求全部结束之后才会真正关闭。
Runtime.getRuntime().addShutdownHook(this);,注册一个JVM关闭的钩子,这个钩子可以在以下几种场景被调用:

  • 程序正常退出
  • 使用System.exit()
  • 终端使用Ctrl+C触发的中断
  • 系统关闭
  • 使用Kill pid命令干掉进程(不要带-9)

优雅启动

在服务启动之后,要对系统进行预热,可以执行一些测试数据,在整个系统启动并且预热之后,然后在进行服务注册。

3.2 代理技术

为什么要用代理
RPC的调用对用户来讲是透明的,内部核心技术采用的就是代理技术,RPC 会自动给接口生成一个代理实现,当我们在项目中注入接口的时候,运行过程中实际绑定的是这个接口生成的代理实现。在接口方法被调用的时候,它实际上是被生成代理类拦截到了,这样就可以在生成的代理类里面,加入其他调用处理逻辑

JDK动态代理
在运行期动态的创建代理类,它是通过接口生成代理类的,与静态代理相比更加灵活,但是也有一定的限制,第一是代理对象必须实现一个接口,否则会报异常。第二是有性能问题,因为是通过反射来实现调用的,所以比正常的直接调用来得慢,并且通过生成类文件也会多消耗部分方法区空间,可能引起Full GC。

ASM
ASM 是一个 Java 字节码操控框架。它能够以二进制形式修改已有类或者动态生成类。ASM 可以直接产生二进制 class 文件,也可以在类被加载入 Java 虚拟机之前动态改变类行为(也就是生成的代码可以覆盖原来的类也可以是原始类的子类)。不过ASM在创建class字节码的过程中,操纵的是底层JVM的汇编指令级别,这要求ASM使用者要对class组织结构和JVM汇编指令有一定的了解

CGLIB
CGLIB(Code Generation Library)是一个基于ASM的字节码生成库。其原理是动态生成一个要代理类的子类,子类重写要代理的类的所有不是final的方法,在子类中采用方法拦截的技术拦截所有父类方法
的调用,顺势织入横切逻辑。它比使用java反射的JDK动态代理要快
在这里插入图片描述
bytebuddy
Byte Buddy本身也是基于 ASM API 实现的,是一个较高层级的抽象的字节码操作工具,通过使用 Byte Buddy,任何熟悉 Java 编程语言的人都有望非常容易地进行字节码操作。

Javassist
Javassist 使操作Java字节码变得简单,一个可以用于编辑Java字节码的类库,提供了两种级别的API:源码级别和字节码级别。如果用户使用源码级API,他们可以在不需要过多了解Java字节码规范的前提下使用它提供的基于java语言的API来编辑字节码文件。如果使用字节码级API则允许用户直接编辑字节码文件。Javassist在复杂的字节码级操作上提供了更高级别的抽象层。另外Javassist使用了反射机制,这使得运行时比ASM慢。
在这里插入图片描述

3.3 序列化技术

序列化的作用
在网络传输中,数据必须采用二进制形式, 所以在RPC调用过程中, 需要采用序列化技术,对入参和出参进行序列化与反序列化

优势:
解析效率
压缩率压缩后体积
扩展性
兼容性
可读性
可调试
跨语言
通用性

常见序列化技术框架

JDK原生序列化
1、JAVA语言本身提供,使用比较方便和简单
2、不支持跨语言处理,性能相对不是很好,序列化以后产生的数据相对较大

Hessian二进制
1、Hessian 是一个动态类型,二进制序列化,并且支持跨语言特性的序列化框架。
2、Hessian 性能上要比 JDK、JSON 序列化高效很多,并且生成的字节数也更小。有非常好的兼容性和稳定性,所以Hessian 更加适合作为 RPC 框架远程通信的序列化协议

Json轻量级数据交换格式
1、可读性好,方便阅读和调试,多语言支持,序列化以后的字节码文件相对较大,效率相对
不高,但对比XML序列化后的字节流更小,在企业运用普遍,特别是对前端和三方提供api

Protobuf开源,高效
1、Google 推出的开源序列库,它是一种轻便、高效的结构化数据存储格式,多语言支持。
2、速度快,压缩比高,体积小,序列化后体积相比 JSON、Hessian 小很多
3、消息格式的扩展、升级和兼容性都不错,可以做到向后兼容。

3.4 RPC通信协议

在这里插入图片描述
在这里插入图片描述

3.5 系统IO

IO选型
RPC的调用过程中涉及到网络IO的操作,一般来说网络IO往往会成为系统的瓶颈所在,而不管上层应用如何使用,底层都是基于操作系统的IO模型。
为了支持高并发,传统的阻塞式 IO 显然不太合适,因此我们需要异步的 IO,即 NIO。Java 提供了 NIO 的解决方案,Java 7 也提供了更优秀的 NIO.2 支持。

阻塞IO

应用进程发起IO系统调用后,应用进程被阻塞,赚到内核空间处理。内核开始等待数据,等待到数据之后,在将内核中的数据拷贝到用户内存中,
整个IO处理完毕后返回进程。最后应用的进程接触阻塞状态,运行业务逻辑。等待数据和拷贝数据操作时线程会一直处于阻塞状态。

IO多路服用

多路就是指多个通道,也就是多个网络连接的IO,复用是指多个通道复用在一个复用器上。

IO多路复用更适合高并发的场景。

零拷贝

所谓零拷贝,就是取消用户空间与内核空间之间的数据拷贝操作,应用进程每一次的读写操作,都可以通过一种方式,让应用进程向用户空间写入或者读取数据,
就如同直接向内核空间写入或者读取数据一样,在通过dma将内核中的数据拷贝到网卡,或者将网卡中的数据copy到内核。
零拷贝的两种实现方式:mmap+write方式,sendFile方式。

mmap+write

mmap是一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。
mmap 通过内存映射,将文件映射到内核缓冲区,同时,用户空间可以共享内核空间的数据。这样,在进行网络传输时,就可以减少内核空间到用户空间的拷贝次数。
这样就可以省掉原来内核read缓冲区copy数据到用户缓冲区,但是还是需要内核read缓冲区将数据copy到内核socket缓冲区。

sendFile

数据被 DMA 引擎从文件复制到内核缓冲区,然后调用 write 方法时,从内核缓冲区进入到 Socket,这时,是没有上下文切换的,因为都在内核空间。

mmap 和 sendFile 的区别

  • mmap 适合小数据量读写,sendFile 适合大文件传输。
  • mmap 需要 4 次上下文切换,3 次数据拷贝;sendFile 需要 3 次上下文切换,最少 2 次数据拷贝。
  • sendFile 可以利用 DMA 方式,减少 CPU 拷贝,mmap 则不能(必须从内核拷贝到 Socket 缓冲区)。
    在这个选择上:rocketMQ 在消费消息时,使用了 mmap。kafka 使用了 sendFile。

Netty的零拷贝是完全站在用户空间上,也就是JVM上,它的零拷贝主要是偏向于数据操作的优化上。

Netty对数据操作进行的优化

  • Netty 提供了CompositeByteBuf类,可以将多个ByteBuf合并为一个逻辑上的ByteBuf,避免了各个ByteBuf之间的拷贝。
  • ByteBuffer支持slice操作,因此可以将ByteBuf分解为多个共享同一个存储区域的ByteBuf,避免了内存的拷贝。
  • 通过wrap操作,可以将byte[] 数组、ByteBuf、ByteBuffer等包装盛一个Netty ByteBuf对象,进而避免拷贝操作。
  • Netty 的ByteBuf可以采用Direct Buffers,使用堆外直接内存进行socket的读写操作。
  • Netty还提供了FileRegion中保证NIO的FileChannel.transferTo方法实现了零拷贝。

3.6 超时重试机制

异常重试需要关注的点:

  • 保证被重试的业务服务是具有幂等性的;
  • 超时重试前重置计时;
  • 针对业务返回的异常,设置重试是异常名单;
  • 重试时负载均衡选取节点时要剔除前一次访问的节点
    在这里插入图片描述

3.7 时间轮算法

概念
在时钟轮机制中,有时间槽和时钟轮的概念,时间槽就相当于时钟的刻度;而时钟轮就相当于指针跳动的一个周期,我们可以将每个任务放到对应的时间槽位上。
在这里插入图片描述

在这里插入图片描述

1、每个任务会按要求只扫描执行一次,能很好的解决CPU 浪费的问题
2、秒级轮,分钟轮,小时轮除了用于检测rpc调用是否超时,也可以将定时心跳的任务添加到时间轮中,当前时间的心跳执行完后再将下一秒的心跳任务添加到时间轮中,这样就能做到每秒的定时心跳
在这里插入图片描述

3.8 负载均衡策略

用途
RPC Server为了高可用,可用选择做集群,因此在RPC Client端调用时要使用相应的均衡策略,这属于客户端负载均衡。
dubbo的负载均衡方法:

在这里插入图片描述

  • 基于权重随机算法:将请求按照权重进行分配,权重越大,分配的越多。
  • 基于最小活跃调用数算法:活跃少越少,表明该服务提供者效率越高,单位时间内可处理更多的请求。
  • 基于hash一致性:适用于服务有状态的场景。
  • 基于加权轮询算法:权重越大,节点选中的更多。

在这里插入图片描述

3.9 熔断限流

熔断作用
熔断器如同电力过载保护器。它可以实现快速失败,如果它在一段时间内侦测到许多类似的错误,会强迫其以后的多个调用快速失败,不再访问远程服务器,从而防止应用程序不断地尝试执行可能会失败的操作,使得应用程序继续执行而不用等待修正错误,或者浪费CPU时间去等到长时间的超时产生。熔断器也可以使应用程序能够诊断错误是否已经修正,如果已经修正,应用程序会再次尝试恢复调用操作
在这里插入图片描述

在这里插入图片描述
限流
作用
实际生产环境中,每个服务节点都可能由于访问量过大而引起一系列问题,就需要业务提供方能够进行自我保护,从而保证在高访问量、高并发的场景下,系统依然能够稳定,高效运行。
限流器的作用是用来限制其请求的速率,保护后台响应服务,以免服务过载导致服务不可用现象出现。
在这里插入图片描述
在这里插入图片描述

3.10 滑动窗口算法

在这里插入图片描述
在这里插入图片描述

3.11 限流组件

在这里插入图片描述

4 RPC框架简易实现

4.1 服务端

服务端提供客户端所期待的服务,一般包括三个部分:服务接口,服务实现以及服务的注册暴露三部分,如下:服务接口

public interface HelloService {
    String hello(String name);
    String hi(String msg);
}
  • 1
  • 2
  • 3
  • 4

服务实现

public class HelloServiceImpl implements HelloService{
    @Override
    public String hello(String name) {
        return "Hello " + name;
    }
    @Override
    public String hi(String msg) {
        return "Hi, " + msg;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

服务暴露:只有把服务暴露出来,才能让客户端进行调用,这是RPC框架功能之一。

public class RpcProvider {
    public static void main(String[] args) throws Exception {
        HelloService service = new HelloServiceImpl();
        // RPC框架将服务暴露出来,供客户端消费
        RpcFramework.export(service, 1234);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

4.2 客户端

客户端消费服务端所提供的服务,一般包括两个部分:服务接口和服务引用两个部分,如下:服务接口:与服务端共享同一个服务接口

public interface HelloService {
  String hello(String name);
  String hi(String msg);
}
  • 1
  • 2
  • 3
  • 4

服务引用:消费端通过RPC框架进行远程调用,这也是RPC框架功能之一

public class RpcConsumer {
    public static void main(String[] args) throws Exception {
        // 由RpcFramework生成的HelloService的代理
        HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1234);
        String hello = service.hello("World");
        System.out.println("客户端收到远程调用的结果 : " + hello);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4.3 RPC框架原型实现

RPC框架主要包括两大功能:一个用于服务端暴露服务,一个用于客户端引用服务。服务端暴露服务

    /**
     * 暴露服务
     *
     * @param service 服务实现
     * @param port    服务端口
     * @throws Exception
     */
    public static void export(final Object service, int port) throws Exception {
        if (service == null) {
            throw new IllegalArgumentException("service instance == null");
        }
        if (port <= 0 || port > 65535) {
            throw new IllegalArgumentException("Invalid port " + port);
        }
        System.out.println("Export service " + service.getClass().getName() + " on port " + port);
        // 建立Socket服务端
        ServerSocket server = new ServerSocket(port);
        for (; ; ) {
            try {
                // 监听Socket请求
                final Socket socket = server.accept();
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            try {
                                /* 获取请求流,Server解析并获取请求*/
                                // 构建对象输入流,从源中读取对象到程序中
                                ObjectInputStream input = new ObjectInputStream(
                                    socket.getInputStream());
                                try {
                                    System.out.println("\nServer解析请求 : ");
                                    String methodName = input.readUTF();
                                    System.out.println("methodName : " + methodName);
                                    // 泛型与数组是不兼容的,除了通配符作泛型参数以外
                                    Class<?>[] parameterTypes = (Class<?>[])input.readObject();
                                    System.out.println(
                                        "parameterTypes : " + Arrays.toString(parameterTypes));
                                    Object[] arguments = (Object[])input.readObject();
                                    System.out.println("arguments : " + Arrays.toString(arguments));
                                    /* Server 处理请求,进行响应*/
                                    ObjectOutputStream output = new ObjectOutputStream(
                                        socket.getOutputStream());
                                    try {
                                        // service类型为Object的(可以发布任何服务),故只能通过反射调用处理请求
                                        // 反射调用,处理请求
                                        Method method = service.getClass().getMethod(methodName,
                                            parameterTypes);
                                        Object result = method.invoke(service, arguments);
                                        System.out.println("\nServer 处理并生成响应 :");
                                        System.out.println("result : " + result);
                                        output.writeObject(result);
                                    } catch (Throwable t) {
                                        output.writeObject(t);
                                    } finally {
                                        output.close();
                                    }
                                } finally {
                                    input.close();
                                }
                            } finally {
                                socket.close();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73

从该RPC框架的简易实现来看,RPC服务端逻辑是:首先创建ServerSocket负责监听特定端口并接收客户连接请求,然后使用Java原生的序列化/反序列化机制来解析得到请求,包括所调用方法的名称、参数列表和实参,最后反射调用服务端对服务接口的具体实现并将得到的结果回传至客户端。至此,一次简单PRC调用的服务端流程执行完毕。
客户端引用服务

    /**
     * 引用服务
     *
     * @param <T>            接口泛型
     * @param interfaceClass 接口类型
     * @param host           服务器主机名
     * @param port           服务器端口
     * @return 远程服务,返回代理对象
     * @throws Exception
     */
    @SuppressWarnings("unchecked")
    public static <T> T refer(final Class<T> interfaceClass, final String host, final int port)
        throws Exception {
        if (interfaceClass == null) {
            throw new IllegalArgumentException("Interface class == null");
        }
        // JDK 动态代理的约束,只能实现对接口的代理
        if (!interfaceClass.isInterface()) {
            throw new IllegalArgumentException(
                "The " + interfaceClass.getName() + " must be interface class!");
        }
        if (host == null || host.length() == 0) {
            throw new IllegalArgumentException("Host == null!");
        }
        if (port <= 0 || port > 65535) {
            throw new IllegalArgumentException("Invalid port " + port);
        }
        System.out.println(
            "Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);
        // JDK 动态代理
        T proxy = (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(),
            new Class<?>[] {interfaceClass}, new InvocationHandler() {
                // invoke方法本意是对目标方法的增强,在这里用于发送RPC请求和接收响应
                @Override
                public Object invoke(Object proxy, Method method, Object[] arguments)
                    throws Throwable {
                    // 创建Socket客户端,并与服务端建立链接
                    Socket socket = new Socket(host, port);
                    try {
                        /* 客户端像服务端进行请求,并将请求参数写入流中*/
                        // 将对象写入到对象输出流,并将其发送到Socket流中去
                        ObjectOutputStream output = new ObjectOutputStream(
                            socket.getOutputStream());
                        try {
                            // 发送请求
                            System.out.println("\nClient发送请求 : ");
                            output.writeUTF(method.getName());
                            System.out.println("methodName : " + method.getName());
                            output.writeObject(method.getParameterTypes());
                            System.out.println("parameterTypes : " + Arrays.toString(method
                                .getParameterTypes()));
                            output.writeObject(arguments);
                            System.out.println("arguments : " + Arrays.toString(arguments));
                            /* 客户端读取并返回服务端的响应*/
                            ObjectInputStream input = new ObjectInputStream(
                                socket.getInputStream());
                            try {
                                Object result = input.readObject();
                                if (result instanceof Throwable) {
                                    throw (Throwable)result;
                                }
                                System.out.println("\nClient收到响应 : ");
                                System.out.println("result : " + result);
                                return result;
                            } finally {
                                input.close();
                            }
                        } finally {
                            output.close();
                        }
                    } finally {
                        socket.close();
                    }
                }
            });
        return proxy;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77

从该RPC框架的简易实现来看,RPC客户端逻辑是:首先创建Socket客户端并与服务端建立链接,然后使用Java原生的序列化/反序列化机制将调用请求发送给客户端,包括所调用方法的名称、参数列表将服务端的响应返回给用户即可。至此,一次简单PRC调用的客户端流程执行完毕。特别地,从代码实现来看,实现透明的PRC调用的关键就是 动态代理,这是RPC框架实现的灵魂所在。RPC原型实现

public class RpcFramework {
    /**
     * 暴露服务
     *
     * @param service 服务实现
     * @param port    服务端口
     * @throws Exception
     */
    public static void export(final Object service, int port) throws Exception {
        if (service == null) {
            throw new IllegalArgumentException("service instance == null");
        }
        if (port <= 0 || port > 65535) {
            throw new IllegalArgumentException("Invalid port " + port);
        }
        System.out.println("Export service " + service.getClass().getName() + " on port " + port);
        // 建立Socket服务端
        ServerSocket server = new ServerSocket(port);
        for (; ; ) {
            try {
                // 监听Socket请求
                final Socket socket = server.accept();
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            try {
                                /* 获取请求流,Server解析并获取请求*/
                                // 构建对象输入流,从源中读取对象到程序中
                                ObjectInputStream input = new ObjectInputStream(
                                    socket.getInputStream());
                                try {
                                    System.out.println("\nServer解析请求 : ");
                                    String methodName = input.readUTF();
                                    System.out.println("methodName : " + methodName);
                                    // 泛型与数组是不兼容的,除了通配符作泛型参数以外
                                    Class<?>[] parameterTypes = (Class<?>[])input.readObject();
                                    System.out.println(
                                        "parameterTypes : " + Arrays.toString(parameterTypes));
                                    Object[] arguments = (Object[])input.readObject();
                                    System.out.println("arguments : " + Arrays.toString(arguments));
                                    /* Server 处理请求,进行响应*/
                                    ObjectOutputStream output = new ObjectOutputStream(
                                        socket.getOutputStream());
                                    try {
                                        // service类型为Object的(可以发布任何服务),故只能通过反射调用处理请求
                                        // 反射调用,处理请求
                                        Method method = service.getClass().getMethod(methodName,
                                            parameterTypes);
                                        Object result = method.invoke(service, arguments);
                                        System.out.println("\nServer 处理并生成响应 :");
                                        System.out.println("result : " + result);
                                        output.writeObject(result);
                                    } catch (Throwable t) {
                                        output.writeObject(t);
                                    } finally {
                                        output.close();
                                    }
                                } finally {
                                    input.close();
                                }
                            } finally {
                                socket.close();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 引用服务
     *
     * @param <T>            接口泛型
     * @param interfaceClass 接口类型
     * @param host           服务器主机名
     * @param port           服务器端口
     * @return 远程服务,返回代理对象
     * @throws Exception
     */
    @SuppressWarnings("unchecked")
    public static <T> T refer(final Class<T> interfaceClass, final String host, final int port)
        throws Exception {
        if (interfaceClass == null) {
            throw new IllegalArgumentException("Interface class == null");
        }
        // JDK 动态代理的约束,只能实现对接口的代理
        if (!interfaceClass.isInterface()) {
            throw new IllegalArgumentException(
                "The " + interfaceClass.getName() + " must be interface class!");
        }
        if (host == null || host.length() == 0) {
            throw new IllegalArgumentException("Host == null!");
        }
        if (port <= 0 || port > 65535) {
            throw new IllegalArgumentException("Invalid port " + port);
        }
        System.out.println(
            "Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);
        // JDK 动态代理
        T proxy = (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(),
            new Class<?>[] {interfaceClass}, new InvocationHandler() {
                // invoke方法本意是对目标方法的增强,在这里用于发送RPC请求和接收响应
                @Override
                public Object invoke(Object proxy, Method method, Object[] arguments)
                    throws Throwable {
                    // 创建Socket客户端,并与服务端建立链接
                    Socket socket = new Socket(host, port);
                    try {
                        /* 客户端像服务端进行请求,并将请求参数写入流中*/
                        // 将对象写入到对象输出流,并将其发送到Socket流中去
                        ObjectOutputStream output = new ObjectOutputStream(
                            socket.getOutputStream());
                        try {
                            // 发送请求
                            System.out.println("\nClient发送请求 : ");
                            output.writeUTF(method.getName());
                            System.out.println("methodName : " + method.getName());
                            output.writeObject(method.getParameterTypes());
                            System.out.println("parameterTypes : " + Arrays.toString(method
                                .getParameterTypes()));
                            output.writeObject(arguments);
                            System.out.println("arguments : " + Arrays.toString(arguments));
                            /* 客户端读取并返回服务端的响应*/
                            ObjectInputStream input = new ObjectInputStream(
                                socket.getInputStream());
                            try {
                                Object result = input.readObject();
                                if (result instanceof Throwable) {
                                    throw (Throwable)result;
                                }
                                System.out.println("\nClient收到响应 : ");
                                System.out.println("result : " + result);
                                return result;
                            } finally {
                                input.close();
                            }
                        } finally {
                            output.close();
                        }
                    } finally {
                        socket.close();
                    }
                }
            });
        return proxy;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/145704
推荐阅读
相关标签
  

闽ICP备14008679号