当前位置:   article > 正文

GRPC开发基础解密实现_grpc ipc

grpc ipc


前言

本文为GRPC的基础使用及注册优化,概念行东西比较多,后期会增加GRPC身份认证,网关,服务的健康检测,以及GRPC的完整框架搭建,以开源的形式,提高GRPC的生态。


一, 概述

GRPC是一种与语言无关的高性能远程过程调用RPC框架

优点

  • 高性能,轻量级,跨语言
  • 协定优先API开发,默认使用协议缓冲区,允许与语言无关的实现
  • 可用于多语言的工具,以生成强类型服务器和客户端
  • 支持客户端,服务器和双向流式调用
  • 使用protobuf二进制序列化,减少对网络的使用。

应用场景

  • 效率只管重要的轻量级微服务架构
  • 需要多种语言用于开发的polyglot系统
  • 需要处理流式处理请求或响应的点对点实时服务

流式处理

  • 一元无流式传输
  • 服务器到客户端流式传输
  • 客户端到服务器流式传输
  • 双向流式传输

应用方案

  1. 微服务:grpc设计用于低延迟和高吞吐量通讯,对于效率只管重要的轻量级微服务非常有用
  2. 点对点实时通讯:对双向流式传输提供出色的支持,可以实时推送消息而无需轮询
  3. 多语言环境:支持所有常用的开发语言
  4. 网络受限环境:使用protobuf进行序列化,消息始终小于等效的json消息
  5. 进程间通讯:IPC传输可于grpc一起用于同一台计算机上的应用间通讯。

二,兼容浏览器方案

grpc-web

grpc-web 允许浏览器通过grpc-web客户端和protobuf调用grpc服务,要求浏览器应用生成grpc客户端。

Grpc json 转码

grpc json转码允许浏览器应用调用grpc服务,就像是使用json的restful api 一样,浏览器应用不需要生成grpc客户端或者了解grpc的信息,通过使用proto文件从grpc服务自动创建RestFul API 。

三,GRPC与Http API对比

1.概括

  • GRPC必须经过Proto协定,遵循HTTP2.0协议,严格按照规定规范,将内容进行序列化为Protobuf,通讯采用双向流式处理,基于TLS安全传输
  • HttpAPi可以选用OpenApi协定,遵循Http协议,规定宽松,使用Json传输(大型,人工可读),采用客户端—服务端流式处理,浏览器支持,基于TLS安全传输。

2.性能

  • grpc专为Http2.0设计,使用二进制组和压缩,协议在发送和接收方梦均紧凑而且高效,并通过google加密方式进行消息加密,在单个TCP连接上多路复用多个Http 2.0调用,多路复用可消除对头阻塞。
  • HttpApi也可以使用Http2.0协议

3.代码生成

  • Proto文件时Grpc开发的核心文件,定义了服务和消息的属性,通过proto文件grpc框架生成服务基类,消息和完整的客户端。

四,代码实现

1.服务端

a、创建服务端项目

  1. 新建项目
  2. 项目类型选择Asp.net Core GRPC
  3. 创建

b、项目文件说明

  • Protos/greet.proto---->定义GreetGRPC,并用于生成grpc服务资产
  • services---->greeter服务的实现
  • appsetting.json---->包含配置数据
  • Program---->服务的入口点, 配置应用行为代码

c、GRPC服务自动注册

1.前言

因为每次添加完GRPC服务后都需要在Program中通过MapGrpcService来暴露服务,供外部客户端访问,所以想到可以通过反射,再运行时进行自动注册服务,只需要再定义服务时,增加继承空抽象接口的标识,根据这个标识,可以实现自动注册GRPC服务。

2.解决思路
  1. 创建IGrpcService空接口
  2. 创建的GrpcService服务继承IGrpcService接口(供反射类可以通过该接口查询到所有的Grpc服务)
  3. program中通过反射查询到所有的服务,类型为List
  4. 通过泛着进行注册。
3.代码实现(Program中增加)

代码说明:反射类型将通过此接口对GrpcService进行检索,注册服务


	/// <summary>
    /// 定义GRPC空接口标识
    /// 反射类型将通过此接口对GrpcService进行检索,注册
    /// </summary>
    public interface IGrpcService
    {
    //空接口
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

代码说明:通过反射进行检索,并循环注册服务,定义再program.cs中


//通过IGrpcService接口查询所有实现该接口的grpcservice服务Type集合
var inhertIGrpcServiceTypeList = AppDomain.CurrentDomain.GetAssemblies().SelectMany(a => a.GetTypes().Where(p => p.GetInterfaces().Contains(typeof(IGrpcService)))).ToList();
//通过反射获取GrpcEndpointRouteBuilderExtensions类中所有方法,并查询名称等于MapGrpcService的方法;
var mapGrpcServiceMethod = typeof(GrpcEndpointRouteBuilderExtensions).GetMethods().Where(p => p.Name.Equals("MapGrpcService")).FirstOrDefault();
//遍历Type集合
foreach (var item in inhertIGrpcServiceTypeList)
{
    //通过反射的方法,重新生成泛型方法并调用
    mapGrpcServiceMethod.MakeGenericMethod(item).Invoke(null, new object[] { app });
    Console.WriteLine(item.FullName);
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

d、GRPC 方法类型

  • 一元
  • 服务器流
  • 客户端流
  • 双向流

e、服务端编码实现

1.添加Proto文件
  1. GRPC服务内部Proto文档, .csproj 文件中增加配置
	<ItemGroup>		
	<Protobuf Include="Protos\greet.proto" GrpcServices="Server" />	
	</ItemGroup>
  • 1
  • 2
  • 3
  1. GRPC外部Lib库Proto文档
    类库项目引入Nuget包Grpc.AspNetCore
    引入Nuget包Grpc.AspNetCore, Lib库的.csproj文件中增加配置

代码说明:代码生成器会根据这个路径找到proto文件,根据proto文件生成GRPC代码

	<ItemGroup>		
	<Protobuf Include="Proto\User\userinfo.proto" />
	</ItemGroup>
  • 1
  • 2
  • 3
  1. Proto文档说明
//文档类型
syntax = "proto3";
//定义生成的代码的命名空间
option csharp_namespace = "Grpc.Test.Demo.ProtoLib.User";
//生成代码的包名
package userinfo;

service userinfo{
//一元
rpc DoTest(TestRequest) returns (testReply);
//服务器流
rpc ServiceStream(ServiceStreamRequest) returns (stream ServiceStreamReply);
//客户端流
rpc ClientStream(stream ClientStreamRequest) returns (ClientStreamReply);
//双向流
rpc AllStream(stream AllStreamRequest) returns (stream AllStreamReply);
}

//一元参数
message TestRequest{
	string name = 1;
}
//一元响应
message testReply{
	string message = 1;
}

//服务端流参数
message ServiceStreamRequest{
	int32 PageIndex=1;
	int32 PageSize=2;
	bool isDescending=3;
}
//服务端流返回值
message ServiceStreamReply{
	string message=1;
}

//客户端流参数
message ClientStreamRequest{
	int32 PageIndex=1;
	int32 PageSize=2;
	bool isDescending=3;
}
//客户端流返回值
message ClientStreamReply{
	string message=1;
}

//双向流参数
message AllStreamRequest{
	int32 PageIndex=1;
	int32 PageSize=2;
	bool isDescending=3;
}
//双向流返回值
message AllStreamReply{
	string message=1;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
2.生成C#Grpc代码

重新生成项目,生成的代码会根据Proto文档的csharp_namespace来生成代码到指定文件夹下生成的文件( PackageName, PackageNameGrpc)。

3.创建对应服务实现,重写proto定义的服务接口
1.一元服务

代码说明:服务端代码(一元)

using Grpc.Core;
using Grpc.Test.Demo.ProtoLib.User;
using Newtonsoft.Json;

namespace Grpc.Project.Demo.GrpcService1.Services.user
{
    /// <summary>
    /// 用户服务
    /// 继承IGrpcService空接口,实现自动注册
    /// </summary>
    public class userinfoService : userinfo.userinfoBase, IGrpcService
    {
        private readonly ILogger<userinfoService> _logger;
        public userinfoService(ILogger<userinfoService> logger)
        {
            _logger = logger;
        }
        /// <summary>
        /// 一元
        /// </summary>
        /// <param name="request"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public override Task<testReply> DoTest(TestRequest request, ServerCallContext context)
        {
            Console.WriteLine("userinfoService---DoTest.testReply.Name:" + request.Name+$"---------{System.DateTime.Now}");

            return Task.FromResult(new testReply
            {
                Message = "Hello " + request.Name
            });
        }        
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
2.服务端流

代码说明:服务器流

		/// <summary>
        /// 服务器流
        /// </summary>
        /// <param name="request"></param>
        /// <param name="responseStream"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public override async Task ServiceStream(ServiceStreamRequest request, IServerStreamWriter<ServiceStreamReply> responseStream, ServerCallContext context)
        {
            for (var i = 0; i < 5; i++)
            {
                Console.WriteLine($"发送第{i}条消息流-----{System.DateTime.Now}");
                await responseStream.WriteAsync(new ServiceStreamReply() { Message = $"第{i}条Message,was Done-----{System.DateTime.Now}" });
                Console.WriteLine($"第{i}条消息流任务停止2秒-----{System.DateTime.Now}");
                await Task.Delay(TimeSpan.FromSeconds(1));
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
3.客户端流

代码说明:客户端流

		/// <summary>
        /// 客户端流
        /// </summary>
        /// <param name="requestStream"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public override async Task<ClientStreamReply> ClientStream(IAsyncStreamReader<ClientStreamRequest> requestStream, ServerCallContext context)
        {
            while (await requestStream.MoveNext())
            {
                Console.WriteLine($"***********************************************{System.DateTime.Now}*");
                Console.WriteLine(JsonConvert.SerializeObject(requestStream));
                var message = requestStream.Current;
                //打印接收到的参数
                Console.WriteLine(JsonConvert.SerializeObject(message));
            }
            return new ClientStreamReply() { Message = $"ClientStream Done---------{System.DateTime.Now}" };
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
4.双向流

代码说明:双向流

		/// <summary>
        /// 双向流
        /// </summary>
        /// <param name="requestStream"></param>
        /// <param name="responseStream"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public override async Task AllStream(IAsyncStreamReader<AllStreamRequest> requestStream, IServerStreamWriter<AllStreamReply> responseStream, ServerCallContext context)
        {
            var count = 0;
            var readTask = Task.Run(async () =>
            {
                Console.WriteLine("开启循环读取");
                while (await requestStream.MoveNext()) {
                    count++;
                    Console.WriteLine(count+ $"---------{System.DateTime.Now}----------" + JsonConvert.SerializeObject(requestStream.Current));
                }
            });
            //循环判断读取任务是否结束
            while (!readTask.IsCompleted) {

                await responseStream.WriteAsync(new AllStreamReply() { Message = $"done {System.DateTime.Now}" });
                await Task.Delay(TimeSpan.FromSeconds(2), context.CancellationToken);
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

2.客户端调用

a.创建控制台应用程序

b.引入Nuget包

  1. Grpc.Net.Client:包含.net客户端
  2. Google.Protobuf :适用于C#语言的Protobuf消息
  3. GRPC.Tools :适用于Protobuf的C#工具支持。

c.Proto文件

  • 内部拷贝Proto文件
<ItemGroup>		
<Protobuf Include="Protos\greet.proto" GrpcServices="Client" />	
</ItemGroup>
  • 1
  • 2
  • 3
  • 引用外部Lib库Proto文件:使用lib库中配置的protobuf Include,无需区分grpcservices

d.调用服务端

  1. 一元客户端

代码说明:一元调用

using var channel = GrpcChannel.ForAddress("https://localhost:7183");
var client = new Greeter.GreeterClient(channel);
for (var i = 0; i < 10; i++)
{
   var reply = await client.SayHelloAsync(new HelloRequest { Name = "Chen Congcong" });
   Console.WriteLine($"Greeting:{reply.Message}");
}
var userClient = new userinfo.userinfoClient(channel);
for (var i = 0; i < 10; i++)
{
   var reply = await userClient.DoTestAsync(new TestRequest { Name = "Chen Congcong1111111111111" });
   Console.WriteLine($"Greeting:{reply.Message}");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  1. 服务端流

代码说明:服务端流式调用

Console.WriteLine($"****************************************** 服务器流式调用 ******************************************");
using var channel = GrpcChannel.ForAddress("https://localhost:7183");
var userClient = new userinfo.userinfoClient(channel);
using var serviceStreamCall = userClient.ServiceStream(new ServiceStreamRequest() { IsDescending = true, PageIndex = 0, PageSize = 50 });
await foreach (var message in serviceStreamCall.ResponseStream.ReadAllAsync())
{
    Console.WriteLine($"Message========{message.Message}");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  1. 客户端流

代码说明:客户端流式调用

using var channel = GrpcChannel.ForAddress("https://localhost:7183");
var userClient = new userinfo.userinfoClient(channel);
Task.Run(() =>
{
    var cts = new CancellationTokenSource(); //任务取消令牌
    var ret3 = userClient.ClientStream(cancellationToken: cts.Token);
    var pageindex = 0;
    for (var i = 0; i < 10; i++)
    {
        ret3.RequestStream.WriteAsync(new ClientStreamRequest
        {
            PageIndex = pageindex++,
            PageSize = 20,
            IsDescending = true
        });
        Console.WriteLine($"{pageindex}----->发送的第{pageindex}条消息");
        Thread.Sleep(2000);
    }
    //告知服务端发送完成
    ret3.RequestStream.CompleteAsync();
    var result = ret3.ResponseAsync.Result;
    Console.WriteLine($"************** 客户端流式RPC响应结果:{result.Message} **************");
}).Wait();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  1. 双向流

代码说明:双向流式调用

Console.WriteLine($"****************************************** 双向流式调用 ******************************************");
var allstreamCancelToken = new CancellationTokenSource(); //任务取消令牌
using var allStreamclient = userClient.AllStream(cancellationToken: allstreamCancelToken.Token);
var allStreamResponseTask = Task.Run(async () =>
{
    Console.WriteLine($"再跑的任务");
    var responseCount = 0;
    while (await allStreamclient.ResponseStream.MoveNext()) {
        responseCount++;
        Console.WriteLine($"第{responseCount}次响应,返回{allStreamclient.ResponseStream.Current.Message}。-----{System.DateTime.Now}");
    }
});
for (var i = 0; i < 10; i++)
{
    await allStreamclient.RequestStream.WriteAsync(new AllStreamRequest()
    {
        PageIndex = i,
        PageSize = 20,
        IsDescending = true
    });
    Console.WriteLine($"{i}----->发送的第{i}条消息-----{System.DateTime.Now}");
    Thread.Sleep(2000);
}
//声明发送完毕
await allStreamclient.RequestStream.CompleteAsync();
await allStreamResponseTask;
#endregion

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/430224
推荐阅读
相关标签
  

闽ICP备14008679号