赞
踩
先看一下如果使用
func main() { //创建一个grpc连接 conn, err := grpc.Dial("localhost:8002", grpc.WithInsecure()) if err != nil { fmt.Println("connect: ", err) return } defer conn.Close() //创建RPC客户端 client := pb.NewGreetsClient(conn) //设置超时时间 _, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() reply, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: "小超", Message: "回来吃饭吗"}) if err != nil { log.Fatalf("couldn not greet: %v", err) return } log.Println(reply.Name, reply.Message) time.Sleep(5 * time.Second) }
这里的使用是客户端,服务的使用见上一篇文章。
这里的就是创建连接,参数是"localhost:8002"和grpc.WithInsecure()。这里为了便于理解,同时这篇文章的重点也并不是ssltls,所以不使用https。
// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
这里首先看一下ClientConn的这个结构体,
// ClientConnInterface defines the functions clients need to perform unary and // streaming RPCs. It is implemented by *ClientConn, and is only intended to // be referenced by generated code. type ClientConnInterface interface { // Invoke performs a unary RPC and returns after the response is received // into reply. Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error // NewStream begins a streaming RPC. NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) } // Assert *ClientConn implements ClientConnInterface. var _ ClientConnInterface = (*ClientConn)(nil) // ClientConn represents a virtual connection to a conceptual endpoint, to // perform RPCs. // // A ClientConn is free to have zero or more actual connections to the endpoint // based on configuration, load, etc. It is also free to determine which actual // endpoints to use and may change it every RPC, permitting client-side load // balancing. // // A ClientConn encapsulates a range of functionality including name // resolution, TCP connection establishment (with retries and backoff) and TLS // handshakes. It also handles errors on established connections by // re-resolving the name and reconnecting. type ClientConn struct { ctx context.Context cancel context.CancelFunc target string parsedTarget resolver.Target authority string dopts dialOptions csMgr *connectivityStateManager balancerBuildOpts balancer.BuildOptions blockingpicker *pickerWrapper safeConfigSelector iresolver.SafeConfigSelector mu sync.RWMutex resolverWrapper *ccResolverWrapper sc *ServiceConfig conns map[*addrConn]struct{} // Keepalive parameter can be updated if a GoAway is received. mkp keepalive.ClientParameters curBalancerName string balancerWrapper *ccBalancerWrapper retryThrottler atomic.Value firstResolveEvent *grpcsync.Event channelzID int64 // channelz unique identification number czData *channelzData lceMu sync.Mutex // protects lastConnectionError lastConnectionError error }
首先这里记录了一个interface,然后ClientConnInterface,然后说明ClientConn实现了,注意这个Invoke是后面真正会使用的。然后就是看一下ClientConn这个方法。这个结构体的成员很多,这里主要看一下这几个结构体
这个成员是根据传入的服务端的地址进行解析,示例代码传入的是location:8002.然后根据这个地址解析解析。结构体如下
// Target represents a target for gRPC, as specified in: // https://github.com/grpc/grpc/blob/master/doc/naming.md. // It is parsed from the target string that gets passed into Dial or DialContext // by the user. And gRPC passes it to the resolver and the balancer. // // If the target follows the naming spec, and the parsed scheme is registered // with gRPC, we will parse the target string according to the spec. If the // target does not contain a scheme or if the parsed scheme is not registered // (i.e. no corresponding resolver available to resolve the endpoint), we will // apply the default scheme, and will attempt to reparse it. // // Examples: // // - "dns://some_authority/foo.bar" // Target{Scheme: "dns", Authority: "some_authority", Endpoint: "foo.bar"} // - "foo.bar" // Target{Scheme: resolver.GetDefaultScheme(), Endpoint: "foo.bar"} // - "unknown_scheme://authority/endpoint" // Target{Scheme: resolver.GetDefaultScheme(), Endpoint: "unknown_scheme://authority/endpoint"} type Target struct { // Deprecated: use URL.Scheme instead. Scheme string // Deprecated: use URL.Host instead. Authority string // Deprecated: use URL.Path or URL.Opaque instead. The latter is set when // the former is empty. Endpoint string // URL contains the parsed dial target with an optional default scheme added // to it if the original dial target contained no scheme or contained an // unregistered scheme. Any query params specified in the original dial // target can be accessed from here. URL url.URL }
从注释中,也可以可以看出成员就是传入的参数解析而来,比如Scheme,Authority等等。当然传入不同的scheme,也会有这不同的负载均衡的策略,当然这个放在后面说。
这个描述的是ClientConn的状态。因为会根据对端连接状态的不同,设置不同的状态。这个最后是balance去进行设置
// connectivityStateManager keeps the connectivity.State of ClientConn.
// This struct will eventually be exported so the balancers can access it.
type connectivityStateManager struct {
mu sync.Mutex
state connectivity.State
notifyChan chan struct{}
channelzID int64
}
然后看一下状态有哪些
const (
// Idle indicates the ClientConn is idle.
Idle State = iota
// Connecting indicates the ClientConn is connecting.
Connecting
// Ready indicates the ClientConn is ready for work.
Ready
// TransientFailure indicates the ClientConn has seen a failure but expects to recover.
TransientFailure
// Shutdown indicates the ClientConn has started shutting down.
Shutdown
)
看一下对应的结构体是
// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
// actions and unblock when there's a picker update.
type pickerWrapper struct {
mu sync.Mutex
done bool
blockingCh chan struct{}
picker balancer.Picker
}
然后看一下对应的Picker的实现
// Picker is used by gRPC to pick a SubConn to send an RPC.
// Balancer is expected to generate a new picker from its snapshot every time its
// internal state has changed.
//
// The pickers used by gRPC can be updated by ClientConn.UpdateState().
type Picker interface {
// Pick returns the connection to use for this RPC and related information.
//
// Pick should not block. If the balancer needs to do I/O or any blocking
// or time-consuming work to service this call, it should return
Pick(info PickInfo) (PickResult, error)
}
然后PickInfo是
// PickInfo contains additional information for the Pick operation.
type PickInfo struct {
// FullMethodName is the method name that NewClientStream() is called
// with. The canonical format is /service/Method.
FullMethodName string
// Ctx is the RPC's context, and may contain relevant RPC-level information
// like the outgoing header metadata.
Ctx context.Context
}
PickResult是
// PickResult contains information related to a connection chosen for an RPC.
type PickResult struct {
// SubConn is the connection to use for this pick, if its state is Ready.
// If the state is not Ready, gRPC will block the RPC until a new Picker is
// provided by the balancer (using ClientConn.UpdateState). The SubConn
// must be one returned by ClientConn.NewSubConn.
SubConn SubConn
// Done is called when the RPC is completed. If the SubConn is not ready,
// this will be called with a nil parameter. If the SubConn is not a valid
// type, Done may not be called. May be nil if the balancer does not wish
// to be notified when the RPC completes.
Done func(DoneInfo)
}
然后这个SubConn的这个是一个interface。实现如下
// A SubConn represents a single connection to a gRPC backend service. // // Each SubConn contains a list of addresses. // // All SubConns start in IDLE, and will not try to connect. To trigger the // connecting, Balancers must call Connect. If a connection re-enters IDLE, // Balancers must call Connect again to trigger a new connection attempt. // // gRPC will try to connect to the addresses in sequence, and stop trying the // remainder once the first connection is successful. If an attempt to connect // to all addresses encounters an error, the SubConn will enter // TRANSIENT_FAILURE for a backoff period, and then transition to IDLE. // // Once established, if a connection is lost, the SubConn will transition // directly to IDLE. // // This interface is to be implemented by gRPC. Users should not need their own // implementation of this interface. For situations like testing, any // implementations should embed this interface. This allows gRPC to add new // methods to this interface. type SubConn interface { // UpdateAddresses updates the addresses used in this SubConn. // gRPC checks if currently-connected address is still in the new list. // If it's in the list, the connection will be kept. // If it's not in the list, the connection will gracefully closed, and // a new connection will be created. // // This will trigger a state transition for the SubConn. // // Deprecated: This method is now part of the ClientConn interface and will // eventually be removed from here. UpdateAddresses([]resolver.Address) // Connect starts the connecting for this SubConn. Connect() }
从注释里看就是,Balancer 生成picker,然后Balancer调用connect去生成SubConn这个interface。当然我们这里先进行文字描述,后面会分析具体的实现
// ccResolverWrapper is a wrapper on top of cc for resolvers.
// It implements resolver.ClientConn interface.
type ccResolverWrapper struct {
cc *ClientConn
resolverMu sync.Mutex
resolver resolver.Resolver
done *grpcsync.Event
curState resolver.State
incomingMu sync.Mutex // Synchronizes all the incoming calls.
}
这里主要是看一下resolver.Resolver这个结构体和resolver.State这个结构体。
// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
// ResolveNow will be called by gRPC to try to resolve the target name
// again. It's just a hint, resolver can ignore this if it's not necessary.
//
// It could be called multiple times concurrently.
ResolveNow(ResolveNowOptions)
// Close closes the resolver.
Close()
}
// State contains the current Resolver state relevant to the ClientConn.
type State struct {
// Addresses is the latest set of resolved addresses for the target.
Addresses []Address
// ServiceConfig contains the result from parsing the latest service
// config. If it is nil, it indicates no service config is present or the
// resolver does not provide service configs.
ServiceConfig *serviceconfig.ParseResult
// Attributes contains arbitrary data about the resolver intended for
// consumption by the load balancing policy.
Attributes *attributes.Attributes
}
还是看一下这个具体的结构体的实现
// ccBalancerWrapper is a wrapper on top of cc for balancers.
// It implements balancer.ClientConn interface.
type ccBalancerWrapper struct {
cc *ClientConn
balancerMu sync.Mutex // synchronizes calls to the balancer
balancer balancer.Balancer
hasExitIdle bool
updateCh *buffer.Unbounded
closed *grpcsync.Event
done *grpcsync.Event
mu sync.Mutex
subConns map[*acBalancerWrapper]struct{}
}
这里主要是看一下balancer.Balancer和acBalancerWrapper。
balancer.Balancer的实现是
// Balancer takes input from gRPC, manages SubConns, and collects and aggregates // the connectivity states. // // It also generates and updates the Picker used by gRPC to pick SubConns for RPCs. // // UpdateClientConnState, ResolverError, UpdateSubConnState, and Close are // guaranteed to be called synchronously from the same goroutine. There's no // guarantee on picker.Pick, it may be called anytime. type Balancer interface { // UpdateClientConnState is called by gRPC when the state of the ClientConn // changes. If the error returned is ErrBadResolverState, the ClientConn // will begin calling ResolveNow on the active name resolver with // exponential backoff until a subsequent call to UpdateClientConnState // returns a nil error. Any other errors are currently ignored. UpdateClientConnState(ClientConnState) error // ResolverError is called by gRPC when the name resolver reports an error. ResolverError(error) // UpdateSubConnState is called by gRPC when the state of a SubConn // changes. UpdateSubConnState(SubConn, SubConnState) // Close closes the balancer. The balancer is not required to call // ClientConn.RemoveSubConn for its existing SubConns. Close() }
然后从描述上面来看,balance是承上启下,一方面接收参数,然后生成Picker,SubConn,ClientConnState这些。这个后面结合具体的实现去说。
然后就是acBalancerWrapper。这个其实就是addr这个进行更新等操作,然后是一个结构体
// acBalancerWrapper is a wrapper on top of ac for balancers.
// It implements balancer.SubConn interface.
type acBalancerWrapper struct {
mu sync.Mutex
ac *addrConn
}
addrConn 就是实际产生的链接。这个在下面进行描述
这个主要是看addrConn这个结构体。
// addrConn is a network connection to a given address. type addrConn struct { ctx context.Context cancel context.CancelFunc cc *ClientConn dopts dialOptions acbw balancer.SubConn scopts balancer.NewSubConnOptions // transport is set when there's a viable transport (note: ac state may not be READY as LB channel // health checking may require server to report healthy to set ac to READY), and is reset // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway // is received, transport is closed, ac has been torn down). transport transport.ClientTransport // The current transport. mu sync.Mutex curAddr resolver.Address // The current address. addrs []resolver.Address // All addresses that the resolver resolved to. // Use updateConnectivityState for updating addrConn's connectivity state. state connectivity.State backoffIdx int // Needs to be stateful for resetConnectBackoff. resetBackoff chan struct{} channelzID int64 // channelz unique identification number. czData *channelzData }
这里 balancer.SubConn 在上面已经说够。然后接下来主要就是transport.ClientTransport 这个interface。看一下实现是
// ClientTransport is the common interface for all gRPC client-side transport // implementations. type ClientTransport interface { // Close tears down this transport. Once it returns, the transport // should not be accessed any more. The caller must make sure this // is called only once. Close(err error) // GracefulClose starts to tear down the transport: the transport will stop // accepting new RPCs and NewStream will return error. Once all streams are // finished, the transport will close. // // It does not block. GracefulClose() // Write sends the data for the given stream. A nil stream indicates // the write is to be performed on the transport as a whole. Write(s *Stream, hdr []byte, data []byte, opts *Options) error // NewStream creates a Stream for an RPC. NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) // CloseStream clears the footprint of a stream when the stream is // not needed any more. The err indicates the error incurred when // CloseStream is called. Must be called when a stream is finished // unless the associated transport is closing. CloseStream(stream *Stream, err error) // Error returns a channel that is closed when some I/O error // happens. Typically the caller should have a goroutine to monitor // this in order to take action (e.g., close the current transport // and create a new one) in error case. It should not return nil // once the transport is initiated. Error() <-chan struct{} // GoAway returns a channel that is closed when ClientTransport // receives the draining signal from the server (e.g., GOAWAY frame in // HTTP/2). GoAway() <-chan struct{} // GetGoAwayReason returns the reason why GoAway frame was received, along // with a human readable string with debug info. GetGoAwayReason() (GoAwayReason, string) // RemoteAddr returns the remote network address. RemoteAddr() net.Addr // IncrMsgSent increments the number of message sent through this transport. IncrMsgSent() // IncrMsgRecv increments the number of message received through this transport. IncrMsgRecv() }
然后这个是一个interface,然后这个在真正实例化的再进行具体说明。
以上就是ClientConn最重要的几个成员。接下来看一下实例化也就是DialContext这个方法。是如何进行的初始化。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。