赞
踩
近期比较火的开源项目go-zero是一个集成了各种工程实践的包含了 Web 和 RPC 协议的功能完善的微服务框架,今天我们就一起来分析一下其中的 RPC 部分zRPC。
zRPC 底层依赖 gRPC,内置了服务注册、负载均衡、拦截器等模块,其中还包括自适应降载,自适应熔断,限流等微服务治理方案,是一个简单易用的可直接用于生产的企业级 RPC 框架。
zRPC 支持直连和基于 etcd 服务发现两种方式,我们以基于 etcd 做服务发现为例演示 zRPC 的基本使用:
创建 hello.yaml 配置文件,配置如下:
- Name: hello.rpc // 服务名
- ListenOn: 127.0.0.1:9090 // 服务监听地址
- Etcd:
- Hosts:
- - 127.0.0.1:2379 // etcd服务地址
- Key: hello.rpc // 服务注册key
创建 hello.proto 文件,并生成对应的 go 代码
- syntax = "proto3";
-
- package pb;
-
- service Greeter {
- rpc SayHello (HelloRequest) returns (HelloReply) {}
- }
-
- message HelloRequest {
- string name = 1;
- }
-
- message HelloReply {
- string message = 1;
- }
生成 go 代码
protoc --go_out=plugins=grpc:. hello.proto
- package main
-
- import (
- "context"
- "flag"
- "log"
-
- "example/zrpc/pb"
-
- "github.com/tal-tech/go-zero/core/conf"
- "github.com/tal-tech/go-zero/zrpc"
- "google.golang.org/grpc"
- )
-
- type Config struct {
- zrpc.RpcServerConf
- }
-
- var cfgFile = flag.String("f", "./hello.yaml", "cfg file")
-
- func main() {
- flag.Parse()
-
- var cfg Config
- conf.MustLoad(*cfgFile, &cfg)
-
- srv, err := zrpc.NewServer(cfg.RpcServerConf, func(s *grpc.Server) {
- pb.RegisterGreeterServer(s, &Hello{})
- })
- if err != nil {
- log.Fatal(err)
- }
- srv.Start()
- }
-
- type Hello struct{}
-
- func (h *Hello) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
- return &pb.HelloReply{Message: "hello " + in.Name}, nil
- }
- package main
-
- import (
- "context"
- "log"
-
- "example/zrpc/pb"
-
- "github.com/tal-tech/go-zero/core/discov"
- "github.com/tal-tech/go-zero/zrpc"
- )
-
- func main() {
- client := zrpc.MustNewClient(zrpc.RpcClientConf{
- Etcd: discov.EtcdConf{
- Hosts: []string{"127.0.0.1:2379"},
- Key: "hello.rpc",
- },
- })
-
- conn := client.Conn()
- hello := pb.NewGreeterClient(conn)
- reply, err := hello.SayHello(context.Background(), &pb.HelloRequest{Name: "go-zero"})
- if err != nil {
- log.Fatal(err)
- }
- log.Println(reply.Message)
- }
启动服务,查看服务是否注册:
ETCDCTL_API=3 etcdctl get hello.rpc --prefix
显示服务已经注册:
- hello.rpc/7587849401504590084
- 127.0.0.1:9090
运行客户端即可看到输出:
hello go-zero
这个例子演示了 zRPC 的基本使用,可以看到通过 zRPC 构建 RPC 服务非常简单,只需要很少的几行代码,接下来我们继续进行探索
下图展示 zRPC 的架构图和主要组成部分
zRPC 主要有以下几个模块组成:
discov: 服务发现模块,基于 etcd 实现服务发现功能
resolver: 服务注册模块,实现了 gRPC 的 resolver.Builder 接口并注册到 gRPC
interceptor: 拦截器,对请求和响应进行拦截处理
balancer: 负载均衡模块,实现了 p2c 负载均衡算法,并注册到 gRPC
client: zRPC 客户端,负责发起请求
server: zRPC 服务端,负责处理请求
这里介绍了 zRPC 的主要组成模块和每个模块的主要功能,其中 resolver 和 balancer 模块实现了 gRPC 开放的接口,实现了自定义的 resolver 和 balancer,拦截器模块是整个 zRPC 的功能重点,自适应降载、自适应熔断、prometheus 服务指标收集等功能都在这里实现
gRPC 提供了拦截器功能,主要是对请求前后进行额外处理的拦截操作,其中拦截器包含客户端拦截器和服务端拦截器,又分为一元 (Unary) 拦截器和流 (Stream) 拦截器,这里我们主要讲解一元拦截器,流拦截器同理。
客户端拦截器定义如下:
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
其中 method 为方法名,req,reply 分别为请求和响应参数,cc 为客户端连接对象,invoker 参数是真正执行 rpc 方法的 handler 其实在拦截器中被调用执行
服务端拦截器定义如下:
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
其中 req 为请求参数,info 中包含了请求方法属性,handler 为对 server 端方法的包装,也是在拦截器中被调用执行
zRPC 中内置了丰富的拦截器,其中包括自适应降载、自适应熔断、权限验证、prometheus 指标收集等等,由于拦截器较多,篇幅有限没法所有的拦截器给大家一一解析,这里我们主要分析两个,自适应熔断和 prometheus 服务监控指标收集:
当客户端向服务端发起请求,客户端会记录服务端返回的错误,当错误达到一定的比例,客户端会自行的进行熔断处理,丢弃掉一定比例的请求以保护下游依赖,且可以自动恢复。zRPC 中自适应熔断遵循《Google SRE》中过载保护策略,算法如下:
requests: 总请求数量
accepts: 正常请求数量
K: 倍值 (Google SRE 推荐值为 2)
可以通过修改 K 的值来修改熔断发生的激进程度,降低 K 的值会使得自适应熔断算法更加激进,增加 K 的值则自适应熔断算法变得不再那么激进
熔断拦截器定义如下:
- func BreakerInterceptor(ctx context.Context, method string, req, reply interface{},
- cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
- // target + 方法名
- breakerName := path.Join(cc.Target(), method)
- return breaker.DoWithAcceptable(breakerName, func() error {
- // 真正执行调用
- return invoker(ctx, method, req, reply, cc, opts...)
- }, codes.Acceptable)
- }
accept 方法实现了 Google SRE 过载保护算法,判断否进行熔断
- func (b *googleBreaker) accept() error {
- // accepts为正常请求数,total为总请求数
- accepts, total := b.history()
- weightedAccepts := b.k * float64(accepts)
- // 算法实现
- dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
- if dropRatio <= 0 {
- return nil
- }
- // 是否超过比例
- if b.proba.TrueOnProba(dropRatio) {
- return ErrServiceUnavailable
- }
-
- return nil
- }
doReq 方法首先判断是否熔断,满足条件直接返回 error(circuit breaker is open),不满足条件则对请求数进行累加
- func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
- if err := b.accept(); err != nil {
- if fallback != nil {
- return fallback(err)
- } else {
- return err
- }
- }
-
- defer func() {
- if e := recover(); e != nil {
- b.markFailure()
- panic(e)
- }
- }()
-
- // 此处执行RPC请求
- err := req()
- // 正常请求total和accepts都会加1
- if acceptable(err) {
- b.markSuccess()
- } else {
- // 请求失败只有total会加1
- b.markFailure()
- }
-
- return err
- }
服务监控是了解服务当前运行状态以及变化趋势的重要手段,监控依赖于服务指标的收集,通过 prometheus 进行监控指标的收集是业界主流方案,zRPC 中也采用了 prometheus 来进行指标的收集
prometheus 拦截器定义如下:
这个拦截器主要是对服务的监控指标进行收集,这里主要是对 RPC 方法的耗时和调用错误进行收集,这里主要使用了 Prometheus 的 Histogram 和 Counter 数据类型
- func UnaryPrometheusInterceptor() grpc.UnaryServerInterceptor {
- return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (
- interface{}, error) {
- // 执行前记录一个时间
- startTime := timex.Now()
- resp, err := handler(ctx, req)
- // 执行后通过Since算出执行该调用的耗时
- metricServerReqDur.Observe(int64(timex.Since(startTime)/time.Millisecond), info.FullMethod)
- // 方法对应的错误码
- metricServerReqCodeTotal.Inc(info.FullMethod, strconv.Itoa(int(status.Code(err))))
- return resp, err
- }
- }
除了内置了丰富的拦截器之外,zRPC 同时支持添加自定义拦截器
Client 端通过 AddInterceptor 方法添加一元拦截器:
- func (rc *RpcClient) AddInterceptor(interceptor grpc.UnaryClientInterceptor) {
- rc.client.AddInterceptor(interceptor)
- }
Server 端通过 AddUnaryInterceptors 方法添加一元拦截器:
- func (rs *RpcServer) AddUnaryInterceptors(interceptors ...grpc.UnaryServerInterceptor) {
- rs.server.AddUnaryInterceptors(interceptors...)
- }
zRPC 服务注册架构图:
zRPC 中自定义了 resolver 模块,用来实现服务的注册功能。zRPC 底层依赖 gRPC,在 gRPC 中要想自定义 resolver 需要实现 resolver.Builder 接口:
- type Builder interface {
- Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
- Scheme() string
- }
其中 Build 方法返回 Resolver,Resolver 定义如下:
- type Resolver interface {
- ResolveNow(ResolveNowOptions)
- Close()
- }
在 zRPC 中定义了两种 resolver,direct 和 discov,这里我们主要分析基于 etcd 做服务发现的 discov,自定义的 resolver 需要通过 gRPC 提供了 Register 方法进行注册代码如下:
- func RegisterResolver() {
- resolver.Register(&dirBuilder)
- resolver.Register(&disBuilder)
- }
当我们启动我们的 zRPC Server 的时候,调用 Start 方法,会像 etcd 中注册对应的服务地址:
- func (ags keepAliveServer) Start(fn RegisterFn) error {
- // 注册服务地址
- if err := ags.registerEtcd(); err != nil {
- return err
- }
- // 启动服务
- return ags.Server.Start(fn)
- }
当我们启动 zRPC 客户端的时候,在 gRPC 内部会调用我们自定义 resolver 的 Build 方法,zRPC 通过在 Build 方法内调用执行了 resolver.ClientConn 的 UpdateState 方法,该方法会把服务地址注册到 gRPC 客户端内部:
- func (d *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
- resolver.Resolver, error) {
- hosts := strings.FieldsFunc(target.Authority, func(r rune) bool {
- return r == EndpointSepChar
- })
- // 服务发现
- sub, err := discov.NewSubscriber(hosts, target.Endpoint)
- if err != nil {
- return nil, err
- }
-
- update := func() {
- var addrs []resolver.Address
- for _, val := range subset(sub.Values(), subsetSize) {
- addrs = append(addrs, resolver.Address{
- Addr: val,
- })
- }
- // 向gRPC注册服务地址
- cc.UpdateState(resolver.State{
- Addresses: addrs,
- })
- }
- // 监听
- sub.AddListener(update)
- update()
- // 返回自定义的resolver.Resolver
- return &nopResolver{cc: cc}, nil
- }
在 discov 中,通过调用 load 方法从 etcd 中获取指定服务的所有地址:
- func (c *cluster) load(cli EtcdClient, key string) {
- var resp *clientv3.GetResponse
- for {
- var err error
- ctx, cancel := context.WithTimeout(c.context(cli), RequestTimeout)
- // 从etcd中获取指定服务的所有地址
- resp, err = cli.Get(ctx, makeKeyPrefix(key), clientv3.WithPrefix())
- cancel()
- if err == nil {
- break
- }
-
- logx.Error(err)
- time.Sleep(coolDownInterval)
- }
-
- var kvs []KV
- c.lock.Lock()
- for _, ev := range resp.Kvs {
- kvs = append(kvs, KV{
- Key: string(ev.Key),
- Val: string(ev.Value),
- })
- }
- c.lock.Unlock()
-
- c.handleChanges(key, kvs)
- }
并通过 watch 监听服务地址的变化:
- func (c *cluster) watch(cli EtcdClient, key string) {
- rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix())
- for {
- select {
- case wresp, ok := <-rch:
- if !ok {
- logx.Error("etcd monitor chan has been closed")
- return
- }
- if wresp.Canceled {
- logx.Error("etcd monitor chan has been canceled")
- return
- }
- if wresp.Err() != nil {
- logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err()))
- return
- }
- // 监听变化通知更新
- c.handleWatchEvents(key, wresp.Events)
- case <-c.done:
- return
- }
- }
- }
这部分主要介绍了 zRPC 中是如何自定义的 resolver,以及基于 etcd 的服务发现原理,通过这部分的介绍大家可以了解到 zRPC 内部服务注册发现的原理,源代码比较多只是粗略的从整个流程上进行了分析,如果大家对 zRPC 的源码比较感兴趣可以自行进行学习
负载均衡原理图:
避免过载是负载均衡策略的一个重要指标,好的负载均衡算法能很好的平衡服务端资源。常用的负载均衡算法有轮训、随机、Hash、加权轮训等。但为了应对各种复杂的场景,简单的负载均衡算法往往表现的不够好,比如轮训算法当服务响应时间变长就很容易导致负载不再平衡, 因此 zRPC 中自定义了默认负载均衡算法 P2C(Power of Two Choices),和 resolver 类似,要想自定义 balancer 也需要实现 gRPC 定义的 balancer.Builder 接口,由于和 resolver 类似这里不再带大家一起分析如何自定义 balancer,感兴趣的朋友可以查看 gRPC 相关的文档来进行学习
注意,zRPC 是在客户端进行负载均衡,常见的还有通过 nginx 中间代理的方式
zRPC 框架中默认的负载均衡算法为 P2C,该算法的主要思想是:
从可用节点列表中做两次随机选择操作,得到节点 A、B
比较 A、B 两个节点,选出负载最低的节点作为被选中的节点
伪代码如下:
主要算法逻辑在 Pick 方法中实现:
- func (p *p2cPicker) Pick(ctx context.Context, info balancer.PickInfo) (
- conn balancer.SubConn, done func(balancer.DoneInfo), err error) {
- p.lock.Lock()
- defer p.lock.Unlock()
-
- var chosen *subConn
- switch len(p.conns) {
- case 0:
- return nil, nil, balancer.ErrNoSubConnAvailable
- case 1:
- chosen = p.choose(p.conns[0], nil)
- case 2:
- chosen = p.choose(p.conns[0], p.conns[1])
- default:
- var node1, node2 *subConn
- for i := 0; i < pickTimes; i++ {
- // 随机数
- a := p.r.Intn(len(p.conns))
- b := p.r.Intn(len(p.conns) - 1)
- if b >= a {
- b++
- }
- // 随机获取所有节点中的两个节点
- node1 = p.conns[a]
- node2 = p.conns[b]
- // 效验节点是否健康
- if node1.healthy() && node2.healthy() {
- break
- }
- }
- // 选择其中一个节点
- chosen = p.choose(node1, node2)
- }
-
- atomic.AddInt64(&chosen.inflight, 1)
- atomic.AddInt64(&chosen.requests, 1)
- return chosen.conn, p.buildDoneFunc(chosen), nil
- }
choose 方法对随机选择出来的节点进行负载比较从而最终确定选择哪个节点
- func (p *p2cPicker) choose(c1, c2 *subConn) *subConn {
- start := int64(timex.Now())
- if c2 == nil {
- atomic.StoreInt64(&c1.pick, start)
- return c1
- }
-
- if c1.load() > c2.load() {
- c1, c2 = c2, c1
- }
-
- pick := atomic.LoadInt64(&c2.pick)
- if start-pick > forcePick && atomic.CompareAndSwapInt64(&c2.pick, pick, start) {
- return c2
- } else {
- atomic.StoreInt64(&c1.pick, start)
- return c1
- }
- }
上面主要介绍了 zRPC 默认负载均衡算法的设计思想和代码实现,那自定义的 balancer 是如何注册到 gRPC 的呢,resolver 提供了 Register 方法来进行注册,同样 balancer 也提供了 Register 方法来进行注册:
- func init() {
- balancer.Register(newBuilder())
- }
-
- func newBuilder() balancer.Builder {
- return base.NewBalancerBuilder(Name, new(p2cPickerBuilder))
- }
注册 balancer 之后 gRPC 怎么知道使用哪个 balancer 呢?这里我们需要使用配置项进行配置,在 NewClient 的时候通过 grpc.WithBalancerName 方法进行配置:
- func NewClient(target string, opts ...ClientOption) (*client, error) {
- var cli client
- opts = append(opts, WithDialOption(grpc.WithBalancerName(p2c.Name)))
- if err := cli.dial(target, opts...); err != nil {
- return nil, err
- }
-
- return &cli, nil
- }
这部分主要介绍了 zRPC 中内中的负载均衡算法的实现原理以及具体的实现方式,之后介绍了 zRPC 是如何注册自定义的 balancer 以及如何选择自定义的 balancer,通过这部分大家应该对负载均衡有了更进一步的认识
首先,介绍了 zRPC 的基本使用方法,可以看到 zRPC 使用非常简单,只需要少数几行代码就可以构建高性能和自带服务治理能力的 RPC 服务,当然这里没有面面俱到的介绍 zRPC 的基本使用,大家可以查看相关文档进行学习
接着,介绍了 zRPC 的几个重要组成模块以及其实现原理,并分析了部分源码。拦截器模块是整个 zRPC 的重点,其中内置了丰富的功能,像熔断、监控、降载等等也是构建高可用微服务必不可少的。resolver 和 balancer 模块自定义了 gRPC 的 resolver 和 balancer,通过该部分可以了解到整个服务注册与发现的原理以及如何构建自己的服务发现系统,同时自定义负载均衡算法也变得不再神秘
最后,zRPC 是一个经历过各种工程实践的 RPC 框架,不论是想要用于生产还是学习其中的设计模式都是一个不可多得的开源项目。希望通过这篇文章的介绍大家能够进一步了解 zRPC
https://github.com/tal-tech/go-zero
https://github.com/tal-tech/go-zero/tree/master/zrpc
https://www.yuque.com/tal-tech/go-zero/rhakzy
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。