当前位置:   article > 正文

【Golang | gRPC】gRPC-Client Streaming客户端流实战_go grpc 流

go grpc 流

环境:
Golang: go1.18.2 windows/amd64
grpc: v1.47.0
protobuf: v1.28.0

完整代码:
https://github.com/WanshanTian/GolangLearning
cd GolangLearning/RPC/gRPC-ClientStreaming

1. 简介

前文【Golang | gRPC】HTTP的连接管理——从HTTP/1.0到HTTP/2.0的演进 简单介绍了gRPC中流模式主要分为客户端流、服务端流、双向流以及流传输模式的优点,下面通过一个demo来说明gRPC客户端流的使用

2. 实践

现有下面一种场景:服务端保存着用户的年龄信息,客户端通过stream多次发送含用户姓名的message,服务端通过stream接收message,一次性返回所有请求用户的年龄和

2.1 proto文件

2.1.1 新建gRPC-ClientStreaming文件夹,使用go mod init初始化,创建pb文件夹,新建query.proto文件

syntax = "proto3";
package pb;
option go_package= ".;pb";

// 定义查询服务包含的方法
service Query {
  // 客户端流模式
  rpc GetAge (stream userInfo) returns (ageInfo){}
}

// 请求用的结构体,包含一个name字段
message userInfo {
  string name = 1;
}

// 响应用的结构体,包含一个age字段
message ageInfo {
  int32 age = 1;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

服务端实现一个查询(Query)服务,包含一个方法GetAge。方法GetAge的入参前加关键字stream来表明该方法启用客户端流

2.1.2 在.\gRPC-ClientStreaming\pb目录下使用protoc工具进行编译,在pb文件夹下直接生成.pb.go和_grpc.pb.go文件。关于protoc的详细使用可以查看【Golang | gRPC】使用protoc编译.proto文件

protoc --go_out=./ --go-grpc_out=./ .\query.proto
  • 1

在这里插入图片描述

2.2 grpc.pb.go文件

2.2.1 查看query_grpc.pb.go中生成的客户端流和服务端流的接口定义以及服务端QueryServer服务的定义

// 客户端流
type Query_GetAgeClient interface {
	Send(*UserInfo) error
	CloseAndRecv() (*AgeInfo, error)
	grpc.ClientStream
}
// 服务端流
type Query_GetAgeServer interface {
	SendAndClose(*AgeInfo) error
	Recv() (*UserInfo, error)
	grpc.ServerStream
}
//  Query服务的客户端接口
type QueryClient interface {
	GetAge(ctx context.Context, opts ...grpc.CallOption) (Query_GetAgeClient, error)
}
// Query服务的服务端接口
type QueryServer interface {
	GetAge(Query_GetAgeServer) error
	mustEmbedUnimplementedQueryServer()
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 客户端流使用Send发送message,使用CloseAndRecv接收message
  • 客户端GetAge方法的第一个返回值是Query_GetAgeClient,表明生成了一条流,用于发送和接收message;如果有多个方法,则每个方法可以各自生成一条流
  • 服务端GetAge方法的入参是Query_GetAgeServer(流),具体方法需要用户自行实现,可以从流中接收和发送message

2.3 服务端

在gRPC-ClientStreaming目录下新建Server文件夹,新建main.go文件

2.3.1 下面我们通过Query这个结构体具体实现QueryServer接口

// 用户信息
var userinfo = map[string]int32{
	"foo": 18,
	"bar": 20,
}

type Query struct {
	pb.UnimplementedQueryServer // 涉及版本兼容
}

func (q *Query) GetAge(serverStream pb.Query_GetAgeServer) error {
	log.Println("start of stream")
	var names_received []*pb.UserInfo
	for {
		userinfoRecv, err := serverStream.Recv()
		// 待客户端主动关闭流后,退出for循环
		if err == io.EOF {
			log.Println("end of the recv direction of the stream")
			break
		}
		log.Printf("The name of user received is %s\n", userinfoRecv.GetName())
		names_received = append(names_received, userinfoRecv)
	}
	// 统计年龄和
	var ages_sum int32
	for _, v := range names_received {
		ages_sum += userinfo[v.GetName()]
	}
	// 返回message
	log.Printf("send message about the total of ages:%d ", ages_sum)
	err := serverStream.SendAndClose(&pb.AgeInfo{Age: ages_sum})
	if err != nil {
		log.Panic(err)
	}
	log.Println("end of the send direction of the stream")
	return nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 服务端每收到一个message,保存其用户名,直到客户端关闭发送方向的流
  • Recv方法会一直阻塞直到从stream中接收到message,或者直到客户端调用CloseAndRecv方法
  • 当客户端调用CloseAndRecv方法时,服务端调用Recv方法会得到io.EOF返回值
  • 服务端调用SendAndClose方法发送响应message并关闭发送方向的流

2.3.2 服务注册并启动

func main() {
	// 创建socket监听器
	listener, _ := net.Listen("tcp", ":1234")
	// new一个gRPC服务器,用来注册服务
	grpcserver := grpc.NewServer()
	// 注册服务方法
	pb.RegisterQueryServer(grpcserver, new(Query))
	// 开启gRPC服务
	_ = grpcserver.Serve(listener)
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

使用RegisterQueryServer这个方法向gRPC服务器里注册QueryServer

2.4 客户端

在gRPC-ClientStreaming目录下新建Client文件夹,新建main.go文件

2.4.1 先建立无认证的连接,生成Client,然后通过方法GetAge返回对应的流,最后通过流进行message的收发

func main() {
	//建立无认证的连接
	conn, err := grpc.Dial(":1234", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Panic(err)
	}
	defer conn.Close()
	client := pb.NewQueryClient(conn)
	//返回GetAge方法对应的流
	queryStream, _ := client.GetAge(context.Background())
	// 向stream中发送message
	_ = queryStream.Send(&pb.UserInfo{Name: "foo"})
	time.Sleep(time.Second)
	_ = queryStream.Send(&pb.UserInfo{Name: "bar"})
	time.Sleep(time.Second)

	// 发送两次数据后主动关闭流并等待接收来自server端的message
	ages_sum, err := queryStream.CloseAndRecv()
	if err != nil {
		log.Println(err)
	}
	fmt.Printf("The total of ages of foo and bar is %d", ages_sum.GetAge())
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 客户端通过CloseAndRecv方法主动关闭发送方向的流同时等待接收来自服务端的message

运行结果如下:
在这里插入图片描述

3. 总结

  • 客户端通过Send方法多次发送message,通过CloseAndRecv方法主动关闭发送方向的流同时等待接收来自服务端的message
  • 服务端通过Recv方法多次接收message,通过SendAndClose方法发送响应message并关闭发送方向的流
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/145967
推荐阅读
相关标签
  

闽ICP备14008679号