当前位置:   article > 正文

黑马---Redis入门到实战【实战篇】_黑马redis实战篇

黑马redis实战篇

文章目录


在这里插入图片描述

一、短信登录

基于session实现短信登录的流程

在这里插入图片描述

实现发送短信验证码功能

在这里插入图片描述

发送验证码功能:

@Override
public Result sendCode(String phone, HttpSession session) {
    //1.校验手机号
    if(RegexUtils.isPhoneInvalid(phone)){
        //2.如果不符合,返回错误信息
        return Result.fail("手机号格式错误!");
    }
    //3.符合,生成验证码
    String code = RandomUtil.randomNumbers(6);
    //4.保存验证码到session
    session.setAttribute("code",code);
    //5.发送验证码
    log.debug("发送短信验证码成功,验证码:{}"+code);
    //返回ok
    return Result.ok();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

在这里插入图片描述

登录功能:

登录表单的实体类

@Data
public class LoginFormDTO {
    private String phone;
    private String code;
    private String password;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

登录逻辑代码实现:

    @Override
    public Result login(LoginFormDTO loginForm, HttpSession session) {
        //1.校验手机号
        String phone = loginForm.getPhone();
        if(RegexUtils.isPhoneInvalid(phone)){
            //如果不符合,返回错误信息
            return Result.fail("手机号格式错误!");
        }
        //2.校验验证码
        Object cacheCode = session.getAttribute("code");
        String code = loginForm.getCode();
        if(cacheCode==null ||! cacheCode.toString().equals(code)){
            //3.不一致,报错
            return Result.fail("验证码错误!");
        }
        //4.一致,根据手机号查询用户 select * from tb_user where phone = ?
        User user = query().eq("phone", phone).one();
        //5.判断用户是否存在
        if(user==null) {
            //6.不存在,创建新用户并保存
            user=createUsrWithPhone(phone);
        }
        //7.保存用户信息到session中
        session.setAttribute("user",user);
        return null;   
    }
 
    private User createUsrWithPhone(String phone) {
        //1.创建用户
        User user = new User();
        user.setPhone(phone);
        user.setNickName(USER_NICK_NAME_PREFIX+RandomUtil.randomString(10));
        //2.保存用户
        save(user);
        return user;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

实现登录校验拦截器

ThreadLocal 叫做本地线程变量,意思是说,ThreadLocal 中填充的的是当前线程的变量,该变量对其他线程而言是封闭且隔离的,ThreadLocal 为变量在每个线程中创建了一个副本,这样每个线程都可以访问自己内部的副本变量。

注意,为了隐藏用户敏感信息,也为了节省ThreadLocal的空间,需要将User转为UserDTO返回给前端。

可以通过hutool工具类,在UserService里修改:

session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));
  • 1
public class UserHolder {
    private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>();
    public static void saveUser(UserDTO user){
        tl.set(user);
    } 
    public static UserDTO getUser(){
        return tl.get();
    }
    public static void removeUser(){
        tl.remove();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

拦截器:

public class LoginInterceptor implements HandlerInterceptor {
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        //1.获取session
        HttpSession session = request.getSession();
        //2.获取session中的用户
        Object user = session.getAttribute("user");
        //3.判断用户是否存在
        if(user==null) {
            //4.不存在,拦截,返回401状态码,代表未授权
            response.setStatus(401);
            return false;
        }
        //5.存在,保存用户信息到ThreadLocal
        UserHolder.saveUser((UserDTO) user);
        //6.放行
        return true;
    }
 
    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        UserHolder.removeUser();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

添加拦截器:

@Configuration
public class MvcConfig implements WebMvcConfigurer {
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new LoginInterceptor())
                .excludePathPatterns("/user/code","/user/login","/blog/hot","/shop/**","/shop-type/**","/voucher/**","/upload/**");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

session共享的问题分析

session共享问题:多态Tomcat并不共享session存储空间,当请求切换到不同tomcat服务时导致数据丢失的问题
在这里插入图片描述

session的替代方案应该满足:

数据共享
内存存储
key、value结构
===>redis

Redis代替session的业务流程

在这里插入图片描述

在这里插入图片描述

基于Redis实现短信登录

拦截器修改:

public class LoginInterceptor implements HandlerInterceptor {
 
    private StringRedisTemplate stringRedisTemplate;
 
    public LoginInterceptor(StringRedisTemplate stringRedisTemplate){
        this.stringRedisTemplate=stringRedisTemplate;
    }
 
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        //1.获取请求头中的token
        String token = request.getHeader("authorization");
        //判断token是否为空
        if(StrUtil.isBlank(token)){
            response.setStatus(401);
            return false;
        }
        String key=LOGIN_USER_KEY+token;
        //2.基于token获取redis中的用户
        Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
        //3.判断用户是否存在
        if(userMap.isEmpty()){
            //不存在,拦截,返回401状态码
            response.setStatus(401);
            return false;
        }
        //5.存在,将查询到的Hash数据转为UserDTO对象
        UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
        //6.保存用户到ThreadLocal
        UserHolder.saveUser(userDTO);
        //7.刷新token有效期
        stringRedisTemplate.expire(key,LOGIN_USER_TTL, TimeUnit.MINUTES);
        //8.放行
        return true;
    }
 
    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        UserHolder.removeUser();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

MvcConfig修改:

@Configuration
public class MvcConfig implements WebMvcConfigurer {
 
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new LoginInterceptor(stringRedisTemplate))
                .excludePathPatterns(
                        "/user/code",
                        "/user/login",
                        "/blog/hot",
                        "/shop/**",
                        "/shop-type/**",
                        "/voucher/**",
                        "/upload/**");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

UserServiceImpl修改:

@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
 
    @Resource
    private StringRedisTemplate stringRedisTemplate;
 
    @Override
    public Result sendCode(String phone, HttpSession session) {
        //1.校验手机号
        if(RegexUtils.isPhoneInvalid(phone)){
            //2.如果不符合,返回错误信息
            return Result.fail("手机号格式错误!");
        }
        //3.符合,生成验证码
        String code = RandomUtil.randomNumbers(6);
        //4.保存验证码到redis中
        stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY+phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);
        //5.发送验证码
        log.debug("发送短信验证码成功,验证码:{}"+code);
        //返回ok
        return Result.ok();
    }
 
    @Override
    public Result login(LoginFormDTO loginForm, HttpSession session) {
        //1.校验手机号
        String phone = loginForm.getPhone();
        if(RegexUtils.isPhoneInvalid(phone)){
            //如果不符合,返回错误信息
            return Result.fail("手机号格式错误!");
        }
        //2.校验验证码
        String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
        String code = loginForm.getCode();
        if(cacheCode==null ||!cacheCode.equals(code)){
            //3.不一致,报错
            return Result.fail("验证码错误!");
        }
        //4.一致,根据手机号查询用户 select * from tb_user where phone = ?
        User user = query().eq("phone", phone).one();
        //5.判断用户是否存在
        if(user==null) {
            //6.不存在,创建新用户并保存
            user=createUsrWithPhone(phone);
        }
        //保存用户信息到redis中
        //1.随机生成token,作为登录令牌
        String token = UUID.randomUUID().toString(true);  //true代表isSimple,即不带中划线
        //2.将User对象转为Hash存储
        UserDTO userDTO=BeanUtil.copyProperties(user,UserDTO.class);
        Map<String, Object> userMap = BeanUtil.beanToMap(userDTO,new HashMap<>(), CopyOptions.create().setIgnoreNullValue(true).setFieldValueEditor((fieldName,fieldValue)->fieldValue.toString()));
        String tokenKey=LOGIN_USER_KEY+token;
        //7.存储
        stringRedisTemplate.opsForHash().putAll(tokenKey,userMap);
        //设置token有效期
        stringRedisTemplate.expire(tokenKey,LOGIN_USER_TTL,TimeUnit.MINUTES);
        return Result.ok();
    }
 
    private User createUsrWithPhone(String phone) {
        //1.创建用户
        User user = new User();
        user.setPhone(phone);
        user.setNickName(USER_NICK_NAME_PREFIX+RandomUtil.randomString(10));
        //2.保存用户
        save(user);
        return user;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

Redis代替session需要考虑的问题:

选择合适的数据结构

选择合适的key

选择合适的存储粒度

解决登录状态刷新的问题

在这里插入图片描述
RefreshTokenInterceptor

public class RefreshTokenInterceptor implements HandlerInterceptor {
 
    private  StringRedisTemplate stringRedisTemplate;
 
    public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate){
        this.stringRedisTemplate=stringRedisTemplate;
    }
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
 
        //1.获取请求头中的token
        String token = request.getHeader("authorization");
        //判断token是否为空
        if(StrUtil.isBlank(token)){
            return true;
        }
        String key=LOGIN_USER_KEY+token;
        //2.基于token获取redis中的用户
        Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
        //3.判断用户是否存在
        if(userMap.isEmpty()){
            return true;
        }
        //5.存在,将查询到的Hash数据转为UserDTO对象
        UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
        //6.保存用户到ThreadLocal
        UserHolder.saveUser(userDTO);
        //7.刷新token有效期
        stringRedisTemplate.expire(key,LOGIN_USER_TTL, TimeUnit.MINUTES);
        //8.放行
        return true;
    }
 
    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        UserHolder.removeUser();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

LoginInterceptor

public class LoginInterceptor implements HandlerInterceptor {
 
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        //判断是否需要拦截(ThreadLocal中是否有用户)
        if(UserHolder.getUser()==null){
            //没有,需要拦截,设置状态码
            response.setStatus(401);
            return false;
        }
        //有用户,则放行
        return true;
    }
 
    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        UserHolder.removeUser();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

配置添加拦截器
注意通过order控制拦截器执行顺序,order越小越先执行

@Configuration
public class MvcConfig implements WebMvcConfigurer {
 
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new LoginInterceptor())
                .excludePathPatterns(
                        "/user/code",
                        "/user/login",
                        "/blog/hot",
                        "/shop/**",
                        "/shop-type/**",
                        "/voucher/**",
                        "/upload/**").order(1);
        registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).order(0);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

二、商户缓存查询

在这里插入图片描述

什么是缓存

缓存就是数据交换的缓冲区(称作Cache),是存储数据的临时地方,一般读写性能较高
在这里插入图片描述

添加Redis缓存

在这里插入图片描述

    /**
     * 根据id查询商铺信息
     * @param id 商铺id
     * @return 商铺详情数据
     */
    @GetMapping("/{id}")
    public Result queryShopById(@PathVariable("id") Long id) {
        return shopService.queryById(id);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

ShopServiceImpl:

@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
 
    @Resource
    private StringRedisTemplate stringRedisTemplate;
 
    @Override
    public Result queryById(Long id) {
        String key=CACHE_SHOP_KEY+id;
        //1.从redis查询商铺缓存
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        //2.判断是否存在
        if(StrUtil.isNotBlank(shopJson)) {
            //3.存在,直接返回
            Shop shop = JSONUtil.toBean(shopJson, Shop.class);
            return Result.ok(shop);
        }
        //4.不存在,根据id查询数据库
        Shop shop = getById(id);
        //5.不存在,返回错误
        if(shop==null){
            return Result.fail("店铺不存在!");
        }
        //6.存在,写入redis
        stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop));
        //7.返回
        return Result.ok(shop);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

练习:给店铺类型查询业务添加缓存

在这里插入图片描述

@Service
public class ShopTypeServiceImpl extends ServiceImpl<ShopTypeMapper, ShopType> implements IShopTypeService {
 
    @Autowired
    private ShopTypeMapper shopTypeMapper;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Override
    public Result queryBatch() {
        String key="CACHE_SHOP_TYPE_KEY";
        String jsonType = stringRedisTemplate.opsForValue().get(key);
        if(StrUtil.isNotBlank(jsonType)){
            List<ShopType> shopTypes = JSONUtil.toList(jsonType, ShopType.class);
            return Result.ok(shopTypes);
        }
        List<ShopType> shopTypes = shopTypeMapper.selectList(new QueryWrapper<>());
        if(shopTypes.isEmpty()){
            return Result.fail("您查询的页面不存在!");
        }
        stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shopTypes));
        return Result.ok(shopTypes);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

缓存更新策略

在这里插入图片描述

业务场景:

低一致性需求:使用内存淘汰机制。例如店铺类型的查询缓存
高一致性需求:主动更新,并以超时剔除作为兜底方案。例如店铺详情查询的缓存

主动更新策略:
在这里插入图片描述
操作缓存和数据库时有三个问题需要考虑:

1.删除缓存还是更新缓存?

更新缓存:每次更新都更新缓存,无效写操作较多

删除缓存:更新数据库时让缓存失效,查询时再更新缓存(胜出)

2.如何保证缓存与数据库的操作的同时成功或失败?

单体系统:将缓存与数据库操作放在一个事务

分布式系统:利用TCC等分布式事务方案

3.先操作缓存还是先操作数据库?
在这里插入图片描述
总结:
在这里插入图片描述

实现缓存与数据库的双写一致

案例:给查询商铺的缓存添加超时剔除和主动更新的策略

修改ShopController中的业务逻辑,满足下面的需求:

①根据id查询店铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间

②根据id修改店铺时,先修改数据库,再删除缓存

更新操作:

@Override
@Transactional
public Result update(Shop shop) {
    Long id=shop.getId();
    if(id==null){
        return Result.fail("店铺id不能为空!");
    }
    //1.更新数据库
    updateById(shop);
    //2.删除缓存
    stringRedisTemplate.delete(CACHE_SHOP_KEY+shop.getId());
    return Result.ok();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

缓存穿透

缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库

在这里插入图片描述
编码解决商铺查询的缓存穿透问题
在这里插入图片描述

    @Override
    public Result queryById(Long id) {
        String key=CACHE_SHOP_KEY+id;
        //1.从redis查询商铺缓存
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        //2.判断是否存在
        if(StrUtil.isNotBlank(shopJson)) {
            //3.存在,直接返回
            Shop shop = JSONUtil.toBean(shopJson, Shop.class);
            return Result.ok(shop);
        }
        //命中的是否为空值
        if(shopJson!=null){
            return Result.fail("店铺信息不存在!");
        }
        //4.不存在,根据id查询数据库
        Shop shop = getById(id);
        //5.不存在,返回错误
        if(shop==null){
            //将空值写入redis
            stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
            return Result.fail("店铺信息不存在!");
        }
        //6.存在,写入redis
        stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);
        //7.返回
        return Result.ok(shop);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

总结:

缓存穿透产生的原因是什么?

用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力

缓存穿透的解决方案有哪些?

缓存null值

布隆过滤

增强id的复杂度,避免被猜测id规律

做好数据的基础格式校验

加强用户权限校验

做好热点参数的限流

缓存雪崩

缓存雪崩是指同一时刻大量的缓存key同时失效或者redis服务宕机,导致大量请求到达数据库,带来巨大压力

解决方案:

给不同的Key的TTL添加随机值

利用Redis集群提高服务的可用性

给缓存业务添加降级限流策略

给业务添加多级缓存

在这里插入图片描述

缓存击穿

缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

在这里插入图片描述

常见的解决方案有两种:

互斥锁

逻辑过期

互斥锁和逻辑过期对比:
在这里插入图片描述
互斥锁和逻辑过期优缺点:

在这里插入图片描述

利用互斥锁解决缓存击穿问题

需求:修改根据id查询商铺的业务,基于互斥锁方式来解决缓存击穿问题
在这里插入图片描述

    public Shop queryWithMutex(Long id){
        String key = CACHE_SHOP_KEY+id;
        //从redis查询商铺缓存
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        //判断缓存是否命中
        if(StrUtil.isNotBlank(shopJson)){
            //命中则直接返回数据
            return JSONUtil.toBean(shopJson, Shop.class);
        }
        //判断是否是缓存穿透
        if(shopJson!=null){
            return null;
        }
        //实现缓存重建
        //1.获取互斥锁
        String lockKey=LOCK_SHOP_KEY+id;
       try{
           boolean isLock = tryLock(lockKey);
           //2.判断是否获取成功
           if(!isLock) {
               //3.失败,则休眠并重试
               Thread.sleep(50);
               queryWithMutex(id);
           }
           //4.成功,根据id查询数据库
           Shop shop = getById(id);
           //5.不存在,返回错误
           if(shop==null){
               stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
               return null;
           }
           //6.存在,写入redis
           stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL,TimeUnit.MINUTES);
           return shop;
       }catch(InterruptedException e){
           throw new RuntimeException(e);
       }finally{
           unlock(lockKey);
       }
    }
 
    private boolean tryLock(String key){
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", LOCK_SHOP_TTL, TimeUnit.MINUTES);
        return BooleanUtil.isTrue(flag); //为防止程序在拆箱的时候出现空指针,要手动拆箱
    }
 
    private void unlock(String key){
        stringRedisTemplate.delete(key);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

基于逻辑过期方式解决缓存击穿问题

需求:修改根据id删除商铺的业务,基于逻辑过期的方式来解决缓存击穿问题

在这里插入图片描述
添加逻辑过期时间:

@Data
public class RedisData {
    private LocalDateTime expireTime;
    private Object data;
}
  • 1
  • 2
  • 3
  • 4
  • 5

整体实现逻辑:

   private static final ExecutorService CACHE_REBUILD_EXECUTOR= Executors.newFixedThreadPool(10);
 
    public Shop queryWithLogicalExpire(Long id){
        String key=CACHE_SHOP_KEY+id;
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        //缓存未命中
        if(StrUtil.isBlank(shopJson)){
            return null;
        }
        //命中,需要先把json反序列化为对象
        RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
        Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
        LocalDateTime expireTime = redisData.getExpireTime();
        //判断是否过期
        if(expireTime.isAfter(LocalDateTime.now())) {
            //未过期,直接返回店铺信息
            return shop;
        }
        //已过期,需要缓存重建
        String lockKey=LOCK_SHOP_KEY+id;
        //获取互斥锁
        boolean isLock = tryLock(lockKey);
        //判断是否获取锁成功
        if(isLock) {
            //成功,开启独立线程,实现缓存重建
            CACHE_REBUILD_EXECUTOR.submit(()->{
                try {
                    saveShop2Redis(id, CACHE_SHOP_TTL);
                }catch(Exception e){
                    throw new RuntimeException(e);
                }finally{
                    unlock(lockKey);
                }
            });
        }
        //返过期的商铺信息
    }
 
    private boolean tryLock(String key){
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", LOCK_SHOP_TTL, TimeUnit.MINUTES);
        return BooleanUtil.isTrue(flag); //为防止程序在拆箱的时候出现空指针,要手动拆箱
    }
 
    private void unlock(String key){
        stringRedisTemplate.delete(key);
    }
 
    public void saveShop2Redis(Long id,Long expireSeconds){
        //1.查询店铺数据
        Shop shop = getById(id);
        //2.封装逻辑过期时间
        RedisData redisData = new RedisData();
        redisData.setData(shop);
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
        //3.写入Redis
        stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

封装Redis工具类

基于StringRedisTemplate封装一个缓存工具类,满足下列要求:

方法1:任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题

@Component
public class CacheClient {
    private final StringRedisTemplate stringRedisTemplate;
 
    public CacheClient(StringRedisTemplate stringRedisTemplate){
        this.stringRedisTemplate=stringRedisTemplate;
    }
 
    public void set(String key, Object value, Long time, TimeUnit unit){
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);
    }
 
    public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit){
        //设置逻辑过期
        RedisData redisData = new RedisData();
        redisData.setData(value);
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
    }
 
    public <R,ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID,R>dbFallback,Long time,TimeUnit unit){
        String key=keyPrefix+id;
        String json = stringRedisTemplate.opsForValue().get(key);
        if(StrUtil.isNotBlank(json)){
            return JSONUtil.toBean(json,type);
        }
        //判断命中的是否为空值
        if(json!=null){
            return null;
        }
        R r = dbFallback.apply(id);
        if(r==null){
            stringRedisTemplate.opsForValue().set(key,"",time,unit);
            return null;
        }
        this.set(key,r,time,unit);
        return r;
    }
 
    private static final ExecutorService CACHE_REBUILD_EXECUTOR= Executors.newFixedThreadPool(10);
 
    public <R,ID> R queryWithLogicalExpire(String keyPrefix,ID id,Class<R> type,Function<ID,R>dbFallback,Long time,TimeUnit unit){
        String key=keyPrefix+id;
        String json = stringRedisTemplate.opsForValue().get(key);
        if(StrUtil.isBlank(json)){
            return null;
        }
        RedisData redisData = JSONUtil.toBean(json, RedisData.class);
        R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
        LocalDateTime expireTime = redisData.getExpireTime();
        //判断是否过期
        if(expireTime.isAfter(LocalDateTime.now())){
            //未过期,直接返回店铺信息
            return r;
        }
        //已过期,需要缓存重建
        String lockKey=LOCK_SHOP_KEY+id;
        boolean isLock = tryLock(lockKey);
        if(isLock){
            CACHE_REBUILD_EXECUTOR.submit(()->{
                try{
                    //重建缓存
                    //1.查询数据库
                    R apply = dbFallback.apply(id);
                    //2.存入redis
                    this.setWithLogicalExpire(key,apply,time,unit);
                }catch (Exception e){
                    throw new RuntimeException(e);
                }finally{
                    unlock(key);
                }
            });
        }
        return r;
    }
 
    private boolean tryLock(String key){
        Boolean flag=stringRedisTemplate.opsForValue().setIfAbsent(key,"1",10,TimeUnit.SECONDS);
        return BooleanUtil.isTrue(flag);
    }
 
    private void unlock(String key){
        stringRedisTemplate.delete(key);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85

缓存总结

认识缓存

什么是缓存?

  • 一种具备高效读写能力的数据暂存区域

缓存的作用?

  • 降低后端负载

  • 提高读写响应速度

缓存的成本?

  • 开发成本
  • 运维成本
  • 一致性问题

缓存更新策略

三种策略:

  • 内存淘汰:redis自带的内存淘汰机制

  • 过期淘汰:利用expire命令给数据设置过期时间

  • 主动更新:主动完成数据库与缓存的同时更新

策略选择:

  • 低一致性需求:内存淘汰或过期淘汰

  • 高一致性需求:

    • 主动更新为主

    • 过期淘汰兜底

主动更新方案

Cache Aside:缓存调用者在更新数据库的同时完成对缓存的更新

一致性良好、实现难度一般

Read/Write Through:缓存与数据库集成为一个服务,服务保证两者的一致性,对外暴露API接口,调用者调用API,无需知道自己操作的是数据库还是缓存,不关心一致性

一致性优秀、实现复杂、性能一般

Write Back:缓存调用者的CRUD都针对缓存完成。有独立线程异步将缓存数据写到数据库,实现最终一致

一致性差、性能好、实现复杂

Cache Aside模式选择

更新缓存还是删除缓存?

  • 更新缓存会产生无效更新,并且存在较大的线程安全问题

  • 删除缓存本质是延迟更新,没有无效更新,线程安全问题相对较低

先操作数据库还是缓存?

  • 先更新数据,再删除缓存——在满足原子性的情况下,安全问题概率较低

  • 先删除缓存,再更新数据库——安全问题概率极高

如何确保数据库与缓存操作原子性?

  • 单体系统——利用事务机制

  • 分布式系统——利用分布式事务机制

最佳实践

查询数据时

1.先查询缓存

2.如果缓存命中,直接返回

3.如果缓存未命中,则查询数据库

4.将数据库数据写入缓存

5.返回结果

修改数据库时:

1.先修改数据库

2.然后删除缓存

===>确保两者的原子性

缓存穿透

产生原因:客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库

解决方案:

①缓存空对象:
思路:对于不存在的数据也在redis建立缓存,值为空,并设置一个较短的TTL时间
优点:实现简单,便于维护
缺点:额外的内存消耗、短期的数据不一致问题

②布隆过滤:
思路:利用布隆过滤算法,在请求进入Redis之前先判断是否存在,如果不存在则直接拒绝请求
优点:内存占用少
缺点:实现复杂、存在误判的可能性

③其他:
做好数据的基础格式校验
加强用户权限校验
做好热点参数的限流

缓存雪崩

产生原因:在同一时段大量的缓存key同时失效或者redis服务宕机,导致大量请求到达数据库,带来巨大压力

解决方案:

    给不同的Key的TTL添加随机值

    利用Redis集群提高服务的可用性

    给缓存业务添加降级限流策略

    给业务添加多级缓存
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

缓存击穿(热点key)

产生原因:
热点key:①在某一时段被高并发访问 ②缓存重建耗时较长
热点key突然过期,因为缓存重建耗时长,在这段时间内大量请求落到数据库,带来巨大冲击

解决方案:

互斥锁:
思路:给缓存重建过程加锁,确保重建过程只有一个线程执行,其他线程等待
优点:①实现简单 ②没有额外内存消耗 ③一致性好
缺点:①等待导致性能下降
缺点:有死锁风险

逻辑过期:
思路:
①热点key缓存永不过期,而是设置一个逻辑过期时间,查询到数据时通过对逻辑过期时间判断,来决定是否需要重建缓存
②重建缓存也通过互斥锁来保证单线程执行
③重建缓存利用独立线程异步执行
④其他线程无需等待,直接查询到旧数据即可
优点:现成无需等待,性能较好
缺点:
①不保证一致性
②有额外内存消耗
③实现复杂

三、优惠券秒杀

在这里插入图片描述

全局ID生成器

每个店铺都可以发布优惠券:
在这里插入图片描述
当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就会存在一些问题:

id的规律性太明显

受单表数据量的限制

全局ID生成器:

全局ID生成器是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:

在这里插入图片描述
为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息

在这里插入图片描述
ID的组成部分:

符号位:1bit,永远为0
时间戳:31bit,以秒为单位,可以使用69年
序列号:32bit,秒内的计数器,可以支持每秒产生2^32个不同的ID

Redis实现全局唯一ID

@Component
public class RedisIdWorker {
    //开始时间戳 2023-01-01 00:00:00
    private static final long BEGIN_TIMESTAMP=1672531200L;
    private static final int COUNT_BITS=32;
 
    private StringRedisTemplate stringRedisTemplate;
 
    public RedisIdWorker(StringRedisTemplate stringRedisTemplate){
        this.stringRedisTemplate=stringRedisTemplate;
    }
 
    public long nextId(String keyPrefix){
        //1.生成时间戳
        LocalDateTime now = LocalDateTime.now();
        long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp=nowSecond-BEGIN_TIMESTAMP;
        //2.生成序列号
        //2.1 获取当前日期,精确到天
        String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        //2.2 自增长
        long count=stringRedisTemplate.opsForValue().increment("icr:"+keyPrefix+":"+date);
        //3.拼接并返回
        return timestamp<<COUNT_BITS | count;
    }
 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

测试代码:

    @Resource
    private RedisIdWorker redisIdWorker;
 
    private ExecutorService es= Executors.newFixedThreadPool(500);
    @Test
    void testIdWorker() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(300);
        Runnable task=()->{
            for (int i = 0; i < 100; i++) {
                long id = redisIdWorker.nextId("order");
                System.out.println("id="+id);
            }
            latch.countDown();
        };
        latch.countDown();
        long begin = System.currentTimeMillis();
        for (int i = 0; i < 300; i++) {
            es.submit(task);
        }
        latch.await();
        long end=System.currentTimeMillis();
        System.out.println("time="+(end-begin));
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

总结:

全局唯一ID生成策略:

  • UUID

  • Redis自增

  • snowflake算法

  • 数据库自增

Redis自增ID策略:

  • 每天一个key,方便统计订单量

  • ID构造是 时间戳+计数器

添加优惠券

每个店铺都可以发布优惠券,分为评价券和特价劵。平价券可以任意购买,而特价券需要秒杀抢购:
在这里插入图片描述
表关系如下:

tb_voucher:优惠券的基本信息,优惠金额、使用规则等

tb_seckill_voucher:优惠券的库存、开始抢购时间、结束抢购时间。特价优惠券才需要填写这些信息。

通过postman添加优惠券:

{
    "shopId":1,
    "title":"100元代金券",
    "subTitle":"周一至周五均可使用",
    "rules":"全场通用\\n无需预约\\n可无限叠加\\n不兑换、不找零\\n仅限堂食",
    "payValue":8000,
    "actualValue":10000,
    "type":1,
    "stock":100,
    "beginTime":"2023-04-29T10:09:17",
    "endTime":"2023-04-29T23:09:04"
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

实现秒杀下单

下单时需要判断两点:

秒杀是否开始或结束,如果尚未开始或已经结束则无法下单

库存是否充足,不足则无法下单

在这里插入图片描述

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
 
    @Resource
    private ISeckillVoucherService seckillVoucherService;
 
    @Resource
    private RedisIdWorker redisIdWorker;
 
    @Override
    @Transactional
    public Result seckillVoucher(Long voucherId) {
        //1.查询优惠券
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        //2.判断秒杀是否开始
        if(voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            //尚未开始
            return Result.fail("秒杀尚未开始!");
        }
        //3.判断秒杀是否结束
        if(voucher.getEndTime().isBefore(LocalDateTime.now())){
            //已经结束
            return Result.fail("秒杀已经结束!");
        }
        //4.判断库存是否充足
        if(voucher.getStock()<1){
            //库存不足
            return Result.fail("库存不足!");
        }
        //5.扣减库存
        boolean success=seckillVoucherService
                .update()
                .setSql("stock=stock-1")
                .eq("voucher_id",voucherId)
                .update();
        if(!success){
            //扣减失败
            return Result.fail("库存不足!");
        }
        //6.创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        //6.1 订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        //6.2 用户id
        Long userId = UserHolder.getUser().getId();
        voucherOrder.setUserId(userId);
        //6.3 代金券id
        voucherOrder.setVoucherId(voucherId);
        save(voucherOrder);
        //7.返回订单id
        return Result.ok(orderId);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

在这里插入图片描述

超卖问题

用jmeter模拟高并发:

在这里插入图片描述
在这里插入图片描述
模拟结果:
在这里插入图片描述

原因:
在这里插入图片描述
超卖问题就是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:

在这里插入图片描述
乐观锁的关键是判断之前查询得到的数据是否有被修改过,常见的方式有两种:

版本号法

CAS法

在这里插入图片描述
在这里插入图片描述

乐观锁解决超卖问题

boolean success=seckillVoucherService
   .update()
   .setSql("stock=stock-1")
   .eq("voucher_id",voucherId).eq("stock",voucher.getStock())
   .update();
  • 1
  • 2
  • 3
  • 4
  • 5

乐观锁问题:成功率太低
在这里插入图片描述
乐观锁优化:

boolean success=seckillVoucherService
    .update()
    .setSql("stock=stock-1")
    .eq("voucher_id",voucherId).gt("stock",0)
    .update();
  • 1
  • 2
  • 3
  • 4
  • 5

运行结果:50%的失败率,订单恰好没有超卖

在这里插入图片描述
超卖这样的线程安全问题,解决方案有哪些?

1.悲观锁:添加同步锁,让线程串行执行
优点:简单粗暴
缺点:性能一般

2.乐观锁:不加锁,在更新时判断是否有其他线程在修改
优点:性能好
缺点:存在成功率低的问题

实现一人一单功能

需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单

在这里插入图片描述
代码实现:

  //一人一单
  Long userId = UserHolder.getUser().getId();
  Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
  if(count>0){
      //用户已经存在
      return Result.fail("该用户已经购买过一次!");
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

对于高并发,解决线程安全问题:
(先提交事务,再释放锁)
第一步:添加依赖

<dependency>
     <groupId>org.aspectj</groupId>
     <artifactId>aspectjweaver</artifactId>
 </dependency>
  • 1
  • 2
  • 3
  • 4

第二步:暴露代理对象 @EnableAspectJAutoProxy(exposeProxy=true)

@MapperScan("com.hmdp.mapper")
@SpringBootApplication
@EnableAspectJAutoProxy(exposeProxy = true)
public class HmDianPingApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(HmDianPingApplication.class, args);
    }
 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

第三步:对方法加锁

如果用this.createVoucherOrder(voucherId); 首先我们要知道事务如果想生效,需要Spring对该类做动态代理,拿到了代理对象,来对事务进行处理

这个this是没有事务功能的,因为拿到的目标对象(非代理对象)

所以需要拿到事务代理对象AopContext.currentProxy()

synchronized (userId.toString().intern()) {
    //获取代理对象(事务)
    IVoucherOrderService proxy =(IVoucherOrderService) AopContext.currentProxy();
    return proxy.createVoucherOrder(voucherId);
}
  • 1
  • 2
  • 3
  • 4
  • 5
    @Transactional
    public Result createVoucherOrder(Long voucherId) {
        //一人一单
        Long userId = UserHolder.getUser().getId();
        Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
        if (count > 0) {
            //用户已经存在
            return Result.fail("该用户已经购买过一次!");
        }
        //5.扣减库存
        boolean success = seckillVoucherService
                .update()
                .setSql("stock=stock-1")
                .eq("voucher_id", voucherId).gt("stock", 0)
                .update();
        if (!success) {
            //扣减失败
            return Result.fail("库存不足!");
        }
        //6.创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        //6.1 订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        //6.2 用户id
        voucherOrder.setUserId(userId);
        //6.3 代金券id
        voucherOrder.setVoucherId(voucherId);
        save(voucherOrder);
        //7.返回订单id
        return Result.ok(orderId);
 
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

压测结果:
在这里插入图片描述

集群模式下线程锁失效

因为集群模式下,每个JVM有各自的锁监视器
在这里插入图片描述

四、分布式锁

分布式锁基本原理

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁

在这里插入图片描述
在这里插入图片描述
分布式锁的实现:

分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见有三种:

在这里插入图片描述

Redis分布式锁的基本实现

实现分布式锁时需要实现的两个基本方法:
获取锁:
互斥:确保只能有一个线程获取锁

#添加锁,利用setnx的互斥特性
setnx lock thread1

#添加锁过期时间,避免服务宕机引起的死锁
expire lock 10
  • 1
  • 2
  • 3
  • 4
  • 5

非阻塞:尝试一次,成功返回true,失败返回false
在这里插入图片描述

释放锁:

手动释放:del key
超时释放:获取锁时添加一个超时时间
在这里插入图片描述

实现Redis分布式锁初级版本

需求:定义一个类,实现下面的接口,利用Redis实现分布式锁功能
在这里插入图片描述

@AllArgsConstructor
public class SimpleRedisLock implements ILock{
    private String name;
    private StringRedisTemplate stringRedisTemplate;
    private static final String KEY_PREFIX="lock:";
 
    @Override
    public boolean tryLock(long timeoutSec) {
        long threadId = Thread.currentThread().getId();
        Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }
 
    @Override
    public void unlock() {
        //释放锁
        stringRedisTemplate.delete(KEY_PREFIX+name);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

优化一人一单:

Long userId=UserHolder.getUser().getId();
 //创建锁对象
 SimpleRedisLock lock = new SimpleRedisLock("order:"+userId,stringRedisTemplate);
 boolean isLock = lock.tryLock(1200);
 if(!isLock) {
     //获取锁失败,返回错误或重试
     return Result.fail("一个人只能下一单!");
 }
 try {
     //获取代理对象(事务)
     IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
     return proxy.createVoucherOrder(voucherId);
 }finally {
     lock.unlock();
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

Redis分布式锁误删

在这里插入图片描述
解决方案:在释放锁前判断这个锁是否还属于自己
在这里插入图片描述
在这里插入图片描述

改进Redis的分布式锁

需求:修改之前的分布式锁实现,满足:

1.在获取锁时存入线程标识(可以用UUID表示)
2.在释放锁时现货区锁中的线程标识,判断是否与当前线程标识一致

  • 如果一致则释放锁
  • 如果不一致则不释放锁
@AllArgsConstructor
public class SimpleRedisLock implements ILock{
 
    private String name;
    private StringRedisTemplate stringRedisTemplate;
    private static final String KEY_PREFIX="lock:";
    private static final String ID_PREFIX= UUID.randomUUID().toString(true)+"-";
 
    @Override
    public boolean tryLock(long timeoutSec) {
        String threadId =ID_PREFIX + Thread.currentThread().getId();
        Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId , timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }
 
    @Override
    public void unlock() {
        //获取线程标识
        String threadId=ID_PREFIX + Thread.currentThread().getId();
        //获取锁中的标识
        String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
        //判断标识是否一致
        if(threadId.equals(id)) {
            //释放锁
            stringRedisTemplate.delete(KEY_PREFIX + name);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

分布式锁的原子性问题

判断和释放锁是两个动作,这中间可能会发生并发问题
在这里插入图片描述

Lua脚本解决多条命令原子性问题

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言。

这里重点介绍Redis提供的调用函数,语法如下:

#执行redis命令
redis.call('命令名称','key','其它参数',...)
#比如,我们要执行set name jack
redis.call('set','name','jack')
 
#如果我们要执行set name Rose,再执行get name,则脚本如下
redis.call('set','name','Rose')
local name = redis.call('get','name')
return name
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

写好脚本以后,需要用Redis命令来调用脚本,调用脚本常见命令如下:

执行无参脚本:
在这里插入图片描述
执行有参脚本:

在这里插入图片描述
如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入keys数组,其它参数会放入argv数组,在脚本中可以从keys和argv数组获取这些参数:
在这里插入图片描述

在这里插入图片描述
基于Redis的分布式锁

释放锁的业务流程:

1.获取锁中的线程标识
2.判断是否与指定的标识(当前线程标识)一致
3.如果一致则释放锁(删除)
4.如果不一致则什么都不做

-- 锁的key
local key = KEYS[1]
-- 当前线程标识
local threadId = ARGV[1]
 
-- 获取锁中的线程标识 get key
local id = redis.call('get',key)
--比较线程标识与锁中的标识是否一致
if(id == threadId) then
    -- 释放锁
   return redis.call('del',key)
end
return 0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

脚本优化:


if(redis.call('get',KEYS[1])==ARGV[1]) then
    return redis.call('del',KEYS[1])
end
return 0
  • 1
  • 2
  • 3
  • 4
  • 5

再次改进Redis的分布式锁

需求:基于Lua脚本实现分布式锁的释放锁逻辑

提示:RedisTemplate调用Lua脚本的API如下:

在这里插入图片描述
在这里插入图片描述

    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
    static {
        UNLOCK_SCRIPT=new DefaultRedisScript<>();
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        UNLOCK_SCRIPT.setResultType(Long.class);
    }
    @Override
    public void unlock() {
        //调用lua脚本
        stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX+name),ID_PREFIX+Thread.currentThread().getId());
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

基于Redis的分布式锁实现思路:

利用setnx nx ex获取锁,并设置过期时间,保存线程标识

  • 释放锁时先判断线程标识与自己是否一致,一致则删除锁

特性:

  • 利用set nx满足互斥性
  • 利用set ex保证故障时依然能释放,避免死锁,提高安全性
  • 利用Redis集群保证高可用和高并发特性

Redisson功能介绍

在这里插入图片描述
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

在这里插入图片描述

Redisson快速入门

第一步:引入依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.20.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

第二步:配置Redisson

@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient(){
        //配置
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.202.128:6379").setPassword("123321");
        //创建RedissonClient对象
        return Redisson.create(config);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

3.使用Redisson分布式锁
在这里插入图片描述
使用jmeter压测通过,一个用户只能下一单:

在这里插入图片描述

Redisson可重入锁原理

redis无法实现可重入锁的原因:
在这里插入图片描述
Redisson可以实现可重入锁的原理:
在这里插入图片描述
lua脚本实现:
在这里插入图片描述
在这里插入图片描述
测试代码:

@SpringBootTest
@Slf4j
public class RedissonTest {
 
    @Autowired
    private RedissonClient redissonClient;
 
    private RLock lock;
 
    @BeforeEach
    void setup(){
        lock=redissonClient.getLock("order");
    }
 
    @Test
    void method1(){
        boolean isLock = lock.tryLock();
        if(!isLock){
            log.error("获取锁失败……1");
            return;
        }
        try{
            log.info("获取锁成功……1");
            method2();
            log.info("开始执行业务……1");
        }finally{
            log.warn("准备释放锁……1");
            lock.unlock();
        }
    }
 
    void method2(){
        boolean isLock = lock.tryLock();
        if(!isLock){
            log.error("获取锁失败……2");
            return;
        }
        try{
            log.info("获取锁成功……2");
            log.info("开始执行业务……2");
        }finally{
            log.warn("准备释放锁……2");
            lock.unlock();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

运行截图:

在这里插入图片描述
查看源码实现:

trylock:

在这里插入图片描述
unlock:
在这里插入图片描述

Redisson的锁重试和WatchDog机制

在这里插入图片描述
Redisson分布式锁原理:

可重入:利用hash结构记录线程id和重入次数
可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
超时续约:利用watchDog,每隔一段时间(releaseTime/3),重置超时时间

修改method1:

 boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
  • 1

跟入源码:

注:redis.call('pttl',KEYS[1]);中的pttl返回以毫秒为单位,而ttl返回以秒为单位

如果该线程拿到锁,则返回nil;否则返回剩余有效期

在这里插入图片描述
代码精华部分:

            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }
 
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
 
                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }
 
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

怎么确保锁是因为业务结束而释放,而非阻塞导致的超时释放呢?

自动更新续期:

scheduleExpirationRenewal(threadId);

在这里插入图片描述
renewExpiration:更新有效期

在这里插入图片描述
实现永不过期的代码:

        Timeout task = commandExecutor.getServiceManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                CompletionStage<Boolean> future = renewExpirationAsync(threadId);
                future.whenComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock {} expiration", getRawName(), e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    } else {
                        cancelExpirationRenewal(null);
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

那么,何时取消计时刷新呢?

在锁释放的时候结束计时
在这里插入图片描述

Redisson的multiLock问题

redisson分布式锁主从一致问题:如果主节点宕机,而未将数据同步给从节点,可能会导致并发问题
在这里插入图片描述
解决方案:所有节点都变成独立的redis节点

在这里插入图片描述
总结:

1)不可重入的Redis分布式锁:

原理:利用setnx的互斥性,利用ex避免死锁;释放锁时判断线程标识
缺陷:不可重入、无法重试、锁超时失败

2)可重入的Redis分布式锁:

原理:利用hash结果,记录线程标识和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待
缺陷:redis宕机引起锁失效问题

3)Redisson的multiLock:

原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
缺陷:运维成本高、实现复杂

五、秒杀优化

异步秒杀思路

在这里插入图片描述
怎么保证一人一单呢?Set集合

怎么确保代码执行的原子性?lua脚本

在这里插入图片描述

基于Redis完成秒杀资格判断

需求:

①新增秒杀优惠券的同时,将优惠券的信息保存到Redis中

 
    @Override
    @Transactional
    public void addSeckillVoucher(Voucher voucher) {
        // 保存优惠券
        save(voucher);
        // 保存秒杀信息
        SeckillVoucher seckillVoucher = new SeckillVoucher();
        seckillVoucher.setVoucherId(voucher.getId());
        seckillVoucher.setStock(voucher.getStock());
        seckillVoucher.setBeginTime(voucher.getBeginTime());
        seckillVoucher.setEndTime(voucher.getEndTime());
        seckillVoucherService.save(seckillVoucher);
        //保存秒杀库存到Redis中
        stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());
 
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

新增优惠券:
在这里插入图片描述
在这里插入图片描述
②基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功

-- 1.参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
--1.2 用户id
local userId = ARGV[2]
 
-- 2.数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId
 
-- 3.脚本业务
-- 3.1 判断库存是否充足 get stockKey
if(tonumber(redis.call('get',stockKey))<=0) then
    -- 3.2 库存不足,返回1
    return 1
end
-- 3.2 判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember',orderKey,userId) == 1) then
    -- 3.3 存在,说明是重复下单,返回2
    return 2
end
-- 3.4 扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
-- 3.5 下单(保存用户)sadd orderKey userId
redis.call('sadd',orderKey,userId)
return 0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

③如果抢购成功,将优惠券id和用户id封装后存入阻塞队列


@Resource
private ISeckillVoucherService seckillVoucherService;

@Resource
private RedisIdWorker redisIdWorker;

@Autowired
private StringRedisTemplate stringRedisTemplate;

private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
    SECKILL_SCRIPT = new DefaultRedisScript<>();
    SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
    SECKILL_SCRIPT.setResultType(Long.class);
}
@Override
public Result seckillVoucher(Long voucherId) {
    // 取出用户
    Long userId = UserHolder.getUser().getId();
    //1. 执行lua脚本
    Long result = stringRedisTemplate.execute(
            SECKILL_SCRIPT,
            Collections.emptyList(),
            voucherId.toString(),userId.toString()
    );
    int r=result.intValue();
    //2. 判断结果是否为0
    if(r != 0) {
        //2.1 不为0,没有购买资格
        return Result.fail(r==1 ? "库存不足" : "不能重复下单");
    }
    //2.1 为0,有购买资格,把下单信息保存到阻塞队列
    long orderId = redisIdWorker.nextId("order");
    // TODO 保存阻塞队列
    //3.返回订单id
    return Result.ok(orderId);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

④开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

基于阻塞队列实现秒杀业务

源代码有大改动,包括之前写好的createVoucherOrder,所以附上完整源码:

@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
 
    @Resource
    private ISeckillVoucherService seckillVoucherService;
 
    @Resource
    private RedisIdWorker redisIdWorker;
 
    @Resource
    private RedissonClient redissonClient;
 
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
 
    private BlockingQueue<VoucherOrder> orderTasks=new ArrayBlockingQueue<>(1024*1024);
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
   private IVoucherOrderService proxy;
    @PostConstruct
    private void init(){
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }
    private class VoucherOrderHandler implements Runnable{
        @Override
        public void run() {
            while(true){
                try {
                    //1.获取队列中的订单信息
                    VoucherOrder voucherOrder = orderTasks.take();//没有任务时会自动阻塞
                    //2.创建订单
                    handleVoucherOrder(voucherOrder);
                } catch (InterruptedException e) {
                    log.error("处理订单异常",e);
                }
            }
        }
    }
 
    private void handleVoucherOrder(VoucherOrder voucherOrder) {
        //1.获取用户
        Long userId=voucherOrder.getUserId();
        //2.创建锁对象
        RLock lock=redissonClient.getLock("lock:order:" + userId);
        //3.获取锁
        boolean isLock = lock.tryLock();
        //4.判断是否获取锁成功
        if(!isLock){
            log.error("不允许重复下单");
            return;
        }
        try{
            proxy.createVoucherOrder(voucherOrder);
        }finally{
            lock.unlock();
        }
 
    }
 
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }
    @Override
    public Result seckillVoucher(Long voucherId) {
        // 取出用户
        Long userId = UserHolder.getUser().getId();
        //1. 执行lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(),userId.toString()
        );
        int r=result.intValue();
        //2. 判断结果是否为0
        if(r != 0) {
            //2.1 不为0,没有购买资格
            return Result.fail(r==1 ? "库存不足" : "不能重复下单");
        }
        //2.1 为0,有购买资格,把下单信息保存到阻塞队列
        long orderId = redisIdWorker.nextId("order");
        // TODO 保存阻塞队列
        VoucherOrder voucherOrder = new VoucherOrder();
        voucherOrder.setId(orderId);
        voucherOrder.setUserId(userId);
        voucherOrder.setVoucherId(voucherId);
        orderTasks.add(voucherOrder);
        //3.返回订单id
        return Result.ok(orderId);
 
    }
 
    @Transactional
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        //一人一单
        Long userId = voucherOrder.getUserId();
        Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
        if (count > 0) {
            //用户已经存在
            log.error("用户已经购买过一次");
            return;
        }
        //5.扣减库存
        boolean success = seckillVoucherService
                .update()
                .setSql("stock=stock-1")
                .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0)
                .update();
        if (!success) {
            //扣减失败
            log.error("库存不足");
        }
        save(voucherOrder);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117

秒杀业务的优化思路是什么?

①先利用Redis完成库存余量、一人一单判断,完成抢单业务

②再将下单业务放入阻塞队列,利用独立线程异步下单

基于阻塞队列的异步秒杀存在哪些问题?

  • 内存限制问题
  • 数据安全问题

六、Redis消息队列

认识消息队列

消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

消息队列:存储和管理消息,也被称为消息代理(Message Broker)
生产者:发送消息到消息队列
消费者:从消息队列获取消息并处理消息
Redis提供了三种不同的方式来实现消息队列:

  • list结构:基于List结构模拟消息队列
  • PubSub:基本的点对点消息模型
  • Stream:比较完善的消息队列模型

在这里插入图片描述

基于List结构模拟消息队列

消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果

队列的入口和出口不在一边,因此我们可以利用:LPUSH结合RPOP、或者RPUSH结合LPOP来实现

不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用的是BRPOP或者BLPOP来实现阻塞效果。
在这里插入图片描述

示例:
在这里插入图片描述
在这里插入图片描述
基于List的消息队列有哪些优缺点?

优点:

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

基于PubSub的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息

SUBSCRIBE channel [channel] :订阅一个或多个频道
PUBLISH channel msg:向一个频道发送消息
PSUBSCRIBE pattern [pattern]:订阅与pattern格式匹配的所有频道

pattern:

?代表一个字符

  • 代表零个或多个字符

[ae] 代表可以是a也可以是e

在这里插入图片描述
示例:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

基于PubSub的消息队列有哪些优缺点?

优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化

  • 无法避免消息丢失

  • 消息堆积有上限,超出时数据丢失

基于Stream的消息队列

Stream是Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列

在这里插入图片描述
在这里插入图片描述
示例1:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
===>结论 :基于stream的消息队列,消息会被持久化存储,可以被多个消费者读取,也可以被一个消费者读取多次

示例2:阻塞等待,block 0 代表一直阻塞直到有新消息的到来
在这里插入图片描述
在这里插入图片描述

消费者应用:

在这里插入图片描述
bug:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题
消息漏读:

在这里插入图片描述
在这里插入图片描述
STREAM类型消息队列的XREAD命令特点:

消息可回溯
一个消息可以被多个消费者读取
可以阻塞读取
有消息漏读的风险

基于Stream的消费队列—消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

在这里插入图片描述
在这里插入图片描述
==>队列中的消息如果还想用,ID从0开始;如果不想用了,ID从$开始
在这里插入图片描述
在这里插入图片描述
示例:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
Java代码实现:

在这里插入图片描述
STREAM类型消息队列的XREADGROUP命令特点:

消息可回溯

可以多消费者争抢消息,加快消费速度

可以阻塞读取

没有消息漏读的风险

有消息确认机制,保证消息至少被消费一次

总结:

在这里插入图片描述

基于Redis的Stream结构作为消息队列,实现异步秒杀下单

需求:

①创建一个Stream类型的消息队列,名为stream.orders

参数MKSTREAM,在创建组的时候,如果消息队列不存在,则自动创建组和队列
在这里插入图片描述
②修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId

添加orderId:

--1.3 订单id
local orderId = ARGV[3]
  • 1
  • 2

发送消息到队列中

-- 3.6 发送消息到队列中
redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
  • 1
  • 2

改造Java代码:

    @Override
    public Result seckillVoucher(Long voucherId) {
        // 取出用户
        Long userId = UserHolder.getUser().getId();
        long orderId = redisIdWorker.nextId("order");
        //1. 执行lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(),userId.toString(),String.valueOf(orderId)
        );
        int r=result.intValue();
        //2. 判断结果是否为0
        if(r != 0) {
            //2.1 不为0,没有购买资格
            return Result.fail(r==1 ? "库存不足" : "不能重复下单");
        }
       proxy=(IVoucherOrderService) AopContext.currentProxy();
        //3.返回订单id
        return Result.ok(orderId);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

③项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

    private class VoucherOrderHandler implements Runnable{
        String queueName = "stream.orders";
        @Override
        public void run() {
            while(true) {
                try {
                    //1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order >
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create(queueName, ReadOffset.lastConsumed())
                    );
                    //2.判断消息获取是否成功
                    if (list == null || list.isEmpty()) {
                        // 如果获取失败,说明没有消息,继续下一次循环
                        continue;
                    }
                    // 解析消息中的订单信息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> values = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                    //3.创建订单
                    handleVoucherOrder(voucherOrder);
                    //4.ACK确认 SACK stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
                } catch (Exception e) {
                    log.error("处理订单异常", e);
                    handlePendingList();
                }
            }
        }
 
        private void handlePendingList(){
            while(true){
                try{
                    //1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS streams.order 0
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create(queueName, ReadOffset.from("0"))
                    );
                    //2.判断消息是否获取成功
                    if(list == null || list.isEmpty()){
                        //没有消息,说明pending-list中没有异常消息,结束循环
                        break;
                    }
                    //3.解析消息中的订单信息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> values = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                    //4.如果获取成功,可以下单
                    handleVoucherOrder(voucherOrder);
                    //5.ACK确认
                    stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
                }catch(Exception e){
                    log.error("处理pending-list订单异常",e);
                    try {
                        Thread.sleep(20);
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65

七、达人探店

在这里插入图片描述

发布探店笔记

探店笔记类似点评网站的评价,往往是图文结合。对应的表有两个:

tb_blog:探店笔记表,包含笔记中的标题、文字、图片等
tb_blog_comments:其他用户对探店笔记的评价
在这里插入图片描述
第一步:将SystemConstants的常量改为部署在nginx项目下的imgs目录

public static final String IMAGE_UPLOAD_DIR = "D:\\lesson\\nginx-1.18.0\\html\\hmdp\\imgs\\";
  • 1

在这里插入图片描述
在这里插入图片描述

实现查看发布探店笔记的接口

Controller接口:

    @GetMapping("/{id}")
    public Result queryBlogById(@PathVariable("id") Long id){
        return blogService.queryBlogById();
    }
  • 1
  • 2
  • 3
  • 4

Service实现:

    @Override
    public Result queryBlogById(Long id) {
        //1.查询blog
        Blog blog = getById(id);
        if(blog == null){
            return Result.fail("笔记不存在!");
        }
        //2.查询blog有关的用户
        queryBlogUser(blog);
        return Result.ok(blog);
    }
 
    private void queryBlogUser(Blog blog) {
        Long userId = blog.getUserId();
        User user = userService.getById(userId);
        blog.setName(user.getNickName());
        blog.setIcon(user.getIcon());
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

点赞

需求:

同一个用户只能点赞一次,再次点击则取消点赞

如果当前用户已经点赞,则点赞按钮高亮显示(前段已实现,判断字段Blog类的isLike属性)

实现步骤:

①给Blog类中添加一个isLike字段,标识是否被当前用户点赞

    /**
     * 是否点赞过了
     */
    @TableField(exist = false)
    private Boolean isLike;
  • 1
  • 2
  • 3
  • 4
  • 5

②修改点赞功能,利用Redis的set集合判断是否点赞过,未点赞过则点赞数+1,已点赞过则点赞数-1
③修改根据id查询Blog的业务,判断当前登录用户是否点赞过,赋值給isLike字段
④修改分页查询Blog业务,判断当前登录用户是否点赞过,赋值给isLike字段

    @PutMapping("/like/{id}")
    public Result likeBlog(@PathVariable("id") Long id) {
        return blogService.likeBlog(id);
    }
  • 1
  • 2
  • 3
  • 4
    @Override
    public Result queryHotBlog(Integer current) {
        // 根据用户查询
        Page<Blog> page = query()
                .orderByDesc("liked")
                .page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
        // 获取当前页数据
        List<Blog> records = page.getRecords();
        // 查询用户
        records.forEach(blog->{
            this.queryBlogUser(blog);
            this.isBlogLiked(blog);
        });
        return Result.ok(records);
    }
 
    @Override
    public Result queryBlogById(Long id) {
        //1.查询blog
        Blog blog = getById(id);
        if(blog == null){
            return Result.fail("笔记不存在!");
        }
        //2.查询blog有关的用户
        queryBlogUser(blog);
        //3.查询blog是否被点赞
        isBlogLiked(blog);
        return Result.ok(blog);
    }
 
    private void isBlogLiked(Blog blog) {
        Long userId = UserHolder.getUser().getId();
        String key = "blog:liked:" + blog.getId();
        Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
        blog.setIsLike(BooleanUtil.isTrue(isMember));
    }
 
    @Override
    public Result likeBlog(Long id) {
        //1. 获取登录用户
        Long userId = UserHolder.getUser().getId();
        //2. 判断当前登录用户是否已经点赞
        String key = "blog:liked:" + id;
        Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
        if(BooleanUtil.isFalse(isMember)) {
            //3. 如果未点赞,可以点赞
            //3.1 数据库点赞数+1
            boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
            //3.2 保存用户到Redis的set集合
            if(isSuccess){
                stringRedisTemplate.opsForSet().add(key,userId.toString());
            }
        }else {
            //4. 如果已经点赞,则取消点赞
            //4.1 数据库点赞数-1
            boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
            //4.2 把用户从Redis的set集合移除
            if (isSuccess) {
                stringRedisTemplate.opsForSet().remove(key, userId.toString());
            }
        }
        return Result.ok();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

点赞排行榜

在这里插入图片描述

需求:按照点赞时间先后排序,返回Top5的用户
在这里插入图片描述
一人只能点赞一次功能实现:

因为SortedSet中没有isMember的判断,所以在添加元素的时候加上时间戳;查询的时候如果对应的元素没有时间戳,则代表集合中没有这个元素

    private void isBlogLiked(Blog blog) {
        UserDTO user = UserHolder.getUser();
        if(user==null){
            return;
        }
        Long userId = UserHolder.getUser().getId();
        String key = BLOG_LIKED_KEY + blog.getId();
        Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
        blog.setIsLike(score!=null);
    }
 
    @Override
    public Result likeBlog(Long id) {
        //1. 获取登录用户
        Long userId = UserHolder.getUser().getId();
        //2. 判断当前登录用户是否已经点赞
        String key = BLOG_LIKED_KEY + id;
        Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
        if(score==null) {
            //3. 如果未点赞,可以点赞
            //3.1 数据库点赞数+1
            boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
            //3.2 保存用户到Redis的set集合 zadd key value score
            if(isSuccess){
                stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());
            }
        }else {
            //4. 如果已经点赞,则取消点赞
            //4.1 数据库点赞数-1
            boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
            //4.2 把用户从Redis的set集合移除
            if (isSuccess) {
                stringRedisTemplate.opsForZSet().remove(key, userId.toString());
            }
        }
        return Result.ok();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

查询点赞排行前五:

    @Override
    public Result queryBlogLikes(Long id) {
        String key=BLOG_LIKED_KEY+id;
        //1. 查询top5的点赞用户 zrange key 0 4
        Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
        if(top5 == null || top5.isEmpty()){
            return Result.ok(Collections.emptyList());
        }
        //2. 解析出其中的用户id
        List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
        String idStr = StrUtil.join(",", ids);
        //3. 根据用户id查询用户
        List<UserDTO> userDTOs = userService
                .query()
                .in("id",ids)
                .last("order by field(id,"+idStr+")")
                .list()
                .stream()
                .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
                .collect(Collectors.toList());
        //4. 返回
        return Result.ok(userDTOs);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

八、好友关注

在这里插入图片描述

关注和取关

在这里插入图片描述

需求:基于该表数据结构,实现两个接口:
①关注和取关接口
②判断是否关注的接口

Controller实现:

@RestController
@RequestMapping("/follow")
public class FollowController {
    @Resource
    private IFollowService followService;
 
    @PutMapping("/{id}/{isFollow}")
    public Result follow(@PathVariable("id")Long followUserId,@PathVariable("isFollow")Boolean isFollow){
        return followService.follow(followUserId,isFollow);
    }
    @GetMapping("/or/not/{id}")
    public Result follow(@PathVariable("id")Long followUserId){
        return followService.isFollow(followUserId);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

ServiceImpl具体实现:

@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {
 
    @Override
    public Result follow(Long followUserId, Boolean isFollow) {
        // 获取登录用户
        Long userId = UserHolder.getUser().getId();
        //1.判断到底是关注还是取关
        if (isFollow) {
            //2.关注,新增数据
            Follow follow = new Follow();
            follow.setUserId(userId);
            follow.setFollowUserId(followUserId);
            save(follow);
        } else {
            //3.取关,删除
            remove(new QueryWrapper<Follow>().eq("user_id",userId).eq("follow_user_id",followUserId));
        }
        return Result.ok();
    }
 
    @Override
    public Result isFollow(Long followUserId) {
        //1.获取登录用户
        Long userId = UserHolder.getUser().getId();
        //2.查询是否关注
        Integer count = query().eq("user_id", userId).eq("follow_user_id", followUserId).count();
        //3.判断
        return Result.ok(count>0);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

共同关注

将下面的代码插入到UserController:

    @GetMapping("/{id}")
    public Result queryUserById(@PathVariable("id")Long userId){
        User user = userService.getById(userId);
        if(user == null){
            return Result.ok();
        }
        UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
        return Result.ok(userDTO);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

将下面的代码插入到BlogController

    @GetMapping("/of/user")
    public Result queryBlogByUserId(
            @RequestParam(value="current",defaultValue = "1")Integer current,
            @RequestParam("id")Long id){
        Page<Blog> page = blogService.query().eq("user_id", id).page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
        List<Blog> records = page.getRecords();
        return Result.ok(records);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

要实现共同关注的查找,可以考虑集合set的交集

在这里插入图片描述
需求:利用Redis中恰当的数据结构,实现共同关注功能。在博主个人页面展示出当前用户与博主的共同好友

①修改follow代码,将用户和关注的用户加入集合

    @Override
    public Result follow(Long followUserId, Boolean isFollow) {
        // 获取登录用户
        Long userId = UserHolder.getUser().getId();
        String key="follows:"+userId;
        //1.判断到底是关注还是取关
        if (isFollow) {
            //2.关注,新增数据
            Follow follow = new Follow();
            follow.setUserId(userId);
            follow.setFollowUserId(followUserId);
            boolean isSuccess = save(follow);
            //判断是否关注成功
            if(isSuccess){
                //如果关注成功,把关注用户的id放入redis的set集合
                stringRedisTemplate.opsForSet().add(key,followUserId.toString());
            }
 
        } else {
            //3.取关,删除
            boolean isSuccess = remove(new QueryWrapper<Follow>().eq("user_id", userId).eq("follow_user_id", followUserId));
            if (isSuccess) {
                stringRedisTemplate.opsForSet().remove(key, followUserId.toString());
            }
        }
        return Result.ok();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

②求共同关注

controller:

    @GetMapping("/commons/{id}")
    public Result followCommons(@PathVariable("id") Long id){
        return followService.followCommons(id);
    }
  • 1
  • 2
  • 3
  • 4

具体实现:

    @Resource
    private IUserService userService;
    @Override
    public Result followCommons(Long id) {
        //1.获取当前用户
        Long userId = UserHolder.getUser().getId();
        String key="follows:"+userId;
        //2.求交集
        String key2="follows:"+id;
        Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key, key2);
        if(intersect==null || intersect.isEmpty()){
            //无交集
            return Result.ok(Collections.emptyList());
        }
        //3.解析id集合
        List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
        //4.查询用户
        List<UserDTO> users = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());
        return Result.ok(users);
 
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

运行效果:
在这里插入图片描述

关注推送

Feed流实践方案分析

关注推送也叫作Feed流,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无线下拉刷新获取新的信息。
在这里插入图片描述

Feed流产品有两种常见的模式:

Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈。

  • 优点:信息全面,不会有缺失。并且实现也相对简单
  • 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低

智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣的信息来吸引用户

  • 优点:投喂用户感兴趣的信息,用户粘度很高,容易沉迷
  • 缺点:如果算法不精准,可能起到反作用

本例中的个人页面,是基于关注的好友来做Feed流,因此采用Timeline的模式。该模式的实现方案有三种:
①拉模式
②推模式
③推拉结合

拉模式:也叫读扩散

每次发送消息的时候都会加上一个时间戳,当用户打开个人页面,就会去关注的人的发件箱里拉取消息并按时间戳排序。缺点是耗时较长,优点是节省内存。
在这里插入图片描述

推模式:也叫作写扩散。

关注的博主发消息的时候会直接把消息推送到个人主页,并排好序。在本人阅读个人主页的时候,无需等待拉取信息等。优点是延时低,缺点是消耗内存。

在这里插入图片描述
推拉结合模式:也叫读写混合,兼具推和拉两种模式的优点。
对于普通人,发送的消息可以直接推送到粉丝的收件箱。
对于大V,发送的消息,对于活跃粉丝使用推模式,对于普通粉丝使用拉模式。

在这里插入图片描述
总结:
在这里插入图片描述

基于推模式实现关注推送功能

需求:

①修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱

②收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现

③查询收件箱数据时,可以实现分页查询

Feed流的滚动分页:

Feed流中的数据会不断更新,所以数据的角标也在变化,因此不能采用传统的分页模式。

在这里插入图片描述

接口:

    @PostMapping
    public Result saveBlog(@RequestBody Blog blog) {
        return blogService.saveBlog(blog);
    }
  • 1
  • 2
  • 3
  • 4

具体实现:

    @Resource
    private IFollowService followService;
    @Override
    public Result saveBlog(Blog blog) {
        //1.获取登录用户
        UserDTO user = UserHolder.getUser();
        blog.setUserId(user.getId());
        //2.保存探店笔记
        boolean isSuccess = save(blog);
        if(!isSuccess){
            return Result.fail("新增笔记失败!");
        }
        //3.查询笔记作者的所有粉丝
        List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();
        //4.返回id
        for(Follow follow:follows){
            //4.1 获取粉丝id
            Long userId = follow.getUserId();
            //4.2 推送
            String key="feed:"+userId;
            stringRedisTemplate.opsForZSet().add(key,blog.getId().toString(),System.currentTimeMillis());
        }
        return Result.ok(blog.getId());
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

实现关注推送页面的分页查询

需求:在个人主页的“关注”卡片中,查询并展示推送的Blog信息:
在这里插入图片描述
以z1为例复习相应命令:
在这里插入图片描述
①倒序按范围查询
在这里插入图片描述
②混乱插入(此时插入一条数据,继续查询)
在这里插入图片描述
③解决方法:可以按照分数查询

zrevrangebyscore key max min withscores limit offsest count

其中offset中0代表小于等于max的第一条,如果要实现小于max的第一条,则应将offset置1

count代表一次查询多少条(数量)

在这里插入图片描述
④存在的问题:如果value不一样,但是分数值一样,可能会查询到重复的数据

将m7的分数值改为6:
在这里插入图片描述
此时再按照记忆的最后一条分数去做查询,会发现数据重复:
在这里插入图片描述

⑤总结:

滚动分页查询参数:
max:当前时间戳或者上一次查询的最小时间戳
min:0
offset:0 或者 在上一次的结果中,与最小值一样的元素的个数
count:3(与前端约定好)

在这里插入图片描述

实现滚动查询

①定义滚动查询结果的实体类

@Data
public class ScrollResult {
    private List<?> list;
    private Long minTime;
    private Integer offset;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

②定义接口

    @GetMapping("/of/follow")
    public Result queryBlogOfFollow(@RequestParam("lastId")Long max,@RequestParam(value = "offset",defaultValue = "0")Integer offset){
        return blogService.queryBlogOfFollow(max,offset);
    }
  • 1
  • 2
  • 3
  • 4

③具体实现

    @Override
    public Result queryBlogOfFollow(Long max, Integer offset) {
        //1.获取当前用户
        Long userId = UserHolder.getUser().getId();
        //2.查询收件箱
        String key=FEED_KEY+userId;
        Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);
        //3.非空判断
        if(typedTuples==null || typedTuples.isEmpty()){
            return Result.ok();
        }
        //4.解析数据:blogId,minTime(时间戳),offset
        ArrayList<Object> ids = new ArrayList<>(typedTuples.size());
        long minTime=0;
        int os=1;
        for(ZSetOperations.TypedTuple<String> tuple:typedTuples) {
            //4.1 获取id
            ids.add(Long.valueOf(tuple.getValue()));
            //4.2 获取分数(时间戳)
            long time = tuple.getScore().longValue();
            if (time == minTime) {
                os++;
            } else {
                minTime = time;
                os = 1;
            }
        }
        //5.根据id查询blog
        String idStr = StrUtil.join(",", ids);
        List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();
        for (Blog blog : blogs) {
            //5.1 查询blog有关的用户
            queryBlogUser(blog);
            //5.2 查询blog是否被点赞
            isBlogLiked(blog);
        }
        //6.封装并返回
        ScrollResult r = new ScrollResult();
        r.setList(blogs);
        r.setOffset(os);
        r.setMinTime(minTime);
        return Result.ok(r);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

九、附近商户

在这里插入图片描述

GEO数据结构

在这里插入图片描述
案例:练习Redis的GEO功能

需求:

1.添加下面几条数据:
北京南站(116.378248 39.865275)
北京站(116.42803 39.903738)
北京西站(116.322287 39.893729)

在这里插入图片描述
查看redis客户端:
在这里插入图片描述
==>geo底层采用ZSET实现

2.计算北京西站到北京南站的距离
在这里插入图片描述

==>默认单位是米

3.搜索天安门(116.397904 39.909005)附近10km内的所有火车站,并按照距离升序排序
在这里插入图片描述

附近商户搜索

导入店铺数据到GEO

在这里插入图片描述

按照商户类型做分组,类型相同的商户作为同一组,以typeId为key存入同一个GEO集合中即可

在这里插入图片描述
导入数据(通过单元测试):

    @Test
    void loadShopData(){
        //1.查询店铺信息
        List<Shop> list = shopService.list();
        //2.把店铺按照typeId分组,id一致的放到一个集合
        Map<Long,List<Shop>> map=list.stream().collect(Collectors.groupingBy(Shop::getTypeId));
        //3.分批完成写入Redis
        for(Map.Entry<Long,List<Shop>> entry:map.entrySet()){
            //3.1 获取类型id
            Long typeId = entry.getKey();
            String key="shop:geo:"+typeId;
            //3.2 获取同类型的店铺集合
            List<Shop> value = entry.getValue();
            //3.3 写入redis
            List<RedisGeoCommands.GeoLocation<String>> locations=new ArrayList<>(value.size());
            for(Shop shop:value){
                locations.add(new RedisGeoCommands.GeoLocation<>(shop.getId().toString(),new Point(shop.getX(),shop.getY())));
            }
            stringRedisTemplate.opsForGeo().add(key,locations);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

实现附近商户功能

首先,项目中redis相关的依赖版本太低,不支持GEOSEARCH方法,需要更换版本


我们将5.3.7和1.3.9版本排除,观察代码:

然后手动引入新版本:

        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-redis</artifactId>
            <version>2.6.2</version>
        </dependency>
        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
            <version>6.1.6.RELEASE</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

接口:

    /**
     * 根据商铺类型分页查询商铺信息
     * @param typeId 商铺类型
     * @param current 页码
     * @return 商铺列表
     */
    @GetMapping("/of/type")
    public Result queryShopByType(
            @RequestParam("typeId") Integer typeId,
            @RequestParam(value = "current", defaultValue = "1") Integer current,
            @RequestParam(value = "x",required = false) Double x,
            @RequestParam(value = "y",required = false) Double y
    ) {
       return shopService.queryShopByType(typeId,current,x,y);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

具体实现:

    @Override
    public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
        //1.判断是否需要根据坐标查询
        if(x==null || y==null){
            Page<Shop> page = query().eq("type_id", typeId).page(new Page<>(current, DEFAULT_PAGE_SIZE));
            return Result.ok(page.getRecords());
        }
        //2.计算分页参数
        int from=(current-1)*DEFAULT_PAGE_SIZE;
        int end=current*DEFAULT_PAGE_SIZE;
        //3.查询redis、按照距离排序、分页 结果:shopId、distance
        String key=SHOP_GEO_KEY+typeId;
        GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo()
                .search(
                        key,
                        GeoReference.fromCoordinate(x, y),
                        new Distance(5000),
                        RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end)
                );
        //4.解析出id
        if(results==null){
            return Result.ok(Collections.emptyList());
        }
        List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();
        if(list.size()<=from){
            return Result.ok(Collections.emptyList());
        }
        //4.1 截取from ~ end的那部分
        List<Long> ids=new ArrayList<>(list.size());
        Map<String,Distance> distanceMap=new HashMap<>(list.size());
        list.stream().skip(from).forEach(result->{
            //4.2 获取店铺id
            String shopIdStr = result.getContent().getName();
            ids.add(Long.valueOf(shopIdStr));
            //4.3 获取距离
            Distance distance = result.getDistance();
            distanceMap.put(shopIdStr,distance);
        });
        //5.根据id查询店铺Shop
        String idStr = StrUtil.join(",", ids);
        List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();
        for(Shop shop:shops){
            shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
        }
        //6.返回
        return Result.ok(shops);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

效果:
在这里插入图片描述
注意,在stream流中跳过了一部分商铺,很有可能导致跳过以后没有商铺可查,从而出现问题,所以需要加上如下判断:

if(list.size()<=from){
    return Result.ok(Collections.emptyList());
}
  • 1
  • 2
  • 3

十、用户签到

在这里插入图片描述

BitMap用法

在这里插入图片描述
我们按月来统计用户签到信息,签到记录为1,未签到则记录为0
在这里插入图片描述

把每一个bit位对应当月的每一天,形成了映射关系。用0和1标示业务状态,这种思路就称为位图(BitMap)

Redis中就是利用String类型数据结构实现BitMap,因此最大上限是512M,转换为bit则是2^32个bit位。

在这里插入图片描述
练习:

BITFIELD bm1 GET u2 0含义:读取bm1,以无符号十进制的方式读取两位(u2),从第一位开始读取。返回值3代表读取的比特位是11

BITPOS bm1 0 :代表查找第一个0出现的位置

在这里插入图片描述

签到功能

需求:实现签到接口,将当前用户当天签到信息保存到Redis中

在这里插入图片描述
提示:因为BitMap底层是基于String数据结构,因此其操作也都封装在字符串相关操作中了。(Redis的字符串)

接口:

    @PostMapping("/sign")
    public Result sign(){
        return userService.sign();
    }
  • 1
  • 2
  • 3
  • 4

具体实现:

    @Override
    public Result sign() {
        //1.获取当前登录的用户
        Long userId = UserHolder.getUser().getId();
        //2.获取日期
        LocalDateTime now = LocalDateTime.now();
        //3.拼接key
        String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
        String key=USER_SIGN_KEY+userId+keySuffix;
        //4.获取今天是本月的第几天
        int dayOfMonth=now.getDayOfMonth();
        //5.写入Redis
        stringRedisTemplate.opsForValue().setBit(key,dayOfMonth-1,true);
        return Result.ok();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

测试:
在这里插入图片描述

签到统计

Q1:什么叫做连续签到天数?
A1:从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数。

Q2:如何得到本月到今天为止的所有签到数据?
BITFIELD key GET u[dayOfMonth] 0

Q3:如何从后向前遍历每个bit位?
与1做与运算,就能得到最后一个bit位
随后右移1位,下一个bit位就成为了最后一个bit位

案例:实现签到统计功能
需求:实现下面接口,统计当前用户截止当前时间在本月的连续签到天数

在这里插入图片描述
接口:

    @GetMapping("/sign/count")
    public Result signCount(){
        return userService.signCount();
    }
  • 1
  • 2
  • 3
  • 4

具体实现:

    @Override
    public Result signCount() {
        //1.获取当前登录用户
        Long userId = UserHolder.getUser().getId();
        //2.获取日期
        LocalDateTime now = LocalDateTime.now();
        //3.拼接key
        String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
        String key=USER_SIGN_KEY+userId+keySuffix;
        //4.获取今天是本月的第几天
        int dayOfMonth = now.getDayOfMonth();
        //5.获取本月截止今天为止的所有签到记录,返回的是一个十进制的数字
        List<Long> result = stringRedisTemplate.opsForValue().bitField(key,
                BitFieldSubCommands
                        .create()
                        .get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth))
                        .valueAt(0));
        if(result==null || result.isEmpty()){
            //没有任何签到结果
            return Result.ok(0);
        }
        Long num = result.get(0);
        if(num==null || num==0){
            return Result.ok(0);
        }
        //6.循环遍历
        int count=0;
        while(true){
            //6.1 让这个数字与1做与运算,得到数字的最后一个bit位
            if((num&1)==0){
                //如果为0,说明未签到,结束
                break;
            }else{
                //如果不为0,说明已签到,计数器+1
                count++;
            }
            num >>>= 1; // >>>代表无符号右移
        }
        return Result.ok(count);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

测试:返回值为3

在这里插入图片描述
实际上也确实是3个连续的1:

在这里插入图片描述
测试通过!

十一、UV统计

在这里插入图片描述

HyperLogLog用法

UV:全称Unique Visitor,也叫独立访问量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次

PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。

UV统计在服务端做会比较麻烦,因为要判断用户是否已经统计过了,要将统计过的用户信息保存,但是如果每个访问的用户都保存到Redis中,数据量会非常恐怖。

在这里插入图片描述
命令练习:
在这里插入图片描述

实现UV统计

我们直接利用单元测试,向HyperLogLog中添加100万条数据,看看内存占用和统计效果如何:

@Test
void testHyperLogLog(){
    String[] values=new String[1000];
    int j=0;
    for (int i = 0; i < 1000000; i++) {
        j=i%1000;
        values[j]="user_"+i;
        if(j==999){
            //发送到Redis
            stringRedisTemplate.opsForHyperLogLog().add("hl2",values);
        }
    }
    //统计数量
    Long count = stringRedisTemplate.opsForHyperLogLog().size("hl2");
    System.out.println("count="+count);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

运行结果:
在这里插入图片描述
内存消耗:

运行前内存:

在这里插入图片描述
运行后内存:
在这里插入图片描述
1929640-1505552= 424088

424088/1024/1024=0.4M

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/682868
推荐阅读
相关标签
  

闽ICP备14008679号