当前位置:   article > 正文

《黑马点评》简版笔记

《黑马点评》简版笔记

本套笔记基于黑马程序员《黑马点评》项目,供大家参考

目录

短信登入- Redis代替session的业务流程

Controller接口

Service层

拦截器

缓存

  Controller接口

缓存更新策略 

 Service层之更新

缓存穿透

 缓存雪崩 ​编辑

 缓存击穿

利用互斥锁解决缓存击穿问题

利用逻辑过期解决缓存击穿问题

优惠卷秒杀

 Redis实现全局唯一ID

 库存超卖问题

乐观锁

 一人一单问题

悲观锁

集群下的线程并发安全问题

分布式锁

 锁误删操作

锁释放过程发生阻塞

基于setnx所存在的问题

 基于Redission实现的分布式锁

基于Redis的秒杀优化

秒杀业务完整代码

达人探店

 点赞

好友关注

关注功能实现

共同关注

Feed流实现方案

​编辑

附件商铺

 导入店铺数据到GEO

实现附件商铺功能

用户签到

实现签到功能

实现统计连续签到功能

UV统计

短信登入- Redis代替session的业务流程

验证码我们使用手机号作为Key,value数据结构使用String;用户信息我们以随机token作为Key,Hash作为value的数据结构,并将随机token存储在浏览器前端每次请求都会携带这个随机的token.

Controller接口

主要实现客户向后端发送验证码,登入,根据/me接口查看当前用户是否有权限,进入界面

  1. /**
  2. * 发送手机验证码
  3. */
  4. @PostMapping("/code")
  5. public Result sendCode(@RequestParam("phone") String phone, HttpSession session) {
  6. // 发送短信验证码并保存验证码
  7. return userService.sendCode(phone, session);
  8. }
  9. /**
  10. * 登录功能
  11. * @param loginForm 登录参数,包含手机号、验证码;或者手机号、密码
  12. */
  13. @PostMapping("/login")
  14. public Result login(@RequestBody LoginFormDTO loginForm, HttpSession session){
  15. // 实现登录功能
  16. return userService.login(loginForm, session);
  17. }
  18. /**
  19. * 登入校验功能
  20. * @return 返回当前登入的用户
  21. */
  22. @GetMapping("/me")
  23. public Result me(){
  24. // 获取当前登录的用户并返回
  25. return Result.ok(ThreadLocalUtls.getUser());
  26. }

Service层

验证码匹配

  1. @Override
  2. public Result sendCode(String phone, HttpSession session) {
  3. // 1、判断手机号是否合法
  4. if (RegexUtils.isPhoneInvalid(phone)) {
  5. return Result.fail("手机号格式不正确");
  6. }
  7. // 2、手机号合法,生成验证码,并保存到Redis中
  8. String code = RandomUtil.randomNumbers(6);
  9. stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code,
  10. RedisConstants.LOGIN_CODE_TTL, TimeUnit.MINUTES);
  11. // 3、发送验证码
  12. log.info("验证码:{}", code);
  13. return Result.ok();
  14. }

实现用户登入功能:将UserDto对象存储在Redis中的时候,我们将对象转化为hash,其中hash的key和value值都需要是String类型,我们需要对对象的字段值进行处理转为String类型

设置Redis过期时间,我们需要完成在客户没有交互后的30钟内将其过期,而不是用户登入后30就将其强制过期

  1. /**
  2. * 用户登录
  3. *
  4. * @param loginForm
  5. * @param session
  6. * @return
  7. */
  8. @Override
  9. public Result login(LoginFormDTO loginForm, HttpSession session) {
  10. String phone = loginForm.getPhone();
  11. String code = loginForm.getCode();
  12. // 1、判断手机号是否合法
  13. if (RegexUtils.isPhoneInvalid(phone)) {
  14. return Result.fail("手机号格式不正确");
  15. }
  16. // 2、判断验证码是否正确
  17. String redisCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
  18. if (code == null || !code.equals(redisCode)) {
  19. return Result.fail("验证码不正确");
  20. }
  21. // 3、判断手机号是否是已存在的用户
  22. User user = this.getOne(new LambdaQueryWrapper<User>()
  23. .eq(User::getPhone, phone));
  24. if (Objects.isNull(user)) {
  25. // 用户不存在,需要注册
  26. user = createUserWithPhone(phone);
  27. }
  28. // 4、保存用户信息到Redis中,便于后面逻辑的判断(比如登录判断、随时取用户信息,减少对数据库的查询)
  29. UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
  30. // 将对象中字段全部转成string类型,StringRedisTemplate只能存字符串类型的数据
  31. Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
  32. CopyOptions.create().setIgnoreNullValue(true).
  33. setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()));
  34. String token = UUID.randomUUID().toString(true);
  35. String tokenKey = LOGIN_USER_KEY + token;
  36. stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);
  37. log.info(tokenKey);
  38. stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);
  39. // 登录成功,将用户信息保存到ThreadLocal中,方便后续登录校验
  40. //将用户信息保存到UserDTO,抹去了密码,实现了信息的安全
  41. ThreadLocalUtls.saveUser(BeanUtil.copyProperties(userMap, UserDTO.class));
  42. return Result.ok(token);
  43. }
  44. /**
  45. * 根据手机号创建用户并保存
  46. *
  47. * @param phone
  48. * @return
  49. */
  50. private User createUserWithPhone(String phone) {
  51. User user = new User();
  52. user.setPhone(phone);
  53. user.setNickName(SystemConstants.USER_NICK_NAME_PREFIX + RandomUtil.randomString(10));
  54. this.save(user);
  55. return user;
  56. }

拦截器

第一个拦截器,我们获取当前用户放在ThreadLocal,如果没有就放行,并刷新有效期,对所有路径有效。

第二个拦截器,我们只判断当前用户是否登入,ThreadLocal是否有该用户

判断当前用户是否已登入

  1. @Override
  2. public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
  3. // 判断当前用户是否已登录
  4. if (ThreadLocalUtls.getUser() == null){
  5. // 当前用户未登录,直接拦截
  6. response.setStatus(HttpStatus.HTTP_UNAUTHORIZED);
  7. return false;
  8. }
  9. // 用户存在,直接放行
  10. return true;
  11. }

刷新Redis中当前用户的有效时间,拦截器没有被注解标识,因此RedisTemplate需要通过构造器的方法进行注入

  1. public class RefreshTokenInterceptor implements HandlerInterceptor {
  2. private StringRedisTemplate stringRedisTemplate;
  3. public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {
  4. this.stringRedisTemplate = stringRedisTemplate;
  5. }
  6. @Override
  7. public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
  8. // 1、获取token,并判断token是否存在
  9. String token = request.getHeader("authorization");
  10. if (StrUtil.isBlank(token)){
  11. // token不存在,说明当前用户未登录,不需要刷新直接放行
  12. return true;
  13. }
  14. // 2、判断用户是否存在
  15. String tokenKey = LOGIN_USER_KEY + token;
  16. Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(tokenKey);
  17. if (userMap.isEmpty()){
  18. // 用户不存在,说明当前用户未登录,不需要刷新直接放行
  19. return true;
  20. }
  21. // 3、用户存在,则将用户信息保存到ThreadLocal中,方便后续逻辑处理,比如:方便获取和使用用户信息,Redis获取用户信息是具有侵入性的
  22. UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
  23. ThreadLocalUtls.saveUser(BeanUtil.copyProperties(userMap, UserDTO.class));
  24. // 4、刷新token有效期
  25. stringRedisTemplate.expire(token, LOGIN_USER_TTL, TimeUnit.MINUTES);
  26. return true;
  27. }
  28. }

 将拦截器进行配置,order值越小越先执行

  1. @Configuration
  2. public class WebMvcConfig implements WebMvcConfigurer {
  3. @Resource
  4. private StringRedisTemplate stringRedisTemplate;
  5. @Override
  6. public void addInterceptors(InterceptorRegistry registry) {
  7. // 添加登录拦截器
  8. registry.addInterceptor(new LoginInterceptor())
  9. // 设置放行请求
  10. .excludePathPatterns(
  11. "/user/code",
  12. "/user/login",
  13. "/blog/hot",
  14. "/shop/**",
  15. "/shop-type/**",
  16. "/upload/**",
  17. "/voucher/**",
  18. // knife4j接口文档请求
  19. "/doc.html",
  20. "/webjars/**",
  21. "/swagger-resources",
  22. "/v2/api-docs"
  23. ).order(1); // 优先级默认都是0,值越大优先级越低
  24. // 添加刷新token的拦截器
  25. registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);
  26. }
  27. }

缓存

我们将商品信息直接添加在Redis中,当我们在Redis中找不到的时候再将数据从数据库中更新到Redis中去,提高查询速度,但是会出现数据库和Redis的一致性问题

  Controller接口

  1. /**
  2. * 根据id查询商铺信息
  3. *
  4. * @param id 商铺id
  5. * @return 商铺详情数据
  6. */
  7. @GetMapping("/{id}")
  8. public Result queryShopById(@PathVariable("id") Long id) {
  9. return shopService.queryById(id);
  10. }
  11. /**
  12. * 更新商铺信息
  13. *
  14. * @param shop 商铺数据
  15. * @return 无
  16. */
  17. @PutMapping
  18. public Result updateShop(@RequestBody Shop shop) {
  19. return shopService.updateShop(shop);
  20. }

缓存更新策略 

 我们先更新数据后,删除缓存;当查询缓存的时候未命中,我们查询数据库,将数据写入缓存;我们一定是先更新数据库后,删除缓存,而不是删除缓存后更新数据库,降低数据的不一致性。并设置超时时间,保证数据的强一致性

最佳实践方案:

 Service层之更新

  1. @Transactional
  2. @Override
  3. public Result updateShop(Shop shop) {
  4. // 参数校验, 略
  5. // 1、更新数据库中的店铺数据
  6. boolean f = this.updateById(shop);
  7. if (!f) {
  8. // 缓存更新失败,抛出异常,事务回滚
  9. throw new RuntimeException("数据库更新失败");
  10. }
  11. // 2、删除缓存
  12. f = stringRedisTemplate.delete(CACHE_SHOP_KEY + shop.getId());
  13. if (!f) {
  14. // 缓存删除失败,抛出异常,事务回滚
  15. throw new RuntimeException("缓存删除失败");
  16. }
  17. return Result.ok();
  18. }

缓存穿透

当数据库和Redis中都没有数据的时候,这样缓存就不会生效,请求将直接打到数据库中

 我们修改Service层queryById(Long id)的处理逻辑:

  • 当我们从数据库查询对象为null的时候,我们向redis中放空字符串
  • 当我们从Redis中查询的时候,不为空说明是null或者是空字符串,如果是空字符串我们直接返回“商铺信息为空”,如果是null则从数据库中查询数据
  1. /**
  2. * 根据id查询数据(处理缓存穿透)
  3. *
  4. * @param keyPrefix key前缀
  5. * @param id 查询id
  6. * @param type 查询的数据类型
  7. * @param dbFallback 根据id查询数据的函数
  8. * @param timeout 有效期
  9. * @param unit 有效期的时间单位
  10. * @param <T>
  11. * @param <ID>
  12. * @return
  13. */
  14. public <T, ID> T handleCachePenetration(String keyPrefix, ID id, Class<T> type,
  15. Function<ID, T> dbFallback, Long timeout, TimeUnit unit) {
  16. String key = keyPrefix + id;
  17. // 1、从Redis中查询店铺数据
  18. String jsonStr = stringRedisTemplate.opsForValue().get(key);
  19. T t = null;
  20. // 2、判断缓存是否命中
  21. if (StrUtil.isNotBlank(jsonStr)) {
  22. // 2.1 缓存命中,直接返回店铺数据
  23. t = JSONUtil.toBean(jsonStr, type);
  24. return t;
  25. }
  26. // 2.2 缓存未命中,判断缓存中查询的数据是否是空字符串(isNotBlank把null和空字符串给排除了)
  27. if (Objects.nonNull(jsonStr)) {
  28. // 2.2.1 当前数据是空字符串(说明该数据是之前缓存的空对象),直接返回失败信息
  29. return null;
  30. }
  31. // 2.2.2 当前数据是null,则从数据库中查询店铺数据
  32. t = dbFallback.apply(id);
  33. // 4、判断数据库是否存在店铺数据
  34. if (Objects.isNull(t)) {
  35. // 4.1 数据库中不存在,缓存空对象(解决缓存穿透),返回失败信息
  36. this.set(key, "", CACHE_NULL_TTL, TimeUnit.SECONDS);
  37. return null;
  38. }
  39. // 4.2 数据库中存在,重建缓存,并返回店铺数据
  40. this.set(key, t, timeout, unit);
  41. return t;
  42. }

 缓存雪崩 

 缓存击穿

缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

 互斥锁:就是当线程1获得锁后,查询数据库重建缓冲数据,但是再这个时间内,其他线程需要处于一个等待的时间

逻辑过期:线程1获得锁后,委托另一个线程2完成重建缓冲数据和释放锁的工作,同时线程1和其他没有获得锁的线程,返回一个过期的数据。(过期时间作为value值放在Redis)

互斥锁方案更加强调数据的一致性,逻辑过期更加强调数据的可用性

利用互斥锁解决缓存击穿问题

我们使用Redis中的setnx lock 1,作为锁,当数据库中不存在的时候结果返回1,数据库已经存在则会返回0;缓存数据后,我们释放锁del lock

利用逻辑过期解决缓存击穿问题

这里我们先不考虑缓存穿透的问题,即未命中的时候是null还是空字符串

 我们首先需要创建如下的数据结构,记录过期时间

  1. @Data
  2. public class RedisData {
  3. /**
  4. * 过期时间
  5. */
  6. private LocalDateTime expireTime;
  7. /**
  8. * 缓存数据
  9. */
  10. private Object data;
  11. }
  1. /**
  2. * 根据id查询数据(处理缓存击穿)
  3. *
  4. * @param keyPrefix key前缀
  5. * @param id 查询id
  6. * @param type 查询的数据类型
  7. * @param dbFallback 根据id查询数据的函数
  8. * @param timeout 有效期
  9. * @param unit 有效期的时间单位
  10. * @param <T>
  11. * @param <ID>
  12. * @return
  13. */
  14. public <T, ID> T handleCacheBreakdown(String keyPrefix, ID id, Class<T> type,
  15. Function<ID, T> dbFallback, Long timeout, TimeUnit unit) {
  16. String key = keyPrefix + id;
  17. // 1、从Redis中查询店铺数据,并判断缓存是否命中
  18. String jsonStr = stringRedisTemplate.opsForValue().get(key);
  19. if (StrUtil.isBlank(jsonStr)) {
  20. // 1.1 缓存未命中,直接返回失败信息
  21. return null;
  22. }
  23. // 1.2 缓存命中,将JSON字符串反序列化未对象,并判断缓存数据是否逻辑过期
  24. RedisData redisData = JSONUtil.toBean(jsonStr, RedisData.class);
  25. // 这里需要先转成JSONObject再转成反序列化,否则可能无法正确映射Shop的字段
  26. JSONObject data = (JSONObject) redisData.getData();
  27. T t = JSONUtil.toBean(data, type);
  28. LocalDateTime expireTime = redisData.getExpireTime();
  29. if (expireTime.isAfter(LocalDateTime.now())) {
  30. // 当前缓存数据未过期,直接返回
  31. return t;
  32. }
  33. // 2、缓存数据已过期,获取互斥锁,并且重建缓存
  34. String lockKey = LOCK_SHOP_KEY + id;
  35. boolean isLock = tryLock(lockKey);
  36. if (isLock) {
  37. // 获取锁成功,开启一个子线程去重建缓存
  38. CACHE_REBUILD_EXECUTOR.submit(() -> {
  39. try {
  40. // 查询数据库
  41. T t1 = dbFallback.apply(id);
  42. // 将查询到的数据保存到Redis
  43. this.setWithLogicalExpire(key, t1, timeout, unit);
  44. } finally {
  45. unlock(lockKey);
  46. }
  47. });
  48. }
  49. // 3、获取锁失败,再次查询缓存,判断缓存是否重建(这里双检是有必要的)
  50. jsonStr = stringRedisTemplate.opsForValue().get(key);
  51. if (StrUtil.isBlank(jsonStr)) {
  52. // 3.1 缓存未命中,直接返回失败信息
  53. return null;
  54. }
  55. // 3.2 缓存命中,将JSON字符串反序列化未对象,并判断缓存数据是否逻辑过期
  56. redisData = JSONUtil.toBean(jsonStr, RedisData.class);
  57. // 这里需要先转成JSONObject再转成反序列化,否则可能无法正确映射Shop的字段
  58. data = (JSONObject) redisData.getData();
  59. t = JSONUtil.toBean(data, type);
  60. expireTime = redisData.getExpireTime();
  61. if (expireTime.isAfter(LocalDateTime.now())) {
  62. // 当前缓存数据未过期,直接返回
  63. return t;
  64. }
  65. // 4、返回过期数据
  66. return t;
  67. }

 获取锁和释放锁

  1. /**
  2. * 获取锁
  3. *
  4. * @param key
  5. * @return
  6. */
  7. private boolean tryLock(String key) {
  8. Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
  9. // 拆箱要判空,防止NPE
  10. return BooleanUtil.isTrue(flag);
  11. }
  12. /**
  13. * 释放锁
  14. *
  15. * @param key
  16. */
  17. private void unlock(String key) {
  18. stringRedisTemplate.delete(key);
  19. }

优惠卷秒杀

 Redis实现全局唯一ID

实现全局唯一ID我们希望满足以下条件:

唯一性、递增性、安全性、高可用、高性能

实现全局唯一ID,我们使用时间戳+序列号,序列号我们取Redis自增长的自增长,但是他的key我们采用业务名称+时间的方式,防止超过范围

  1. /**
  2. * @author ghp
  3. * @title 分布式ID生成器
  4. * @description
  5. */
  6. @Component
  7. public class RedisIdWorker {
  8. @Resource
  9. private StringRedisTemplate stringRedisTemplate;
  10. /**
  11. * 开始时间戳
  12. */
  13. private static final long BEGIN_TIMESTAMP = 1640995200;
  14. /**
  15. * 序列化位数
  16. */
  17. private static final int COUNT_BITS = 32;
  18. /**
  19. * 生成分布式ID
  20. * @param keyPrefix
  21. * @return
  22. */
  23. public long nextId(String keyPrefix){
  24. // 1、生成时间戳
  25. LocalDateTime now = LocalDateTime.now();
  26. long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
  27. long timestamp = nowSecond - BEGIN_TIMESTAMP;
  28. // 2、生成序列号
  29. // 以当天的时间戳为key,防止一直自增下去导致超时,这样每天的极限都是 2^{31}
  30. String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
  31. Long count = stringRedisTemplate.opsForValue().increment(ID_PREFIX + keyPrefix + ":" + date);
  32. // 3、拼接并返回,时间戳向左移动32
  33. return timestamp << COUNT_BITS | count;
  34. }
  35. public static void main(String[] args) {
  36. LocalDateTime time = LocalDateTime.of(2022, 1, 1, 0, 0, 0);
  37. long second = time.toEpochSecond(ZoneOffset.UTC);
  38. System.out.println("second = " + second);
  39. }
  40. }

 库存超卖问题

乐观锁

通过判断是否和之前的版本是否一致的乐观锁方法,成功率太低,我们需要对其进行改进,改为库存大于0的情况

set stock = stock - 1 where id = 10 and stock > 1

 一人一单问题

业务上,我们要加上一步,判断是否已经购买过优惠卷

 这样简单的逻辑判断还不行,一个用户可能忽然有多个线程同时进行访问,仍然可能会出现一个用户购买多张优惠卷的问题,这个时候我们要加上锁,保证一个用户只有一个线程在判断是否已经购买。又因为这里是插入操作,我们使用悲观锁(更新操作我们可以使用乐观锁)

悲观锁

  • 我们使用锁synchronized去控制transactional,而不是将synchronized放在transactional中,这是因为锁结束后事务仍未提交,会有线程认为没有订单而造成问题
  • 这里我们使用userId.toString()每次都会new一个新的String对象,因此,我们需要使用userId.toString().intern()返回字符串池中已有的字符串,没有再加入字符串池
  • 非事务的方法调用事务的方法,我们需要使用代理对象,添加代理对象如下

添加依赖 

  1. <dependency>
  2. <groupId>org.aspectj</groupId>
  3. <artifactId>aspectjweaver</artifactId>
  4. </dependency>

 在启动类上暴露代理对象

EnableAspectJAutoProxy(exposeProxy = true)

集群下的线程并发安全问题

 在一个JVM中会有一个锁监视器,保证只有一个线程,但是在集群情况下多个JVM就无法做到锁了,这个时候就会出现问题,这个时候我们就需要考虑分布式锁了

分布式锁

什么是分布式锁

 锁误删操作

 线程1在执行任务阻塞的时候,锁被超时误删;如果线程二拿到了线程,线程1执行完任务后就会删除锁,这个时候线程3又进来了,造成了线程2和线程3冲突了

解决:Redis的值设置为UUID+线程id,删除的时候判断是不是自己的线程

锁释放过程发生阻塞

线程在已经判断是自己的锁之后,在释放锁的过程中发生阻塞,这样,又会释放其他线程的锁 

 解决:这就要求我们将判断锁和释放锁封装在一起,使其具有原子性

在Resource目录下添加Lua脚本

编写Lua脚本

  1. ---
  2. --- Generated by EmmyLua(https://github.com/EmmyLua)
  3. --- Created by ghp.
  4. --- DateTime: 2023/7/13 16:19
  5. --- Description 释放锁(简单锁)
  6. ---
  7. -- 比较缓存中的线程标识与当前线程标识是否一致
  8. if (redis.call('get', KEYS[1]) == ARGV[1]) then
  9. -- 一致,直接删除
  10. return redis.call('del', KEYS[1])
  11. end
  12. -- 不一致,返回0
  13. return 0

加载Lua脚本

  1. /**
  2. * 加载Lua脚本
  3. */
  4. private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
  5. static {
  6. UNLOCK_SCRIPT = new DefaultRedisScript<>();
  7. UNLOCK_SCRIPT.setLocation(new ClassPathResource("lua/unlock.lua"));
  8. UNLOCK_SCRIPT.setResultType(Long.class);
  9. }

完整分布式锁如下(我们通过StringRedisTemplate和name初始化锁,就可以获取锁和释放锁):

  1. public class SimpleRedisLock implements Lock {
  2. /**
  3. * RedisTemplate
  4. */
  5. private StringRedisTemplate stringRedisTemplate;
  6. /**
  7. * 锁的名称
  8. */
  9. private String name;
  10. /**
  11. * key前缀
  12. */
  13. private static final String KEY_PREFIX = "lock:";
  14. /**
  15. * ID前缀
  16. */
  17. private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
  18. /**
  19. * 创建锁对象
  20. */
  21. public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
  22. this.stringRedisTemplate = stringRedisTemplate;
  23. this.name = name;
  24. }
  25. /**
  26. * 获取锁
  27. *
  28. * @param timeoutSec 超时时间
  29. * @return
  30. */
  31. @Override
  32. public boolean tryLock(long timeoutSec) {
  33. String threadId = ID_PREFIX + Thread.currentThread().getId() + "";
  34. // SET lock:name id EX timeoutSec NX
  35. Boolean result = stringRedisTemplate.opsForValue()
  36. .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
  37. return Boolean.TRUE.equals(result);
  38. }
  39. /**
  40. * 加载Lua脚本
  41. */
  42. private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
  43. static {
  44. UNLOCK_SCRIPT = new DefaultRedisScript<>();
  45. UNLOCK_SCRIPT.setLocation(new ClassPathResource("lua/unlock.lua"));
  46. UNLOCK_SCRIPT.setResultType(Long.class);
  47. }
  48. /**
  49. * 释放锁
  50. */
  51. @Override
  52. public void unlock() {
  53. // 执行lua脚本
  54. stringRedisTemplate.execute(
  55. UNLOCK_SCRIPT,
  56. Collections.singletonList(KEY_PREFIX + name),
  57. ID_PREFIX + Thread.currentThread().getId()
  58. );
  59. }
  60. }

基于setnx所存在的问题

 解决不可重入问题的办法很简单:

在value中增加一个字段count,当同一个线程重新获取锁的时候增加count;同一个线程释放锁的时候减count,count等于0的时候在Redis中释放锁

 基于Redission实现的分布式锁

添加依赖

  1. <dependency>
  2. <groupId>org.redisson</groupId>
  3. <artifactId>redisson-spring-boot-starter</artifactId>
  4. <version>3.11.5</version>
  5. </dependency>

主要步骤:1.创建锁 2.获取锁 3.释放锁

基于分布式锁创建订单如下

  1. /**
  2. * 创建订单
  3. *
  4. * @param voucherOrder
  5. */
  6. private void handleVoucherOrder(VoucherOrder voucherOrder) {
  7. Long userId = voucherOrder.getUserId();
  8. //1.声明一个锁
  9. RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId);
  10. //2.获取锁
  11. //参数1:最大等待时间(解决了不可重试的问题),锁自动释放时间,单位
  12. boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
  13. if (!isLock) {
  14. // 索取锁失败,重试或者直接抛异常(这个业务是一人一单,所以直接返回失败信息)
  15. log.error("一人只能下一单");
  16. return;
  17. }
  18. try {
  19. // 创建订单(使用代理对象调用,是为了确保事务生效)
  20. proxy.createVoucherOrder(voucherOrder);
  21. } finally {
  22. //3.释放锁
  23. lock.unlock();
  24. }
  25. }

具体创建订单过程仍然使用事务控制

  1. /**
  2. * 创建订单
  3. *
  4. * @param voucherOrder
  5. * @return
  6. */
  7. @Transactional
  8. @Override
  9. public void createVoucherOrder(VoucherOrder voucherOrder) {
  10. Long userId = voucherOrder.getUserId();
  11. Long voucherId = voucherOrder.getVoucherId();
  12. // 1、判断当前用户是否是第一单
  13. int count = this.count(new LambdaQueryWrapper<VoucherOrder>()
  14. .eq(VoucherOrder::getUserId, userId));
  15. if (count >= 1) {
  16. // 当前用户不是第一单
  17. log.error("当前用户不是第一单");
  18. return;
  19. }
  20. // 2、用户是第一单,可以下单,秒杀券库存数量减一
  21. boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
  22. .eq(SeckillVoucher::getVoucherId, voucherId)
  23. .gt(SeckillVoucher::getStock, 0)
  24. .setSql("stock = stock -1"));
  25. if (!flag) {
  26. throw new RuntimeException("秒杀券扣减失败");
  27. }
  28. // 3、将订单保存到数据库
  29. flag = this.save(voucherOrder);
  30. if (!flag) {
  31. throw new RuntimeException("创建秒杀券订单失败");
  32. }
  33. }

基于Redis的秒杀优化

之前,我们我们资格判断和创建订单操作通过一条线程来完成,多次和数据库发生交换,这就导致我们任务的处理时间过程较长,这里我们主线程从Redis中获取信息判断资格,后面Mysql中数据用操作让另一条线程来完成(相当于服务员在外面帮客户点单,厨师在后面炒菜)。不过这里资格判断仍需要通过Lua脚本完成原子性操作。 

在VoucherServiceImp中实现添加优惠卷信息

  1. /**
  2. * 新增秒杀券
  3. * @param voucher
  4. */
  5. @Override
  6. @Transactional
  7. public void addSeckillVoucher(Voucher voucher) {
  8. // 1、保存优惠券
  9. boolean result = save(voucher);
  10. if (!result){
  11. throw new RuntimeException("优惠券保存失败");
  12. }
  13. // 2、保存秒杀优惠券信息
  14. SeckillVoucher seckillVoucher = new SeckillVoucher();
  15. seckillVoucher.setVoucherId(voucher.getId());
  16. seckillVoucher.setStock(voucher.getStock());
  17. seckillVoucher.setBeginTime(voucher.getBeginTime());
  18. seckillVoucher.setEndTime(voucher.getEndTime());
  19. result = seckillVoucherService.save(seckillVoucher);
  20. if (!result){
  21. throw new RuntimeException("秒杀优惠券保存失败");
  22. }
  23. // 3、将秒杀券库存保存到Redis中(这给信息是可以持久化保存到Redis中,不想要的时候就进行手动删除)
  24. stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
  25. }

编写Lua脚本stream-seckill.lua

  1. ---
  2. --- Generated by EmmyLua(https://github.com/EmmyLua)
  3. --- Created by ghp.
  4. --- DateTime: 2023/7/16 18:22
  5. --- Description 判断库存是否充足 && 判断用户是否已下单 (以stream作为消息队列)
  6. ---
  7. -- 优惠券id
  8. local voucherId = ARGV[1];
  9. -- 用户id
  10. local userId = ARGV[2];
  11. -- 订单id
  12. local orderId = ARGV[3]
  13. -- 库存的key
  14. local stockKey = 'seckill:stock:' .. voucherId;
  15. -- 订单key
  16. local orderKey = 'seckill:order:' .. voucherId;
  17. -- 判断库存是否充足 get stockKey > 0 ?
  18. local stock = redis.call('GET', stockKey);
  19. if (tonumber(stock) <= 0) then
  20. -- 库存不足
  21. return 1;
  22. end
  23. -- 库存充足,判断用户是否已经下过单 SISMEMBER orderKey userId
  24. if (redis.call('SISMEMBER', orderKey, userId) == 1) then
  25. -- 用户下过单
  26. return 2;
  27. end
  28. -- 库存充足,没有下过单,扣库存、下单
  29. redis.call('INCRBY', stockKey, -1);
  30. redis.call('SADD', orderKey, userId);
  31. -- 发送消息到队列中,XDD stream.orders * key1 value1 key2 value2...
  32. redis.call('XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId);
  33. -- 返回0,表示下单成功
  34. return 0;

加载Lua脚本

  1. /**
  2. * 加载 判断秒杀券库存是否充足 并且 判断用户是否已下单 的Lua脚本
  3. */
  4. private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
  5. static {
  6. SECKILL_SCRIPT = new DefaultRedisScript<>();
  7. SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/stream-seckill.lua"));
  8. SECKILL_SCRIPT.setResultType(Long.class);
  9. }

异步处理订单问题

基于JVM的异步处理有一个很大的问题,那就是当线程数量变多的时候会出现内存溢出的情况,所以这里,我们使用Redis的消息队列完成异步处理。(基于Redis的单消费者模式最大问题是,当消息数量较多的时候会出现漏读消息的问题)

Stream的消费者组模式有以下的优点

消息可回溯是指在不同消费组都可以收到消息,但是同一个消费者组中都是竞争关系,在同一个消费组里只会被消费一次。总结:独立与JVM以外,消息持久化,消息的确认机制

在Redis中添加队列和消费者组,stream.orders队列,g1消费者组

XGROUP CREATE stream.orders g1 0 MKSTREAM

编写Lua脚本,我们在判断购买资格的时候直接将UserId,VoucherID,OrderId写入消费者组中,并将Lua脚本加载,这一步和上面是一样的,只不过我们增加将消息加入消费组这一步。

秒杀业务完整代码

第一步:seckillVoucher(Long voucherId)在Redis中完成对用户的秒杀,并将信息加入到Redis的队列中去。

第二步:VoucherOrderHandler()异步去队列接受消息,handlePendingList()处理对异常信息的处理,正常接收消息调用handleVoucherOrder()完成我们的订单任务,其会获取锁后调用代理对象通过createVoucherOrder()正真完成我们订单的创建。

基于分布式锁和异步处理订单,最终完成我们的秒杀业务,完整代码如下:

  1. package com.hmdp.service.impl;
  2. import cn.hutool.core.bean.BeanUtil;
  3. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  4. import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
  5. import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
  6. import com.hmdp.constants.RedisConstants;
  7. import com.hmdp.mapper.VoucherOrderMapper;
  8. import com.hmdp.model.dto.Result;
  9. import com.hmdp.model.entity.SeckillVoucher;
  10. import com.hmdp.model.entity.VoucherOrder;
  11. import com.hmdp.service.ISeckillVoucherService;
  12. import com.hmdp.service.IVoucherOrderService;
  13. import com.hmdp.utils.RedisIdWorker;
  14. import com.hmdp.utils.ThreadLocalUtls;
  15. import org.redisson.api.RLock;
  16. import org.redisson.api.RedissonClient;
  17. import org.springframework.aop.framework.AopContext;
  18. import org.springframework.core.io.ClassPathResource;
  19. import org.springframework.data.redis.connection.RedisConnection;
  20. import org.springframework.data.redis.connection.stream.*;
  21. import org.springframework.data.redis.core.StringRedisTemplate;
  22. import org.springframework.data.redis.core.script.DefaultRedisScript;
  23. import org.springframework.data.redis.core.script.RedisScript;
  24. import org.springframework.stereotype.Service;
  25. import org.springframework.transaction.annotation.Transactional;
  26. import javax.annotation.PostConstruct;
  27. import javax.annotation.Resource;
  28. import java.time.Duration;
  29. import java.util.*;
  30. import java.util.concurrent.ExecutorService;
  31. import java.util.concurrent.Executors;
  32. import java.util.concurrent.TimeUnit;
  33. import static com.hmdp.constants.RedisConstants.*;
  34. /**
  35. * <p>
  36. * 服务实现类
  37. * </p>
  38. *
  39. * @author ghp
  40. * @since 2021-12-22
  41. */
  42. @Service
  43. public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
  44. @Resource
  45. private ISeckillVoucherService seckillVoucherService;
  46. @Resource
  47. private RedisIdWorker redisIdWorker;
  48. @Resource
  49. private StringRedisTemplate stringRedisTemplate;
  50. @Resource
  51. private RedissonClient redissonClient;
  52. /**
  53. * VoucherOrderServiceImpl类的代理对象
  54. * 将代理对象的作用域进行提升,方面子线程取用
  55. */
  56. private IVoucherOrderService proxy;
  57. /**
  58. * 加载 判断秒杀券库存是否充足 并且 判断用户是否已下单 的Lua脚本
  59. */
  60. private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
  61. static {
  62. SECKILL_SCRIPT = new DefaultRedisScript<>();
  63. SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/stream-seckill.lua"));
  64. SECKILL_SCRIPT.setResultType(Long.class);
  65. }
  66. //===========================执行秒杀===========================================
  67. /**
  68. * 抢购秒杀券,实现对Redis中对资格判断,购买优惠卷的功能,并发送消息,通知去完成对数据库的操作
  69. *
  70. * @param voucherId
  71. * @return
  72. */
  73. @Transactional
  74. @Override
  75. public Result seckillVoucher(Long voucherId) {
  76. Long userId = ThreadLocalUtls.getUser().getId();
  77. long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER);
  78. // 1、执行Lua脚本,判断用户是否具有秒杀资格
  79. Long result = null;
  80. try {
  81. result = stringRedisTemplate.execute(
  82. SECKILL_SCRIPT,
  83. Collections.emptyList(),
  84. voucherId.toString(),
  85. userId.toString(),
  86. String.valueOf(orderId)
  87. );
  88. } catch (Exception e) {
  89. log.error("Lua脚本执行失败");
  90. throw new RuntimeException(e);
  91. }
  92. if (result != null && !result.equals(0L)) {
  93. // result为1表示库存不足,result为2表示用户已下单
  94. int r = result.intValue();
  95. return Result.fail(r == 2 ? "不能重复下单" : "库存不足");
  96. }
  97. // 2、result为0,下单成功,直接返回ok
  98. // 索取锁成功,创建代理对象,使用代理对象调用第三方事务方法, 防止事务失效
  99. IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
  100. this.proxy = proxy;
  101. return Result.ok();
  102. }
  103. //===================创建线程完成在Mysql中订单的创建=======================
  104. /**
  105. * 当前类初始化完毕就立马执行该方法在Redis中创建我们的队列
  106. */
  107. @PostConstruct
  108. private void init() {
  109. // 创建消息队列
  110. DefaultRedisScript<Long> mqScript = new DefaultRedisScript<>();
  111. mqScript.setLocation(new ClassPathResource("lua/stream-mq.lua"));
  112. mqScript.setResultType(Long.class);
  113. Long result = null;
  114. try {
  115. result = stringRedisTemplate.execute(mqScript,
  116. Collections.emptyList(),
  117. QUEUE_NAME,
  118. GROUP_NAME);
  119. } catch (Exception e) {
  120. log.error("队列创建失败", e);
  121. return;
  122. }
  123. int r = result.intValue();
  124. String info = r == 1 ? "队列创建成功" : "队列已存在";
  125. log.debug(info);
  126. // 执行线程任务
  127. SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
  128. }
  129. /**
  130. * 线程池
  131. */
  132. private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
  133. //============================从阻塞队列中获取订单信息进行处理=====================================
  134. /**
  135. * 线程任务: 不断从阻塞队列中获取订单信息,使用handleVoucherOrder(voucherOrder)处理订单消息
  136. * 当出现异常的时候,我们使用handlePendingList()方法从pendinglist中获取消息进行处理
  137. */
  138. private class VoucherOrderHandler implements Runnable {
  139. @Override
  140. public void run() {
  141. while (true) {
  142. try {
  143. // 1、从消息队列中获取订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 1000 STREAMS stream.orders >
  144. List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read(
  145. Consumer.from("g1","c1"),//g1消费组的名称,c1消费者名称
  146. //Consumer.from(GROUP_NAME, "c1"),
  147. StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)),//读取一个,阻塞1秒钟
  148. StreamOffset.create(QUEUE_NAME, ReadOffset.lastConsumed())//读取队列stream.orders,ReadOffset.lastConsumed()读取最新未读取的队列
  149. );
  150. // 2、判断消息获取是否成功
  151. if (messageList == null || messageList.isEmpty()) {
  152. // 2.1 消息获取失败,说明没有消息,进入下一次循环获取消息
  153. continue;
  154. }
  155. // 3、消息获取成功,可以下单
  156. // 将消息转成VoucherOrder对象
  157. MapRecord<String, Object, Object> record = messageList.get(0);//获取的是消息id,字段名称,字段值
  158. Map<Object, Object> messageMap = record.getValue();//获取字段名称和字段值
  159. //将结果转为voucher对象,注意:之前我们在Lua脚本中定义的字段名称和voucher字段名称刚好对应
  160. VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true);
  161. handleVoucherOrder(voucherOrder);
  162. // 4、ACK确认 SACK stream.orders g1 id(队列名称,消费组,消息id)
  163. stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId());
  164. } catch (Exception e) {
  165. log.error("处理订单异常", e);
  166. //如果处理异常,我们就要尝试去PendingList中将消息取出来进行处理
  167. // 处理异常消息
  168. handlePendingList();
  169. }
  170. }
  171. }
  172. }
  173. //==========================使用 handlePendingList对未处理成功的信息进行处理=================================
  174. private void handlePendingList() {
  175. while (true) {
  176. try {
  177. // 1、从pendingList中获取订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 1000 STREAMS streams.order 0
  178. List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read(
  179. Consumer.from(GROUP_NAME, "c1"),
  180. StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)),
  181. StreamOffset.create(QUEUE_NAME, ReadOffset.from("0"))
  182. );
  183. // 2、判断pendingList中是否有效性
  184. if (messageList == null || messageList.isEmpty()) {
  185. // 2.1 pendingList中没有消息,直接结束循环
  186. break;
  187. }
  188. // 3、pendingList中有消息
  189. // 将消息转成VoucherOrder对象
  190. MapRecord<String, Object, Object> record = messageList.get(0);
  191. Map<Object, Object> messageMap = record.getValue();
  192. VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true);
  193. handleVoucherOrder(voucherOrder);
  194. // 4、ACK确认 SACK stream.orders g1 id
  195. stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId());
  196. } catch (Exception e) {
  197. log.error("处理订单异常", e);
  198. // 这里不用调自己,直接就进入下一次循环,再从pendingList中取,这里只需要休眠一下,防止获取消息太频繁
  199. try {
  200. Thread.sleep(20);
  201. } catch (InterruptedException ex) {
  202. log.error("线程休眠异常", ex);
  203. }
  204. }
  205. }
  206. }
  207. //========================获取锁并调用createVoucherOrder()完成对订单的创建===========================
  208. /**
  209. * 创建订单
  210. *
  211. * @param voucherOrder
  212. */
  213. private void handleVoucherOrder(VoucherOrder voucherOrder) {
  214. Long userId = voucherOrder.getUserId();
  215. //1.声明一个锁
  216. RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId);
  217. //2.获取锁
  218. //参数1:最大等待时间(解决了不可重试的问题),锁自动释放时间,单位
  219. boolean isLock = lock.tryLock();
  220. if (!isLock) {
  221. // 索取锁失败,重试或者直接抛异常(这个业务是一人一单,所以直接返回失败信息)
  222. log.error("一人只能下一单");
  223. return;
  224. }
  225. try {
  226. // 创建订单(使用代理对象调用,是为了确保事务生效)
  227. proxy.createVoucherOrder(voucherOrder);
  228. } finally {
  229. //3.释放锁
  230. lock.unlock();
  231. }
  232. }
  233. /**
  234. * 创建订单
  235. *
  236. * @param voucherOrder
  237. * @return
  238. */
  239. @Transactional
  240. @Override
  241. public void createVoucherOrder(VoucherOrder voucherOrder) {
  242. Long userId = voucherOrder.getUserId();
  243. Long voucherId = voucherOrder.getVoucherId();
  244. // 1、判断当前用户是否是第一单
  245. int count = this.count(new LambdaQueryWrapper<VoucherOrder>()
  246. .eq(VoucherOrder::getUserId, userId));
  247. if (count >= 1) {
  248. // 当前用户不是第一单
  249. log.error("当前用户不是第一单");
  250. return;
  251. }
  252. // 2、用户是第一单,可以下单,秒杀券库存数量减一
  253. boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
  254. .eq(SeckillVoucher::getVoucherId, voucherId)
  255. .gt(SeckillVoucher::getStock, 0)
  256. .setSql("stock = stock -1"));
  257. if (!flag) {
  258. throw new RuntimeException("秒杀券扣减失败");
  259. }
  260. // 3、将订单保存到数据库
  261. flag = this.save(voucherOrder);
  262. if (!flag) {
  263. throw new RuntimeException("创建秒杀券订单失败");
  264. }
  265. }
  266. }

达人探店

@TableField表示该字段是后面添加进去的

 点赞

为了同时满足判断元素是否在集合内,以及元素的排序,我们在Redis中使用scored_set类型

  1. ZADD z1 1 m1 2 m2 3 m3 //m1 m2 m3 元素的值分别为1 2 3
  2. ZSCORE z1 m1 // 得到结果1
  3. ZRANGE z1 0 4 //查前5

分析:1.我们使用scored_set类型,按用户的点赞时间作为分数进行排序 2.用户每次点赞我们在数据库设置blog的liked加1 3.在blog实体类中增加字段isLiked表示当前用户是否点赞了博客

判断当前blog是否被点赞,如果被点赞在blog实体类标识被点赞

  1. /**
  2. * 判断当前用户是否点赞该博客
  3. */
  4. private void isBlogLiked(Blog blog) {
  5. UserDTO user = ThreadLocalUtls.getUser();
  6. if (Objects.isNull(user)) {
  7. // 当前用户未登录,无需查询点赞
  8. return;
  9. }
  10. Long userId = user.getId();
  11. String key = BLOG_LIKED_KEY + blog.getId();
  12. Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
  13. //在blog中标识被当前用户已经点赞
  14. blog.setIsLike(Objects.nonNull(score));
  15. }

实现点赞功能,用户没点赞过则点赞,用户点赞了则取消点赞

  1. /**
  2. * 点赞
  3. *
  4. * @param id
  5. * @return
  6. */
  7. @Override
  8. public Result likeBlog(Long id) {
  9. // 1、判断用户是否点赞
  10. Long userId = ThreadLocalUtls.getUser().getId();
  11. String key = BLOG_LIKED_KEY + id;
  12. // 在score_set中查用户的分数,如果查到了说明存在
  13. Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
  14. boolean result;
  15. if (score == null) {
  16. // 1.1 用户未点赞,点赞数+1
  17. result = this.update(new LambdaUpdateWrapper<Blog>()
  18. .eq(Blog::getId, id)
  19. .setSql("liked = liked + 1"));
  20. if (result) {
  21. // 数据库更新成功,更新缓存 zadd key value score,分数我们按时间戳去存
  22. stringRedisTemplate.opsForZSet().add(key, userId.toString(), System.currentTimeMillis());
  23. }
  24. } else {
  25. // 1.2 用户已点赞,点赞数-1
  26. result = this.update(new LambdaUpdateWrapper<Blog>()
  27. .eq(Blog::getId, id)
  28. .setSql("liked = liked - 1"));
  29. if (result) {
  30. // 数据更新成功,更新缓存 zrem key value
  31. stringRedisTemplate.opsForZSet().remove(key, userId.toString());
  32. }
  33. }
  34. return Result.ok();
  35. }

查询点赞前5的用户,我们先从Redis中查询前5的用户id,再用id从Mysql中查询,注意id的顺序必须是降序

  1. /**
  2. * 查询Top5的点赞用户 zrange key 0 4
  3. * @param id
  4. * @return
  5. */
  6. @Override
  7. public Result queryBlogLikes(Long id) {
  8. // 查询Top5的点赞用户 zrange key 0 4
  9. String key = BLOG_LIKED_KEY + id;
  10. Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
  11. if (top5 == null || top5.isEmpty()) {
  12. return Result.ok(Collections.emptyList());
  13. }
  14. List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
  15. String idStr = StrUtil.join(",", ids);
  16. // 根据id降序排序 select * from tb_user where id in(1,5) order by field(id, 1, 5)
  17. List<UserDTO> userDTOList = userService.list(new LambdaQueryWrapper<User>()
  18. .in(User::getId, ids)
  19. .last("order by field (id," + idStr + ")"))
  20. .stream()
  21. .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
  22. .collect(Collectors.toList());
  23. return Result.ok(userDTOList);
  24. }

好友关注

关注功能实现

用户和博主属于多对多的关系,因此我们建立一张多对多的关系表

关注和取消关注,我们同时在Redis和关系表中取消增加用户信息

  1. /**
  2. * 关注用户
  3. *
  4. * @param followUserId 关注用户的id
  5. * @param isFollow 是否已关注
  6. * @return
  7. */
  8. @Override
  9. public Result follow(Long followUserId, Boolean isFollow) {
  10. Long userId = ThreadLocalUtls.getUser().getId();
  11. String key = FOLLOW_KEY + userId;
  12. if (isFollow) {
  13. // 用户未关注,则关注
  14. Follow follow = new Follow();
  15. follow.setUserId(userId);
  16. follow.setFollowUserId(followUserId);
  17. boolean isSuccess = this.save(follow);
  18. if (isSuccess) {
  19. // 用户关注信息保存成功,把关注的用户id放入Redis的Set集合中,
  20. stringRedisTemplate.opsForSet().add(key, followUserId.toString());
  21. }
  22. } else {
  23. // 用户已关注,删除关注信息
  24. boolean isSuccess = this.remove(new LambdaQueryWrapper<Follow>()
  25. .eq(Follow::getUserId, userId)
  26. .eq(Follow::getFollowUserId, followUserId));
  27. if (isSuccess) {
  28. stringRedisTemplate.opsForSet().remove(key, followUserId.toString());
  29. }
  30. }
  31. return Result.ok();
  32. }

直接从数据中判断用户是否关注博主

  1. /**
  2. * 是否关注用户
  3. *
  4. * @param followUserId 关注用户的id
  5. * @return
  6. */
  7. @Override
  8. public Result isFollow(Long followUserId) {
  9. Long userId = ThreadLocalUtls.getUser().getId();
  10. int count = this.count(new LambdaQueryWrapper<Follow>()
  11. .eq(Follow::getUserId, userId)
  12. .eq(Follow::getFollowUserId, followUserId));
  13. return Result.ok(count > 0);
  14. }

共同关注

利用Redis中SINTER实现共同关注

  1. SADD s1 m1 m2
  2. SADD s2 m2 m3
  3. SINTER s1 s2 //返回m2

  1. /**
  2. * 查询共同关注
  3. *
  4. * @param id
  5. * @return
  6. */
  7. @Override
  8. public Result followCommons(Long id) {
  9. Long userId = ThreadLocalUtls.getUser().getId();
  10. String key1 = FOLLOW_KEY + userId;
  11. String key2 = FOLLOW_KEY + id;
  12. // 查询当前用户与目标用户的共同关注对象
  13. Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key1, key2);
  14. if (Objects.isNull(intersect) || intersect.isEmpty()) {
  15. return Result.ok(Collections.emptyList());
  16. }
  17. List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
  18. // 查询共同关注的用户信息
  19. List<UserDTO> userDTOList = userService.listByIds(ids).stream()
  20. .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
  21. .collect(Collectors.toList());
  22. return Result.ok(userDTOList);
  23. }

Feed流实现方案

大V对普通粉丝使用拉的方式先发到发件箱里,他们需要的时候再将消息推送给普通用户,对于活跃用户使用推的方式,直接将消息推送到他们的收件箱;对于普通人,直接使用推的方式,直接将消息发送到他们的收件箱 

 使用sorted_set和list都可以实现上面的需求但是,但是这里我们需要按滚动分页查询,即使有用户发送了新的消息,我们都从上一次分页的最后一条开始查询,所以这里我们直接采用sorted_set实现该功能

  1. /**
  2. * 保存探店笔记
  3. *
  4. * @param blog
  5. * @return
  6. */
  7. @Override
  8. public Result saveBlog(Blog blog) {
  9. Long userId = ThreadLocalUtls.getUser().getId();
  10. blog.setUserId(userId);
  11. // 保存探店笔记
  12. boolean isSuccess = this.save(blog);
  13. if (!isSuccess) {
  14. return Result.fail("笔记保存失败");
  15. }
  16. // 查询笔记作者的所有粉丝,找到FollowId与等于作者UserId的所有用户就可以了
  17. List<Follow> follows = followService.list(new LambdaQueryWrapper<Follow>()
  18. .eq(Follow::getFollowUserId, userId));
  19. // 将笔记推送给所有的粉丝
  20. for (Follow follow : follows) {
  21. // 获取粉丝的id
  22. Long id = follow.getUserId();
  23. // 推送笔记
  24. String key = FEED_KEY + id;
  25. //将blogId推送到每一个粉丝的收件箱,粉丝收件箱的key=FEED_KEY + id
  26. stringRedisTemplate.opsForZSet().add(key, blog.getId().toString(), System.currentTimeMillis());
  27. }
  28. return Result.ok(blog.getId());
  29. }

滚动分页查询的实现

参数:5~0表示范围,1表示小于等于5的偏移量,3表示输出3个

这样输出有一个问题如果sorted_set末尾有两个相同的数,偏移量仍是1的话就会出现问题

可以看到offset仍是1的话,6会重复出现,所以offset应该是上一次查询最后一个数的个数

  1. /**
  2. * 关注推送页面的分页查询
  3. */
  4. @GetMapping("/of/follow")
  5. public Result queryBlogOfFollow(@RequestParam("lastId") Long max,
  6. @RequestParam(value = "offset", defaultValue = "0") Integer offset) {
  7. return blogService.queryBlogOfFollow(max, offset);
  8. }

 注意:这里使用blogId查询blog,我们需要使用自己的顺序

  1. /**
  2. * 关注推送页面的笔记分页
  3. *
  4. * @param max
  5. * @param offset
  6. * @return
  7. */
  8. @Override
  9. public Result queryBlogOfFollow(Long max, Integer offset) {
  10. // 1、查询收件箱
  11. Long userId = ThreadLocalUtls.getUser().getId();
  12. String key = FEED_KEY + userId;
  13. // ZREVRANGEBYSCORE key Max Min LIMIT offset count
  14. Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
  15. .reverseRangeByScoreWithScores(key, 0, max, offset, 2);
  16. // 2、判断收件箱中是否有数据
  17. if (typedTuples == null || typedTuples.isEmpty()) {
  18. return Result.ok();
  19. }
  20. // 3、收件箱中有数据,则解析数据: blogId、minTime(时间戳)、offset
  21. List<Long> ids = new ArrayList<>(typedTuples.size());
  22. long minTime = 0; // 记录当前最小值
  23. int os = 1; // 偏移量offset,用来计数
  24. for (ZSetOperations.TypedTuple<String> tuple : typedTuples) { // 5 4 4 2 2
  25. // 获取id
  26. ids.add(Long.valueOf(tuple.getValue()));
  27. // 获取分数(时间戳)
  28. long time = tuple.getScore().longValue();
  29. if (time == minTime) {
  30. // 当前时间等于最小时间,偏移量+1
  31. os++;
  32. } else {
  33. // 当前时间不等于最小时间,重置
  34. minTime = time;
  35. os = 1;
  36. }
  37. }
  38. // 4、根据id查询blog(使用in查询的数据是默认按照id升序排序的,这里需要使用我们自己指定的顺序排序)
  39. String idStr = StrUtil.join(",", ids);
  40. List<Blog> blogs = this.list(new LambdaQueryWrapper<Blog>().in(Blog::getId, ids)
  41. .last("ORDER BY FIELD(id," + idStr + ")"));
  42. // 设置blog相关的用户数据,是否被点赞等属性值
  43. for (Blog blog : blogs) {
  44. // 查询blog有关的用户
  45. queryUserByBlog(blog);
  46. // 查询blog是否被点赞
  47. isBlogLiked(blog);
  48. }
  49. // 5、封装并返回
  50. ScrollResult scrollResult = new ScrollResult();
  51. scrollResult.setList(blogs);
  52. scrollResult.setOffset(os);
  53. scrollResult.setMinTime(minTime);
  54. return Result.ok(scrollResult);
  55. }

附件商铺

1.添加数据,并计算距离

 2.搜索附件10KM火车站

 导入店铺数据到GEO

我们按店铺类型将店铺的坐标信息导入Redis中,我们将SHOP_GEO_KEY+店铺类型id作为我们的key,店铺的坐标+id作为我们的value

  1. /**
  2. * 预热店铺数据,按照typeId进行分组,用于实现附近商户搜索功能
  3. */
  4. @Test
  5. public void loadShopListToCache() {
  6. // 1、获取店铺数据
  7. List<Shop> shopList = shopService.list();
  8. // 2、根据 typeId 进行分类
  9. // Map<Long, List<Shop>> shopMap = new HashMap<>();
  10. // for (Shop shop : shopList) {
  11. // Long shopId = shop.getId();
  12. // if (shopMap.containsKey(shopId)){
  13. // // 已存在,添加到已有的集合中
  14. // shopMap.get(shopId).add(shop);
  15. // }else{
  16. // // 不存在,直接添加
  17. // shopMap.put(shopId, Arrays.asList(shop));
  18. // }
  19. // }
  20. // 使用 Lambda 表达式,更加优雅(优雅永不过时)
  21. Map<Long, List<Shop>> shopMap = shopList.stream()
  22. .collect(Collectors.groupingBy(Shop::getTypeId));
  23. // 3、将分好类的店铺数据写入redis
  24. for (Map.Entry<Long, List<Shop>> shopMapEntry : shopMap.entrySet()) {
  25. // 3.1 获取 typeId
  26. Long typeId = shopMapEntry.getKey();
  27. List<Shop> values = shopMapEntry.getValue();
  28. // 3.2 将同类型的店铺的写入同一个GEO ( GEOADD key 经度 维度 member )
  29. String key = SHOP_GEO_KEY + typeId;
  30. // 方式一:单个写入(这种方式,一个请求一个请求的发送,十分耗费资源,我们可以进行批量操作)
  31. // for (Shop shop : values) {
  32. // stringRedisTemplate.opsForGeo().add(key, new Point(shop.getX(), shop.getY()),
  33. // shop.getId().toString());
  34. // }
  35. // 方式二:批量写入
  36. List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>();
  37. for (Shop shop : values) {
  38. locations.add(new RedisGeoCommands.GeoLocation<>(shop.getId().toString(),
  39. new Point(shop.getX(), shop.getY())));
  40. }
  41. stringRedisTemplate.opsForGeo().add(key, locations);
  42. }
  43. }

实现附件商铺功能

 接口

  1. /**
  2. * 根据商铺类型分页查询商铺信息
  3. *
  4. * @param typeId 商铺类型
  5. * @param current 页码
  6. * @return 商铺列表
  7. */
  8. @GetMapping("/of/type")
  9. public Result queryShopByType(
  10. @RequestParam("typeId") Integer typeId,
  11. @RequestParam(value = "current", defaultValue = "1") Integer current,
  12. @RequestParam(value = "x", required = false) Double x,
  13. @RequestParam(value = "y", required = false) Double y) {
  14. return shopService.queryShopByType(typeId, current, x, y);
  15. }

实现附件商铺

当没有传入x y的时候,我们直接从数据库中查询数据,当有x y传入的时候我们再从Redis中查询数据,但是Redis只会返回0~end的数据,我们需要手动截取from~end的数据

  1. /**
  2. * 分页查询店铺数据
  3. *
  4. * @param typeId 店铺类型ID
  5. * @param current 页码
  6. * @param x 坐标x轴
  7. * @param y 坐标y轴
  8. * @return
  9. */
  10. @Override
  11. public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
  12. // 1、判断是否需要根据坐标查询
  13. if (x == null || y == null) {
  14. // 不需要坐标查询,按数据库查询
  15. Page<Shop> page = this.page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE),
  16. new LambdaQueryWrapper<Shop>().eq(Shop::getTypeId, typeId));
  17. // 返回数据
  18. return Result.ok(page.getRecords());
  19. }
  20. // 2、需要查询坐标,则需要到Redis中进行查询
  21. // 2.1 计算分页参数
  22. int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;
  23. int end = current * SystemConstants.DEFAULT_PAGE_SIZE;
  24. // 2.2 根据经纬度从redis中查询店铺数据,并按照距离排序、分页
  25. String key = SHOP_GEO_KEY + typeId;
  26. // GEOSEARCH key BYLONLAT x y BYRADIUS 10 WITHDISTANCE 结果: shopId、distance
  27. GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate
  28. .opsForGeo().search(
  29. key,
  30. GeoReference.fromCoordinate(x, y),
  31. // 默认搜索范围是5km,默认单位m
  32. new Distance(5000),
  33. // 查询从0end,所以后面还需要截取fromend之间的数据
  34. RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end));
  35. // 4、解析出店铺id
  36. // 4.1 健壮性判断,防止skip出现NPE
  37. if (results == null) {
  38. // 缓存中不存在店铺数据
  39. return Result.ok(Collections.emptyList());
  40. }
  41. // 4.2 缓存中存在店铺数据,则需要截取 from ~ end的部分,需要判断fromend之间的数据是否存在
  42. List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();
  43. if (list.size() <= from) {
  44. // 当前数据比起始索引还要小,说明没有我们要查询页的数据
  45. return Result.ok(Collections.emptyList());
  46. }
  47. // 4.3 fromend之间的数据存在,则解析出店铺id
  48. List<Long> ids = new ArrayList<>(list.size());
  49. Map<String, Distance> distanceMap = new HashMap<>(list.size());
  50. // skip表示直接从第from个数据开始遍历
  51. list.stream().skip(from).forEach(result -> {
  52. // 获取店铺id
  53. String shopIdStr = result.getContent().getName();
  54. ids.add(Long.valueOf(shopIdStr));
  55. // 获取店铺距离
  56. Distance distance = result.getDistance();
  57. distanceMap.put(shopIdStr, distance);
  58. });
  59. // 5、根据店铺ids查询出店铺数据
  60. String idStr = StrUtil.join(",", ids);
  61. // 5.1 查寻出所有符合条件的店铺数据(这里需要利用ORDER BY FIELD确保id的有序性)
  62. List<Shop> shopList = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();
  63. // List<Shop> shopList = this.list(new LambdaQueryWrapper<Shop>()
  64. // .in(Shop::getId, ids)
  65. // .last("ORDER BY FIELD(id," + idStr + ")"));
  66. // 5.2 为店铺的距离属性进行赋值
  67. for (Shop shop : shopList) {
  68. shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
  69. }
  70. // 6、返回
  71. return Result.ok(shopList);
  72. }

用户签到

使用Redis中的BitMap类型统计签到

 

 BITFIELD bm1 GET u2 0  u表示按无符号 2表示读取2个数 0表示从0位开始读 返回10进制数对应的二进制就是11

BITPOS bm1 0 表示第一次出现0的位置 BITPOS bm1 1 表示第一次出现1的位置

实现签到功能

  1. /**
  2. * 用户签到
  3. *
  4. * @return
  5. */
  6. @Override
  7. public Result sign() {
  8. // 获取当前登录用户
  9. Long userId = ThreadLocalUtls.getUser().getId();
  10. // 获取日期
  11. LocalDateTime now = LocalDateTime.now();
  12. // 拼接key
  13. String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
  14. String key = USER_SIGN_KEY + userId + keySuffix;
  15. // 获取今天是本月的第几天
  16. int dayOfMonth = now.getDayOfMonth();
  17. // 写入Redis SETBIT key offset 1
  18. stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1, true);
  19. return Result.ok();
  20. }

实现统计连续签到功能

  1. /**
  2. * 记录连续签到的天数
  3. *
  4. * @return
  5. */
  6. @Override
  7. public Result signCount() {
  8. // 1、获取签到记录
  9. // 获取当前登录用户
  10. Long userId = ThreadLocalUtls.getUser().getId();
  11. // 获取日期
  12. LocalDateTime now = LocalDateTime.now();
  13. // 拼接key
  14. String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
  15. String key = USER_SIGN_KEY + userId + keySuffix;
  16. // 获取今天是本月的第几天
  17. int dayOfMonth = now.getDayOfMonth();
  18. // 获取本月截止今天为止的所有的签到记录,返回的是一个十进制的数字 BITFIELD sign:5:202203 GET u14 0
  19. List<Long> result = stringRedisTemplate.opsForValue().bitField(
  20. key,
  21. BitFieldSubCommands.create()
  22. .get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
  23. );
  24. // 2、判断签到记录是否存在
  25. if (result == null || result.isEmpty()) {
  26. // 没有任何签到结果
  27. return Result.ok(0);
  28. }
  29. // 3、获取本月的签到数(List<Long>是因为BitFieldSubCommands是一个子命令,可能存在多个返回结果,这里我们知识使用了Get
  30. // 可以明确只有一个返回结果,即为本月的签到数,所以这里就可以直接通过get(0)来获取)
  31. Long num = result.get(0);
  32. if (num == null || num == 0) {
  33. // 二次判断签到结果是否存在,让代码更加健壮
  34. return Result.ok(0);
  35. }
  36. // 4、循环遍历,获取连续签到的天数(从当前天起始)
  37. int count = 0;
  38. while (true) {
  39. // 让这个数字与1做与运算,得到数字的最后一个bit位,并且判断这个bit位是否为0
  40. if ((num & 1) == 0) {
  41. // 如果为0,说明未签到,结束
  42. break;
  43. } else {
  44. // 如果不为0,说明已签到,计数器+1
  45. count++;
  46. }
  47. // 把数字右移一位,抛弃最后一个bit位,继续下一个bit
  48. num >>>= 1;
  49. }
  50. return Result.ok(count);
  51. }

UV统计

 

使用HLL不可以统计重复的数据,相同的数据只会被当作一次统计

 最后返回结果997593,误差为百分之一

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号