赞
踩
本文为GRPC的基础使用及注册优化,概念行东西比较多,后期会增加GRPC身份认证,网关,服务的健康检测,以及GRPC的完整框架搭建,以开源的形式,提高GRPC的生态。
GRPC是一种与语言无关的高性能远程过程调用RPC框架
grpc-web 允许浏览器通过grpc-web客户端和protobuf调用grpc服务,要求浏览器应用生成grpc客户端。
grpc json转码允许浏览器应用调用grpc服务,就像是使用json的restful api 一样,浏览器应用不需要生成grpc客户端或者了解grpc的信息,通过使用proto文件从grpc服务自动创建RestFul API 。
因为每次添加完GRPC服务后都需要在Program中通过MapGrpcService来暴露服务,供外部客户端访问,所以想到可以通过反射,再运行时进行自动注册服务,只需要再定义服务时,增加继承空抽象接口的标识,根据这个标识,可以实现自动注册GRPC服务。
代码说明:反射类型将通过此接口对GrpcService进行检索,注册服务
/// <summary>
/// 定义GRPC空接口标识
/// 反射类型将通过此接口对GrpcService进行检索,注册
/// </summary>
public interface IGrpcService
{
//空接口
}
代码说明:通过反射进行检索,并循环注册服务,定义再program.cs中
//通过IGrpcService接口查询所有实现该接口的grpcservice服务Type集合
var inhertIGrpcServiceTypeList = AppDomain.CurrentDomain.GetAssemblies().SelectMany(a => a.GetTypes().Where(p => p.GetInterfaces().Contains(typeof(IGrpcService)))).ToList();
//通过反射获取GrpcEndpointRouteBuilderExtensions类中所有方法,并查询名称等于MapGrpcService的方法;
var mapGrpcServiceMethod = typeof(GrpcEndpointRouteBuilderExtensions).GetMethods().Where(p => p.Name.Equals("MapGrpcService")).FirstOrDefault();
//遍历Type集合
foreach (var item in inhertIGrpcServiceTypeList)
{
//通过反射的方法,重新生成泛型方法并调用
mapGrpcServiceMethod.MakeGenericMethod(item).Invoke(null, new object[] { app });
Console.WriteLine(item.FullName);
}
<ItemGroup>
<Protobuf Include="Protos\greet.proto" GrpcServices="Server" />
</ItemGroup>
代码说明:代码生成器会根据这个路径找到proto文件,根据proto文件生成GRPC代码
<ItemGroup>
<Protobuf Include="Proto\User\userinfo.proto" />
</ItemGroup>
//文档类型 syntax = "proto3"; //定义生成的代码的命名空间 option csharp_namespace = "Grpc.Test.Demo.ProtoLib.User"; //生成代码的包名 package userinfo; service userinfo{ //一元 rpc DoTest(TestRequest) returns (testReply); //服务器流 rpc ServiceStream(ServiceStreamRequest) returns (stream ServiceStreamReply); //客户端流 rpc ClientStream(stream ClientStreamRequest) returns (ClientStreamReply); //双向流 rpc AllStream(stream AllStreamRequest) returns (stream AllStreamReply); } //一元参数 message TestRequest{ string name = 1; } //一元响应 message testReply{ string message = 1; } //服务端流参数 message ServiceStreamRequest{ int32 PageIndex=1; int32 PageSize=2; bool isDescending=3; } //服务端流返回值 message ServiceStreamReply{ string message=1; } //客户端流参数 message ClientStreamRequest{ int32 PageIndex=1; int32 PageSize=2; bool isDescending=3; } //客户端流返回值 message ClientStreamReply{ string message=1; } //双向流参数 message AllStreamRequest{ int32 PageIndex=1; int32 PageSize=2; bool isDescending=3; } //双向流返回值 message AllStreamReply{ string message=1; }
重新生成项目,生成的代码会根据Proto文档的csharp_namespace来生成代码到指定文件夹下生成的文件( PackageName, PackageNameGrpc)。
代码说明:服务端代码(一元)
using Grpc.Core; using Grpc.Test.Demo.ProtoLib.User; using Newtonsoft.Json; namespace Grpc.Project.Demo.GrpcService1.Services.user { /// <summary> /// 用户服务 /// 继承IGrpcService空接口,实现自动注册 /// </summary> public class userinfoService : userinfo.userinfoBase, IGrpcService { private readonly ILogger<userinfoService> _logger; public userinfoService(ILogger<userinfoService> logger) { _logger = logger; } /// <summary> /// 一元 /// </summary> /// <param name="request"></param> /// <param name="context"></param> /// <returns></returns> public override Task<testReply> DoTest(TestRequest request, ServerCallContext context) { Console.WriteLine("userinfoService---DoTest.testReply.Name:" + request.Name+$"---------{System.DateTime.Now}"); return Task.FromResult(new testReply { Message = "Hello " + request.Name }); } }
代码说明:服务器流
/// <summary> /// 服务器流 /// </summary> /// <param name="request"></param> /// <param name="responseStream"></param> /// <param name="context"></param> /// <returns></returns> public override async Task ServiceStream(ServiceStreamRequest request, IServerStreamWriter<ServiceStreamReply> responseStream, ServerCallContext context) { for (var i = 0; i < 5; i++) { Console.WriteLine($"发送第{i}条消息流-----{System.DateTime.Now}"); await responseStream.WriteAsync(new ServiceStreamReply() { Message = $"第{i}条Message,was Done-----{System.DateTime.Now}" }); Console.WriteLine($"第{i}条消息流任务停止2秒-----{System.DateTime.Now}"); await Task.Delay(TimeSpan.FromSeconds(1)); } }
代码说明:客户端流
/// <summary> /// 客户端流 /// </summary> /// <param name="requestStream"></param> /// <param name="context"></param> /// <returns></returns> public override async Task<ClientStreamReply> ClientStream(IAsyncStreamReader<ClientStreamRequest> requestStream, ServerCallContext context) { while (await requestStream.MoveNext()) { Console.WriteLine($"***********************************************{System.DateTime.Now}*"); Console.WriteLine(JsonConvert.SerializeObject(requestStream)); var message = requestStream.Current; //打印接收到的参数 Console.WriteLine(JsonConvert.SerializeObject(message)); } return new ClientStreamReply() { Message = $"ClientStream Done---------{System.DateTime.Now}" }; }
代码说明:双向流
/// <summary> /// 双向流 /// </summary> /// <param name="requestStream"></param> /// <param name="responseStream"></param> /// <param name="context"></param> /// <returns></returns> public override async Task AllStream(IAsyncStreamReader<AllStreamRequest> requestStream, IServerStreamWriter<AllStreamReply> responseStream, ServerCallContext context) { var count = 0; var readTask = Task.Run(async () => { Console.WriteLine("开启循环读取"); while (await requestStream.MoveNext()) { count++; Console.WriteLine(count+ $"---------{System.DateTime.Now}----------" + JsonConvert.SerializeObject(requestStream.Current)); } }); //循环判断读取任务是否结束 while (!readTask.IsCompleted) { await responseStream.WriteAsync(new AllStreamReply() { Message = $"done {System.DateTime.Now}" }); await Task.Delay(TimeSpan.FromSeconds(2), context.CancellationToken); } }
<ItemGroup>
<Protobuf Include="Protos\greet.proto" GrpcServices="Client" />
</ItemGroup>
代码说明:一元调用
using var channel = GrpcChannel.ForAddress("https://localhost:7183");
var client = new Greeter.GreeterClient(channel);
for (var i = 0; i < 10; i++)
{
var reply = await client.SayHelloAsync(new HelloRequest { Name = "Chen Congcong" });
Console.WriteLine($"Greeting:{reply.Message}");
}
var userClient = new userinfo.userinfoClient(channel);
for (var i = 0; i < 10; i++)
{
var reply = await userClient.DoTestAsync(new TestRequest { Name = "Chen Congcong1111111111111" });
Console.WriteLine($"Greeting:{reply.Message}");
}
代码说明:服务端流式调用
Console.WriteLine($"****************************************** 服务器流式调用 ******************************************");
using var channel = GrpcChannel.ForAddress("https://localhost:7183");
var userClient = new userinfo.userinfoClient(channel);
using var serviceStreamCall = userClient.ServiceStream(new ServiceStreamRequest() { IsDescending = true, PageIndex = 0, PageSize = 50 });
await foreach (var message in serviceStreamCall.ResponseStream.ReadAllAsync())
{
Console.WriteLine($"Message========{message.Message}");
}
代码说明:客户端流式调用
using var channel = GrpcChannel.ForAddress("https://localhost:7183"); var userClient = new userinfo.userinfoClient(channel); Task.Run(() => { var cts = new CancellationTokenSource(); //任务取消令牌 var ret3 = userClient.ClientStream(cancellationToken: cts.Token); var pageindex = 0; for (var i = 0; i < 10; i++) { ret3.RequestStream.WriteAsync(new ClientStreamRequest { PageIndex = pageindex++, PageSize = 20, IsDescending = true }); Console.WriteLine($"{pageindex}----->发送的第{pageindex}条消息"); Thread.Sleep(2000); } //告知服务端发送完成 ret3.RequestStream.CompleteAsync(); var result = ret3.ResponseAsync.Result; Console.WriteLine($"************** 客户端流式RPC响应结果:{result.Message} **************"); }).Wait();
代码说明:双向流式调用
Console.WriteLine($"****************************************** 双向流式调用 ******************************************"); var allstreamCancelToken = new CancellationTokenSource(); //任务取消令牌 using var allStreamclient = userClient.AllStream(cancellationToken: allstreamCancelToken.Token); var allStreamResponseTask = Task.Run(async () => { Console.WriteLine($"再跑的任务"); var responseCount = 0; while (await allStreamclient.ResponseStream.MoveNext()) { responseCount++; Console.WriteLine($"第{responseCount}次响应,返回{allStreamclient.ResponseStream.Current.Message}。-----{System.DateTime.Now}"); } }); for (var i = 0; i < 10; i++) { await allStreamclient.RequestStream.WriteAsync(new AllStreamRequest() { PageIndex = i, PageSize = 20, IsDescending = true }); Console.WriteLine($"{i}----->发送的第{i}条消息-----{System.DateTime.Now}"); Thread.Sleep(2000); } //声明发送完毕 await allStreamclient.RequestStream.CompleteAsync(); await allStreamResponseTask; #endregion
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。