赞
踩
参数:手机号
方式:get
生成验证码,并保存到session中
发送短信验证码
参数:手机号,验证码
方式:post
校验验证码
判断用户是否存在
存在,登录
不存在,注册
保存到session
拦截器 LoginInterceptor
参数:request
从session中读取数据,判断是否存在
不存在,重定向到login
存在,保存用户到ThreadLocal中,并放行
多台Tomcat中的session数据不共享,切换不同的Tomcat数据丢失
结果:以电话为key,valus 的类型为String
思路:每个电话号码都具有唯一性,login()的参数中含有phone和验证码,方便拿到
结果:以随机字符为key,valus 的类型为Hash
实现:
后端:
1.从Redis中取出code,校验
2.生成随机的token字符串,作为存储的key
2.把user对象转成HaspMap,存放到Redis中,value的类型是Hash,并设置失效时间为30分钟
3.把token,并返回给前端,
前端:
1.把返回的token 使用浏览器的sessionStorage保存
2.使用request拦截器,在每个请求发送之前,将用户的token放入请求头中
1.从请求头中获取token的值
2.从redis中取出usermap,判空(true,返回false),转userDTO(脱敏后的user对象)
3.刷新user信息的失效时间,模仿session的失效,在登录时设置user信息的失效时间是30min,当user访问其他(发送请求时),刷新user的失效时间(还为30min)
4.放行
LoginInterceptor 做了两件事,设置哪些页面需要登录(权限),== 刷新用户在Redis中的有效期==,即LoginInterceptor 最终的功能是只有需要用户登录权限的页面才能刷新失效时间,当用户一直访问首页时,访问时间大于30分钟,则用户信息失效,需要重新登录,非常不友好
在LoginInterceptor 拦截器之前再做一个拦截器RefreshTokenInterceptor,
RefreshTokenInterceptor:拦截所有路径,更新失效时间
LoginInterceptor:只用来判断用户是否登录,是否放行
缓存,数据交换的缓冲区
优势:减低后端负载,提高读写效率
劣势:数据一致性,代码维护成本,运维成本
参数:long id
请求:get
返回结果:shop对象
redis中缓存的商铺信息:key : id , valus : string或者Hash 类型存储
实现思路:
1.根据id从redis中取出shop信息
2.判断是否存在shop信息, 存在,直接返回
3.不存在,根据id从数据库查询shop信息
4.数据库中不存在,报错,404
5.存在, a.往redis添加该信息 b.返回该数据
超时剔除策略可以当做其他缓存策略的兜底方案
使用人工编码的方式,需要考虑3个问题
需求:
1.根据id查询商铺时,未命中,则查询数据库,将数据库中的数据写入缓存并设置失效时间
2.根据id修改数据库中的商铺信息时,先修改数据库,再删除缓存
需求2 的实现:
方法:update
参数: shop对象
方式:put
实现思路:
1根据shop对象更改数据库
2.根据shop的id删除redis中的信息
浏览器发送的请求的数据,在redis和数据库中都未命中,即不存在这个数据,这些数据都会一直访问数据库(可能会存在恶意攻击的情况)
**缓存空对象 **:数据库不存在这个数据,就返回一个空对象给redis,并设置较短的失效时间
布隆过滤:根据算法,把数据库中的数据转成btye[],布隆过滤器说不存在一定 不存在,说存在但 不一定 存在
增强id的复杂度,避免被猜测id规律
做好数据的基础格式校验
加强用户校验权限
做好热点参数的限流
产生原因:(满足其一)
产生的条件: 一个高访问并且==缓存重建业务比较复杂(时间相对较久)==的key失效了
结果:无数的请求会在重建缓存时,给数据库带来巨大的压力
**原理:**当线程1构建缓存数据之前,先拿到一把锁,完成缓存的构建后,释放锁,在构建缓存数据之间,其他的线程获取锁失败,则无法写入缓存,休眠重试,一直等待,直到线程1完成,才命中缓存
**结果:**数据一致性,但性能较低(程序的可用性较低)
原理:给数据添加一个失效字段expire,不真正的设置失效时间,即永远有效,当线程1发现逻辑失效时间已经过期,拿到互斥锁,让一个新的线程去构建缓存数据,自己返回旧的数据,在构建缓存数据期间,其他线程发现逻辑时间过期,拿锁失败=发现有其他线程正在构建,则直接返回旧的数据
**结果:**程序的性能比较好,但是数据有较短的不一致性
是一种在分布式系统下用来生成全局唯一ID的工具
方法名 nextId()
参数:String keyPrefix ,keyPrefix 其实是业务名,根据业务名的不同,在Redis中创建不同的key,能对不同的ID进项自增
具体实现:
符号位:永远是0,可以不写
生成时间戳(使用时间戳是为了id的安全性,为了防止用户读懂订单id)
生成序列号 (这里使用了redis string类型中incr的命令,每次自增1,即每个ID的序列号都不同,即使在时间戳相同(下单时间相同,仍能保证该方法生成的唯一id))
拼接两者,称为一个long类型的数字
返回 long数字
代码
@Component public class RedisIdWorker { /** * 起始时间 */ public static final long BEGIN_TIMESTAMP = 1672531200L; //序列号的位置长度 public static final int COUNT_BITS = 32; @Resource private StringRedisTemplate stringRedisTemplate; public long nextId(String keyPrefix){ //1. 生成时间戳 LocalDateTime now = LocalDateTime.now(); long nowSecond = now.toEpochSecond(ZoneOffset.UTC); long timeStamp = nowSecond - BEGIN_TIMESTAMP; //2. 生成序列号 //2.1 生成当前年月日 String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd")); //2.2 自增长 Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date); //拼接返回 return timeStamp << COUNT_BITS | count; } /* public static void main(String[] args) { LocalDateTime time = LocalDateTime.of(2023, 1, 1, 0, 0, 0); long second = time.toEpochSecond(ZoneOffset.UTC); System.out.println("second = " + second); }*/ }
方法: seckillVoucher()
参数:long voucherId 优惠券id
实现:
@Override public Result seckillVoucher(Long voucherId) { // 1.查询优惠券 SeckillVoucher voucher = seckillVoucherService.getById(voucherId); // 2.判断秒杀是否开始 if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { // 尚未开始 return Result.fail("秒杀尚未开始!"); } // 3.判断秒杀是否已经结束 if (voucher.getEndTime().isBefore(LocalDateTime.now())) { // 尚未开始 return Result.fail("秒杀已经结束!"); } // 4.判断库存是否充足 if (voucher.getStock() < 1) { // 库存不足 return Result.fail("库存不足!"); } //5,扣减库存 boolean success = seckillVoucherService.update() .setSql("stock= stock -1") .eq("voucher_id", voucherId).update(); if (!success) { //扣减库存 return Result.fail("库存不足!"); } //6.创建订单 VoucherOrder voucherOrder = new VoucherOrder(); // 6.1.订单id long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); // 6.2.用户id Long userId = UserHolder.getUser().getId(); voucherOrder.setUserId(userId); // 6.3.代金券id voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(orderId); }
悲观锁:认为线程安全问题一定会发生,因此在操作数据之前先获得锁,使线程串行执行
实例:==Synchronized、Lock(互斥锁)==都属于悲观锁
插入:Synchronized、Lock(互斥锁)的区别
乐观锁:认为线程安全不一定会发生(小概率会发生),因此不需要加锁,只是在更新的数据时,去判断数据是否被修改(根据版本号或者更新的数据(如库存)),被修改则异常或重试
判断数据是否被修改的两种方法:
版本号法:添加一个新的字段版本号version,查询数据时并查询版本号,更新数据并对版本号+1,再以查询的版本号作为条件之一进行数据更新操作
CAS法(Compare And Set )
相当于简化的版本号法,用数据本身是否发生变化来判断线程是否安全
原因:初始库存100,有100个线程同时购买商品,查询的库存数量都是100,当线程1完成并更改库存数量为99,其他99个线程全部失败
解决方案:
//5,扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).gt("stock",0)
.update();
方法: seckillVoucher()
参数:long voucherId 优惠券id
实现:
出现的问题:
解决:加悲观锁,因为乐观锁是通过判断数据是否被修改来判断线程是否安全,查询订单是否存在没有修改数据,无法使用乐观锁
**悲观锁加在哪里: ** 先看4.4.3 synchronized的使用讲解
目标:用户相同的线程 只能创建一个订单
userId.toString().intern()
✔️
toString()
是希望userId的值一样,但是toString内部会new String()
,所以还要使用intern()
去从常量池中寻找相同字符的地址,并返回,所以,当userId的值一样,返回的地址一样@Service public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Resource private RedisIdWorker redisIdWorker; @Override public Result seckillVoucher(Long voucherId) { // 1.查询优惠券 SeckillVoucher voucher = seckillVoucherService.getById(voucherId); // 2.判断秒杀是否开始 if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { // 尚未开始 return Result.fail("秒杀尚未开始!"); } // 3.判断秒杀是否已经结束 if (voucher.getEndTime().isBefore(LocalDateTime.now())) { // 尚未开始 return Result.fail("秒杀已经结束!"); } // 4.判断库存是否充足 if (voucher.getStock() < 1) { // 库存不足 return Result.fail("库存不足!"); } Long userId = UserHolder.getUser().getId(); synchronized (userId.toString().intern()){ IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); //代理类,防止事务失效 return proxy.createVoucherOrder(voucherId); } } @Transactional public Result createVoucherOrder(Long voucherId) { Long userId = UserHolder.getUser().getId(); // 5.一人一单逻辑 // 5.1.用户id int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); // 5.2.判断是否存在 if (count > 0) { // 用户已经购买过了 return Result.fail("用户已经购买过一次!"); } //5,扣减库存 boolean success = seckillVoucherService.update() .setSql("stock= stock -1") .eq("voucher_id", voucherId).gt("stock",0) .update(); if (!success) { //扣减库存 return Result.fail("库存不足!"); } //6.创建订单 VoucherOrder voucherOrder = new VoucherOrder(); // 6.1.订单id long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); // 6.2.用户id voucherOrder.setUserId(userId); // 6.3.代金券id voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(orderId); } }
还需要添加依赖
在主程序暴露代理
synchronized
是Java中用于实现线程同步的关键字。当多个线程同时访问共享资源时,使用synchronized
关键字可以确保线程的安全性,避免数据不一致或竞态条件的问题。
synchronized
可以用在方法或代码块中,以下是synchronized
的使用方法讲解:
synchronized
关键字,表示该方法是一个同步方法。同一时间只能有一个线程访问该方法。public synchronized void synchronizedMethod() {
// 同步方法的代码块
}
synchronized
关键字,对代码块进行同步。在同一时间只有一个线程可以进入同步代码块执行。public void method() {
// 非同步代码块
synchronized (this) {
// 同步代码块
}
// 非同步代码块
}
可以使用synchronized
关键字后面的对象作为锁来控制线程的同步。常见的对象锁包括当前对象(this
)、类锁(ClassName.class
)、任意对象等。
需要注意的是,如果多个线程使用相同的锁对象,那么只有一个线程能够进入同步代码块进行执行,其他线程将进入阻塞状态等待锁释放。
synchronized
的使用可以确保线程同步,但过多的同步块或方法可能会导致性能下降。因此,在使用synchronized
时应注意选择合适的同步粒度,避免不必要的同步操作。
同时,Java还提供了更高级的并发工具,如Lock
接口和ReentrantLock
类,可以提供更灵活的控制和更细粒度的锁定,可以根据需求选择合适的方式进行线程同步。
问题:当在集群模式或者分布式系统中,相同的id可能会访问不同的Tomact,而多个Tomcat会有多个JVM–>多个常量池—>都获得锁—>从而有多个订单
结论:synchronized的锁 在分布式系统或者集群中失效,从而继续学习分布式锁
条件:在分布式系统或集群模式下,多线程可见并互斥的锁
特点:
接口
方法 tryLock(Time timeoutSec) 返回 boolean 尝试获取锁
方法 unlock() 释放锁
接口实现类
类名simpleRedisLock
属性
方法
代码实现:
package com.hmdp.lock; /** * @author ttsin */ public interface ILock { /** * 尝试获取锁 * @param timeoutSec 锁持有的超时时间,过期后自动释放锁 * @return true 代表获取锁成功,false代表获取锁失败 */ boolean tryLock(long timeoutSec); /** * 释放锁 */ void unlock(); }
package com.hmdp.lock; import cn.hutool.core.util.BooleanUtil; import org.springframework.data.redis.core.StringRedisTemplate; import java.util.concurrent.TimeUnit; /** * @author ttsin */ public class SimpleRedisLock implements ILock { private StringRedisTemplate stringRedisTemplate; private String name; public static final String KEY_PREFIX = "lock:"; public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) { this.stringRedisTemplate = stringRedisTemplate; this.name = name; } @Override public boolean tryLock(long timeoutSec) { //获取当前线程id long threadId = Thread.currentThread().getId(); //获取锁 Boolean result = stringRedisTemplate.opsForValue(). setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS); return BooleanUtil.isTrue(result); } @Override public void unlock() { stringRedisTemplate.delete(KEY_PREFIX + name); } }
@Override public Result seckillVoucher(Long voucherId) { // 1.查询优惠券 SeckillVoucher voucher = seckillVoucherService.getById(voucherId); // 2.判断秒杀是否开始 if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { // 尚未开始 return Result.fail("秒杀尚未开始!"); } // 3.判断秒杀是否已经结束 if (voucher.getEndTime().isBefore(LocalDateTime.now())) { // 尚未开始 return Result.fail("秒杀已经结束!"); } // 4.判断库存是否充足 if (voucher.getStock() < 1) { // 库存不足 return Result.fail("库存不足!"); } Long userId = UserHolder.getUser().getId(); /* synchronized (userId.toString().intern()){ IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); //代理类,防止事务失效 return proxy.createVoucherOrder(voucherId); }*/ //获取锁对象 SimpleRedisLock simpleRedisLock = new SimpleRedisLock(stringRedisTemplate, "order:" + userId); //拿锁 boolean lock = simpleRedisLock.tryLock(5L); //失败 if(!lock){ return Result.fail("不可重复下单!"); } //成功 try { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); //代理类,防止事务失效 return proxy.createVoucherOrder(voucherId); } finally { //释放 simpleRedisLock.unlock(); }
分析版本1的问题
解决方案:
代码实现:修改了simpleRedisLock中的tryLock()和unlock()方法
package com.hmdp.lock; import cn.hutool.core.lang.UUID; import cn.hutool.core.util.BooleanUtil; import org.springframework.data.redis.core.StringRedisTemplate; import java.util.concurrent.TimeUnit; /** * @author ttsin */ public class SimpleRedisLock implements ILock { private StringRedisTemplate stringRedisTemplate; private String name; private static final String KEY_PREFIX = "lock:"; private static final String ID_PREFIX = UUID.randomUUID().toString(true); @Override public boolean tryLock(long timeoutSec) { //获取当前线程id String threadId = ID_PREFIX + Thread.currentThread().getId(); //获取锁 Boolean result = stringRedisTemplate.opsForValue(). setIfAbsent(KEY_PREFIX + name, threadId , timeoutSec, TimeUnit.SECONDS); return BooleanUtil.isTrue(result); } @Override public void unlock() { //当前线程的id String threadId = ID_PREFIX + Thread.currentThread().getId(); //锁中存放的id String lockID = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); //判断两者是否相同 if(threadId.equals(lockID)){ //相同,释放锁 stringRedisTemplate.delete(KEY_PREFIX + name); } } }
分析版本2的问题
解决方案
Lua脚本的编写
释放锁的业务流程
1、获取锁中的线程标示
2、判断是否与指定的标示(当前线程标示)一致
3、如果一致则释放锁(删除)
4、如果不一致则什么都不做
Lua脚本实现逻辑
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
对simpleRedisLock类进行修改
package com.hmdp.lock; import cn.hutool.core.lang.UUID; import cn.hutool.core.util.BooleanUtil; import org.springframework.core.io.ClassPathResource; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import java.util.Collections; import java.util.concurrent.TimeUnit; /** * @author ttsin */ public class SimpleRedisLock implements ILock { private StringRedisTemplate stringRedisTemplate; private String name; private static final String KEY_PREFIX = "lock:"; private static final String ID_PREFIX = UUID.randomUUID().toString(true); public static final DefaultRedisScript<Long> UNLOCK_SCRIPT; static { UNLOCK_SCRIPT = new DefaultRedisScript<>(); UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua")); UNLOCK_SCRIPT.setResultType(Long.class); } public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) { this.stringRedisTemplate = stringRedisTemplate; this.name = name; } @Override public boolean tryLock(long timeoutSec) { //获取当前线程id String threadId = ID_PREFIX + Thread.currentThread().getId(); //获取锁 Boolean result = stringRedisTemplate.opsForValue(). setIfAbsent(KEY_PREFIX + name, threadId , timeoutSec, TimeUnit.SECONDS); return BooleanUtil.isTrue(result); } @Override public void unlock() { //调用Lua脚本 stringRedisTemplate.execute( UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name), ID_PREFIX + Thread.currentThread().getId()); } }
基于setnx实现的分布式锁存在下面的问题:
重入问题:重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的。
不可重试:是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。
**超时释放:**我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患
主从一致性: 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。
什么是Redisson
它是一个在Redis基础上实现了分布式工具的集合,分布式锁只是它的功能之一,它包含了各种分布式锁的实现
<!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
package com.hmdp.config; import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author ttsin */ @Configuration public class RedisConfig { @Bean public RedissonClient redissonClient() { //配置类 Config config = new Config(); //添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址 config.useSingleServer() .setAddress("redis://ip地址:6379").setPassword("Redis密码"); //创建客户端 return Redisson.create(config); } }
情景:一个线程在执行方法A()时,使用set nx ex 取了锁,A()中调用了B(),B()也需要锁才能执行业务,因为是同一个线程,
A()获得了锁,B()会获取锁失败,A()无法执行完毕,A()不释放锁,从而死锁
解决方案:Redssion中的可重入锁
期望:
原理:
(Hash 类型无 set nx ex 的命令)判断锁是否存在
不存在,则获取锁并添加线程标识,和设置有效期(以上步骤等同于 set nx ex 命令)和 锁计数 + 1
存在,则判断锁标识是否是自己
当获取锁成功后,执行业务
判断锁是否是自己的(防止误删锁)
流程图和存储图
Lua获取锁和释放锁的脚本代码
问题情景
主从模式,主节点指向关于写的操作,从节点执行关于读的操作,主节点同步到从节点上,保持数据的一致性
当java应用发起一个请求,获取锁 set lock thread1 NX EX 10 ,主节点设置锁成功,在同步到从节点之前,主节点宕机,会默认选出一个从节点成为主节点,但当前主节点上没有锁的设置,其他线程可以获取锁,从而造成线程同步的问题
图示
解决方案
使用联锁multiLock
设置相互独立的多个主节点,多个主节点都获取锁成功才算获取锁成功
当某个主节点 在于 从节点进行数据同步之前,突然宕机,它的从节点成为主节点,从节点中没有锁的存储,其他线程获取当前节点锁成功,但其他节点的锁获取失败,于是,获取锁失败
图示
性能问题
解决方案
流程图
需求
需求1
@Override @Transactional public void addSeckillVoucher(Voucher voucher) { // 保存优惠券 save(voucher); // 保存秒杀信息 SeckillVoucher seckillVoucher = new SeckillVoucher(); seckillVoucher.setVoucherId(voucher.getId()); seckillVoucher.setStock(voucher.getStock()); seckillVoucher.setBeginTime(voucher.getBeginTime()); seckillVoucher.setEndTime(voucher.getEndTime()); seckillVoucherService.save(seckillVoucher); //保存优惠券信息到Redis中 StringRedisTemplate.opsForValue() .set(SECKILL_STOCK_KEY_PREFIX+voucher.getId(),voucher.getStock().toString()); }
需求2
编写Lua脚本,判断秒杀库存和一人一单,决定用户是否抢购成功
-- 1.参数列表 -- 1.1.优惠券id local voucherId = ARGV[1] -- 1.2.用户id local userId = ARGV[2] -- 2.数据key -- 2.1.库存key local stockKey = 'seckill:stock:' .. voucherId -- 2.2.订单key local orderKey = 'seckill:order:' .. voucherId -- 3.脚本业务 -- 3.1.判断库存是否充足 get stockKey if(tonumber(redis.call('get', stockKey)) <= 0) then -- 3.2.库存不足,返回1 return 1 end -- 3.2.判断用户是否下单 SISMEMBER orderKey userId if(redis.call('sismember', orderKey, userId) == 1) then -- 3.3.存在,说明是重复下单,返回2 return 2 end -- 3.4.扣库存 incrby stockKey -1 redis.call('incrby', stockKey, -1) -- 3.5.下单(保存用户)sadd orderKey userId redis.call('sadd', orderKey, userId) return 0
需求3
根据Lua返回的结果,对订单进行处理seckillVoucher()
@Override public Result seckillVoucher(Long voucherId) { //获得用户信息 UserDTO user = UserHolder.getUser(); //执行Lua脚本 Long result = stringRedisTemplate .execute(SECKILL_ORDER_SCRIPT, Collections.emptyList(), voucherId, user.getId()); //判断结果是否为0 assert result != null; int res = result.intValue(); if(res != 0){ return Result.fail(res == 1 ? "库存不足":"重复下单"); } //订单id long orderId = redisIdWorker.nextId("order"); //TODO 阻塞线程 //返回订单id return Result.ok(orderId); }
需求4
//异步处理线程池 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); //在类初始化之后执行,因为当这个类初始化好了之后,随时都是有可能要执行的 @PostConstruct private void init() { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); } // 用于线程池处理的任务 // 当初始化完毕后,就会去从对列中去拿信息 private class VoucherOrderHandler implements Runnable{ @Override public void run() { while (true){ try { // 1.获取队列中的订单信息 VoucherOrder voucherOrder = orderTasks.take(); // 2.创建订单 handleVoucherOrder(voucherOrder); } catch (Exception e) { log.error("处理订单异常", e); } } } private void handleVoucherOrder(VoucherOrder voucherOrder) { //1.获取用户 Long userId = voucherOrder.getUserId(); // 2.创建锁对象 RLock redisLock = redissonClient.getLock("lock:order:" + userId); // 3.尝试获取锁 boolean isLock = redisLock.lock(); // 4.判断是否获得锁成功 if (!isLock) { // 获取锁失败,直接返回失败或者重试 log.error("不允许重复下单!"); return; } try { //注意:由于是spring的事务是放在threadLocal中,此时的是多线程,事务会失效 proxy.createVoucherOrder(voucherOrder); } finally { // 释放锁 redisLock.unlock(); } } //a private BlockingQueue<VoucherOrder> orderTasks =new ArrayBlockingQueue<>(1024 * 1024); @Override public Result seckillVoucher(Long voucherId) { Long userId = UserHolder.getUser().getId(); long orderId = redisIdWorker.nextId("order"); // 1.执行lua脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) ); int r = result.intValue(); // 2.判断结果是否为0 if (r != 0) { // 2.1.不为0 ,代表没有购买资格 return Result.fail(r == 1 ? "库存不足" : "不能重复下单"); } VoucherOrder voucherOrder = new VoucherOrder(); // 2.3.订单id long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); // 2.4.用户id voucherOrder.setUserId(userId); // 2.5.代金券id voucherOrder.setVoucherId(voucherId); // 2.6.放入阻塞队列 orderTasks.add(voucherOrder); //3.获取代理对象 proxy = (IVoucherOrderService)AopContext.currentProxy(); //4.返回订单id return Result.ok(orderId); } @Transactional public void createVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); // 5.1.查询订单 int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count(); // 5.2.判断是否存在 if (count > 0) { // 用户已经购买过了 log.error("用户已经购买过了"); return ; } // 6.扣减库存 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") // set stock = stock - 1 .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0 .update(); if (!success) { // 扣减失败 log.error("库存不足"); return ; } save(voucherOrder); }
小总结:
秒杀业务的优化思路是什么?
消息队列字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色
基于List的消息队列有哪些优缺点?
优点:
缺点:
语法使用:
基于PubSub的消息队列有哪些优缺点?
优点:
缺点:
Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
发送消息的命令:
例如:
读取消息的方式之一:XREAD
例如,使用XREAD读取第一个消息:
XREAD阻塞方式,读取最新的消息:
在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下
注意:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题
STREAM类型消息队列的XREAD命令特点:
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列
特点:
创建消费者组:
key:队列名称
groupName:消费者组名称
ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列
其它常见命令:
删除指定的消费者组
XGROUP DESTORY key groupName
给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername
删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername
从消费者组读取消息:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
消费者监听消息的基本思路(伪代码):
STREAM类型消息队列的XREADGROUP命令特点:
需求
需求1:
XGROUP CREATE stream.orders g1 0 MKSTREAM
MKSTREAM:队列不存在时自动创建队列,所以这条命令创建了消费者组g1和队列 stream.orders
需求2 :Lua脚本的修改
-- 1.参数列表 -- 1.1.优惠券id local voucherId = ARGV[1] -- 1.2.用户id local userId = ARGV[2] -- 1.3.订单id local orderId = ARGV[3] -- 2.数据key -- 2.1.库存key local stockKey = 'seckill:stock:' .. voucherId -- 2.2.订单key local orderKey = 'seckill:order:' .. voucherId -- 3.脚本业务 -- 3.1.判断库存是否充足 get stockKey if(tonumber(redis.call('get', stockKey)) <= 0) then -- 3.2.库存不足,返回1 return 1 end -- 3.2.判断用户是否下单 SISMEMBER orderKey userId if(redis.call('sismember', orderKey, userId) == 1) then -- 3.3.存在,说明是重复下单,返回2 return 2 end -- 3.4.扣库存 incrby stockKey -1 redis.call('incrby', stockKey, -1) -- 3.5.下单(保存用户)sadd orderKey userId redis.call('sadd', orderKey, userId) -- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ... redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId) return 0
需求3:获取stream.orders 消息队列中的 消息,进行下单修改代码在run()
@Slf4j @Service public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Resource private RedisIdWorker redisIdWorker; @Resource private StringRedisTemplate stringRedisTemplate; @Resource private RedissonClient redissonClient; public static final DefaultRedisScript<Long> SECKILL_ORDER_SCRIPT; static { SECKILL_ORDER_SCRIPT = new DefaultRedisScript<>(); SECKILL_ORDER_SCRIPT.setLocation(new ClassPathResource("seckill_order.lua")); SECKILL_ORDER_SCRIPT.setResultType(Long.class); } //异步处理线程池 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); //在类初始化之后执行,因为当这个类初始化好了之后,随时都是有可能要执行的 @PostConstruct private void init() { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); } // 用于线程池处理的任务 // 当初始化完毕后,就会去从对列中去拿信息 private class VoucherOrderHandler implements Runnable { @Override public void run() { while(true) { try { // 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 > List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create("stream.orders", ReadOffset.lastConsumed()) ); // 2.判断订单信息是否为空 if (list == null || list.isEmpty()) { // 如果为null,说明没有消息,继续下一次循环 continue; } // 解析数据 MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); // 3.创建订单 createVoucherOrder(voucherOrder); // 4.确认消息 XACK stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId()); } catch (Exception e) { log.error("处理订单异常", e); //处理异常消息 handlePendingList(); } } } private void handlePendingList() { while (true) { try { // 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0 List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1), StreamOffset.create("stream.orders", ReadOffset.from("0")) ); // 2.判断订单信息是否为空 if (list == null || list.isEmpty()) { // 如果为null,说明没有异常消息,结束循环 break; } // 解析数据 MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); // 3.创建订单 createVoucherOrder(voucherOrder); // 4.确认消息 XACK stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId()); } catch (Exception e) { log.error("处理pendding订单异常", e); try { Thread.sleep(20); } catch (InterruptedException ex) { ex.printStackTrace(); } } } } } @Override public Result seckillVoucher(Long voucherId) { //获得用户信息 Long userId = UserHolder.getUser().getId(); //订单id long orderId = redisIdWorker.nextId("order"); //执行Lua脚本 Long result = stringRedisTemplate .execute(SECKILL_ORDER_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(),String.valueOf(orderId)); //判断结果是否为0 assert result != null; int res = result.intValue(); if (res != 0) { return Result.fail(res == 1 ? "库存不足" : "重复下单"); } //返回订单id return Result.ok(orderId); } @Transactional public void createVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); // 5.1.查询订单 int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count(); // 5.2.判断是否存在 if (count > 0) { // 用户已经购买过了 log.error("用户已经购买过了"); return; } // 6.扣减库存 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") // set stock = stock - 1 .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0 .update(); if (!success) { // 扣减失败 log.error("库存不足"); return; } save(voucherOrder); } }
方法:qureyBlogById
请求方式:get
请求参数:id 博客id
返回值:Blog 对象,包含用户信息
代码实现
@Override public Result queryBlogById(Long id) { //查询当前Blog Blog blog = getById(id); if(blog == null){ return Result.fail("笔记不存在!"); } //查询blog有关的用户 queryBlogUser(blog); return Result.ok(blog); } public void queryBlogUser(Blog blog) { Long userId = blog.getUserId(); com.hmdp.entity.User user = userService.getById(userId); blog.setName(user.getNickName()); blog.setIcon(user.getIcon()); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。