赞
踩
当电商项目中出现秒杀功能,同一时间多个线程发起请求,如何保证多个线程的安全呢,这就是分布式锁需要实现的功能
,分布式锁一个较为完美的框架就不得不提redission了
<--引入redis依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- redisson分布式锁依赖-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.17.6</version>
</dependency>
2.配置链接池
3.创建redission配置类注册bean
@Configuration
public class RedissonConfig {
// redission客户端配置 几个节点就配置几个方法名(也就是bean名)不一样即可
@Bean
public RedissonClient redissonClient(){
// 1. 配置redisseon 配置类
Config config = new Config();
// 1.2配置地址和密码(单节点模式) 多节点模式 使用 useclusterServer
config.useSingleServer().setAddress("redis://IP地址:6379").setPassword("密码");
// 创建
return Redisson.create(config);
}
}
4.使用方式
RLock rlock = redissonClient.getLock("order" + userid);//参数name为锁名字通常业务
boolean islock = rlock.tryLock();//超时时间应该是根业务有
if(islock){//获取到锁
try {
//p.5获取锁功能 该接下来的操作是串行
// 执行逻辑代码
}finally {
//释放锁
rlock.unlock();
}}
好了一个分布式锁的功能就如此完成了 ,用代码redis实现
在黑马点评项目的实现秒杀功能中出现出现的问题
虽然分布式锁框架有redisson 但是这个案列把原理讲得挺明白的
接口地址
点击跳过我的笔记看原理
@RestController
@RequestMapping("/voucher-order")
public class
VoucherOrderController {
@Autowired
IVoucherOrderService voucherService;
@PostMapping("seckill/{id}")
public Result seckillVoucher(@PathVariable("id") Long voucherId) {
// 下单 -校验是否生效-生成订单-优惠卷库存处理-
return voucherService.seckillorder(voucherId);
}
}
service
public interface IVoucherOrderService extends IService<VoucherOrder> {
Result seckillorder(Long voucherid);
Result createvoucher(Long voucherid);
}
实现
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
StringRedisTemplate stringRedisTemplate;
//唯一id生成器
@Autowired
RedisIdWorker redisIdWorker;
// 优惠券实现类
@Autowired
ISeckillVoucherService sekillservice;
//处理订货单
@Override
public Result seckillorder(Long voucherid) {
// 1.根据秒杀卷id 查询该优惠卷
SeckillVoucher seckillvoucher = sekillservice.getById(voucherid);
// 2.判断秒杀是否开始
if (seckillvoucher.getBeginTime().isAfter(LocalDateTime.now())){//结束时间在当前时间之后
return Result.fail("秒杀尚未开始");
}
// 3.判断时间是否结束
if (seckillvoucher.getEndTime().isBefore(LocalDateTime.now())){//结束时间在当前时间之前
return Result.fail("秒杀已经结束");
}
// 4.判断库存是否充足
if (seckillvoucher.getStock()<1){
return Result.fail("优惠券已经抢光");
}
/**
* 悲观·锁 互斥锁对象 synchronized
*
*/
// 5 校验该用户是否有过订单
// synchronized (userid.toString().intern()) { //用id做线程锁的标识存放在jvm中 线程同步的锁方法比避免用户刷单 多个线程同时查到用户首单的情况
//todo 优化之前用的默认jdk的同步方法 现在使用自己的封装类·获取锁
Long userid = UserHolder.getUser().getId();
//p.1创建锁对象 因为是秒杀订单 创建锁这里我们使用 业务+用户id 就可以知道是哪个用户创建的
SimpleredisLock lock = new SimpleredisLock("order" + userid, stringRedisTemplate);
//p.2获取锁
boolean islock = lock.trylock(5);//超时时间应该是根业务有关
//p.3判断是否获取成功
if(!islock){
//p.4 失败要么重试 递归 要么 返回错误
/**
* 这个业务是用户秒杀订单索取锁失败(要求一个用户一单) 代表同时点击多次 或者用脚本刷 所以应该直接返回error
*/
return Result.fail("一个用户只可享受一次优惠");
}
try {
//p.5获取锁功能 该接下来的操作是串行
//todo synchronized 的缺点是只能完成单个jvm的互斥
//6. 我们把创建订单封装未来一个事务方法 本类调用本类的事务方法只能从代理对象中获取 获取代理对象事务
// 6.1获取代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();//spring中是通过但代理对象管理bean 的这个方法 可以获取到当前的代理对象需要在启动类开启暴露
// 7.返回用户id
return proxy.createvoucher(voucherid); //如果直接调用 是调用的this 为了事务生效管理需要 使用代理对象
}finally {
lock.nulock();
}
}
//实现秒杀功能代码封装
@Transactional//多表操作标志为事务 回滚
public Result createvoucher(Long voucherid){
/**
* 进行优化 ,秒杀优惠卷应该是一人一单 在进行操作的时候 联合用户id 和 和优惠卷id 查询是否纯在订单记录
*/
//todo 但是秒杀功能和是多线程的 往往在查询出现用户id和订单的时候出现线程安全 比如多个线程查到count不大于0
//P.1 查询订单
Long userid = UserHolder.getUser().getId();
Integer count = query().eq("user_id", userid).eq("voucher_id", voucherid).count();
if (count>0){
return Result.fail("每个用户只可享受一次");
}
// 5.扣减库存
// 5.1 用乐观锁进行优化(更新数据) 这里不对比版本号 就对比库存量即可
boolean success= sekillservice
.update().setSql("stock=stock-1")
.eq("voucher_id", voucherid)
.gt("stock",0)
// 库存要求和最开始当前库存一致才可以进行数据库操作 类似版本号
// .eq("stock",seckillvoucher.getStock()) //避免数据不一致出现线程安全问题
// 当同时多个线程并行 这样的效率很低 (统一时间版本号发送改变 不执行数据库操作) 但是这样效率很低 为了避免这种情况 这里用得版本号=库存>0即可执行
.update();
if (!success){
return Result.fail("优惠卷不足");
}
// 6.创建订单 返回的主要是三个信息
VoucherOrder voucherOrder = new VoucherOrder();
//6.1 订单id
long nextId = redisIdWorker.nextId("order");//利用的redis自增和时间戳 每次创建一个id 会记录一次自增 以便到达统计的效果
voucherOrder.setId(nextId);
//6.2用户id 从校验登录凭证容器 userholder取 这里是保存线程 也可以保存到redis
voucherOrder.setUserId(userid);
// 6.3代金券id
voucherOrder.setVoucherId(voucherid);
//7.返回订单id
save(voucherOrder);
return Result.ok(nextId);
}
}
代码逻辑:
根据优惠卷id查询优惠券对应属性,判断时间不过期,库存等因素,以及用户订单记录为零(一人·一单)既可以创建订单 ,并且返回订单号给前端
上述代码中有俩个地方涉及到了锁的应用:
1.高并发下当多个线程同时查询订单记录,线程出现并发安全问题,由于多个线程同时查询到用户订单记录是0,同时进行sql修改,便会使库存出现负的情况.解决办法 使用乐观锁(每次进行数据库读写的时候对比版本号在操作) 这里用当前优惠券数量不为零·作为版本号
boolean success= sekillservice
.update().setSql("stock=stock-1")
.eq("voucher_id", voucherid)
.gt("stock",0)
// 库存要求和最开始当前库存一致才可以进行数据库操作 类似版本号
// .eq(“stock”,seckillvoucher.getStock()) //避免数据不一致出现线程安全问题
// 当同时多个线程并行 这样的效率很低 (统一时间版本号发送改变 不执行数据库操作) 但是这样效率很低 为了避免这种情况 这里用得版本号=库存>0即可执行
2.当服务器端是集群的时候之前是把锁存放到 线程里,当不是单体项目时,由于各个主机都能从各自线程里获取到锁所以需要redis实现分布式锁
/**
* 自定义分布式锁接口
*/
public interface ILock {
/**
* 尝试获取锁
* @param timeout 定时销毁时间
* @return
*/
boolean trylock(long timeout);
/**
* 尝试销毁锁
*/
void nulock();
}
SimpleredisLock
/**
* 使用redis实现功能
*/
public class SimpleredisLock implements ILock {
private final String KEY_LOCK = "LOCK:";
private String name;//业务的名称
/**
* 定义构造函数 让调用的时候申明申明业务的锁
* @param name 业务名字
* @param stringRedisTemplate
*/
public SimpleredisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private final String ThID_Prefix= UUID.randomUUID().toString(true)+"-";
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Override
public boolean trylock(long timeout) {
//获取当前线程
String currentthreadid = ThID_Prefix+Thread.currentThread().getId();
// 获取锁 k 是业务名字 value是 线程标识 后续释放锁的时候按照通过该标识 来删除 避免删除该用户其他业务的锁
Boolean islock = stringRedisTemplate.opsForValue().setIfAbsent(KEY_LOCK + name, currentthreadid, timeout, TimeUnit.SECONDS);
return Boolean.TRUE.equals(islock);//封装的Boolean类型方法起始内部都会拆箱 未来避免空指针使用比较进行判断
}
//
// @Override
// public void nulock() {
// //todo 进行优化 由于之前删除锁锁的key是根据执行的
// String Threadvalue=ThID_Prefix+ java.lang.Thread.currentThread().getId();
// String redisthreadvalue= stringRedisTemplate.opsForValue().get(KEY_LOCK + name);
进行比较
// if (Threadvalue.equals(redisthreadvalue)){
确定是当前线程拥有的锁
// //释放锁
// stringRedisTemplate.delete(KEY_LOCK+name);
// }
// }
/**
* 使用lua 脚本 unlock 写在静态代码块里面 该类一加载就初始化好了 无需进行io流
*/
private static final DefaultRedisScript<Long> unlock_SCRIPT;
static{
// 脚本先初始化
unlock_SCRIPT=new DefaultRedisScript<>();
// 指定脚本资源
unlock_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
// 指定返回值类型
unlock_SCRIPT.setResultType(Long.class);
}
@Override
public void nulock (){
//调用脚本
// List<String> list = Collections.singletonList(KEY_LOCK + name);//将传入数据封装为一个list
// String currentthreadid = ThID_Prefix+Thread.currentThread().getId();
// 这样相较于之前只有一行代码 使用lua 脚本具有原子性 避免锁超时误删的情况
stringRedisTemplate.execute(unlock_SCRIPT,Collections.singletonList(KEY_LOCK + name),ThID_Prefix+Thread.currentThread().getId());
}
}
代码逻辑:
利用redis的setnx功能获取锁,然后为锁的值设置当前线程的标识值,在释放锁的时候对锁的值和当前线程的标识值对比在进行删除
,避免误删其他线程的锁 unlockl 注释掉的unlock方法优化后使用lua 脚本一行代码完成 (保证事务的原子性)
将unlock的判断逻辑封装为lua 脚本
--比较线程标志和锁中的标志是否相同 如果传入的key 的value 和value相等 则删除
if (redis.call('get',KEYS[1])==ARGV[1]) then
--执行释放锁 del key
return redis.call('del',KEYS[1])
end
这样一看分布式锁是不是感觉也挺简单的,这就是redisson的大致源码逻辑了,
和上面自定义的代码逻辑类似,使用lua脚本保持原子性 通过redis 的exsist key
判断锁是否存在,redisson使用的的hash结构(为了是锁的可复用),hash结构一列存储标识值,一列存储线程号(采用自增1,获取一个锁-1,释放锁-1).然后设置到期时间,传参reylock方法的时候默认不传便是-1,底层设置30s到期时间(看门狗机制)
用redis除了可以实现分布式锁以外 还可以用作消息队列
主要途径有三种
利用stream数据结构来做消息队列 ,
stream 结构的基本操作
快速了解stream数据结构
之前的秒杀功能呢虽然晚上,但是数度还是觉得有限,如果还想要提升运行速度,将原来的功能分布异步请求完成,1.redis秒杀卷缓存,库存量,以及对应下单用户id 2. lua脚本判断是否符合购买条件,并保存信息发送消息队列3.数据库操作由异步完成 接受消息完成操作
新增秒杀卷时 把库存信设置缓存
@Override
@Transactional
public void addSeckillVoucher( Voucher voucher) {
//1. 保存优惠券
save(voucher); //优惠卷实体字段字段包含秒杀券实体字段
System.out.println(voucher);
// 2. 保存秒杀卷信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
//begin 和endtime 目前没法传递参数
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 3.秒杀卷信息存入缓存 秒杀卷设置库存
stringRedisTemplate.opsForValue().set(HmConstants.SECKILL_STOCK+voucher.getId(),voucher.getStock().toString());
}
lua脚本
--- DateTime: 2022/12/27 21:47
---根据redis 的key 判断库存是否充足 在判断用户是否下过单 在扣减库存,把当前下单用户id存入set集合
-----1.定义变量
---1.1优惠券id
local voucherId = ARGV[1] --定义一个变量 调用时传入 优惠卷id 判断库存
--1.2用户id
local userId = ARGV[2] --用户id 判断是否下过单
--1.3 订单id 异步请求发送消息
local orderId=ARGV[3]
--1.4.库存key 和存储时候对应
local stockey ='seckill:stock:' .. voucherId --lua 中字符的拼接是..
--1.5.订单key
local orderkey='seckill:order:' .. voucherId
--2.业务执行
-- 2.1判断库存
if(tonumber(redis.call('get',stockey))<=0)then -- 我们用String序列化的记得转数字
-- 库存不足 返回 1 失败 0成功
return 1
end
--2.2todo 判断用户是否下过单 使用redis set集合 sismember 命令
if ((redis.call('sismember',orderkey,userId))==1) then
--存在说明已经下过单 不具有秒杀资格 用状态码 2 标识
return 2
end
--3.1 用户有资格下单 执行下单逻辑 进行库存扣减 incrby 库存 -1
redis.call('incrby',stockey,-1)
--3.2存入用户id 存入set结构无序不重复
redis.call('sadd',orderkey,userId)
--3.3发送到消息队列 xadd stream.orders * k1 v1 k2 v2 .....
redis.cal('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'Id',orderId)
return 0
优化后的实现异步秒杀
@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
// redisson分布式锁
@Autowired
private RedissonClient redissonClient;
@Autowired
private StringRedisTemplate stringRedisTemplate;
//唯一id生成器
@Autowired
private RedisIdWorker redisIdWorker;
// 优惠券实现类
@Autowired
private ISeckillVoucherService sekillservice;
private static final DefaultRedisScript<Long> Seckill_script;//定义脚本
static {
// 使用静态代码块初始脚本
Seckill_script=new DefaultRedisScript<>();
Seckill_script.setLocation(new ClassPathResource("seckill.lua"));
Seckill_script.setResultType(Long.class);
}
// 创建线程池 (单线程)
private static ExecutorService seckill_executor = Executors.newSingleThreadExecutor();
// 创建成员变量 用来防止代理对象 子线程无法获取父线程中出现的代理
private IVoucherOrderService proxy;
// 创建线程类任务
private class vouvherorderhandler implements Runnable{
// 定义队列名称 (实际项目在yaml里面配置)
String quequename="stream.orders";
@Override
public void run() {
while (true){
try {
// 1.获取消息队列中的订单信息 xreadgroup group g1 c1 count 1 block 2000 stream.orders
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(quequename, ReadOffset.lastConsumed())//">" 最新消息
);
// 2.判断消息是否获取成功
if(list==null || list.isEmpty()){
// 2.1 如果获取失败 进行下一个循环
continue;
}
// 3.如果获取成功说名有消息需要处理 可以下单
// 3.1解析消息
MapRecord<String, Object, Object> record = list.get(0);
// 3.2 获取lua脚本存放的数值信息
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 3.4 完成下单
handleVoucherOrder(voucherOrder);
// 4. ack 确认消息队列 ack 队列名 组名 消费者名
stringRedisTemplate.opsForStream().acknowledge(quequename,"c1",record.getId());
} catch (Exception e) {
log.error("订单出现异常",e);
//5.出现异常的话消息存储在pendinglist (消息没被处理默认保存)中
handleoendinglist();
}
}
}
/**
* 处理消息没有被响应时的方法
*/
private void handleoendinglist() {
while (true){
try {
// 1.获取pendinglist中的订单信息 xreadgroup group g1 c1 count 1 stream.orders 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1),
StreamOffset.create(quequename, ReadOffset.from("0"))// lastConsumed=">" 最新消息
);
// 2.判断消息是否获取成功
if(list==null || list.isEmpty()){
// 2.1 如果获取失败 说明pendinglist 没有异常消息 跳出·循环
break;
}
// 3.如果获取成功说名有消息需要处理 可以下单
// 3.1解析消息
MapRecord<String, Object, Object> record = list.get(0);
// 3.2 获取lua脚本存放的数值信息
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 3.4 完成下单
handleVoucherOrder(voucherOrder);
// 4. ack 确认消息队列 ack 队列名 组名 消费者名
stringRedisTemplate.opsForStream().acknowledge(quequename,"c1",record.getId());
} catch (Exception e) {
// 5. 无需递归 本身就是while 循环
log.error("订单出现异常",e);
}
}
}
}
// . 创建订单方法在外部创建 避免冗杂
void handleVoucherOrder(VoucherOrder voucherOrder){
//p.5获取锁功能 该接下来的操作是串行
RLock lock= redissonClient.getLock("lock:order:"+voucherOrder.getUserId());
boolean tryLock = lock.tryLock();
if (tryLock) {
try { //进行创建订单
proxy= (IVoucherOrderService) AopContext.currentProxy();
proxy.createvoucher(voucherOrder) ;
} finally {
lock.unlock();
}
}
}
@PostConstruct //本类构建后自动执行
private void init(){
seckill_executor.submit( new vouvherorderhandler()); //通过线程池处理 创建出线程执行线程任务
}
//异步优化后的秒杀方法2.0
@Override
public Result seckillorder(Long voucherid) {
// 1 获取用户id
Long id = UserHolder.getUser().getId();
// 2.获取订单id
long orderId = redisIdWorker.nextId("order");
//3执行lua 脚本
Long r = stringRedisTemplate.execute(
Seckill_script,
Collections.emptyList(),//脚本的key 由我们传入参数(优惠券id决定 所以无需要传输key)
voucherid.toString(),
id.toString(),String.valueOf(orderId)
);
int zgr = r.intValue(); //转为int
// 2.判断结果不是0 不具有购买资格
if (zgr!=0){
return Result.fail(zgr==1?"库存不足":"用户不能重复下单");//三目运算 对应的俩种清的情况
}
//3.判断成功 说明 lua 脚本发送异步消息 已经执行 了数据操作
return Result.ok(orderId);
}
//实现秒杀功能代码封装
@Transactional//多表操作标志为事务 回滚
public void createvoucher(VoucherOrder voucherorder){
/*
进行优化 ,秒杀优惠卷应该是一人一单 在进行操作的时候 联合用户id 和 和优惠卷id 查询是否纯在订单记录
*/
// //但是秒杀功能和是多线程的 往往在查询出现用户id和订单的时候出现线程安全 比如多个线程查到count不大于0
// //P.1 查询订单
// Long userid = UserHolder.getUser().getId();
Integer count = query().eq("user_id", voucherorder.getVoucherId()).eq("voucher_id",voucherorder.getVoucherId()).count();
if (count>0){
log.error("每个用户只能购买一次");
}
// 5.数据库扣减库存 优化时候 redis已经做出了判断
// 5.1 用乐观锁进行优化(更新数据) 这里不对比版本号 就对比 库存量即可
boolean success= sekillservice
.update().setSql("stock=stock-1")
.eq("voucher_id", voucherorder.getVoucherId())
.gt("stock",0)
// 库存要求和最开始当前库存一致才可以进行数据库操作 类似版本号
// .eq("stock",seckillvoucher.getStock()) //避免数据不一致出现线程安全问题
// 当同时多个线程并行 这样的效率很低 (统一时间版本号发送改变 不执行数据库操作) 但是这样效率很低 为了避免这种情况 这里用得版本号=库存>0即可执行
.update();
if (!success){
log.error("库存不足");
}
//
save(voucherorder);
// 无需返回值了 优化后变成异步执行的了
}
@PostConstruct spring中的注解 由它初始化一个线程任务lua脚本资格判断成功发送消息后,执行数据库操作,完成俩个功能异步实行,在原来并发安全的情况下,提升效率
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。