当前位置:   article > 正文

gRPC 源码详解(一)配置化的结构体

grpc defaultserviceconfig

grpc 源码结构详解

DialOptions

DialOptions 是最重要的一环,负责配置每一次 rpc 请求的时候的一应选择。


结构

先来看看这个的结构

链接

  1. // dialOptions configure a Dial call. dialOptions are set by the DialOption
  2. // values passed to Dial.
  3. type dialOptions struct {
  4. unaryInt UnaryClientInterceptor
  5. streamInt StreamClientInterceptor
  6. chainUnaryInts []UnaryClientInterceptor
  7. chainStreamInts []StreamClientInterceptor
  8. cp Compressor
  9. dc Decompressor
  10. bs backoff.Strategy
  11. block bool
  12. insecure bool
  13. timeout time.Duration
  14. scChan <-chan ServiceConfig
  15. authority string
  16. copts transport.ConnectOptions
  17. callOptions []CallOption
  18. // This is used by v1 balancer dial option WithBalancer to support v1
  19. // balancer, and also by WithBalancerName dial option.
  20. balancerBuilder balancer.Builder
  21. // This is to support grpclb.
  22. resolverBuilder resolver.Builder
  23. channelzParentID int64
  24. disableServiceConfig bool
  25. disableRetry bool
  26. disableHealthCheck bool
  27. healthCheckFunc internal.HealthChecker
  28. minConnectTimeout func() time.Duration
  29. defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON.
  30. defaultServiceConfigRawJSON *string
  31. }

由于命名非常规范,加上注释很容易看懂每一个 field 配置的哪一条属性。如果掠过看的 大概有 压缩解压器,超时阻塞设置,认证安全转发,负载均衡,服务持久化的信息存储 ,配置,心跳检测等。


其一应函数方法都是设置 其中字段的。


如何设置

这里是 grpc 设计较好的地方,通过函数设置,同时设有生成函数的函数。什么意思呢?首先结合图来理解,这也是整个 grpc 设置的精华部分 


grpc-setOperation.svg

这里的意思是 , DialOptions 是一个导出接口,实现函数是 apply 同时接受参数 dialOptions 来修改它。

而实际上,是使用 newFuncDialOption 函数包装一个 修改 dialOptions 的方法给 funcDialOption 结构体,在实际 Dial 调用的时候 是使用闭包 调用 funcDialOption 结构体的 apply 方法。

可以在这里看一下 Dial 方法的源码(Dial 调用的是 DialContext

起作用的就是 opt.apply()

  1. func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
  2. cc := &ClientConn{
  3. target: target,
  4. csMgr: &connectivityStateManager{},
  5. conns: make(map[*addrConn]struct{}),
  6. dopts: defaultDialOptions(),
  7. blockingpicker: newPickerWrapper(),
  8. czData: new(channelzData),
  9. firstResolveEvent: grpcsync.NewEvent(),
  10. }
  11. ···
  12. for _, opt := range opts {
  13. opt.apply(&cc.dopts)
  14. }
  15. ···
  16. }


这里的 options 可以说是 client 发起 rpc 请求的核心中转站。

另一个重要的接口,同时也集中在 dialOptions 结构体中初始化处理的是 

callOptions []CallOption 


CallOption

CallOption 是一个接口,定义在 rpc_util 包内


结构
  1. // CallOption configures a Call before it starts or extracts information from
  2. // a Call after it completes.
  3. type CallOption interface {
  4. // before is called before the call is sent to any server. If before
  5. // returns a non-nil error, the RPC fails with that error.
  6. before(*callInfo) error
  7. // after is called after the call has completed. after cannot return an
  8. // error, so any failures should be reported via output parameters.
  9. after(*callInfo)
  10. }


操作的是 callInfo 结构里的数据,其被包含在 dialOptions  结构体中,

即每一次 dial 的时候进行调用。


callInfo

同时它自身定义很有意思,操作的是 callInfo  结构体

  1. // callInfo contains all related configuration and information about an RPC.
  2. type callInfo struct {
  3. compressorType string
  4. failFast bool
  5. stream ClientStream
  6. maxReceiveMessageSize *int
  7. maxSendMessageSize *int
  8. creds credentials.PerRPCCredentials
  9. contentSubtype string
  10. codec baseCodec
  11. maxRetryRPCBufferSize int
  12. }

可以看到 callInfo 中字段用来表示 单次调用中独有的自定义选项如 压缩,流控,认证,编解码器等。


一个实现

简单看一个 CallOption 接口的实现

  1. // Header returns a CallOptions that retrieves the header metadata
  2. // for a unary RPC.
  3. func Header(md *metadata.MD) CallOption {
  4. return HeaderCallOption{HeaderAddr: md}
  5. }
  6. // HeaderCallOption is a CallOption for collecting response header metadata.
  7. // The metadata field will be populated *after* the RPC completes.
  8. // This is an EXPERIMENTAL API.
  9. type HeaderCallOption struct {
  10. HeaderAddr *metadata.MD
  11. }
  12. func (o HeaderCallOption) before(c *callInfo) error { return nil }
  13. func (o HeaderCallOption) after(c *callInfo) {
  14. if c.stream != nil {
  15. *o.HeaderAddr, _ = c.stream.Header()
  16. }
  17. }

重点看到,实际操作是在 before 和 after 方法中执行,它们会在 Client 发起请求的时候自动执行,顾名思义,一个在调用前执行,一个在调用后执行。


实现注意

这里可以看出,这里也是通过函数返回一个拥有这两个方法的结构体,注意这一个设计,可以作为你自己的 Option 设计的时候的参考。


两种方法

有两种方法让 Client 接受你的 CallOption 设置

  1. 在 Client 使用方法的时候直接作为 参数传递,将刚才所说的函数-返回一个实现了 CallOption 接口的结构体。
  2. 在 生成 Client 的时候就传递设置。具体如下
  1. 通过 dialOptions.go 中的 函数 grpc.WithDefaultCallOptions() 
  2. 这个函数会将 CallOption 设置到 dialOptions 中的字段 []CallOption 中。
  1. // WithDefaultCallOptions returns a DialOption which sets the default
  2. // CallOptions for calls over the connection.
  3. func WithDefaultCallOptions(cos ...CallOption) DialOption {
  4. return newFuncDialOption(func(o *dialOptions) {
  5. o.callOptions = append(o.callOptions, cos...)
  6. })
  7. }


有没有感觉有点不好理解?给你们一个实例

  1. 使用的第一种方法
response, err := myclient.MyCall(ctx, request, grpc.CallContentSubtype("mycodec"))
  1. 使用第二种方法
myclient := grpc.Dial(ctx, target, grpc.WithDefaultCallOptions(grpc.CallContentSubtype("mycodec")))

这里假设 我们设置了一个 mycodec 的译码器。马上下面解释它的设计。


值得注意的是, 我好像只提到了在 Client 调用时设置,callOption  只在客户端设置的情况是不是让大家感到困惑。

实际上 gRPC server 端会自动检测 callOption 的设置,并检测自己是否支持此项选择,如果不支持则会返回失败。也就是说,在 Server 端注册的所有 Codec 译码器之后,Client 直接使用相应的设置就好了。


Codec

在 gRPC 中 Codec 有两个接口定义,一个是 baseCodec 包含正常的 Marshal 和 Unmarshal 方法,另一个是拥有名字的 Codec 定义在 encoding 包内,这是由于在注册 registry 的时候会使用到这个方法。


接口
  1. type Codec interface {
  2. // Marshal returns the wire format of v.
  3. Marshal(v interface{}) ([]byte, error)
  4. // Unmarshal parses the wire format into v.
  5. Unmarshal(data []byte, v interface{}) error
  6. // String returns the name of the Codec implementation. This is unused by
  7. // gRPC.
  8. String() string
  9. }

就是这个方法

  1. // RegisterCodec registers the provided Codec for use with all gRPC clients and
  2. // servers.
  3. //
  4. // The Codec will be stored and looked up by result of its Name() method, which
  5. // should match the content-subtype of the encoding handled by the Codec. This
  6. // is case-insensitive, and is stored and looked up as lowercase. If the
  7. // result of calling Name() is an empty string, RegisterCodec will panic. See
  8. // Content-Type on
  9. // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
  10. // more details.
  11. //
  12. // NOTE: this function must only be called during initialization time (i.e. in
  13. // an init() function), and is not thread-safe. If multiple Compressors are
  14. // registered with the same name, the one registered last will take effect.
  15. func RegisterCodec(codec Codec) {
  16. if codec == nil {
  17. panic("cannot register a nil Codec")
  18. }
  19. if codec.Name() == "" {
  20. panic("cannot register Codec with empty string result for Name()")
  21. }
  22. contentSubtype := strings.ToLower(codec.Name())
  23. registeredCodecs[contentSubtype] = codec
  24. }


Compressor

同时 encoding 包中还定义了 Compressor 接口,参照 Codec 理解即可。

  1. // Compressor is used for compressing and decompressing when sending or
  2. // receiving messages.
  3. type Compressor interface {
  4. // Compress writes the data written to wc to w after compressing it. If an
  5. // error occurs while initializing the compressor, that error is returned
  6. // instead.
  7. Compress(w io.Writer) (io.WriteCloser, error)
  8. // Decompress reads data from r, decompresses it, and provides the
  9. // uncompressed data via the returned io.Reader. If an error occurs while
  10. // initializing the decompressor, that error is returned instead.
  11. Decompress(r io.Reader) (io.Reader, error)
  12. // Name is the name of the compression codec and is used to set the content
  13. // coding header. The result must be static; the result cannot change
  14. // between calls.
  15. Name() string
  16. }


MetaData 

这个包对应 context 中的 Value field 也就是 key-value 形式的存储


在其他包中简写是 MD


结构
type MD map[string][]string


函数

实现了完善的存储功能,从单一读写到批量(采用 pair 模式,...string 作为参数,len(string)%2==1 时会报错,由于会有孤立的没有配对的元信息。


另外几个函数是实现了从 context 中的读取和写入(这里的写入是 使用 context.WithValue 方法,即生成 parent context 的 copy。


注意⚠️
  • 值得注意的是,在 MetaData 结构体中, value 的结构是 []string 。
  • 同时 key 不可以以 "grpc-" 开头,这是因为在 grpc 的 internal 包中已经保留了。
  • 更为重要的是 在 context 中的读取方式,其实是 MetaData 结构对应的是 context Value 中的 value 值,而 key 值设为 一个空结构体同时区分输入输入
  • type mdIncomingKey struct{}
  • type mdOutgoingKey struct{}



转载于:https://my.oschina.net/u/4113130/blog/3079256

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/252823
推荐阅读
相关标签
  

闽ICP备14008679号