赞
踩
五、Rabbit事件总线写法: 引用CAP框架,使用RabbitMQ来管理消息队列替代redis,采用SQLServer进行本地消息表的存储进行发布和订阅
RabbitMQ安装方法:https://blog.csdn.net/zhm3023/article/details/82217222
压力测试:
线程数为10,100,1000三种情况进行测试,Ramp-Up时间空,循环次数为1
首先到NuGet包管理器中安装DotNetCore.CAP、DotNetCore.CAP.InMemoryStorage、DotNetCore.CAP.RabbitMQ、DotNetCore.CAP.SqlServer、Savorboard.CAP.InMemoryMessageQueue
Luas文件夹,新建RaSeckillLua.lua和RaSeckillLuaCallback.lua。
RaSeckillLua.lua:
--[[本脚本主要整合:单品限流、购买的商品数量限制、方法幂等、扣减库存的业务]] --[[ 一. 方法声明 ]]-- --[[ --1. 单品限流--存在缓存覆盖问题 local function seckillLimit() --(1).获取相关参数 -- 限制请求数量 local tLimits=tonumber(ARGV[1]); -- 限制秒数 local tSeconds =tonumber(ARGV[2]); -- 受限商品key local limitKey = ARGV[3]; --(2).执行判断业务 local myLimitCount = redis.call('INCR',limitKey); if myLimitCount > tLimits then return 0; --失败 else redis.call('expire',limitKey,tSeconds) return 1; --成功 end; --对应的是if的结束 end; --对应的是整个代码块的结束 ]]-- --1. 单品限流--解决缓存覆盖问题 local function seckillLimit() --(1).获取相关参数 -- 限制请求数量 local tLimits=tonumber(ARGV[1]); -- 限制秒数 local tSeconds =tonumber(ARGV[2]); -- 受限商品key local limitKey = ARGV[3]; --(2).执行判断业务 local myLimitCount = redis.call('INCR',limitKey); -- 仅当第一个请求进来设置过期时间 if (myLimitCount ==1) then redis.call('expire',limitKey,tSeconds) --设置缓存过期 end; --对应的是if的结束 -- 超过限制数量,返回失败 if (myLimitCount > tLimits) then return 0; --失败 end; --对应的是if的结束 end; --对应的是整个代码块的结束 --2. 限制一个用户商品购买数量(这里假设一次购买一件,后续改造) local function userBuyLimit() --(1).获取相关参数 local tGoodBuyLimits = tonumber(ARGV[5]); local userBuyGoodLimitKey = ARGV[6]; --(2).执行判断业务 local myLimitCount = redis.call('INCR',userBuyGoodLimitKey); if (myLimitCount > tGoodBuyLimits) then return 0; --失败 else redis.call('expire',userBuyGoodLimitKey,600) --10min过期 return 1; --成功 end; end; --对应的是整个代码块的结束 --3. 方法幂等(防止网络延迟多次下单) local function recordOrderSn() --(1).获取相关参数 local requestId = ARGV[7]; --请求ID --(2).执行判断业务 local requestIdNum = redis.call('INCR',requestId); --表示第一次请求 if (requestIdNum==1) then redis.call('expire',requestId,600) --10min过期 return 1; --成功 end; --第二次及第二次以后的请求 if (requestIdNum>1) then return 0; --失败 end; end; --对应的是整个代码块的结束 --4、扣减库存 local function subtractSeckillStock() --(1) 获取相关参数 --local key =KEYS[1]; --传过来的是ypf12345没有什么用处 --local arg1 = tonumber(ARGV[1]);--购买的商品数量 -- (2).扣减库存 -- local lastNum = redis.call('DECR',"sCount"); local lastNum = redis.call('DECRBY',ARGV[8],tonumber(ARGV[4])); --string类型的自减 -- (3).判断库存是否完成 if lastNum < 0 then return 0; --失败 else return 1; --成功 end end --[[ 二. 方法调用 返回值1代表成功,返回:0,2,3,4 代表不同类型的失败 ]]-- --[[ --1. 单品限流调用 local status1 = seckillLimit(); if status1 == 0 then return 2; --失败 end ]]-- --[[ --2. 限制购买数量 local status2 = userBuyLimit(); if status2 == 0 then return 3; --失败 end ]]-- --3. 方法幂等 --[[ local status3 = recordOrderSn(); if status3 == 0 then return 4; --失败 end ]]-- --4.扣减秒杀库存 local status4 = subtractSeckillStock(); if status4 == 0 then return 0; --失败 end return 1; --成功
RaSeckillLuaCallback.lua:
--[[本脚本主要整合:单品限流、购买的商品数量限制、方法幂等、扣减库存的业务的回滚操作]] --[[ 一. 方法声明 ]]-- --1.单品限流恢复 local function RecoverSeckillLimit() local limitKey = ARGV[1];-- 受限商品key redis.call('INCR',limitKey); end; --2.恢复用户购买数量 local function RecoverUserBuyNum() local userBuyGoodLimitKey = ARGV[2]; local goodNum = tonumber(ARGV[5]); --商品数量 redis.call("DECRBY",userBuyGoodLimitKey,goodNum); end --3.删除方法幂等存储的记录 local function DelRequestId() local userRequestId = ARGV[3]; --请求ID redis.call('DEL',userRequestId); end; --4. 恢复订单原库存 local function RecoverOrderStock() local stockKey = ARGV[4]; --库存中的key local goodNum = tonumber(ARGV[5]); --商品数量 redis.call("INCRBY",stockKey,goodNum); end; --[[ 二. 方法调用 ]]-- RecoverSeckillLimit(); RecoverUserBuyNum(); DelRequestId(); RecoverOrderStock();
在项目的Common文件夹(不是Common类库)新建RaCacheBackService.cs、RaLuasLoadService.cs
RaCacheBackService.cs:
public class RaCacheBackService:BackgroundService { private readonly IConfiguration _configuration; public RaCacheBackService(IConfiguration configuration) { _configuration = configuration; } protected async override Task ExecuteAsync(CancellationToken stoppingToken) { // EFCore的上下文默认注入的请求内单例的,而CacheBackService要注册成全局单例的 // 由于二者的生命周期不同,所以不能相互注入调用,这里手动new一个EF上下文 var optionsBuilder = new DbContextOptionsBuilder<DbHelperContext>(); optionsBuilder.UseSqlServer(_configuration[string.Join(":", new string[] { "DBConnection", "ConnectionStrings", "Dbconn" })]); DbHelperContext dbHelper = new DbHelperContext(optionsBuilder.Options); var data = await dbHelper.SeckillProduct.Where(x => x.id == "21e86c6cc32b4e7bb80f96c98e4e8000").FirstOrDefaultAsync(); RedisHelper.Set($"{data.productId}-sCount", data.productStockNum); } }
RaLuasLoadService.cs:
public class RaLuasLoadService: BackgroundService { private readonly IMemoryCache _cache; public RaLuasLoadService(IMemoryCache cache) { _cache = cache; } protected override Task ExecuteAsync(CancellationToken stoppingToken) { FileStream fileStream1 = new FileStream(@"Luas/RaSeckillLua.lua", FileMode.Open); using (StreamReader reader = new StreamReader(fileStream1)) { string line = reader.ReadToEnd(); string luaSha = RedisHelper.ScriptLoad(line); //保存到缓存中 _cache.Set<string>("SeckillLua1", luaSha); } FileStream fileStream2 = new FileStream(@"Luas/RaSeckillLuaCallback.lua", FileMode.Open); using (StreamReader reader = new StreamReader(fileStream2)) { string line = reader.ReadToEnd(); string luaSha = RedisHelper.ScriptLoad(line); //保存到缓存中 _cache.Set<string>("SeckillLuaCallback1", luaSha); } return Task.CompletedTask; } }
Startup.cs中注册Cap和后台任务
services.AddCap(x => { x.UseInMemoryStorage(); //内存存储 x.UseInMemoryMessageQueue(); //内存消息队列 x.UseEntityFramework<DbHelperContext>(); //x.UseSqlServer(Configuration[string.Join(":", new string[] { "DBConnection", "ConnectionStrings", "Dbconn" })]); x.UseRabbitMQ(rb => { rb.HostName = "localhost"; rb.UserName = "guest"; rb.Password = "guest"; rb.Port = 5672; rb.VirtualHost = "/"; //rb.QueueMessageExpires = 24 * 3600 * 10; //队列中消息自动删除时间(默认10天) }); x.ConsumerThreadCount = 1; }); //注册后台任务 services.AddHostedService<RCacheBackService>(); services.AddHostedService<RCustomerService>(); services.AddHostedService<CacheBackService>(); services.AddHostedService<CustomerService>(); services.AddHostedService<LCacheBackService>(); services.AddHostedService<LCustomerService>(); services.AddHostedService<LuasLoadService>();
控制器新建SetOrderRabbit和CreateOrder
[HttpGet] [Route("[action]")] public string SetOrderRabbit([FromServices] ICapPublisher _capBus, string userId, string proId, string requestId = "125643") { int tLimits = 1000; //限制请求数量 int tSeconds = 2; //限制秒数 int goodNum = 1; //用户购买的商品数量 string limitKey = $"LimitRequest{proId}";//受限商品ID int tGoodBuyLimits = 1000; //用户单个商品可以购买的数量 string userBuyGoodLimitKey = $"userBuyGoodLimitKey-{userId}-{proId}"; //用户单个商品的限制key string userRequestId = requestId; //用户下单页面的请求ID string proKey = $"{proId}-sCount"; //该商品库存keyint try { //调用lua脚本 var result = RedisHelper.EvalSHA(cache.Get<string>("SeckillLua1"), "ypf12345", tLimits, tSeconds, limitKey, goodNum, tGoodBuyLimits, userBuyGoodLimitKey, userRequestId, proKey); if (result.ToString() == "1") { //2. 将下单信息存到消息队列中 var orderNum = Guid.NewGuid().ToString("N"); _capBus.Publish("seckillGoods", $"{userId}-{proId}-{orderNum}"); //3. 把部分订单信息返回给前端 return $"下单成功,订单信息为:userId={userId},arcId={proId},orderNum={orderNum}"; } else { return "卖完了"; } } catch (Exception ex) { //lua回滚 RedisHelper.EvalSHA(cache.Get<string>("SeckillLuaCallback1"), "ypf12345", limitKey, userBuyGoodLimitKey, userRequestId, proKey, goodNum); throw new Exception(ex.Message); } } [NonAction] [CapSubscribe("seckillGoods")] public void CreateOrder(string orderInfor) { try { //1扣除库存 var product = dbHelper.SeckillProduct.Where(x => x.id == "21e86c6cc32b4e7bb80f96c98e4e8000").FirstOrDefault(); product.productStockNum = product.productStockNum - 1; //2提交订单 var tempData = orderInfor.Split("-").ToList(); Models.Order tOrder = new Models.Order(); tOrder.id = Guid.NewGuid().ToString("N"); tOrder.userId = tempData[0]; tOrder.orderNum = Guid.NewGuid().ToString("N"); tOrder.productId = tempData[1]; tOrder.orderTotal = product.productPrice; tOrder.addTime = DateTime.Now; tOrder.orderStatus = 0; tOrder.orderPhone = "1565555555"; tOrder.orderAddress = "test"; tOrder.delFlag = 0; dbHelper.Add(tOrder); var count= dbHelper.SaveChanges(); Console.WriteLine($"执行成功,条数为:{count}"); } catch (Exception ex) { throw new Exception($"订阅业务执行失败:{ex.Message}"); } }
测试结果:
源代码分享
链接:https://pan.baidu.com/s/1hjYyQJBM0C0qU65JXrOZ6A
提取码:k365
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。