赞
踩
RPC是分布式系统中不可缺少的一部分。之前接触过几种RPC模块,这里就总结一下常见RPC模块的设计思想和实现。最后我们来设计一个可以方便进行RPC调用的RPC模块。
RPC模块将网络通信的过程封装成了方法调用的过程。从使用者的角度来看,在调用端进行RPC调用,就像进行本地函数调用一样;而在背后,RPC模块会将先调用端的函数名称、参数等调用信息序列化,其中序列化的方式有很多种,比如Java原生序列化、JSON、Protobuf等。接着RPC模块会将序列化后的消息通过某种协议(如TCP, AMQP等)发送到被调用端,被调用端在收到消息以后会对其解码,还原成调用信息,然后在本地进行方法调用,然后把调用结果发送回调用端,这样一次RPC调用过程就完成了。在这个过程中,我们要考虑到一些问题:
我们一点一点来思考。第一点是设计成什么样的调用模型。常见的几种模型:
服务代理。即实现一个服务接口,被调用端实现此服务接口,实现对应的方法逻辑,并写好RPC调用信息接收部分;调用端通过RPC模块获取一个服务代理实例,这个服务代理
实例继承了服务接口并封装了相应的远程调用逻辑(包括消息的编码、解码、传输等)。调用端通过这个服务代理实例进行RPC调用。像Vert.x
Service
Proxy
和grpc
都是这种模型。这样的RPC模块需要具备生成服务代理类的功能
直接调用,即设计特定的API用于RPC调用。比如Go的rpc包,里面的Client就提供了一个Call方法用于任意RPC调用,调用者需要传入方法名称、参数以及返回值指针(异步模式下传入callback handler)
我更倾向于选择服务代理这种模型,因为服务代理这种模型在进行RPC调用的时候就像直接LPC一样方便,但是需要RPC模块生成服务代理类,实现起来可能会麻烦些;当然Go的rpc包封装的也比较好,调用也比较方便,考虑到Go的类型系统,这已经不错了。
RPC
调用耗时会包含通信耗时和本地调用耗时。当网络状况不好的时候,RPC调用可能会很长时间才能得到结果。对传统的同步RPC模式来说,这期间会阻塞调用者的调用线程。当需要进行大量RPC调用的时候,这种阻塞就伤不起了。这时候,异步RPC模式就派上用场了。我们可以对传统RPC模式稍加改造,把服务接口设计成异步模式的,即每个方法都要绑定一个回调函数,或利用Future-Promise模型返回一个Future。设计成异步模式以后,整个架构的灵活性就能得到很大的提升。
第二点是调用信息的序列化反序列化以及传输。序列化主要分为文本(如JSON, XML等)和二进制(如Thrift, Protocol等)两种,不同的序列化策略性能不同,因此我们应该尽量选择性能高,同时便于开发的序列化策略。在大型项目中我们常用Protobuf,性能比较好,支持多语言,但是需要单独定义.proto文件;有的时候我们会选择JSON,尽管效率不是很高但是方便,比如Vert.x Service Proxy
就选择了JSON格式(底层依赖Event Bus
)。另一点就是传输协议的选择。通常情况下我们会选择TCP协议(各种基于TCP的应用层协议,如HTTP/2)进行通信,当然用基于AMQP
比如 RabbitMQ就是AMQP协议的一种实现,协议的消息队列也可以,两者都比较可靠。
这里还需提一点:如何高效地并发处理request/response
,这依赖于通信模块的实现。拿Java来说,基于Netty NIO
或者Java AIO
的I/O多路复用
都可以很好地并发处理请求;而像Go RPC则是来一个request就创建一个Goroutine
并在其中处理请求(Goroutine作为轻量级用户态线程,创建性能消耗小)。
最后一点也是最重要的一点:实现容错,这也是分布式系统设计要考虑的一个核心。想象一下一次RPC调用过程中可能产生的各种failure:
一种简单的应对方式是不断地超时重传,即 at least once
模式。调用端设置一个超时定时器
,若一定时间内没有收到response
就继续发送调用请求,直到收到response
或请求次数达到阈值。这种模式会发送重复请求,因此只适用于幂等性的操作,即执行多次结果相同的操作,比如读取操作。当然服务提供端也可以实现对应的逻辑来检查重复的请求。
更符合我们期望的容错方案是 at most once
模式。at most once
模式要求服务提供端检查重复请求,如果检查到当前请求是重复请求则返回之前的调用结果。服务提供端需要缓存之前的调用结果。
这里面有几点需要考虑:
如何实现重传和重复请求检测?是依靠协议(如TCP的超时重传)还是自己实现?
如果自己实现的话:
如何检查重复请求?我们可以给每个请求生成一个独一无二的标识符(xid),并且在重传请求的时候使用相同的xid进行重传。用伪代码可以表示为:
if (seen(xid)) {
result = oldResult;
} else {
result = call(...);
oldResult = result;
setCurrentId(xid);
}
如何保证xid是独一无二的?可以考虑使用UUID或者不同seed下的随机数。
服务请求端需要在一个合适的时间丢弃掉保存的之前缓存的调用结果。
当某个RPC调用过程还正在执行时,如何应对另外的重复请求?这种情况可以设置一个flag用于标识是否正在执行。
如果服务调用端挂了并且重启怎么办?如果服务调用端将xid和调用结果缓存在内存中,那么保存的信息就丢失了。因此我们可以考虑将缓存信息定时写入硬盘,或者写入replication server
中,当然这些情况就比较复杂了,涉及到高可用和一致性的问题。
由此可见,虽然RPC模块看似比较简单,但是设计的时候要考虑的问题还是非常多的。尤其是在保证性能的基础上又要保证可靠性,还要保证开发者的易用性,这就需要细致地思考了。
这里我来简单总结一下用过的常见的几个RPC模块的使用及实现思路。
Go
的rpc
包使用了Go自己的gob
协议作为序列化协议(通过encoding/gob
模块内的Encoder/Decoder
进行编码和解码),而传输协议可以直接使用TCP(Dial方法)或者使用HTTP(DialHTTP)方法。开发者需要在服务端定义struct并且实现各种方法,然后将struct注册到服务端。需要进行RPC调用的时候,我们就可以在调用端通过Call方法(同步)或者Go方法(异步)进行调用。同步模式下调用结果即为reply指针所指的对象,而异步模式则会在调用结果准备就绪后通知绑定的channel
并执行处理。
在rpc包的实现中(net/rpc/server.go)
,每个注册的服务类都被封装成了一个service
结构体,而其中的每个方法则被封装成了一个methodType
结构体:
type methodType struct {
sync.Mutex // protects counters
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
numCalls uint
}
type service struct {
name string // name of service
rcvr reflect.Value // receiver of methods for the service
typ reflect.Type // type of the receiver
method map[string]*methodType // registered methods
}
每个服务端都被封装成了一个Server结构体,其中的serviceMap存储着各个服务类的元数据:
type Server struct {
mu sync.RWMutex // protects the serviceMap
serviceMap map[string]*service
reqLock sync.Mutex // protects freeReq
freeReq *Request
respLock sync.Mutex // protects freeResp
freeResp *Response
}
RPC Server
处理调用请求的默认路径是/_goRPC_
。当请求到达时,Go就会调用Server结构体实现的ServeHTTP
方法,经ServeConn
方法传入gob codec
预处理以后最终在ServeCodec
方法内处理请求并进行调用:
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
for {
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil {
if debugLog && err != io.EOF {
log.Println("rpc:", err)
}
if !keepReading {
break
}
// send a response if we actually managed to read a header.
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error())
server.freeRequest(req)
}
continue
}
go service.call(server, sending, mtype, req, argv, replyv, codec)
}
codec.Close()
}
如果成功读取请求数据,那么接下来RPC Server就会新建一个Goroutine用来在本地执行方法,并向调用端返回response:
func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
mtype.Lock()
mtype.numCalls++
mtype.Unlock()
function := mtype.method.Func
// Invoke the method, providing a new value for the reply.
returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
// The return value for the method is an error.
errInter := returnValues[0].Interface()
errmsg := ""
if errInter != nil {
errmsg = errInter.(error).Error()
}
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
server.freeRequest(req)
}
在执行调用的过程中应该注意并发问题,防止资源争用,修改数据时需要对数据加锁;至于方法的执行就是利用了Go的反射机制。调用完以后,RPC Server接着调用sendResponse方法发送response,其中写入response的时候同样需要加锁,防止资源争用。
grpc是Google开源的一个通用的RPC框架,支持C, Java和Go等语言。既然是Google出品,序列化协议必然用protobuf啦(毕竟高效),传输协议使用HTTP/2,非常不错。开发时需要在.proto文件里定义数据类型以及服务接口,然后配上protoc的grpc插件就能够自动生成各个语言的服务接口和代理类。粗略地看了下grpc-java的源码,底层利用Netty和OkHttp实现HTTP通信,性能应该不错。
Vert.x Service Proxy是Vert.x的一个异步RPC组件,支持通过各种JVM语言(Java, Scala, JS, JRuby, Groovy等)进行RPC调用。使用Vert.x Service Proxy时我们只需要按照异步开发模式编写服务接口,加上相应的注解,Vert.x Service Proxy就会自动生成相应的服务代理类和服务调用处理类。Vert.x Service Proxy底层借助Event Bus进行通信,调用时将调用消息包装成JSON数据然后通过Event Bus传输到服务端,得到结果后再返回给调用端。Vert.x的一大特性就是异步、响应式编程,因此Vert.x Service Proxy的RPC模型为异步RPC,用起来非常方便。几个异步过程可以通过各种组合子串成一串,妥妥的reactive programming的风格~
更多的关于Vert.x Service Proxy的实现原理的内容可以看这一篇:Vert.x 技术内幕 | 异步RPC实现原理
Java RMI(Remote Method Invocation)是Java里的一种RPC编程接口,类似于服务代理的一种模式。用起来不是很方便。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。