当前位置:   article > 正文

Uragano,基于dotnetty实现的高性能RPC框架_dotnetty rpc

dotnetty rpc

部分API已经做了调整,请查看最新文档

我做c#开发已经11年了,一直没写过博客,不是不想写,实在是没文采,不擅长写文章。虽然不写,但是还是经常看别人写的技术博客,也关注了很多大牛的blog和公众号。最近这两年微服务很火,自己也一直在不断的学习,因为所在的公司基于成本考虑一直没有打算把项目往微服务方向转,但是自己还是花了不少时间去学习这方面的知识。微软终于开窍走上了开源的道路,希望netcore越来越好,自己也写过一些java代码,个人觉得c#在语言设计上比java确实优秀很多,但无赖c#的生态不好,这是事实。所以也想着希望能给c#的生态做一点点贡献,最近自己开发了一个基于dotnetty实现的RPC框架Uragano,编解码使用MessagePack。目前还没测试,所以不要用于生产环境,为了尽快验证自己的想法是否能够实现,所以还没有写单元测试,后面再补上吧,这是一个不好的习惯。

可能你会想,现在不是已经有好多RPC的框架了吗?你这是重复造轮子!这么说也没错,现在确实有大牛已经开发出了优秀的RPC框架,比如surging,google的gRPC等。这些框架我自己也研究过,用起来始终觉得不顺手,服务实现都不支持构造函数注入,gRPC直接就不支持依赖注入,如果你已经习惯了使用依赖注入的话,这用起来相当难受。gRPC只支持protobuffer编解码,protobuffer用来也不方便,MessagePack的作者基于gRPC开发了MagicOnion,MagicOnion使用MessagePack编解码用起来就方便很多,但是还是不支持依赖注入。当然我不是说这些框架不好,而是根据个人习惯,使用起来不顺手。所以我打算自己写一个RPC框架,主要坚持两个原则,一是使用必须简单,二是必须支持构造函数注入,当然性能也是必须要考虑的。好了,废话就说到这里。。。。

声明服务接口

新建服务项目
在这里插入图片描述
添加IHelloService接口,所有的服务接口必须继承IService接口

	[ServiceDiscoveryName("RPC")]
	[ServiceRoute("hello")]
    public interface IHelloService : IService
    {
        [ServiceRoute("say/async")]
        Task<ResultModel> SayHello(string name);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

其中涉及到了两个自定义属性ServiceDiscoveryNameAttribute和ServiceRouteAttribute,ServiceDiscoveryNameAttribute是服务注册发现的服务名称,是必须的;ServiceRouteAttribute可以不加,默认以{命名空间}/{接口名称}/{方法名称}为路由,路由你可以理解成服务的唯一标识,所以路由是不能有重复的,另外路由字符串只允许输入字母,数字,下划线,减号,斜杠。以上示例代码的路由为:hello/say/async。

服务实现与注册

创建服务实现的项目,需要引用上一步创建的服务接口项目
在这里插入图片描述

1.新建HelloService类实现IHelloService接口

 public class HelloService : IHelloService
    {
        private TestLib TestLib { get; }

        public HelloService(TestLib testLib)
        {
            TestLib = testLib;
        }
        public async Task<ResultModel> SayHello(string name)
        {
            TestLib.Exec();
            return await Task.FromResult(new ResultModel { Message = name });
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

到此一个服务的声明和实现都已经完成,是不是很简单。如果你有注意的话上面的示例代码里通过构造函数注入了TestLib对象,当然TestLib是需要先注册的。

注意:在异步编程里,异步转同步调用是非常糟糕的,并且有可能死锁。为了避免这个问题,Uragano不支持同步方法,所有的服务方法必须是异步的。

2.注册服务器端

  public void ConfigureServices(IServiceCollection services)
        {
            services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);
            services.AddUragano(config =>
            {
                config.AddServer(Configuration.GetSection("Uragano:Server"));
                config.AddConsul(Configuration.GetSection("Uragano:Consul:Client"),
                    Configuration.GetSection("Uragano:Consul:Service"));
            });
            services.AddScoped<TestLib>();
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

AddServer用于注册服务器端监听的ip和端口。
AddConsul配置基于Consul的服务注册与发现,目前只支持Consul,后期会考虑对其他框架的支持。此方法进行了重载,第一个参数是Consul的agent配置,第二个参数是服务注册的相关参数。如果是客户端的话传入第一参数就可以了。你也可以自己进行扩展,调用AddServiceDiscovery方法进行注册。以上方法均进行了重载,示例代码是读取 配置文件进行配置,以下是配置文件示例:

{
  "Logging": {
    "LogLevel": {
      "Default": "Warning"
    }
  }
  "Uragano": {
    "Server": {
      "ip": "{LocalIP}",
      "port": 5001,
      "weight": 1
    },
    "Consul": {
      "Client": {
        "Address": "http://192.168.1.133:8500",
        "Token": ""
      },
      "Service": {
        "Id": null,
        "Name": "RPC",
        "tags": null,
        "EnableTagOverride": false,
        "meta": null,
        "HealthCheckInterval": 10000
      }
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

ip,port是服务监听的ip和端口号,ip可以用{LocalIP}替代使用本机的内网IP。
weight是设置此服务的权重,支持访问量越大就越高,主要用于后面负责均衡,后面会讲到。
address,token是连接Consul的agent配置。
id是注册服务的唯一id,不配置或者是null的话系统会自动生成guid作为服务id。
name是服务发现的name,与ServiceDiscoveryNameAttribute设置的名称要保持一致。
tags,EnableTagOverride,meta都是consul的参数,这里就不做说明了。
HealthCheckInterval是Consul健康检查的频率,单位是毫秒,默认是10秒。

3.启动服务端

 public void Configure(IApplicationBuilder app, IHostingEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseMvc();
            app.UseUragano();
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

好啦,服务器端的工作全部完成。

客户端的实现与注册

1.实现客户端
首先新建一个客户端项目,引入第一步创建的服务声明的接口项目。
在这里插入图片描述

public class ValuesController : ControllerBase
    {
        private IHelloService HelloService { get; }
        public ValuesController(IHelloService helloService)
        {
            HelloService = helloService;
        }

        // GET api/values
        [HttpGet]
        public async Task<IActionResult> Get()
        {
            var a = Guid.NewGuid().ToString();
            var r = await HelloService.SetMeta(("token", "bearer .....")).SayHello(a);
            if (r.Message == a)
                return Ok(r);
            return BadRequest(new
            {
                @in = a,
                @out = r.Message
            });
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

在控制器里通过构造函数注入IHelloService接口,通过接口调用服务方法即可。你可能已经发现里面有一个SetMeta方法,这是一个扩展方法,为本次调用设置元数据,暂且这么叫吧,当然这不是必须的,你可以把它理解成设置headers,有什么用处呢?这里暂时不讲这个,后面会涉及到,可能聪明的你已经知道了。

2.注册客户端

public void ConfigureServices(IServiceCollection services)
        {

            services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);
            services.AddUragano(config =>
            {
                config.AddConsul(Configuration.GetSection("Uragano:Consul:Client"));
                config.AddClient();
            });
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseMvc();
            app.UseUragano();
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

跟服务器端的代码基本一样。好了,一个简单RPC项目就搭建起来了。当然Consul需要自己去搭建,网上的教程很多。
使用起来是不是很方便,直接注入服务接口就可以了,就跟调用本地方法一模一样。

在做微服务的时候肯定避免不了服务之间相互调用,所以Uragano支持同时启动服务端和客户端,配置如下:

services.AddUragano(config =>
            {
                config.AddClient();
                config.AddServer(Configuration.GetSection("Uragano:Server"));
                config.AddConsul(Configuration.GetSection("Uragano:Consul:Client"),
                    Configuration.GetSection("Uragano:Consul:Service"));
            });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

性能

使用Jmeter做了简单的并发测试,使用了3台电脑,一台运行服务端,一台运行客户端,一台运行Jmeter,3台电脑配置都一样,下面是配置截图
在这里插入图片描述

下图是开500个线程,循环500次的测试结果。性能还算可以,高的时候,吞度量能达到9000以上。只是做了简单的测试,仅供参考。
在这里插入图片描述
以上只是最基础的简单应用,下面我们说说更高级的应用。

拦截器

Uragano支持自定义拦截器,拦截器分客户端拦截器和服务器端拦截器。比如需要做服务缓存,可以在客户端拦截器里实现,如果做一些权限验证可以在服务器端拦截器里实现。

1.全局拦截器
不管是客户端还是服务器端都支持全局拦截器,拦截器定义都需要继承InterceptorAbstract抽象类

public class ClientGlobal_1_Interceptor : InterceptorAbstract
    {
        private ILogger Logger { get; }
        public ClientGlobal_1_Interceptor(ILogger<ClientGlobal_1_Interceptor> logger)
        {
            Logger = logger;
        }
        public override async Task<IServiceResult> Intercept(IInterceptorContext context)
        {
            return await context.Next();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

注册全局拦截器

services.AddUragano(config =>
            {
                config.AddClient();
                config.AddServer(Configuration.GetSection("Uragano:Server"));
                config.AddConsul(Configuration.GetSection("Uragano:Consul:Client"),
                    Configuration.GetSection("Uragano:Consul:Service"));
                config.AddClientGlobalInterceptor<ClientGlobal_1_Interceptor>();
                config.AddServerGlobalInterceptor<ServerGlobalInterceptor>();
            });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2.局部拦截器
局部拦截器以自定义属性来实现,所有局部拦截器必须继承InterceptorAttributeAbstract抽象类

    public class ServerInterceptorAttribute : InterceptorAttributeAbstract
    {
        private ILogger Logger { get; }
        public ServerInterceptorAttribute(ILogger<ServerInterceptorAttribute> logger)
        {
            Logger = logger;
        }
        public override async Task<IServiceResult> Intercept(IInterceptorContext context)
        {
            var r = await context.Next();
            r.Status = RemotingStatus.Forbidden;
            return r;
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
 [ServerInterceptor]
 public class HelloService : IHelloService
    {
        private TestLib TestLib { get; }

        public HelloService(TestLib testLib)
        {
            TestLib = testLib;
        }
        public async Task<ResultModel> SayHello(string name)
        {
            TestLib.Exec();
            return await Task.FromResult(new ResultModel { Message = name });
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

局部拦截器可以加在服务接口和实现的具体类上面,也可以加在具体的方法上面,执行的先后顺序是:全局拦截器->类/接口拦截器->方法上的拦截器。

那么,加在接口上的拦截器和加在具体的接口实现类上的拦截器有什么区别呢?加在接口文件里的拦截器属于客户端拦截器,加在具体实现类里的拦截器属于服务器端拦截器。

眼尖的你可能已经发现上面的拦截器示例里通过构造函数注入了ILogger对象,没错拦截器也是支持依赖注入的,这就是我希望达到的效果,依赖注入无处不在。

在拦截器里就需要说说Meta了,在前面我们提到调用具体的服务方法时我们可以通过SetMeta扩展方法设置元数据,那么我们在拦截器里就可以用到,它在context对象里可以取到。

熔断

既然是做微服务,怎么能少得了熔断呢,c#里首选就是Polly了,直接上代码

全局策略

public void ConfigureServices(IServiceCollection services)
        {

            services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);
            services.AddUragano(config =>
            {
                config.AddConsul(Configuration.GetSection("Uragano:Consul:Client"));
                config.AddCircuitBreaker(Configuration.GetSection("CircuitBreaker"));
                config.AddClient();
            });
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

配置json示例

"Uragano": {
    "Consul": {
      "Client": {
        "Address": "http://192.168.1.133:8500",
        "Token": ""
      }
    },
    "CircuitBreaker": {
      "timeout": 2000,
      "retry": 3,
      "ExceptionsAllowedBeforeBreaking": 10,
      "DurationOfBreak": 60000,
      "EventHandler": "Sample.Service.Interfaces.CircuitBreakerEvent"
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

timeout 服务调用超时时间,单位毫秒。
retry 重试次数,0则不重试
ExceptionsAllowedBeforeBreaking 抛出异常多少次就打开断路器
DurationOfBreak 断路器打开多长时间后尝试关闭断路器,单位毫秒
EventHandler 如果你需要接收熔断事件通知,你可以注册一个熔断事件的处理器,需要继承ICircuitBreakerEvent接口,示例代码如下:

public class CircuitBreakerEvent : ICircuitBreakerEvent
    {
        private ILogger Logger { get; }

        public CircuitBreakerEvent(ILogger<CircuitBreakerEvent> logger)
        {
            Logger = logger;
        }
        //当降级时触发
        public async Task OnFallback(string route, MethodInfo methodInfo)
        {
            Logger.LogDebug("Raise OnFallback");
        }
		//当断路器打开时触发
        public async Task OnBreak(string route, MethodInfo methodInfo, Exception exception, TimeSpan time)
        {
            Logger.LogDebug($"Raise OnBreak;{exception.Message}");
        }
		//当断路器关闭时触发
        public async Task OnRest(string route, MethodInfo methodInfo)
        {
            Logger.LogDebug("Raise OnRest");
        }
		//当断路器处于半开状态时触发
        public async Task OnHalfOpen(string route, MethodInfo methodInfo)
        {
            Logger.LogDebug("Raise OnHalfOpen");
        }
		//当调用超时触发
        public async Task OnTimeOut(string route, MethodInfo methodInfo, Exception exception)
        {
            Logger.LogDebug($"Raise OnTimeOut;{exception.Message}");
        }
		//当执行重试时触发
        public async Task OnRetry(string route, MethodInfo methodInfo, Exception exception, int retryTimes)
        {
            Logger.LogDebug($"Raise OnRetry;{exception.Message};{retryTimes}");
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

是的,这也是支持依赖注入的。

局部策略
局部熔断策略可以通过自定义属性CircuitBreakerAttribute来配置,CircuitBreakerAttribute只有加在接口的方法上面才能生效。如果方法上加载了局部策略,那么局部策略就会覆盖全局策略。

	[ServiceDiscoveryName("RPC")]
	[ServiceRoute("hello")]
    public interface IHelloService : IService
    {
        [ServiceRoute("say/async")]
        [CircuitBreaker(FallbackExecuteScript = "return new ResultModel{Message=\"fallback\"};", ScriptUsingNameSpaces = new[] { "Sample.Service.Interfaces" })]
        Task<ResultModel> SayHello(string name);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

CircuitBreakerAttribute的参数基本跟全局策略一样,但是多了两个属性:
1.FallbackExecuteScript:可以配置一段c#代码用于降级处理,如果没有配置此属性,降级时将返回返回类型的默认值。
2.ScriptUsingNameSpaces:执行FallbackExecuteScript注入的代码需要引入的命名空间。

另外值得一提的是,如果你开启了重试机制,当一个节点调用失败了,会自动切换到下一个节点,这也是我希望能达到的效果。实现此效果就必须自己实现负载算法,下面我们讲负载部分

负载均衡

实现负载均衡的方式有很多种,各有各的优劣,Uragano选择了在客户端里自己实现负载,Uragano目前只支持轮询和加权轮询,后面会逐渐丰富其他的负载算法。这里的加权轮询是怎么加权的呢?这就要回到前面我们注册服务的时候,有一个weight的配置,就是通过它来加权的,值越大,分配访问量越大。

public void ConfigureServices(IServiceCollection services)
        {

            services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);
            services.AddUragano(config =>
            {
                config.AddConsul(Configuration.GetSection("Uragano:Consul:Client"));
                config.AddClient<LoadBalancingWeightedPolling>();
            });
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

通过AddClient的泛型接口注入负载处理器,默认使用的是轮询算法。当然,你也可以自己实现负载算法,只要实现ILoadBalancing接口就可以。

其他选项配置

Uragano提供了一些选项配置,可以通过Options和Option方法配置。

public void ConfigureServices(IServiceCollection services)
        {

            services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);
            services.AddUragano(config =>
            {
                config.AddConsul(Configuration.GetSection("Uragano:Consul:Client"));
                config.AddClient<LoadBalancingWeightedPolling>();
                config.Options(Configuration.GetSection("Uragano:Options"));
            });
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
"Uragano": {
    "Server": {
      "ip": "{LocalIP}",
      "port": 5001,
      "certUrl": "",
      "certPwd": "",
      "weight": 1
    },
    "Consul": {
      "Client": {
        "Address": "http://192.168.1.133:8500",
        "Token": ""
      },
      "Service": {
        "Id": null,
        "Name": "RPC",
        "tags": null,
        "EnableTagOverride": false,
        "meta": null,
        "HealthCheckInterval": 10
      }
    },
    "Options": {
      "ThreadPool_MinThreads": 100,
      "DotNetty_Event_Loop_Count": 100
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

目前支持的参数有:
ThreadPool_MinThreads:线程池的最小线程数
ThreadPool_CompletionPortThreads:I/O线程的线程数
Client_Node_Status_Refresh_Interval:客户端更新节点状态的频率,单位毫秒
Server_DotNetty_Channel_SoBacklog:DotNetty的SoBacklog值
DotNetty_Connect_Timeout:DoNetty连接超时时间,单位毫秒
DotNetty_Enable_Libuv:是否启用Libuv
DotNetty_Event_Loop_Count:DotNetty的工作线程数
Remoting_Invoke_CancellationTokenSource_Timeout:客户端等待服务器端返回消息的等待时间,单位毫秒,默认值是10秒。你可能会对这个配置有疑问,熔断里不是已经有一个超时机制了吗?为什么还需要这个配置!我想说这两个是完全不一样的,这里就需要提到DotNetty的消息机制了,说得通俗点就是DotNetty是通过异步回调的形式回传消息,但是我们往往需要同步等待返回结果,这里就需要用到TaskCompletionSource,如果服务器端出现异常,无法响应消息,那么TaskCompletionSource就会一直等待,所以我们必须有一个超时机制。

结束语

先贴上源码地址:GitHub源码

Uragano目前只是一个初级产品,主要目的是提供一个简单高性能的RPC框架,不是微服务框架。如果你觉得不错请给我点个赞吧,你也可以参与进来一起完善此项目。

做微服务至少还需要网关,EventBus等等。。。。目前社区里的surging算是比较完善的一个框架,实现了网关,RPC,EventBus等。

目前社区里涉及到的网关有两种,一种就是纯网关,对外提供服务的,比如Ocelot;还有一种网关就是基于RPC服务扩展出来的,通过RPC服务路由自动注册http路由,以http方式请求服务接口,再通过代理调用远程服务。我个人觉得第一种更好,基于原生Web项目实现,扩展性更好。所以Uragano目前没有打算实现网关部分,我认为和Ocelot搭配起来用更好,EventBus可以用CAP呀。

以上纯属个人愚见,有什么不对之处请指出。

一个完善的微服务是有很多很多工作要做的,Uragano也还有很多工作需要做,这里先简单列一个TODO LIST:
1.单元测试,功能测试,集成测试。
2.编写文档。
3.完善负载算法。
4.增加对缓存的支持。
5.增加对请求监控的支持。
6.完善优化功能。

目前项目还没有做个完整的测试,请不要用于生产环境,可能有些API还会做调整,争取19年上半年发布一个正式版本。

最后,希望netcore越来越好,也希望自己能为社区做一点点微薄的贡献。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/847703
推荐阅读
相关标签
  

闽ICP备14008679号