赞
踩
由上一章内容可知,基于阻塞队列的异步秒杀还存在2个问题:
这一章通过Redis的消息队列进行优化
消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
Redis提供了三种不同的方式来实现消息队列:
基于List的消息队列有哪些优缺点?
优点:
缺点:
PubSub(发布订阅) 是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
基于PubSub的消息队列有哪些优缺点?
优点:
缺点:
发送消息的命令:
例如:
## 创建名为 users 的队列,并向其中发送一个消息,内容是:{name=jack,age=21},并且使用Redis自动生成ID
127.0.0.1:6379> XADD users * name jack age 21
"1644805700523-0"
读取消息的方式之一:XREAD
例如,使用XREAD读取第一个消息:
XREAD阻塞方式,读取最新的消息:
在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:
STREAM类型消息队列的XREAD命令特点:
创建消费者组:
# key:队列名称
# groupName:消费者组名称
# ID:起始ID标识,$代表队列中最后一个消息,0则代表队列中第一个消息
# MKSTREAM:队列不存在时自动创建队列
XGROUP CREATE key groupName ID [MKSTREAM]
其它常见命令:
# 删除指定的消费者组
XGROUP DESTORY key groupName
# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername
# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername
从消费者组读取消息:
# group:消费组名称
# consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
# count:本次查询的最大数量
# BLOCK milliseconds:当没有消息时最长等待时间
# NOACK:无需手动ACK,获取到消息后自动确认,自动ACK可能会出现消息丢失,所以一般需要手动ACK
# STREAMS key:指定队列名称
# ID:获取消息的起始ID:
">":从下一个未消费的消息开始
其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始,消费完并确认后,则会从pending-list中移除
正常情况通过">"读取消息,出现异常情况后,再从pending-list中读取
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
队列中有4个消息k1、k2、k3、k4
消息确认命令:
127.0.0.1:6379> help XACK
XACK key group ID [ID ...]
summary: Marks a pending message as correctly processed, effectively removing it from the pending entries list of the consumer group. Return value of the command is the number of messages successfully acknowledged, that is, the IDs we were actually able to resolve in the PEL.
since: 5.0.0
group: stream
一次性确认消费者组g1中的消息:
127.0.0.1:6379> XACK s1 g1 1646339018049-0 1646339342815-0 1646339529899-0 1646339537593-0
(integer)4
127.0.0.1:6379>
从pending-list中读取消息的命令:
127.0.0.1:6379> help XPENDING
# [IDLE min-idle-time]表示 获取消息以后 到 确认消息之前 的这段空闲时间
# start end 表示从start 到 end 之间的id 消息
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
summary: Return information and entries from a stream consumer group pending entries list, that are messages fetched but never acknowledged.
since: 5.0.0
group: stream
消费者监听消息的基本思路:
STREAM类型消息队列的XREADGROUP命令特点:
需求:
先提前创建stream.orders队列:
127.0.0.1:6379> XGROUP CREATE stream.orders g1 0 MKSTREAM
OK
127.0.0.1:6379>
优化后的Lua脚本:
-- 1.参数列表 -- 1.1.优惠券id local voucherId = ARGV[1] -- 1.2.用户id local userId = ARGV[2] -- 1.3.订单id local orderId = ARGV[3] -- 2.数据key -- 2.1.库存key,2个。。表示拼接字符串 local stockKey = 'seckill:stock:' .. voucherId -- 2.2.订单key local orderKey = 'seckill:order:' .. voucherId -- 3.脚本业务 -- 3.1.判断库存是否充足 get stockKey -- tonumber是将字符串转为数字 if(tonumber(redis.call('get', stockKey)) <= 0) then -- 3.2.库存不足,返回1 return 1 end -- 3.2.判断用户是否下单 SISMEMBER orderKey userId if(redis.call('sismember', orderKey, userId) == 1) then -- 3.3.存在,说明是重复下单,返回2 return 2 end -- 3.4.扣库存 incrby stockKey -1 redis.call('incrby', stockKey, -1) -- 3.5.下单(保存用户)sadd orderKey userId redis.call('sadd', orderKey, userId) -- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ... -- 'id', orderId,用id对应orderId,因为VoucherOrder实体类中是用id表示的orderId,这里也用id表示的orderId,解析Redis对象后可以少一次转换 redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId) return 0
优化后的代码:
@Slf4j @Service public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Resource private RedisIdWorker redisIdWorker; @Resource private StringRedisTemplate stringRedisTemplate; @Resource private RedissonClient redissonClient; private static final DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript<>(); SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); } // 创建阻塞队列 private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024); // 用线程池创建独立线程 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); // 将代理对象作为全局变量,供所有线程使用 private IVoucherOrderService proxy; // 项目启动后,就应该开启线程,异步从阻塞队列中获取信息 @PostConstruct private void init() { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); } private class VoucherOrderHandler implements Runnable{ private final String queueName = "stream.orders"; @Override public void run() { while (true) { try { // 0.初始化stream initStream(); // 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders > List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( // GROUP g1 c1 Consumer.from("g1", "c1"), // BLOCK 2000 StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), // STREAMS stream.orders > StreamOffset.create(queueName, ReadOffset.lastConsumed()) ); // 2.判断订单信息是否为空 if (list == null || list.isEmpty()) { // 如果为null,说明没有消息,继续下一次循环 continue; } // 3.解析消息中的订单信息 MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> value = record.getValue(); // 将Redis对象转为VoucherOrder对象 VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); // 4.读取消息成功后,创建订单 handleVoucherOrder(voucherOrder); // 5.确认消息 XACK stream.orders g1 id stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId()); } catch (Exception e) { log.error("处理订单异常", e); // 出现异常后,从PendingList中读取消息 handlePendingList(); } } } public void initStream(){ Boolean exists = stringRedisTemplate.hasKey(queueName); if (BooleanUtil.isFalse(exists)) { log.info("stream不存在,开始创建stream"); // 不存在,需要创建 stringRedisTemplate.opsForStream().createGroup(queueName, ReadOffset.latest(), "g1"); log.info("stream和group创建完毕"); return; } // stream存在,判断group是否存在 StreamInfo.XInfoGroups groups = stringRedisTemplate.opsForStream().groups(queueName); if(groups.isEmpty()){ log.info("group不存在,开始创建group"); // group不存在,创建group stringRedisTemplate.opsForStream().createGroup(queueName, ReadOffset.latest(), "g1"); log.info("group创建完毕"); } } private void handlePendingList() { while (true) { try { // 1.获取PendingList中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0 List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1), // 不需要BLOCK StreamOffset.create(queueName, ReadOffset.from("0")) // 从PendingList中0开始读取 ); // 2.判断订单信息是否为空 if (list == null || list.isEmpty()) { // 如果为null,说明PendingList没有消息,这里是结束循环,不再继续 break; } // 3.解析消息中的订单信息 MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> value = record.getValue(); // 将Redis对象转为VoucherOrder对象 VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); // 4.读取消息成功后,创建订单 handleVoucherOrder(voucherOrder); // 5.确认消息 XACK stream.orders g1 id stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId()); } catch (Exception e) { log.error("处理PendingList中的订单异常", e); } } } } private void handleVoucherOrder(VoucherOrder voucherOrder) { // 1.获取用户,注意,这里是单独的线程,所以不能从主线程的ThreadLocal获取userId Long userId = voucherOrder.getUserId(); // 2.创建锁对象 RLock redisLock = redissonClient.getLock("lock:order:" + userId); // 3.尝试获取锁,这里加锁是兜底方案,可以不用再加锁,因为前面执行过lua脚本校验过一人一单 boolean isLock = redisLock.tryLock(); // 4.判断是否获得锁成功 if (!isLock) { // 获取锁失败,直接返回失败或者重试 log.error("不允许重复下单!"); return; } try { // 注意:不能通过 IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); 获取代理对象 // 因为AopContext.currentProxy();底层内部也有个ThreadLocal,但是此时的线程是新开启的线程,所以不能获取不到主线程中的代理对象 // 所以需要在主线程中先获取到代理对象,保存到全局变量供所有线程使用 proxy.createVoucherOrder(voucherOrder); } finally { // 释放锁 redisLock.unlock(); } } @Override public Result seckillVoucher(Long voucherId) { Long userId = UserHolder.getUser().getId(); long orderId = redisIdWorker.nextId("order"); // 1.执行lua脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) ); int r = result.intValue(); // 2.判断结果是否为0 if (r != 0) { // 2.1.不为0 ,代表没有购买资格 return Result.fail(r == 1 ? "库存不足" : "不能重复下单"); } // 3.获取代理对象 proxy = (IVoucherOrderService) AopContext.currentProxy(); // 4.返回订单id return Result.ok(orderId); } @Transactional @Override public void createVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); // 5.1.查询订单 int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count(); // 5.2.判断是否存在 if (count > 0) { // 用户已经购买过了 log.error("用户已经购买过了"); return; } // 6.扣减库存 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") // set stock = stock - 1 .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0 .update(); if (!success) { // 扣减失败 log.error("库存不足"); return; } // 7.创建订单 save(voucherOrder); } }
测试结果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。