当前位置:   article > 正文

Redis学习笔记 --------黑马点评项目_黑马点评项目介绍

黑马点评项目介绍

黑马点评项目

1. 项目功能介绍

ad868080c5634f29e26c3566bed56c79

2. 短信登录

2.1 使用session实现登录注册功能

2.1.1 发送验证码

参数:手机号
方式:get

生成验证码,并保存到session中

发送短信验证码

2.1.2 登录注册

参数:手机号,验证码

方式:post

校验验证码

判断用户是否存在

存在,登录

不存在,注册

保存到session

2.1.3 校验登录状态

拦截器 LoginInterceptor

参数:request

从session中读取数据,判断是否存在

不存在,重定向到login

存在,保存用户到ThreadLocal中,并放行

2.2 使用session登录的问题

多台Tomcat中的session数据不共享,切换不同的Tomcat数据丢失

2.3 基于Redis实现共享session登录

2.3.1 Redis 的key要满足唯一性和方便取出
2.3.2 发送验证码功能-- 存储验证码到Redis

结果:以电话为key,valus 的类型为String

思路:每个电话号码都具有唯一性,login()的参数中含有phone和验证码,方便拿到

2.3.3 登录功能---- 存储user对象到Redis

结果:以随机字符为key,valus 的类型为Hash

实现:

后端:

1.从Redis中取出code,校验

2.生成随机的token字符串,作为存储的key

2.把user对象转成HaspMap,存放到Redis中,value的类型是Hash,并设置失效时间为30分钟

3.把token,并返回给前端,

前端:

1.把返回的token 使用浏览器的sessionStorage保存

2.使用request拦截器,在每个请求发送之前,将用户的token放入请求头中


2.3.4 验登录状态 ---- 从Redis中取出user对象,并刷新存储的user信息的失效时间

1.从请求头中获取token的值

2.从redis中取出usermap,判空(true,返回false),转userDTO(脱敏后的user对象)

3.刷新user信息的失效时间,模仿session的失效,在登录时设置user信息的失效时间是30min,当user访问其他(发送请求时),刷新user的失效时间(还为30min)

4.放行

2.4 拦截器的优化

问题:

LoginInterceptor 做了两件事,设置哪些页面需要登录(权限),== 刷新用户在Redis中的有效期==,即LoginInterceptor 最终的功能是只有需要用户登录权限的页面才能刷新失效时间,当用户一直访问首页时,访问时间大于30分钟,则用户信息失效,需要重新登录,非常不友好

解决:

在LoginInterceptor 拦截器之前再做一个拦截器RefreshTokenInterceptor,

RefreshTokenInterceptor:拦截所有路径,更新失效时间

LoginInterceptor:只用来判断用户是否登录,是否放行

3. 商品查询缓存

3.1 什么是缓存

缓存,数据交换的缓冲区

优势:减低后端负载,提高读写效率

劣势:数据一致性,代码维护成本,运维成本

3.2 添加Redis缓存

3.2.1 缓存作用模型

3.2.2 根据商铺id并添加缓存(实例)

参数:long id

请求:get

返回结果:shop对象

redis中缓存的商铺信息:key : id , valus : string或者Hash 类型存储

实现思路:

1.根据id从redis中取出shop信息

2.判断是否存在shop信息, 存在,直接返回

3.不存在,根据id从数据库查询shop信息

4.数据库中不存在,报错,404

5.存在, a.往redis添加该信息 b.返回该数据

3.3 缓存更新策略

3.3.1 三种缓存策略

超时剔除策略可以当做其他缓存策略兜底方案

3.3.2 缓存与数据不一致问题

使用人工编码的方式,需要考虑3个问题


3.3.3 给查询商铺的缓存添加超时剔除和主动更新的策略

需求:

1.根据id查询商铺时,未命中,则查询数据库,将数据库中的数据写入缓存并设置失效时间

2.根据id修改数据库中的商铺信息时,先修改数据库,再删除缓存

需求2 的实现:

方法:update

参数: shop对象

方式:put

实现思路:

1根据shop对象更改数据库

2.根据shop的id删除redis中的信息

3.4 缓存穿透

3.4.1 什么是缓存穿透

浏览器发送的请求的数据,在redis和数据库中都未命中,即不存在这个数据,这些数据都会一直访问数据库(可能会存在恶意攻击的情况)

3.4.2 解决方案

**缓存空对象 **:数据库不存在这个数据,就返回一个空对象给redis,并设置较短的失效时间

布隆过滤:根据算法,把数据库中的数据转成btye[],布隆过滤器说不存在一定 不存在,说存在但 不一定 存在

增强id的复杂度,避免被猜测id规律

做好数据的基础格式校验

加强用户校验权限

做好热点参数的限流

3.5 缓存雪崩

3.5.1 什么是缓存雪崩

产生原因:(满足其一)

  • 同一时间大量缓存的key失效(批量添加数据…)
  • Redis宕机
3.5.2 解决方案

3.6 缓存击穿

3.6.1 什么是缓存击穿

产生的条件: 一个高访问并且==缓存重建业务比较复杂(时间相对较久)==的key失效了

结果:无数的请求会在重建缓存时,给数据库带来巨大的压力

3.6.2 解决方案
  • 互斥锁
  • 逻辑过期
3.6.3 互斥锁

**原理:**当线程1构建缓存数据之前,先拿到一把锁,完成缓存的构建后,释放锁,在构建缓存数据之间,其他的线程获取锁失败,则无法写入缓存,休眠重试,一直等待,直到线程1完成,才命中缓存

**结果:**数据一致性,但性能较低(程序的可用性较低)

3.6.4 逻辑过期

原理:给数据添加一个失效字段expire,不真正的设置失效时间,即永远有效,当线程1发现逻辑失效时间已经过期,拿到互斥锁,让一个新的线程去构建缓存数据,自己返回旧的数据,在构建缓存数据期间,其他线程发现逻辑时间过期,拿锁失败=发现有其他线程正在构建,则直接返回旧的数据

**结果:**程序的性能比较好,但是数据有较短的不一致性

3.6.5 两种方案的比较

3.6.6 互斥锁实例

3.6.7 逻辑过期实例

4. 优惠券秒杀

4.1 全局唯一ID(ID生成器方法)

4.1.1 什么是全局唯一ID

是一种在分布式系统下用来生成全局唯一ID的工具

4.1.2 特点
  • 唯一性 :redis中就一张表
  • 高可用性 :哨兵等还没学…
  • 高性能: redis本身的特点,从内存中读取数据…
  • 递增性 : 使用string存储中的命令。。
  • 安全性 : 使用符号位+时间戳+序列号的方式
4.1.3 如何实现安全性

4.1.4 Redis自增实现全局唯一ID实例

方法名 nextId()

参数:String keyPrefix ,keyPrefix 其实是业务名,根据业务名的不同,在Redis中创建不同的key,能对不同的ID进项自增

具体实现:

  1. 符号位:永远是0,可以不写

  2. 生成时间戳(使用时间戳是为了id的安全性,为了防止用户读懂订单id)

    • 需要定义一个初始时间,两者相减,称为时间戳
  3. 生成序列号 (这里使用了redis string类型中incr的命令,每次自增1,即每个ID的序列号都不同,即使在时间戳相同(下单时间相同,仍能保证该方法生成的唯一id))

    • 获取当前年月日
    • 将 “icr” + 参数keyPrefix + “:”+当前日期作为key,在redis中进行自增1,得到序列号
  4. 拼接两者,称为一个long类型的数字

    • 时间戳向右移32位,使用 | 运算拼接序列号(时间戳右移后,后32位都是零,使用或运算,从而填充序列号)
  5. 返回 long数字

  6. 代码

    @Component
    public class RedisIdWorker {
        /**
         * 起始时间
         */
        public static final long BEGIN_TIMESTAMP = 1672531200L;
        //序列号的位置长度
        public static final int COUNT_BITS = 32;
        @Resource
        private StringRedisTemplate stringRedisTemplate;
        public long nextId(String keyPrefix){
            //1. 生成时间戳
            LocalDateTime now = LocalDateTime.now();
            long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
            long timeStamp = nowSecond - BEGIN_TIMESTAMP;
            //2. 生成序列号
            //2.1 生成当前年月日
            String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
            //2.2 自增长
            Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
            //拼接返回
            return timeStamp << COUNT_BITS | count;
        }
    
    /*    public static void main(String[] args) {
            LocalDateTime time = LocalDateTime.of(2023, 1, 1, 0, 0, 0);
            long second = time.toEpochSecond(ZoneOffset.UTC);
            System.out.println("second = " + second);
        }*/
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
4.1.5 汇总全局唯一ID生成策略
  • UUID 生成的是String类型的16位数字
  • Redis 使用时间戳+计时器的方式 ,方便统计每天每月每年的订单数量
  • snowflake算法(雪花算法)尚硅谷springboot2中有介绍,忘了,列入TODO计划
  • 数据库自增 数据库另起一个表格用于自增ID的存储 不了解

4.2 优惠券秒杀下单实例

方法: seckillVoucher()

参数:long voucherId 优惠券id

实现:

  1. 获取优惠券信息
  2. 判断当前时间是否在优惠券抢购时间内
  3. 判断库存是否充足
  4. 扣减库存
  5. 创建订单
  6. 返回订单id
  7. 代码实现
@Override
public Result seckillVoucher(Long voucherId) {
    // 1.查询优惠券
    SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
    // 2.判断秒杀是否开始
    if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
        // 尚未开始
        return Result.fail("秒杀尚未开始!");
    }
    // 3.判断秒杀是否已经结束
    if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
        // 尚未开始
        return Result.fail("秒杀已经结束!");
    }
    // 4.判断库存是否充足
    if (voucher.getStock() < 1) {
        // 库存不足
        return Result.fail("库存不足!");
    }
    //5,扣减库存
    boolean success = seckillVoucherService.update()
            .setSql("stock= stock -1")
            .eq("voucher_id", voucherId).update();
    if (!success) {
        //扣减库存
        return Result.fail("库存不足!");
    }
    //6.创建订单
    VoucherOrder voucherOrder = new VoucherOrder();
    // 6.1.订单id
    long orderId = redisIdWorker.nextId("order");
    voucherOrder.setId(orderId);
    // 6.2.用户id
    Long userId = UserHolder.getUser().getId();
    voucherOrder.setUserId(userId);
    // 6.3.代金券id
    voucherOrder.setVoucherId(voucherId);
    save(voucherOrder);

    return Result.ok(orderId);

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

4.3 超卖问题

4.3.1 悲观锁

悲观锁:认为线程安全问题一定会发生,因此在操作数据之前先获得锁,使线程串行执行

实例:==Synchronized、Lock(互斥锁)==都属于悲观锁

插入:Synchronized、Lock(互斥锁)的区别

  • 身份:synchronized是Java语言的关键字,因此是内置特性,Lock是一个类,通过这个类可以实现同步访问,当一个线程被Synchronized标记了,这个线程就拥有了锁
  • 释放锁:Synchronized不用特意的去释放锁,但Lock需要调用者去释放锁,一直不释放会造成死锁
4.3.2 乐观锁

乐观锁:认为线程安全不一定会发生(小概率会发生),因此不需要加锁,只是在更新的数据时,去判断数据是否被修改(根据版本号或者更新的数据(如库存)),被修改则异常或重试

判断数据是否被修改的两种方法:

版本号法:添加一个新的字段版本号version,查询数据时并查询版本号,更新数据并对版本号+1再以查询的版本号作为条件之一进行数据更新操作

CAS法(Compare And Set )

相当于简化的版本号法,用数据本身是否发生变化来判断线程是否安全

4.3.3 乐观锁成功率太低

原因:初始库存100,有100个线程同时购买商品,查询的库存数量都是100,当线程1完成并更改库存数量为99,其他99个线程全部失败

解决方案:

  • 数值型的数据,如库存,不再比较原来查询的数据和数据库中的数据是否一致,而是比较是否 > 0
  • 只能根据数据是否变化来判断线程是否安全,可以采用分段锁的方式,把资源放在多个表中,线程访问不同的表,10个表,各放10份资源,当100个线程同时访问时,至少会有10个线程成功,成功率提高了10倍
4.3.4 代码更改
//5,扣减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock= stock -1")
                .eq("voucher_id", voucherId).gt("stock",0)
                .update();
  • 1
  • 2
  • 3
  • 4
  • 5
4.3.5 悲观锁和乐观锁的比较

4.4 一人购买一单功能

4.4.1 修改购买方法

方法: seckillVoucher()

参数:long voucherId 优惠券id

实现:

  1. 获取优惠券信息
  2. 判断当前时间是否在优惠券抢购时间内
  3. 判断库存是否充足
  4. 以用户id和优惠券的id为条件,查询订单是否存在
  5. 扣减库存
  6. 创建订单
  7. 返回订单id
  8. 代码实现

出现的问题:

  • 线程不安全,当100个线程同时执行时,会有多个线程同时查到该用户没有订单,从而创建多个订单

解决:加悲观锁,因为乐观锁是通过判断数据是否被修改来判断线程是否安全,查询订单是否存在没有修改数据,无法使用乐观锁

**悲观锁加在哪里: ** 先看4.4.3 synchronized的使用讲解

目标用户相同的线程 只能创建一个订单

  • 加在创建订单的方法上 ❌ 不行,因为同一时间只能有一个线程访问该方法,即用户a创建订单时,其他用户不能创建订单,不符合需求❌
  • 加在代码块上 ,以用户的id作为控制锁的条件是 userId.toString().intern() ✔️
    • 使用toString()是希望userId的值一样,但是toString内部会new String(),所以还要使用intern()去从常量池中寻找相同字符的地址,并返回,所以,当userId的值一样,返回的地址一样
4.4.2 代码实现
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    @Resource
    private ISeckillVoucherService seckillVoucherService;
    @Resource
    private RedisIdWorker redisIdWorker;

    @Override
    public Result seckillVoucher(Long voucherId) {
        // 1.查询优惠券
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        // 2.判断秒杀是否开始
        if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            // 尚未开始
            return Result.fail("秒杀尚未开始!");
        }
        // 3.判断秒杀是否已经结束
        if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
            // 尚未开始
            return Result.fail("秒杀已经结束!");
        }
        // 4.判断库存是否充足
        if (voucher.getStock() < 1) {
            // 库存不足
            return Result.fail("库存不足!");
        }
        Long userId = UserHolder.getUser().getId();
        synchronized (userId.toString().intern()){
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); //代理类,防止事务失效
            return proxy.createVoucherOrder(voucherId);
        }

    }
    @Transactional
    public Result createVoucherOrder(Long voucherId) {
        Long userId = UserHolder.getUser().getId();
        // 5.一人一单逻辑
        // 5.1.用户id
        int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
        // 5.2.判断是否存在
        if (count > 0) {
            // 用户已经购买过了
            return Result.fail("用户已经购买过一次!");
        }
        //5,扣减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock= stock -1")
                .eq("voucher_id", voucherId).gt("stock",0)
                .update();
        if (!success) {
            //扣减库存
            return Result.fail("库存不足!");
        }
        //6.创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        // 6.1.订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        // 6.2.用户id
        voucherOrder.setUserId(userId);
        // 6.3.代金券id
        voucherOrder.setVoucherId(voucherId);
        save(voucherOrder);

        return Result.ok(orderId);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68

还需要添加依赖

在主程序暴露代理

4.4.3 synchronized的使用

synchronized是Java中用于实现线程同步的关键字。当多个线程同时访问共享资源时,使用synchronized关键字可以确保线程的安全性,避免数据不一致或竞态条件的问题。

synchronized可以用在方法或代码块中,以下是synchronized的使用方法讲解:

  1. 同步方法:在方法声明中使用synchronized关键字,表示该方法是一个同步方法。同一时间只能有一个线程访问该方法。
public synchronized void synchronizedMethod() {
    // 同步方法的代码块
}
  • 1
  • 2
  • 3
  1. 同步代码块:在代码块中使用synchronized关键字,对代码块进行同步。在同一时间只有一个线程可以进入同步代码块执行。
public void method() {
    // 非同步代码块
    
    synchronized (this) {
        // 同步代码块
    }
    
    // 非同步代码块
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

可以使用synchronized关键字后面的对象作为锁来控制线程的同步。常见的对象锁包括当前对象(this)、类锁(ClassName.class)、任意对象等。

需要注意的是,如果多个线程使用相同的锁对象,那么只有一个线程能够进入同步代码块进行执行,其他线程将进入阻塞状态等待锁释放。

synchronized的使用可以确保线程同步,但过多的同步块或方法可能会导致性能下降。因此,在使用synchronized时应注意选择合适的同步粒度,避免不必要的同步操作。

同时,Java还提供了更高级的并发工具,如Lock接口和ReentrantLock类,可以提供更灵活的控制和更细粒度的锁定,可以根据需求选择合适的方式进行线程同步。

4.4.4 一人一单的并发安全问题(集群模式)

问题:当在集群模式或者分布式系统中,相同的id可能会访问不同的Tomact,而多个Tomcat会有多个JVM–>多个常量池—>都获得锁—>从而有多个订单

结论synchronized的锁 在分布式系统或者集群中失效,从而继续学习分布式锁

4.5 分布式锁

4.5.1 什么是分布式锁

条件:在分布式系统或集群模式下,多线程可见并互斥的锁

特点

  • 互斥
  • 多进程可见
  • 高可用
  • 高性能
  • 安全性
4.5.2 分布式锁的三种实现方式
  • MySQL 中的事务,就体现了互斥锁机制,当操作失败后,事务回滚并释放锁
  • Redis 同过setnx(只有新建key才会成功),setnx创建成功等于拿到锁 ,删除key 等于 释放锁 多个进程都只有一个Redis数据库,所以多线程可见,使用setnx 使多线程互斥
  • Zookeeper (好像学过忘了,TODO)

4.5.3 使用Redis分布式锁实例(版本1)

接口

方法 tryLock(Time timeoutSec) 返回 boolean 尝试获取锁

方法 unlock() 释放锁

接口实现类

类名simpleRedisLock

属性

  • name 是key
  • stringRedisTemplate

方法

  • tryLock (timeoutSec) 使用setnx命令,key是name ,value是线程名,并设置超时时间,当忘记或出现异常无法释放锁时,可以自动释放 ,设置线程名
  • unlock() 使用del 的命令删除锁

代码实现

  • 接口
package com.hmdp.lock;

/**
 * @author ttsin
 */
public interface ILock {
    /**
     * 尝试获取锁
     * @param timeoutSec 锁持有的超时时间,过期后自动释放锁
     * @return true 代表获取锁成功,false代表获取锁失败
     */
    boolean tryLock(long timeoutSec);

    /**
     * 释放锁
     */
    void unlock();

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 接口实现类
package com.hmdp.lock;

import cn.hutool.core.util.BooleanUtil;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.concurrent.TimeUnit;

/**
 * @author ttsin
 */
public class SimpleRedisLock implements ILock {
    private StringRedisTemplate stringRedisTemplate;
    private String name;
    public static final String KEY_PREFIX = "lock:";

    public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
        this.stringRedisTemplate = stringRedisTemplate;
        this.name = name;
    }

    @Override
    public boolean tryLock(long timeoutSec) {
        //获取当前线程id
        long threadId = Thread.currentThread().getId();
        //获取锁
        Boolean result = stringRedisTemplate.opsForValue().
                setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
        return BooleanUtil.isTrue(result);
    }

    @Override
    public void unlock() {
        stringRedisTemplate.delete(KEY_PREFIX + name);

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 一人一单功能的修改VoucherOrderServiceImpl

    @Override
    public Result seckillVoucher(Long voucherId) {
        // 1.查询优惠券
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        // 2.判断秒杀是否开始
        if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            // 尚未开始
            return Result.fail("秒杀尚未开始!");
        }
        // 3.判断秒杀是否已经结束
        if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
            // 尚未开始
            return Result.fail("秒杀已经结束!");
        }
        // 4.判断库存是否充足
        if (voucher.getStock() < 1) {
            // 库存不足
            return Result.fail("库存不足!");
        }
        Long userId = UserHolder.getUser().getId();
        
      /*  synchronized (userId.toString().intern()){
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); //代理类,防止事务失效
            return proxy.createVoucherOrder(voucherId);
        }*/
        
        //获取锁对象
        SimpleRedisLock simpleRedisLock = new SimpleRedisLock(stringRedisTemplate, "order:" + userId);
        //拿锁
        boolean lock = simpleRedisLock.tryLock(5L);
        //失败
        if(!lock){
            return Result.fail("不可重复下单!");
        }
        //成功
        try {
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); //代理类,防止事务失效
            return proxy.createVoucherOrder(voucherId);
        } finally {
            //释放
            simpleRedisLock.unlock();
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
4.5.4 分布式锁误删锁 — 无线程标识

分析版本1的问题

  • 当线程1获取锁后,某种原因线程1的业务功能阻塞,锁已超时,锁自动释放;
  • 于是线程2 可以拿到锁,线程2在执行自己的业务=时,==线程1苏醒,完成业务后,释放锁,==此时它释放的是线程2的锁
  • 锁被释放,于是此线程3可以获取锁,执行自己的业务…出现多线程并行问题

解决方案

  • 在获取锁时,存入线程标识使用UUID拼接线程id
    • 之前,我们把当前线程的id存入value中,线程的id是由JVM自增生成,在集群的模式下,不同线程会出现相同线程id的情况
  • 判读锁标识是否是自己的,再释放锁
4.5.5 解决误删问题代码实现(版本2)

代码实现:修改了simpleRedisLock中的tryLock()和unlock()方法

package com.hmdp.lock;
import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.BooleanUtil;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;

/**
 * @author ttsin
 */
public class SimpleRedisLock implements ILock {
    private StringRedisTemplate stringRedisTemplate;
    private String name;
    private static final String KEY_PREFIX = "lock:";
    private static final String ID_PREFIX = UUID.randomUUID().toString(true);


    @Override
    public boolean tryLock(long timeoutSec) {
        //获取当前线程id
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        //获取锁
        Boolean result = stringRedisTemplate.opsForValue().
                setIfAbsent(KEY_PREFIX + name, threadId , timeoutSec, TimeUnit.SECONDS);
        return BooleanUtil.isTrue(result);
    }

    @Override
    public void unlock() {
        //当前线程的id
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        //锁中存放的id
        String lockID = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
        //判断两者是否相同
        if(threadId.equals(lockID)){
            //相同,释放锁
            stringRedisTemplate.delete(KEY_PREFIX + name);
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
4.5.6 分布式锁误删锁 — 无原子性

分析版本2的问题

  • 当线程1获得锁,成功判断锁是自己的,突然阻塞(可能由JVM垃圾回收机制引起),导致锁超时,自动释放
  • 线程2获取锁,实现自己的业务时,线程1苏醒,上一步已经判断成功,锁是自己的,于是删除锁
  • 线程3获取锁成功,执行自己的业务,于是线程2和线程3并向

解决方案

  • 判断锁 和 释放锁 要具有原子性
  • 使用Lua语言编写脚本,Redis可以执行Lua脚本中多条Redis的命令,从而达到原子性
  • Lua语言教程 Lua 教程 | 菜鸟教程 (runoob.com)

Lua脚本的编写

释放锁的业务流程

​ 1、获取锁中的线程标示

​ 2、判断是否与指定的标示(当前线程标示)一致

​ 3、如果一致则释放锁(删除)

​ 4、如果不一致则什么都不做

Lua脚本实现逻辑

-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
  -- 一致,则删除锁
  return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
4.5.7 解决锁误删除-代码实现

对simpleRedisLock类进行修改

package com.hmdp.lock;
import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.BooleanUtil;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

/**
 * @author ttsin
 */
public class SimpleRedisLock implements ILock {
    private StringRedisTemplate stringRedisTemplate;
    private String name;
    private static final String KEY_PREFIX = "lock:";
    private static final String ID_PREFIX = UUID.randomUUID().toString(true);
    public static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
    static {
        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        UNLOCK_SCRIPT.setResultType(Long.class);
    }
    public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
        this.stringRedisTemplate = stringRedisTemplate;
        this.name = name;
    }

    @Override
    public boolean tryLock(long timeoutSec) {
        //获取当前线程id
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        //获取锁
        Boolean result = stringRedisTemplate.opsForValue().
                setIfAbsent(KEY_PREFIX + name, threadId , timeoutSec, TimeUnit.SECONDS);
        return BooleanUtil.isTrue(result);
    }

    @Override
    public void unlock() {
        //调用Lua脚本
        stringRedisTemplate.execute(
                UNLOCK_SCRIPT,
                Collections.singletonList(KEY_PREFIX + name),
                ID_PREFIX + Thread.currentThread().getId());
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
4.5.8 Redis 分布式锁小结
  • 利用set nx ex 获取锁,nx(新增,实现互斥),ex设置过期时间,保存线程标识
  • 释放锁时先判断线程标识是否与自己设置的一致,一致则删除锁
  • 对于删除的动作,需要使用Lua脚本来保证原子性,防止出现误删的情况

4.6 分布式锁Redisson

4.6.1 Redisson 功能介绍

基于setnx实现的分布式锁存在下面的问题

  • 重入问题:重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的

  • 不可重试:是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。

  • **超时释放:**我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患

  • 主从一致性: 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。

什么是Redisson

它是一个在Redis基础上实现了分布式工具的集合,分布式锁只是它的功能之一,它包含了各种分布式锁的实现

4.6.2 redisson 快速入门
  1. 添加依赖
 <!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.13.6</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. 配置Redisson客户端
package com.hmdp.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @author ttsin
 */
@Configuration
public class RedisConfig {
    @Bean
    public RedissonClient redissonClient()  {
        //配置类
        Config config = new Config();
        //添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
        config.useSingleServer()
                .setAddress("redis://ip地址:6379").setPassword("Redis密码");
        //创建客户端
        return Redisson.create(config);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  1. 使用Redisson的分布式锁

4.6.3 Redisson可重入锁原理

情景:一个线程在执行方法A()时,使用set nx ex 取了锁,A()中调用了B(),B()也需要锁才能执行业务因为是同一个线程,

A()获得了锁,B()会获取锁失败,A()无法执行完毕,A()不释放锁,从而死锁

解决方案:Redssion中的可重入锁

期望

  • 获取锁时,根据锁标识判断是否是自己
  • 有一个变量a,获取一次锁a++,释放一次锁a–,当a 为零时,真正的释放锁
  • 有两个属性(变量a和锁标识)都要存放到锁中,string类型不支持存放两个属性,所以使用Hash类型
  • 逻辑比较复杂,需要保证操作的原子性,所以把逻辑编写到Lua脚本中

原理

  • (Hash 类型无 set nx ex 的命令)判断锁是否存在

    • 不存在,则获取锁并添加线程标识,和设置有效期(以上步骤等同于 set nx ex 命令)和 锁计数 + 1

    • 存在,则判断锁标识是否是自己

      • 否,获取锁失败
      • 是,锁计数 +1,并设置有效期
  • 当获取锁成功后,执行业务

  • 判断锁是否是自己的(防止误删锁)

    • 否,锁不是自己的,可能锁超时自动释放,不处理不操作
    • 是,锁技术 - 1并判断锁计数是否为0
      • 是,释放锁
      • 否,重置有效期

流程图和存储图

image-20230820193237502

Lua获取锁和释放锁的脚本代码

4.6.4 Redisson主从一致性问题

问题情景

主从模式,主节点指向关于写的操作,从节点执行关于读的操作,主节点同步到从节点上,保持数据的一致性

当java应用发起一个请求,获取锁 set lock thread1 NX EX 10 ,主节点设置锁成功,在同步到从节点之前,主节点宕机,会默认选出一个从节点成为主节点,但当前主节点上没有锁的设置,其他线程可以获取锁,从而造成线程同步的问题

图示

解决方案

  • 使用联锁multiLock

  • 设置相互独立的多个主节点,多个主节点都获取锁成功才算获取锁成功

  • 当某个主节点 在于 从节点进行数据同步之前,突然宕机,它的从节点成为主节点,从节点中没有锁的存储,其他线程获取当前节点锁成功,但其他节点的锁获取失败,于是,获取锁失败

图示

4.7 对分布式锁的小结

4.8 Redis秒杀优化

4.8.1 回顾Redis秒杀的逻辑

4.8.2 对Redis秒杀问题性能分析解决

性能问题

  • 秒杀功能都是串行执行,秒杀功能的耗时时间是各个逻辑耗时之和,查询优惠券、查询订单、减库存、创建订单 都是数据库写操作,又比较耗时,所以总体上,秒杀功能耗时较久

解决方案

  • 把校验库存和校验一人一单的操作放到Redis中判断,判断用户是否有资格进行下单,有资格就返回用户的id和订单编号
  • 并使用异步线程根据用户id 和订单编号进行数据库写操作
  • 我们需要在Redis中判断库存是否充足和用户是否下过单,所以判断库存需要拿到优惠券的信息
  • 判断用户是否下过单,需要记录已下单的用户id,因为一人一单,其中用户id不能重复,所以使用set类型进行存储

流程图

4.8.3 改进秒杀业务,提高并发性能

需求

需求1

  • 在VoucherServiceImpl类addSeckillVoucher()新增优惠券业务中将优惠券信息保存到Redis中
 @Override
    @Transactional
    public void addSeckillVoucher(Voucher voucher) {
        // 保存优惠券
        save(voucher);
        // 保存秒杀信息
        SeckillVoucher seckillVoucher = new SeckillVoucher();
        seckillVoucher.setVoucherId(voucher.getId());
        seckillVoucher.setStock(voucher.getStock());
        seckillVoucher.setBeginTime(voucher.getBeginTime());
        seckillVoucher.setEndTime(voucher.getEndTime());
        seckillVoucherService.save(seckillVoucher);
        //保存优惠券信息到Redis中
        StringRedisTemplate.opsForValue()
            .set(SECKILL_STOCK_KEY_PREFIX+voucher.getId(),voucher.getStock().toString());

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

需求2

编写Lua脚本,判断秒杀库存和一人一单,决定用户是否抢购成功

-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]

-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
    -- 3.2.库存不足,返回1
    return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
    -- 3.3.存在,说明是重复下单,返回2
    return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

需求3

根据Lua返回的结果,对订单进行处理seckillVoucher()

  @Override
    public Result seckillVoucher(Long voucherId) {
        //获得用户信息
        UserDTO user = UserHolder.getUser();
        //执行Lua脚本
        Long result = stringRedisTemplate
                .execute(SECKILL_ORDER_SCRIPT, Collections.emptyList(), voucherId, user.getId());
        //判断结果是否为0
        assert result != null;
        int res = result.intValue();
        if(res != 0){
            return Result.fail(res == 1 ? "库存不足":"重复下单");
        }
        //订单id
        long orderId = redisIdWorker.nextId("order");
        //TODO 阻塞线程
        //返回订单id
        return Result.ok(orderId);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

需求4

//异步处理线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

//在类初始化之后执行,因为当这个类初始化好了之后,随时都是有可能要执行的
@PostConstruct
private void init() {
   SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
// 用于线程池处理的任务
// 当初始化完毕后,就会去从对列中去拿信息
 private class VoucherOrderHandler implements Runnable{

        @Override
        public void run() {
            while (true){
                try {
                    // 1.获取队列中的订单信息
                    VoucherOrder voucherOrder = orderTasks.take();
                    // 2.创建订单
                    handleVoucherOrder(voucherOrder);
                } catch (Exception e) {
                    log.error("处理订单异常", e);
                }
          	 }
        }
     
       private void handleVoucherOrder(VoucherOrder voucherOrder) {
            //1.获取用户
            Long userId = voucherOrder.getUserId();
            // 2.创建锁对象
            RLock redisLock = redissonClient.getLock("lock:order:" + userId);
            // 3.尝试获取锁
            boolean isLock = redisLock.lock();
            // 4.判断是否获得锁成功
            if (!isLock) {
                // 获取锁失败,直接返回失败或者重试
                log.error("不允许重复下单!");
                return;
            }
            try {
				//注意:由于是spring的事务是放在threadLocal中,此时的是多线程,事务会失效
                proxy.createVoucherOrder(voucherOrder);
            } finally {
                // 释放锁
                redisLock.unlock();
            }
    }
     //a
	private BlockingQueue<VoucherOrder> orderTasks =new  ArrayBlockingQueue<>(1024 * 1024);

    @Override
    public Result seckillVoucher(Long voucherId) {
        Long userId = UserHolder.getUser().getId();
        long orderId = redisIdWorker.nextId("order");
        // 1.执行lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString(), String.valueOf(orderId)
        );
        int r = result.intValue();
        // 2.判断结果是否为0
        if (r != 0) {
            // 2.1.不为0 ,代表没有购买资格
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
        }
        VoucherOrder voucherOrder = new VoucherOrder();
        // 2.3.订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        // 2.4.用户id
        voucherOrder.setUserId(userId);
        // 2.5.代金券id
        voucherOrder.setVoucherId(voucherId);
        // 2.6.放入阻塞队列
        orderTasks.add(voucherOrder);
        //3.获取代理对象
         proxy = (IVoucherOrderService)AopContext.currentProxy();
        //4.返回订单id
        return Result.ok(orderId);
    }
     
      @Transactional
    public  void createVoucherOrder(VoucherOrder voucherOrder) {
        Long userId = voucherOrder.getUserId();
        // 5.1.查询订单
        int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
        // 5.2.判断是否存在
        if (count > 0) {
            // 用户已经购买过了
           log.error("用户已经购买过了");
           return ;
        }

        // 6.扣减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1") // set stock = stock - 1
                .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0
                .update();
        if (!success) {
            // 扣减失败
            log.error("库存不足");
            return ;
        }
        save(voucherOrder);
 
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108

小总结:

秒杀业务的优化思路是什么?

  • 先利用Redis完成库存余量、一人一单判断,完成抢单业务
  • 再将下单业务放入阻塞队列,利用独立线程异步下单
  • 基于阻塞队列的异步秒杀存在哪些问题?
    • 内存限制问题
    • 数据安全问题

4.9 Redis消息队列实现异步秒杀

4.9.1 什么是消息队列

消息队列字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色

  • 消息队列 : 存储和管理消息,也被称为消息代理 (Message Broker)
  • 生产者: 发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

4.9.2 Redis提供的消息队列
  • list 结构:基于list结构模拟消息队列
  • Pubsub:基本的点对点消息模型
  • Stream:比较完善的消息队列模型
4.9.3 基于List结构模拟消息队列
  • Redis的list数据结构是一个双向链表,很容易模拟出队列效果
  • 队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现
  • 不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息
  • 因此这里应该使用BRPOP或者BLPOP来实现阻塞效果

1653575176451

基于List的消息队列有哪些优缺点
优点

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证(因为list列表本身就是为了存储数据,具有数据持久性,当前只是模拟消息队列
  • 可以满足消息有序性

缺点

  • 无法避免消息丢失(POP操作是取出数据并删除数据,当从消息队列中取出数据后,此时突然宕机,会导致消息丢失)
  • 只支持单消费者
4.9.4 基于Pubsub的消息队列
  • **PubSub(发布订阅)**是Redis2.0版本引入的消息传递模型
  • 消费者可以订阅一个或多个channel频道
  • 生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

语法使用

  • SUBSCRIBE channel [channel] :订阅一个或多个频道
  • PUBLISH channel msg :向一个频道发送消息
  • PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
    • ?号 任意一个单个字符
    • *号 0个或多个字符
    • [ a , b] a或者b 中的其中一个

基于PubSub的消息队列有哪些优缺点
优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失(频道发送大量的消息,但处理消息很慢,造成消息堆积,超出时,数据读丢失)
4.9.5 基于Stream的消息队列

Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

发送消息的命令

1653577301737

例如:

1653577349691

读取消息的方式之一:XREAD

1653577445413

例如,使用XREAD读取第一个消息:

1653577643629

XREAD阻塞方式,读取最新的消息:

1653577659166

在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下

1653577689129

注意:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题

STREAM类型消息队列的XREAD命令特点:

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险
4.9.6 基于Stream的消息队列-消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列

特点

创建消费者组
1653577984924
key:队列名称
groupName:消费者组名称
ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列
其它常见命令:

删除指定的消费者组

XGROUP DESTORY key groupName
  • 1

给指定的消费者组添加消费者

XGROUP CREATECONSUMER key groupname consumername
  • 1

删除消费者组中的指定消费者

XGROUP DELCONSUMER key groupname consumername
  • 1

从消费者组读取消息

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • 1
  • group:消费组名称
  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  • count:本次查询的最大数量
  • BLOCK milliseconds:当没有消息时最长等待时间
  • NOACK:无需手动ACK,获取到消息后自动确认
  • STREAMS key:指定队列名称
  • ID:获取消息的起始ID:
    • “>”:从下一个未消费的消息开始
    • 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

消费者监听消息的基本思路(伪代码)

STREAM类型消息队列的XREADGROUP命令特点

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次
4.9.7 总结对比Redis中的消息队列

4.9.8 基于Redis的Stream结构作为消息队列,实现异步秒杀下单

需求

需求1

XGROUP CREATE stream.orders g1 0 MKSTREAM
  • 1

MKSTREAM:队列不存在时自动创建队列,所以这条命令创建了消费者组g1和队列 stream.orders

需求2 :Lua脚本的修改

-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]

-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
    -- 3.2.库存不足,返回1
    return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
    -- 3.3.存在,说明是重复下单,返回2
    return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

需求3:获取stream.orders 消息队列中的 消息,进行下单修改代码在run()

@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    @Resource
    private ISeckillVoucherService seckillVoucherService;
    @Resource
    private RedisIdWorker redisIdWorker;
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Resource
    private RedissonClient redissonClient;
    public static final DefaultRedisScript<Long> SECKILL_ORDER_SCRIPT;

    static {
        SECKILL_ORDER_SCRIPT = new DefaultRedisScript<>();
        SECKILL_ORDER_SCRIPT.setLocation(new ClassPathResource("seckill_order.lua"));
        SECKILL_ORDER_SCRIPT.setResultType(Long.class);
    }
    //异步处理线程池
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
    //在类初始化之后执行,因为当这个类初始化好了之后,随时都是有可能要执行的
    @PostConstruct
    private void init() {
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }
    // 用于线程池处理的任务
    // 当初始化完毕后,就会去从对列中去拿信息
    private class VoucherOrderHandler implements Runnable {
        @Override
        public void run() {
            while(true) {
                try {
                    // 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
                    );
                    // 2.判断订单信息是否为空
                    if (list == null || list.isEmpty()) {
                        // 如果为null,说明没有消息,继续下一次循环
                        continue;
                    }
                    // 解析数据
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    // 3.创建订单
                    createVoucherOrder(voucherOrder);
                    // 4.确认消息 XACK
                    stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
                } catch (Exception e) {
                    log.error("处理订单异常", e);
                    //处理异常消息
                    handlePendingList();
                }
            }
        }

        private void handlePendingList() {
            while (true) {
                try {
                    // 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create("stream.orders", ReadOffset.from("0"))
                    );
                    // 2.判断订单信息是否为空
                    if (list == null || list.isEmpty()) {
                        // 如果为null,说明没有异常消息,结束循环
                        break;
                    }
                    // 解析数据
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    // 3.创建订单
                    createVoucherOrder(voucherOrder);
                    // 4.确认消息 XACK
                    stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
                } catch (Exception e) {
                    log.error("处理pendding订单异常", e);
                    try {
                        Thread.sleep(20);
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
    }


    @Override
    public Result seckillVoucher(Long voucherId) {
        //获得用户信息
        Long userId = UserHolder.getUser().getId();
        //订单id
        long orderId = redisIdWorker.nextId("order");
        //执行Lua脚本
        Long result = stringRedisTemplate
                .execute(SECKILL_ORDER_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(),String.valueOf(orderId));
        //判断结果是否为0
        assert result != null;
        int res = result.intValue();
        if (res != 0) {
            return Result.fail(res == 1 ? "库存不足" : "重复下单");
        }

        //返回订单id
        return Result.ok(orderId);

    }
    @Transactional
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        Long userId = voucherOrder.getUserId();
        // 5.1.查询订单
        int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
        // 5.2.判断是否存在
        if (count > 0) {
            // 用户已经购买过了
            log.error("用户已经购买过了");
            return;
        }

        // 6.扣减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1") // set stock = stock - 1
                .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0
                .update();
        if (!success) {
            // 扣减失败
            log.error("库存不足");
            return;
        }
        save(voucherOrder);

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141

5. 达人探店

5.1 发布探店笔记

  • 上传图片和发布博客不是同一个方法
  • 上传图片完成就上传到图片服务云平台,并返回图片的地址
  • 博客存放图片的地址

5.2 查看探店笔记

方法:qureyBlogById

请求方式:get

请求参数:id 博客id

返回值:Blog 对象,包含用户信息

代码实现

@Override
    public Result queryBlogById(Long id) {
        //查询当前Blog
        Blog blog = getById(id);
        if(blog == null){
            return Result.fail("笔记不存在!");
        }
        //查询blog有关的用户
        queryBlogUser(blog);
        return Result.ok(blog);
    }
    public void queryBlogUser(Blog blog) {
        Long userId = blog.getUserId();
        com.hmdp.entity.User user = userService.getById(userId);
        blog.setName(user.getNickName());
        blog.setIcon(user.getIcon());
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

5.2 点赞

5.3 点赞排行榜

6. 好友关注

7. 附近的商户

8. 用户签到

9. UV统计

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/964908
推荐阅读
相关标签
  

闽ICP备14008679号