当前位置:   article > 正文

彻底搞清RPC模块设计与实现_rpc xid

rpc xid

RPC是分布式系统中不可缺少的一部分。之前接触过几种RPC模块,这里就总结一下常见RPC模块的设计思想和实现。最后我们来设计一个可以方便进行RPC调用的RPC模块。

RPC模块设计需要考虑的问题

RPC模块将网络通信的过程封装成了方法调用的过程。从使用者的角度来看,在调用端进行RPC调用,就像进行本地函数调用一样;而在背后,RPC模块会将先调用端的函数名称、参数等调用信息序列化,其中序列化的方式有很多种,比如Java原生序列化、JSON、Protobuf等。接着RPC模块会将序列化后的消息通过某种协议(如TCP, AMQP等)发送到被调用端,被调用端在收到消息以后会对其解码,还原成调用信息,然后在本地进行方法调用,然后把调用结果发送回调用端,这样一次RPC调用过程就完成了。在这个过程中,我们要考虑到一些问题:

  • 设计成什么样的调用模型?
  • 调用信息通过什么样的方式序列化?通过哪种协议传输?性能如何?可靠性如何?
  • 分布式系统中最关注的问题:出现failure如何应对?如何容错?

我们一点一点来思考。第一点是设计成什么样的调用模型。常见的几种模型:

  • 服务代理。即实现一个服务接口,被调用端实现此服务接口,实现对应的方法逻辑,并写好RPC调用信息接收部分;调用端通过RPC模块获取一个服务代理实例,这个服务代理实例继承了服务接口并封装了相应的远程调用逻辑(包括消息的编码、解码、传输等)。调用端通过这个服务代理实例进行RPC调用。像Vert.x Service Proxygrpc都是这种模型。这样的RPC模块需要具备生成服务代理类的功能

  • 直接调用,即设计特定的API用于RPC调用。比如Go的rpc包,里面的Client就提供了一个Call方法用于任意RPC调用,调用者需要传入方法名称、参数以及返回值指针(异步模式下传入callback handler)

我更倾向于选择服务代理这种模型,因为服务代理这种模型在进行RPC调用的时候就像直接LPC一样方便,但是需要RPC模块生成服务代理类,实现起来可能会麻烦些;当然Go的rpc包封装的也比较好,调用也比较方便,考虑到Go的类型系统,这已经不错了。

RPC调用耗时会包含通信耗时和本地调用耗时。当网络状况不好的时候,RPC调用可能会很长时间才能得到结果。对传统的同步RPC模式来说,这期间会阻塞调用者的调用线程。当需要进行大量RPC调用的时候,这种阻塞就伤不起了。这时候,异步RPC模式就派上用场了。我们可以对传统RPC模式稍加改造,把服务接口设计成异步模式的,即每个方法都要绑定一个回调函数,或利用Future-Promise模型返回一个Future。设计成异步模式以后,整个架构的灵活性就能得到很大的提升。

第二点是调用信息的序列化反序列化以及传输。序列化主要分为文本(如JSON, XML等)和二进制(如Thrift, Protocol等)两种,不同的序列化策略性能不同,因此我们应该尽量选择性能高,同时便于开发的序列化策略。在大型项目中我们常用Protobuf,性能比较好,支持多语言,但是需要单独定义.proto文件;有的时候我们会选择JSON,尽管效率不是很高但是方便,比如Vert.x Service Proxy就选择了JSON格式(底层依赖Event Bus)。另一点就是传输协议的选择。通常情况下我们会选择TCP协议(各种基于TCP的应用层协议,如HTTP/2)进行通信,当然用基于AMQP 比如 RabbitMQ就是AMQP协议的一种实现,协议的消息队列也可以,两者都比较可靠。

这里还需提一点:如何高效地并发处理request/response,这依赖于通信模块的实现。拿Java来说,基于Netty NIO或者Java AIO的I/O多路复用都可以很好地并发处理请求;而像Go RPC则是来一个request就创建一个Goroutine并在其中处理请求(Goroutine作为轻量级用户态线程,创建性能消耗小)。

最后一点也是最重要的一点:实现容错,这也是分布式系统设计要考虑的一个核心。想象一下一次RPC调用过程中可能产生的各种failure:

  • 网络拥塞
  • 丢包,通信异常
  • 服务提供端挂了,调用端得不到response

一种简单的应对方式是不断地超时重传,即 at least once 模式。调用端设置一个超时定时器,若一定时间内没有收到response就继续发送调用请求,直到收到response或请求次数达到阈值。这种模式会发送重复请求,因此只适用于幂等性的操作,即执行多次结果相同的操作,比如读取操作。当然服务提供端也可以实现对应的逻辑来检查重复的请求。

更符合我们期望的容错方案是 at most once 模式。at most once 模式要求服务提供端检查重复请求,如果检查到当前请求是重复请求则返回之前的调用结果。服务提供端需要缓存之前的调用结果。

这里面有几点需要考虑:

如何实现重传和重复请求检测?是依靠协议(如TCP的超时重传)还是自己实现?
如果自己实现的话:

如何检查重复请求?我们可以给每个请求生成一个独一无二的标识符(xid),并且在重传请求的时候使用相同的xid进行重传。用伪代码可以表示为:

if (seen(xid)) {
  result = oldResult;
} else {
  result = call(...);
  oldResult = result;
  setCurrentId(xid);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

如何保证xid是独一无二的?可以考虑使用UUID或者不同seed下的随机数。

服务请求端需要在一个合适的时间丢弃掉保存的之前缓存的调用结果。
当某个RPC调用过程还正在执行时,如何应对另外的重复请求?这种情况可以设置一个flag用于标识是否正在执行。

如果服务调用端挂了并且重启怎么办?如果服务调用端将xid和调用结果缓存在内存中,那么保存的信息就丢失了。因此我们可以考虑将缓存信息定时写入硬盘,或者写入replication server中,当然这些情况就比较复杂了,涉及到高可用和一致性的问题。

由此可见,虽然RPC模块看似比较简单,但是设计的时候要考虑的问题还是非常多的。尤其是在保证性能的基础上又要保证可靠性,还要保证开发者的易用性,这就需要细致地思考了。

常见RPC模块实现

这里我来简单总结一下用过的常见的几个RPC模块的使用及实现思路。

Go RPC

Gorpc包使用了Go自己的gob协议作为序列化协议(通过encoding/gob模块内的Encoder/Decoder进行编码和解码),而传输协议可以直接使用TCP(Dial方法)或者使用HTTP(DialHTTP)方法。开发者需要在服务端定义struct并且实现各种方法,然后将struct注册到服务端。需要进行RPC调用的时候,我们就可以在调用端通过Call方法(同步)或者Go方法(异步)进行调用。同步模式下调用结果即为reply指针所指的对象,而异步模式则会在调用结果准备就绪后通知绑定的channel并执行处理。

在rpc包的实现中(net/rpc/server.go),每个注册的服务类都被封装成了一个service结构体,而其中的每个方法则被封装成了一个methodType结构体:

type methodType struct {
    sync.Mutex // protects counters
    method     reflect.Method
    ArgType    reflect.Type
    ReplyType  reflect.Type
    numCalls   uint
}
type service struct {
    name   string                 // name of service
    rcvr   reflect.Value          // receiver of methods for the service
    typ    reflect.Type           // type of the receiver
    method map[string]*methodType // registered methods
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

每个服务端都被封装成了一个Server结构体,其中的serviceMap存储着各个服务类的元数据:

type Server struct {
    mu         sync.RWMutex // protects the serviceMap
    serviceMap map[string]*service
    reqLock    sync.Mutex // protects freeReq
    freeReq    *Request
    respLock   sync.Mutex // protects freeResp
    freeResp   *Response
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

RPC Server处理调用请求的默认路径是/_goRPC_。当请求到达时,Go就会调用Server结构体实现的ServeHTTP方法,经ServeConn方法传入gob codec预处理以后最终在ServeCodec方法内处理请求并进行调用:

func (server *Server) ServeCodec(codec ServerCodec) {
    sending := new(sync.Mutex)
    for {
        service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
        if err != nil {
            if debugLog && err != io.EOF {
                log.Println("rpc:", err)
            }
            if !keepReading {
                break
            }
            // send a response if we actually managed to read a header.
            if req != nil {
                server.sendResponse(sending, req, invalidRequest, codec, err.Error())
                server.freeRequest(req)
            }
            continue
        }
        go service.call(server, sending, mtype, req, argv, replyv, codec)
    }
    codec.Close()
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

如果成功读取请求数据,那么接下来RPC Server就会新建一个Goroutine用来在本地执行方法,并向调用端返回response:

func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
    mtype.Lock()
    mtype.numCalls++
    mtype.Unlock()
    function := mtype.method.Func
    // Invoke the method, providing a new value for the reply.
    returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
    // The return value for the method is an error.
    errInter := returnValues[0].Interface()
    errmsg := ""
    if errInter != nil {
        errmsg = errInter.(error).Error()
    }
    server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
    server.freeRequest(req)
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

在执行调用的过程中应该注意并发问题,防止资源争用,修改数据时需要对数据加锁;至于方法的执行就是利用了Go的反射机制。调用完以后,RPC Server接着调用sendResponse方法发送response,其中写入response的时候同样需要加锁,防止资源争用。

grpc

grpc是Google开源的一个通用的RPC框架,支持C, Java和Go等语言。既然是Google出品,序列化协议必然用protobuf啦(毕竟高效),传输协议使用HTTP/2,非常不错。开发时需要在.proto文件里定义数据类型以及服务接口,然后配上protoc的grpc插件就能够自动生成各个语言的服务接口和代理类。粗略地看了下grpc-java的源码,底层利用Netty和OkHttp实现HTTP通信,性能应该不错。

Vert.x Service Proxy

Vert.x Service Proxy是Vert.x的一个异步RPC组件,支持通过各种JVM语言(Java, Scala, JS, JRuby, Groovy等)进行RPC调用。使用Vert.x Service Proxy时我们只需要按照异步开发模式编写服务接口,加上相应的注解,Vert.x Service Proxy就会自动生成相应的服务代理类和服务调用处理类。Vert.x Service Proxy底层借助Event Bus进行通信,调用时将调用消息包装成JSON数据然后通过Event Bus传输到服务端,得到结果后再返回给调用端。Vert.x的一大特性就是异步、响应式编程,因此Vert.x Service Proxy的RPC模型为异步RPC,用起来非常方便。几个异步过程可以通过各种组合子串成一串,妥妥的reactive programming的风格~

更多的关于Vert.x Service Proxy的实现原理的内容可以看这一篇:Vert.x 技术内幕 | 异步RPC实现原理

Java RMI

Java RMI(Remote Method Invocation)是Java里的一种RPC编程接口,类似于服务代理的一种模式。用起来不是很方便。

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/145614
推荐阅读
相关标签
  

闽ICP备14008679号