赞
踩
项目是b站黑马程序员的redis教程中的案例,建议所有java程序员做一下!
这篇博客会从最简单的实现优惠卷秒杀到加分布式锁、对秒杀优化、使用消息队列异步下单做详细介绍!
@Override @Transactional public Result seckillVoucher(Long voucherId) { /** * 秒杀基本实现一: * 1.查询优惠卷 * 2.判断秒杀是否开始 * 3.判断是否结束 * 4.判断库存是否充足 * 5.扣减库存 * 6.创建订单 */ SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { //尚未开始 return Result.fail("秒杀尚未开始!"); } if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已经结束!"); } if (voucher.getStock() < 1) { return Result.fail("库存不足!"); } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") .eq("voucher_id", voucherId).update(); if (!success) { return Result.fail("库存不足!"); } VoucherOrder voucherOrder = new VoucherOrder(); long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); Long userId = UserHolder.getUser().getId(); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(orderId); }
超卖情况
加锁解决超卖问题
乐观锁
两种方式是心爱乐观锁:
实现方式:
每当数据做一次修改,版本号加1,所以判断一个数据有没有被修改过就看它的版本有没有变化过
CAS法:
Compare and Swap,即比较再交换。
也就是我不在判断库存有没有被修改过了,我每次都去比较看库存是否小于0
乐观锁解决超卖问题
乐观锁更新操作的时候使用
@Override @Transactional public Result seckillVoucher(Long voucherId) { /** * 秒杀基本实现二: * 1.查询优惠卷 * 2.判断秒杀是否开始 * 3.判断是否结束 * 4.判断库存是否充足 * 5.扣减库存(乐观锁解决超卖问题) * 6.创建订单 */ SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { //尚未开始 return Result.fail("秒杀尚未开始!"); } if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已经结束!"); } if (voucher.getStock() < 1) { return Result.fail("库存不足!"); } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") .eq("voucher_id", voucherId).gt("stock",0) .update(); if (!success) { return Result.fail("库存不足!"); } VoucherOrder voucherOrder = new VoucherOrder(); long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); Long userId = UserHolder.getUser().getId(); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(orderId); }
重复下单情况
解决思路
@Override public Result seckillVoucher(Long voucherId) { /** * 秒杀基本实现三: * 悲观锁实现一人一单 */ SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { //尚未开始 return Result.fail("秒杀尚未开始!"); } if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已经结束!"); } if (voucher.getStock() < 1) { return Result.fail("库存不足!"); } Long userId = UserHolder.getUser().getId(); synchronized (userId.toString().intern()){ //获取事务代理对象 IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } } @Transactional public Result createVoucherOrder(Long voucherId) { Long userId = UserHolder.getUser().getId(); int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); if (count > 0) { return Result.fail("您已经购买过一次了!"); } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") .eq("voucher_id", voucherId).gt("stock", 0) .update(); if (!success) { return Result.fail("库存不足!"); } VoucherOrder voucherOrder = new VoucherOrder(); long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(orderId); }
防止事务失效
synchronized (userId.toString().intern()){
//获取事务代理对象
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
事务的生效其实是Spring拿到当前对象的代理对象,这里如果直接调用就不是Spring的代理对象了事务就会失效,所以要获取当前对象的代理对象
通过AopContext.currentProxy()
API去获取
为什么把生成订单单独提取出来?
因为只有生成订单才会对数据库有插入操作,这个时候才需要事务。
需要事务的同时一人一单也需要加锁(悲观锁)
而且要在事务提交之后在释放锁!
测试
集群下的线程并发安全问题
加锁的原理就是在JVM内部维护了一个锁监视器
,如果是集群模式下的话那就是多个JVM,悲观锁就失效了
通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。
模拟集群环境:
Vm option
里添加-Dserver.port=8082
此时测试,还是会发现会出现一人多单的情况
什么是分布式锁?
满足分布式系统
下或者集群模式
下的多线程可见并且互斥的锁
分布式锁的实现方式
redis分布式锁测试
# 实现分布式锁时需要实现的两个基本方法
# 1.添加锁,利用setnx的互斥特性
SETNX lock thread1
# 2.添加锁过期时间,避免服务宕机引起的死锁
EXPIRE lock 10
# 释放锁,删除即可
127.0.0.1:6379> DEL lock
(integer) 1
如果下订单的线程在redis中加了锁,这时如果redis宕机了,那么其他线程就会一直处于等待状态,这时就出现了死锁的现象。
如何解决?
利用redis中key过期时间,自动释放锁能避免服务宕机引起的死锁
如果服务在加锁和过期释放期间宕机怎么办?
保证加锁和过期释放的原子性!
redis的set命令可以跟上很多参数,可以同时保证加锁和设置过期时间
因为 SET 命令可以通过参数来实现和 SETNX 、 SETEX 和 PSETEX 三个命令的效果,所以将来的 Redis 版本可能会废弃并最终移除 SETNX 、 SETEX 和 PSETEX 这三个命令。
127.0.0.1:6379> SET lock thread1 NX EX 10
OK
127.0.0.1:6379> ttl lock
(integer) 5
127.0.0.1:6379> ttl lock
(integer) -2
实现分布式锁的流程
impl
实现分布式锁的思路就是通过redis中的setnx(如果不存在就创建key,存在就不创建)这样的互斥命令
每当有线程过来抢购的时候,首先会获取锁,也就是执行redis中的setnx命令。如果再有线程过来抢购那么就会被阻塞,只有等该锁被释放其他线程才能再次获取
@Override public Result seckillVoucher(Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { //尚未开始 return Result.fail("秒杀尚未开始!"); } if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已经结束!"); } if (voucher.getStock() < 1) { return Result.fail("库存不足!"); } Long userId = UserHolder.getUser().getId(); //生成分布式锁对象,传入当前用户id和stringRedisTemplate对象 SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate); // boolean isLock = lock.tryLock(1200); if (!isLock){ // 获取锁失败,返回错误或重试 return Result.fail("不允许重复下单!"); } try { //获取事务代理对象 IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { lock.unlock(); } } public class SimpleRedisLock implements ILock { //不同的业务有不同的名称 private String name; private StringRedisTemplate stringRedisTemplate; //对锁定义一个统一的前缀 private static final String KEY_PREFIX = "lock:"; //锁的名称要求用户传递给我们,所以这里我们定义一个构造函数 public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) { this.name = name; this.stringRedisTemplate = stringRedisTemplate; } @Override public boolean tryLock(long timeoutSec) { /** * 版本一: * 基础实现 * key就是固定前缀+锁的名称,value就是线程标识 * SET lock thread1 NX EX 10 */ String threadId = Thread.currentThread().getId(); Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } @Override public void unlock() { stringRedisTemplate.delete(ID_PREFIX + name); }
分布式锁误删情况说明
获取锁之后,线程A的业务出现了阻塞,直到锁到了超时时间被自动释放,业务还在处于阻塞状态。
这时线程B获取锁开始执行自己的业务,此时线程A阻塞的业务完成后,会把锁给删掉!这样就是分布式锁误删的情况。
所以我们在删除锁的时候需要进行一个判断,看看删除的是不是当前线程所持有的锁
@Override public Result seckillVoucher(Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { //尚未开始 return Result.fail("秒杀尚未开始!"); } if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已经结束!"); } if (voucher.getStock() < 1) { return Result.fail("库存不足!"); } Long userId = UserHolder.getUser().getId(); //生成分布式锁对象,传入当前用户id和stringRedisTemplate对象 SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate); // boolean isLock = lock.tryLock(1200); if (!isLock){ // 获取锁失败,返回错误或重试 return Result.fail("不允许重复下单!"); } try { //获取事务代理对象 IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { lock.unlock(); } } public class SimpleRedisLock implements ILock { //不同的业务有不同的名称 private String name; private StringRedisTemplate stringRedisTemplate; //对锁定义一个统一的前缀 private static final String KEY_PREFIX = "lock:"; //锁的名称要求用户传递给我们,所以这里我们定义一个构造函数 public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) { this.name = name; this.stringRedisTemplate = stringRedisTemplate; } private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-"; @Override public boolean tryLock(long timeoutSec) { /** * 版本一: * 基础实现 * key就是固定前缀+锁的名称,value就是线程标识 * SET lock thread1 NX EX 10 */ String threadId = ID_PREFIX + Thread.currentThread().getId(); Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } @Override public void unlock() { /** * 版本二: * 释放锁的时候判断是不是当前线程的锁 */ //获取线程id String threadId = ID_PREFIX + Thread.currentThread().getId(); //获取key String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); if (threadId.equals(id)) { stringRedisTemplate.delete(KEY_PREFIX + name); } }
情况说明
如果JVM发送FULL GC时会阻塞所有的代码,因为判断标识是否一致和释放锁是两步
,所以在判断成功之后如果发生FUll GC那么其他线程再次获取锁的时候,还是可能发生误删的情况
为了避免这种情况的发生,我们必须保证判断锁标识的动作和释放锁的动作是原子性的!
这就是下面我们要学习的lua脚本
什么是lua脚本?
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html
lua脚本的基本使用
在lua脚本中调用函数如下:
redis.call('命令名称', 'key', '其它参数', ...)
例如,我们要执行set name jack,则脚本是这样:
# 执行 set name jack
redis.call('set', 'name', 'jack')
例如,我们要先执行set name Rose,再执行get name,则脚本如下
# 先执行 set name jack
redis.call('set', 'name', 'jack')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name
写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:
127.0.0.1:6379> help @scripting
EVAL script numkeys [key [key ...]] [arg [arg ...]]
summary: Execute a Lua script server side
since: 2.6.0
例如,我们要执行 redis.call(‘set’, ‘name’, ‘jack’) 这个脚本,语法如下:
127.0.0.1:6379> EVAL "return redis.call('set','name','Jack')" 0
如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数
127.0.0.1:6379> EVAL "return redis.call('set',KEYS[1],ARGV[1])" 1 name Tom
OK
127.0.0.1:6379> get name
"Tom"
Java调用lua脚本改造分布式锁
释放锁的业务流程是这样的:
---
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by qiang.
--- DateTime: 2023/1/8 21:24
---
if (redis.call('get', KEYS[1]) == ARGV[1]) then
--释放锁
return redis.call('del', KEYS[1])
end
--如果不匹配
return 0
RedisTemplate调用Lua脚本的API如下:
@Override public Result seckillVoucher(Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { //尚未开始 return Result.fail("秒杀尚未开始!"); } if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已经结束!"); } if (voucher.getStock() < 1) { return Result.fail("库存不足!"); } Long userId = UserHolder.getUser().getId(); //生成分布式锁对象,传入当前用户id和stringRedisTemplate对象 SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate); // boolean isLock = lock.tryLock(1200); if (!isLock){ // 获取锁失败,返回错误或重试 return Result.fail("不允许重复下单!"); } try { //获取事务代理对象 IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { lock.unlock(); } } public class SimpleRedisLock implements ILock { //不同的业务有不同的名称 private String name; private StringRedisTemplate stringRedisTemplate; //对锁定义一个统一的前缀 private static final String KEY_PREFIX = "lock:"; //锁的名称要求用户传递给我们,所以这里我们定义一个构造函数 public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) { this.name = name; this.stringRedisTemplate = stringRedisTemplate; } private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-"; private static final DefaultRedisScript<Long> UNLOCK_SCRIPT; static { UNLOCK_SCRIPT = new DefaultRedisScript<>(); UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua")); UNLOCK_SCRIPT.setResultType(Long.class); } @Override public boolean tryLock(long timeoutSec) { /** * 版本一: * 基础实现 * key就是固定前缀+锁的名称,value就是线程标识 * SET lock thread1 NX EX 10 */ String threadId = ID_PREFIX + Thread.currentThread().getId(); Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } @Override public void unlock() { /** * 版本三 * 通过lua脚本来释放锁 */ stringRedisTemplate.execute( UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name), ID_PREFIX + Thread.currentThread().getId()); } } }
setnx实现的分布式锁存在的问题
什么是Redisson?
Redisson基于redis实现了一套分布式工具的集合
官网地址: https://redisson.org
GitHub地址: https://github.com/redisson/redisson
Redisson快速入门
一、引入依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.1</version>
</dependency>
二、配置Redisson客户端
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://172.20.10.2:6379");
// 创建RedissonClient对象
return Redisson.create(config);
}
}
三、使用Redisson的分布式锁
@Override public Result seckillVoucher(Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { //尚未开始 return Result.fail("秒杀尚未开始!"); } if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已经结束!"); } if (voucher.getStock() < 1) { return Result.fail("库存不足!"); } Long userId = UserHolder.getUser().getId(); RLock lock = redissonClient.getLock("lock:order:" + userId); /** * tryLock参数说明: * long waitTime 超时等待时间 默认是-1,也就是不等待,获取不到就直接返回false * long leaseTime 超时释放时间 默认是30s,如果该锁超过30s会自动释放 */ boolean isLock = lock.tryLock(); if (!isLock) { // 获取锁失败,返回错误或重试 return Result.fail("不允许重复下单!"); } try { //获取事务代理对象 IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { lock.unlock(); } }
将原先逻辑的串行执行改为异步执行,也就是将判断用户有没有秒杀资格交给redis去做
,如果有那就把用户信息
和订单信息
保存到阻塞队列
中交给其他线程去执行
redis数据结构的选择
判断库存是否充足:可以选用String结构,key是优惠卷的信息,value是库存的数量
判断一人一单:可以选用set结构,将下过单的用户保存到set集合当中
流程如下:
实现
1.新增秒杀优惠券的同时,将优惠券信息保存到Redis中
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
...
...
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}
2.基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
--- --- Generated by EmmyLua(https://github.com/EmmyLua) --- Created by qiang. --- DateTime: 2023/1/9 14:41 --- -- 1.参数列表 --判断库存是否充足需要去redis中去查,所以需要知道优惠卷id -- 1.1优惠卷id local voucherId = ARGV[1] --判断一人一单需要知道用户id -- 1.2用户id local userId = ARGV[2] -- 1.3.订单id --local orderId = ARGV[3] -- 2.数据库key -- 2.1库存key local stockKey = 'seckill:stock:' .. voucherId -- 2.2订单key local orderKey = 'seckill:order:' .. voucherId -- 3.脚本业务 -- 3.1判断库存是否大于0 get stock if (tonumber(redis.call('get', stockKey)) <= 0) then -- 3.2.库存不足,返回1 return 1 end -- 判断用户是否下单 SISMEMBER orderKey userId --127.0.0.1:6379> sadd setCollection1 1 2 3 --(integer) 3 --127.0.0.1:6379> SISMEMBER setCollection1 2 --(integer) 1 --127.0.0.1:6379> SISMEMBER setCollection1 0 --(integer) 0 if (redis.call('SISMEMBER', orderKey, userId) == 1) then --存在,说明是重复下单 return 2 end -- 3.4扣库存 incrby stockKey - 1 redis.call('incrby', stockKey, -1) -- 3.5下单 redis.call('sadd', orderKey, userId) return 0
3.java代码如下
PS:秒杀卷必须已经被保存到redis当中
因为lua脚本在执行的时候会去redis中读取优惠卷的库存,不然会出现如下错误:
user_script:26: attempt to compare nil with number
@Service public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Resource private RedisIdWorker redisIdWorker; @Resource private RedissonClient redissonClient; @Resource private StringRedisTemplate stringRedisTemplate; //加载seckill.lua文件 private static final DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript<>(); SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); } @Override public Result seckillVoucher(Long voucherId) { /** * 秒杀实现六 * 通过redis的lua脚本对秒杀进行优化 */ Long userId = UserHolder.getUser().getId(); // 执行lua脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString() ); // 判断结果是否为0 int r = result.intValue(); if (r != 0){ // 不为0,代表没有购买资格 return Result.fail(r == 1 ? "库存不足" : "不能重复下单"); } // 为0,有购买资格,把下单信息保存到阻塞队列 long orderId = redisIdWorker.nextId("order"); // 返回订单id return Result.ok(orderId); } }
防止事务失效
事务的生效其实是Spring拿到当前对象的代理对象,这里如果直接调用(直接创建订单)就不是Spring的代理对象了事务就会失效,所以要获取当前对象的代理对象通过AopContext.currentProxy()
API去获取
@Slf4j @Service public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Resource private RedisIdWorker redisIdWorker; @Resource private RedissonClient redissonClient; @Resource private StringRedisTemplate stringRedisTemplate; //加载seckill.lua文件 private static final DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript<>(); SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); } //阻塞队列 private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024); //线程池 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); //在类初始化之后就执行init方法 //init方法会去执行创建订单线程VoucherOrderHandler(新的线程,通过实现Runnable接口创建) @PostConstruct private void init(){ SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); } //VoucherOrderHandler为线程任务 //该线程会去读阻塞队列中的订单信息,然后再去调用创建订单方法,完成异步下单 private class VoucherOrderHandler implements Runnable{ @Override public void run() { while (true){ try { // 获取队列中的订单信息 VoucherOrder voucherOrder = orderTasks.take(); // 创建订单 handleVoucherOrder(voucherOrder); } catch (Exception e) { log.error("处理订单异常",e); } } } } /** * 异步创建订单 */ private void handleVoucherOrder(VoucherOrder voucherOrder) { //这里创建订单的线程不是主线程,所以不能从userHolder里获取用户,只能从订单对象中获取用户id Long userId = voucherOrder.getUserId(); RLock lock = redissonClient.getLock("lock:order:" + userId); /** * tryLock参数说明: * long waitTime 超时等待时间 默认是-1,也就是不等待,获取不到就直接返回false * long leaseTime 超时释放时间 默认是30s,如果该锁超过30s会自动释放 */ boolean isLock = lock.tryLock(); if (!isLock) { //兜底方案,其实不用再去获取锁,因为在lua脚本中已经判断过一人一单 log.error("不允许重复下单!"); return; } try { //获取事务代理对象 //这里不能通过AopContext.currentProxy()去获取代理对象,因为创建优惠卷订单(createVoucherOrder)是在主线程执行的 //而当前方法是新的线程执行的代码,我们必须用主线程才能防止创建优惠卷订单(createVoucherOrder)事务失效 proxy.createVoucherOrder(voucherOrder); } finally { lock.unlock(); } } IVoucherOrderService proxy; @Override public Result seckillVoucher(Long voucherId) { /** * 秒杀实现六 * 通过redis的lua脚本对秒杀进行优化 */ Long userId = UserHolder.getUser().getId(); // 1.执行lua脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString() ); // 2 判断结果是否为0 int r = result.intValue(); if (r != 0) { // 2.1 不为0,代表没有购买资格 return Result.fail(r == 1 ? "库存不足" : "不能重复下单"); } // 2.2 为0,有购买资格,把下单信息保存到阻塞队列 VoucherOrder voucherOrder = new VoucherOrder(); // 2.3 订单id long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); // 2.4 用户id voucherOrder.setUserId(userId); // 2.5 代金劵id voucherOrder.setVoucherId(voucherId); // 2.6 放入阻塞队列 orderTasks.add(voucherOrder); // 返回订单id return Result.ok(orderId); } @Transactional public void createVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder).count(); if (count > 0) { log.error("您已经购买过一次了!"); return; } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) .update(); if (!success) { log.error("库存不足!"); return; } save(voucherOrder); } }
IVoucherOrderService proxy; @Override public Result seckillVoucher(Long voucherId) { /** * 秒杀实现六 * 通过redis的lua脚本对秒杀进行优化 */ Long userId = UserHolder.getUser().getId(); // 1.执行lua脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString() ); // 2 判断结果是否为0 int r = result.intValue(); if (r != 0) { // 2.1 不为0,代表没有购买资格 return Result.fail(r == 1 ? "库存不足" : "不能重复下单"); } // 2.2 为0,有购买资格,把下单信息保存到阻塞队列 VoucherOrder voucherOrder = new VoucherOrder(); // 2.3 订单id long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); // 2.4 用户id voucherOrder.setUserId(userId); // 2.5 代金劵id voucherOrder.setVoucherId(voucherId); // 2.6 放入阻塞队列 orderTasks.add(voucherOrder); // 返回订单id return Result.ok(orderId); }
//加载seckill.lua文件 private static final DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript<>(); SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); } //阻塞队列 private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024); //线程池 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); //在类初始化之后就执行init方法 //init方法会去执行创建订单线程VoucherOrderHandler(新的线程,通过实现Runnable接口创建) @PostConstruct private void init(){ SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); } //VoucherOrderHandler为线程任务 //该线程会去读阻塞队列中的订单信息,然后再去调用创建订单方法,完成异步下单 private class VoucherOrderHandler implements Runnable{ @Override public void run() { while (true){ try { // 获取队列中的订单信息 VoucherOrder voucherOrder = orderTasks.take(); // 创建订单 handleVoucherOrder(voucherOrder); } catch (Exception e) { log.error("处理订单异常",e); } } } } /** * 异步创建订单 */ private void handleVoucherOrder(VoucherOrder voucherOrder) { //这里创建订单的线程不是主线程,所以不能从userHolder里获取用户,只能从订单对象中获取用户id Long userId = voucherOrder.getUserId(); RLock lock = redissonClient.getLock("lock:order:" + userId); /** * tryLock参数说明: * long waitTime 超时等待时间 默认是-1,也就是不等待,获取不到就直接返回false * long leaseTime 超时释放时间 默认是30s,如果该锁超过30s会自动释放 */ boolean isLock = lock.tryLock(); if (!isLock) { //兜底方案,其实不用再去获取锁,因为在lua脚本中已经判断过一人一单 log.error("不允许重复下单!"); return; } try { //获取事务代理对象 //这里不能通过AopContext.currentProxy()去获取代理对象,因为创建优惠卷订单(createVoucherOrder)是在主线程执行的 //而当前方法是新的线程执行的代码,我们必须用主线程才能防止创建优惠卷订单(createVoucherOrder)事务失效 proxy.createVoucherOrder(voucherOrder); } finally { lock.unlock(); } }
把订单信息放到阻塞队列的缺点是什么?
那为什么消息队列能解决这些问题?
现在有哪些消息队列可以用呢?
Kafka、RabbitMQ、RocketMQ…
但是我们都不用这些,因为我们项目规模比较小,我们用reids去实现消息队列就可以胜任
Redis提供了三种不同的方式来实现消息队列:
# lpush命令在l1队列中放入两个元素 127.0.0.1:6379> LPUSH l1 element1 element2 (integer) 2 # brpop命令在l1队列右侧取出第一个元素,等待时间20s 127.0.0.1:6379> BRPOP l1 20 1) "l1" 2) "element1" # brpop命令在l1队列右侧取出第二个元素,等待时间20s 127.0.0.1:6379> BRPOP l1 20 1) "l1" 2) "element2" # brpop命令在l1队列右侧取出第三个元素,等待时间20s,元素被取完,命令处于阻塞状态 127.0.0.1:6379> BRPOP l1 20
基于List的消息队列有哪些优缺点?
优点:
缺点:
# 向频道发送消息 127.0.0.1:6379> PUBLISH order.q1 hello (integer) 2 127.0.0.1:6379> PUBLISH order.q2 hello (integer) 1 # 订阅一个或多个频道 127.0.0.1:6379> SUBSCRIBE order.q1 Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "order.q1" 3) (integer) 1 1) "message" 2) "order.q1" 3) "hello" # 订阅与pattern格式匹配的所有频道 127.0.0.1:6379> PSUBSCRIBE order.* Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "order.*" 3) (integer) 1 1) "pmessage" 2) "order.*" 3) "order.q1" 4) "hello" 1) "pmessage" 2) "order.*" 3) "order.q2" 4) "hello"
基于PubSub的消息队列有哪些优缺点
优点:
缺点:
Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
发送消息
读取消息的方式之一:XREAD
如果不指定阻塞时长
测试
# 写消息 127.0.0.1:6379> XADD users * name zs age 12 "1673337915109-0" # 读消息 127.0.0.1:6379> XREAD count 1 streams user 0 (nil) 127.0.0.1:6379> XREAD count 1 streams users 0 1) 1) "users" 2) 1) 1) "1673337915109-0" 2) 1) "name" 2) "zs" 3) "age" 4) "12" # 消息可重复读 127.0.0.1:6379> XREAD count 1 streams users 0 1) 1) "users" 2) 1) 1) "1673337915109-0" 2) 1) "name" 2) "zs" 3) "age" 4) "12" # 再次发送消息 127.0.0.1:6379> XADD users * name ls age 21 "1673338159927-0" # 阻塞等待消息 127.0.0.1:6379> XREAD COUNT 1 BLOCK 0 STREAMS users $ 1) 1) "users" 2) 1) 1) "1673338159927-0" 2) 1) "name" 2) "ls" 3) "age" 4) "21" (13.00s)
消息漏读情况
当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题
创建消费者组
从消费者组读取消息
测试
创建消费者组
127.0.0.1:6379> XGROUP create s1 g1 0
OK
读取消费者组
127.0.0.1:6379> XREADGROUP group g1 c1 count 1 block 2000 streams s1 >
1) 1) "s1"
2) 1) 1) "1673336869091-0"
2) 1) "k1"
2) "v1"
在s1队列中添加多条消息
127.0.0.1:6379> XADD s1 * k2 v2
"1673354707296-0"
127.0.0.1:6379> XADD s1 * k3 v3
"1673354711159-0"
读取消费者组中的数据
>
号获取没有被确认的消息127.0.0.1:6379> XREADGROUP group g1 c1 count 1 block 2000 streams s1 >
1) 1) "s1"
2) 1) 1) "1673354707296-0"
2) 1) "k2"
2) "v2"
读取消费者组中的数据
>
号获取没有被确认的消息127.0.0.1:6379> XREADGROUP group g1 c1 count 1 block 2000 streams s1 >
1) 1) "s1"
2) 1) 1) "1673354711159-0"
2) 1) "k3"
2) "v3"
查看消息队列中那一条消息没有被处理
-
,+
号代表id范围,表示所有id127.0.0.1:6379> XPENDING s1 g1 - + 10
1) 1) "1673336869091-0"
2) "c1"
3) (integer) 1113736
4) (integer) 1
读取pending-list
中没有被确认的消息
0
,代表读取pending-list
中第一条没有被确认的消息127.0.0.1:6379> XREADGROUP group g1 c1 count 1 block 2000 streams s1 0
1) 1) "s1"
2) 1) 1) "1673336869091-0"
2) 1) "k1"
2) "v1"
确认消息
127.0.0.1:6379> XACK s1 g1 1673336869091-0
(integer) 1
再次查看pending-list中没有被处理的消息
127.0.0.1:6379> XREADGROUP group g1 c1 count 1 block 2000 streams s1 0
1) 1) "s1"
2) (empty array)
stream消息队列业务流程处理思路:
/** * @author qiang * @since 2022-12-27 */ @Slf4j @Service public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Resource private RedisIdWorker redisIdWorker; @Resource private RedissonClient redissonClient; @Resource private StringRedisTemplate stringRedisTemplate; //加载seckill.lua文件 private static final DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript<>(); SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); } //线程池 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); //在类初始化之后就执行init方法 //init方法会去执行创建订单线程VoucherOrderHandler(新的线程,通过实现Runnable接口创建) @PostConstruct private void init() { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); } //VoucherOrderHandler为线程任务 //该线程会去读消息队列中的订单信息,然后再去调用创建订单方法,完成异步下单 private class VoucherOrderHandler implements Runnable { private final String queueName = "stream.orders"; @Override public void run() { while (true) { try { // 0.初始化stream initStream(); // 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 > List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create(queueName, ReadOffset.lastConsumed()) ); // 2.判断订单信息是否为空 if (list == null || list.isEmpty()) { // 如果为null,说明没有消息,继续下一次循环 continue; } // 解析数据 MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); // 3.创建订单 handleVoucherOrder(voucherOrder); // 4.确认消息 XACK stream.orders g1 id stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId()); } catch (Exception e) { log.error("处理订单异常", e); handlePendingList(); } } } public void initStream() { Boolean exists = stringRedisTemplate.hasKey(queueName); if (BooleanUtil.isFalse(exists)) { log.info("stream不存在,开始创建stream"); // 不存在,需要创建 stringRedisTemplate.opsForStream().createGroup(queueName, ReadOffset.latest(), "g1"); log.info("stream和group创建完毕"); return; } // stream存在,判断group是否存在 StreamInfo.XInfoGroups groups = stringRedisTemplate.opsForStream().groups(queueName); if (groups.isEmpty()) { log.info("group不存在,开始创建group"); // group不存在,创建group stringRedisTemplate.opsForStream().createGroup(queueName, ReadOffset.latest(), "g1"); log.info("group创建完毕"); } } private void handlePendingList() { while (true) { try { // 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0 List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1), StreamOffset.create(queueName, ReadOffset.from("0")) ); // 2.判断订单信息是否为空 if (list == null || list.isEmpty()) { // 如果为null,说明没有消息,继续下一次循环 break; } // 解析数据 MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); // 3.创建订单 handleVoucherOrder(voucherOrder); // 4.确认消息 XACK stream.orders g1 id stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId()); } catch (Exception e) { log.error("处理订单异常", e); } } } } /** * 异步创建订单 */ private void handleVoucherOrder(VoucherOrder voucherOrder) { //这里创建订单的线程不是主线程,所以不能从userHolder里获取用户,只能从订单对象中获取用户id Long userId = voucherOrder.getUserId(); RLock lock = redissonClient.getLock("lock:order:" + userId); /** * tryLock参数说明: * long waitTime 超时等待时间 默认是-1,也就是不等待,获取不到就直接返回false * long leaseTime 超时释放时间 默认是30s,如果该锁超过30s会自动释放 */ boolean isLock = lock.tryLock(); if (!isLock) { //兜底方案,其实不用再去获取锁,因为在lua脚本中已经判断过一人一单 log.error("不允许重复下单!"); return; } try { //获取事务代理对象 //这里不能通过AopContext.currentProxy()去获取代理对象,因为创建优惠卷订单(createVoucherOrder)是在主线程执行的 //而当前方法是新的线程执行的代码,我们必须用主线程才能防止创建优惠卷订单(createVoucherOrder)事务失效 proxy.createVoucherOrder(voucherOrder); } finally { lock.unlock(); } } //通过主线程获取代理对象 private IVoucherOrderService proxy; @Override public Result seckillVoucher(Long voucherId) { /** * 秒杀实现七: * 使用redis提供的Stream消息队列优化秒杀 */ Long userId = UserHolder.getUser().getId(); //获取订单id long orderId = redisIdWorker.nextId("order"); // 1.执行lua脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) ); // 2 判断结果是否为0 int r = result.intValue(); if (r != 0) { // 2.1 不为0,代表没有购买资格 return Result.fail(r == 1 ? "库存不足" : "不能重复下单"); } proxy = (IVoucherOrderService) AopContext.currentProxy(); // 返回订单id return Result.ok(orderId); } @Transactional public void createVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder).count(); if (count > 0) { log.error("您已经购买过一次了!"); return; } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) .update(); if (!success) { log.error("库存不足!"); return; } save(voucherOrder); } }
--- --- Generated by EmmyLua(https://github.com/EmmyLua) --- Created by qiang. --- DateTime: 2023/1/9 14:41 --- -- 1.参数列表 --判断库存是否充足需要去redis中去查,所以需要知道优惠卷id -- 1.1优惠卷id local voucherId = ARGV[1] --判断一人一单需要知道用户id -- 1.2用户id local userId = ARGV[2] -- 1.3.订单id local orderId = ARGV[3] -- 2.数据库key -- 2.1库存key local stockKey = 'seckill:stock:' .. voucherId -- 2.2订单key local orderKey = 'seckill:order:' .. voucherId -- 3.脚本业务 -- 3.1判断库存是否大于0 get stock if (tonumber(redis.call('get', stockKey)) <= 0) then -- 3.2.库存不足,返回1 return 1 end -- 判断用户是否下单 SISMEMBER orderKey userId --127.0.0.1:6379> sadd setCollection1 1 2 3 --(integer) 3 --127.0.0.1:6379> SISMEMBER setCollection1 2 --(integer) 1 --127.0.0.1:6379> SISMEMBER setCollection1 0 --(integer) 0 if (redis.call('SISMEMBER', orderKey, userId) == 1) then --存在,说明是重复下单 return 2 end -- 3.4扣库存 incrby stockKey - 1 redis.call('incrby', stockKey, -1) -- 3.5下单 redis.call('sadd', orderKey, userId) -- 3.6发送消息到队列中,XADD stream.orders * k1 v1 k2 v2 ... -- *代表消息的id自动生成,orderId的key为id是因为订单实体类中订单id的字段属性为id redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId) return 0
总结
STREAM类型消息队列的XREADGROUP命令特点:
总结
STREAM类型消息队列的XREADGROUP命令特点:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。