赞
踩
环境:
Golang: go1.18.2 windows/amd64
grpc: v1.47.0
protobuf: v1.28.0
完整代码:
https://github.com/WanshanTian/GolangLearning
cd GolangLearning/RPC/gRPC-ClientStreaming
前文【Golang | gRPC】HTTP的连接管理——从HTTP/1.0到HTTP/2.0的演进 简单介绍了gRPC中流模式主要分为客户端流、服务端流、双向流以及流传输模式的优点,下面通过一个demo来说明gRPC客户端流的使用
现有下面一种场景:服务端保存着用户的年龄信息,客户端通过stream
多次发送含用户姓名的message
,服务端通过stream
接收message
,一次性返回所有请求用户的年龄和
2.1.1 新建gRPC-ClientStreaming文件夹,使用go mod init
初始化,创建pb文件夹,新建query.proto文件
syntax = "proto3"; package pb; option go_package= ".;pb"; // 定义查询服务包含的方法 service Query { // 客户端流模式 rpc GetAge (stream userInfo) returns (ageInfo){} } // 请求用的结构体,包含一个name字段 message userInfo { string name = 1; } // 响应用的结构体,包含一个age字段 message ageInfo { int32 age = 1; }
服务端实现一个查询(Query
)服务,包含一个方法GetAge
。方法GetAge
的入参前加关键字stream
来表明该方法启用客户端流
2.1.2 在.\gRPC-ClientStreaming\pb目录下使用protoc工具进行编译,在pb文件夹下直接生成.pb.go和_grpc.pb.go文件。关于protoc的详细使用可以查看【Golang | gRPC】使用protoc编译.proto文件
protoc --go_out=./ --go-grpc_out=./ .\query.proto
2.2.1 查看query_grpc.pb.go
中生成的客户端流和服务端流的接口定义以及服务端QueryServer
服务的定义
// 客户端流 type Query_GetAgeClient interface { Send(*UserInfo) error CloseAndRecv() (*AgeInfo, error) grpc.ClientStream } // 服务端流 type Query_GetAgeServer interface { SendAndClose(*AgeInfo) error Recv() (*UserInfo, error) grpc.ServerStream } // Query服务的客户端接口 type QueryClient interface { GetAge(ctx context.Context, opts ...grpc.CallOption) (Query_GetAgeClient, error) } // Query服务的服务端接口 type QueryServer interface { GetAge(Query_GetAgeServer) error mustEmbedUnimplementedQueryServer() }
Send
发送message,使用CloseAndRecv
接收messageGetAge
方法的第一个返回值是Query_GetAgeClient
,表明生成了一条流,用于发送和接收message;如果有多个方法,则每个方法可以各自生成一条流GetAge
方法的入参是Query_GetAgeServer
(流),具体方法需要用户自行实现,可以从流中接收和发送message在gRPC-ClientStreaming目录下新建Server文件夹,新建main.go文件
2.3.1 下面我们通过Query这个结构体具体实现QueryServer接口
// 用户信息 var userinfo = map[string]int32{ "foo": 18, "bar": 20, } type Query struct { pb.UnimplementedQueryServer // 涉及版本兼容 } func (q *Query) GetAge(serverStream pb.Query_GetAgeServer) error { log.Println("start of stream") var names_received []*pb.UserInfo for { userinfoRecv, err := serverStream.Recv() // 待客户端主动关闭流后,退出for循环 if err == io.EOF { log.Println("end of the recv direction of the stream") break } log.Printf("The name of user received is %s\n", userinfoRecv.GetName()) names_received = append(names_received, userinfoRecv) } // 统计年龄和 var ages_sum int32 for _, v := range names_received { ages_sum += userinfo[v.GetName()] } // 返回message log.Printf("send message about the total of ages:%d ", ages_sum) err := serverStream.SendAndClose(&pb.AgeInfo{Age: ages_sum}) if err != nil { log.Panic(err) } log.Println("end of the send direction of the stream") return nil }
Recv
方法会一直阻塞直到从stream中接收到message,或者直到客户端调用CloseAndRecv
方法CloseAndRecv
方法时,服务端调用Recv
方法会得到io.EOF
返回值SendAndClose
方法发送响应message并关闭发送方向的流2.3.2 服务注册并启动
func main() {
// 创建socket监听器
listener, _ := net.Listen("tcp", ":1234")
// new一个gRPC服务器,用来注册服务
grpcserver := grpc.NewServer()
// 注册服务方法
pb.RegisterQueryServer(grpcserver, new(Query))
// 开启gRPC服务
_ = grpcserver.Serve(listener)
}
使用RegisterQueryServer
这个方法向gRPC服务器
里注册QueryServer
在gRPC-ClientStreaming目录下新建Client文件夹,新建main.go文件
2.4.1 先建立无认证的连接,生成Client,然后通过方法GetAge
返回对应的流,最后通过流进行message的收发
func main() { //建立无认证的连接 conn, err := grpc.Dial(":1234", grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Panic(err) } defer conn.Close() client := pb.NewQueryClient(conn) //返回GetAge方法对应的流 queryStream, _ := client.GetAge(context.Background()) // 向stream中发送message _ = queryStream.Send(&pb.UserInfo{Name: "foo"}) time.Sleep(time.Second) _ = queryStream.Send(&pb.UserInfo{Name: "bar"}) time.Sleep(time.Second) // 发送两次数据后主动关闭流并等待接收来自server端的message ages_sum, err := queryStream.CloseAndRecv() if err != nil { log.Println(err) } fmt.Printf("The total of ages of foo and bar is %d", ages_sum.GetAge()) }
CloseAndRecv
方法主动关闭发送方向的流同时等待接收来自服务端的message运行结果如下:
Send
方法多次发送message,通过CloseAndRecv
方法主动关闭发送方向的流同时等待接收来自服务端的messageRecv
方法多次接收message,通过SendAndClose
方法发送响应message并关闭发送方向的流Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。