当前位置:   article > 正文

黑马---Redis入门到实战【实战篇】_redis开发与实战 csdn

redis开发与实战 csdn

 一、短信登录

基于session实现短信登录的流程

实现发送短信验证码功能

 发送验证码功能:

  1. @Override
  2. public Result sendCode(String phone, HttpSession session) {
  3. //1.校验手机号
  4. if(RegexUtils.isPhoneInvalid(phone)){
  5. //2.如果不符合,返回错误信息
  6. return Result.fail("手机号格式错误!");
  7. }
  8. //3.符合,生成验证码
  9. String code = RandomUtil.randomNumbers(6);
  10. //4.保存验证码到session
  11. session.setAttribute("code",code);
  12. //5.发送验证码
  13. log.debug("发送短信验证码成功,验证码:{}"+code);
  14. //返回ok
  15. return Result.ok();
  16. }

登录功能:

登录表单的实体类:

  1. @Data
  2. public class LoginFormDTO {
  3. private String phone;
  4. private String code;
  5. private String password;
  6. }

登录逻辑代码实现:

  1. @Override
  2. public Result login(LoginFormDTO loginForm, HttpSession session) {
  3. //1.校验手机号
  4. String phone = loginForm.getPhone();
  5. if(RegexUtils.isPhoneInvalid(phone)){
  6. //如果不符合,返回错误信息
  7. return Result.fail("手机号格式错误!");
  8. }
  9. //2.校验验证码
  10. Object cacheCode = session.getAttribute("code");
  11. String code = loginForm.getCode();
  12. if(cacheCode==null ||! cacheCode.toString().equals(code)){
  13. //3.不一致,报错
  14. return Result.fail("验证码错误!");
  15. }
  16. //4.一致,根据手机号查询用户 select * from tb_user where phone = ?
  17. User user = query().eq("phone", phone).one();
  18. //5.判断用户是否存在
  19. if(user==null) {
  20. //6.不存在,创建新用户并保存
  21. user=createUsrWithPhone(phone);
  22. }
  23. //7.保存用户信息到session中
  24. session.setAttribute("user",user);
  25. return null;
  26. }
  27. private User createUsrWithPhone(String phone) {
  28. //1.创建用户
  29. User user = new User();
  30. user.setPhone(phone);
  31. user.setNickName(USER_NICK_NAME_PREFIX+RandomUtil.randomString(10));
  32. //2.保存用户
  33. save(user);
  34. return user;
  35. }

实现登录校验拦截器

ThreadLocal 叫做本地线程变量,意思是说,ThreadLocal 中填充的的是当前线程的变量,该变量对其他线程而言是封闭且隔离的,ThreadLocal 为变量在每个线程中创建了一个副本,这样每个线程都可以访问自己内部的副本变量。

注意,为了隐藏用户敏感信息,也为了节省ThreadLocal的空间,需要将User转为UserDTO返回给前端。

可以通过hutool工具类,在UserService里修改:

session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));
  1. public class UserHolder {
  2. private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>();
  3. public static void saveUser(UserDTO user){
  4. tl.set(user);
  5. }
  6. public static UserDTO getUser(){
  7. return tl.get();
  8. }
  9. public static void removeUser(){
  10. tl.remove();
  11. }
  12. }

拦截器:

  1. public class LoginInterceptor implements HandlerInterceptor {
  2. @Override
  3. public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
  4. //1.获取session
  5. HttpSession session = request.getSession();
  6. //2.获取session中的用户
  7. Object user = session.getAttribute("user");
  8. //3.判断用户是否存在
  9. if(user==null) {
  10. //4.不存在,拦截,返回401状态码,代表未授权
  11. response.setStatus(401);
  12. return false;
  13. }
  14. //5.存在,保存用户信息到ThreadLocal
  15. UserHolder.saveUser((UserDTO) user);
  16. //6.放行
  17. return true;
  18. }
  19. @Override
  20. public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
  21. UserHolder.removeUser();
  22. }
  23. }

添加拦截器:

  1. @Configuration
  2. public class MvcConfig implements WebMvcConfigurer {
  3. @Override
  4. public void addInterceptors(InterceptorRegistry registry) {
  5. registry.addInterceptor(new LoginInterceptor())
  6. .excludePathPatterns("/user/code","/user/login","/blog/hot","/shop/**","/shop-type/**","/voucher/**","/upload/**");
  7. }
  8. }

登录并返回:

  1. @GetMapping("/me")
  2. public Result me(){
  3. // TODO 获取当前登录的用户并返回
  4. UserDTO user = UserHolder.getUser();
  5. return Result.ok(user);
  6. }

session共享的问题分析

session共享问题:多态Tomcat并不共享session存储空间,当请求切换到不同tomcat服务时导致数据丢失的问题

session的替代方案应该满足:

        数据共享

        内存存储

        key、value结构

===>redis

Redis代替session的业务流程

基于Redis实现短信登录

拦截器修改:

  1. public class LoginInterceptor implements HandlerInterceptor {
  2. private StringRedisTemplate stringRedisTemplate;
  3. public LoginInterceptor(StringRedisTemplate stringRedisTemplate){
  4. this.stringRedisTemplate=stringRedisTemplate;
  5. }
  6. @Override
  7. public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
  8. //1.获取请求头中的token
  9. String token = request.getHeader("authorization");
  10. //判断token是否为空
  11. if(StrUtil.isBlank(token)){
  12. response.setStatus(401);
  13. return false;
  14. }
  15. String key=LOGIN_USER_KEY+token;
  16. //2.基于token获取redis中的用户
  17. Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
  18. //3.判断用户是否存在
  19. if(userMap.isEmpty()){
  20. //不存在,拦截,返回401状态码
  21. response.setStatus(401);
  22. return false;
  23. }
  24. //5.存在,将查询到的Hash数据转为UserDTO对象
  25. UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
  26. //6.保存用户到ThreadLocal
  27. UserHolder.saveUser(userDTO);
  28. //7.刷新token有效期
  29. stringRedisTemplate.expire(key,LOGIN_USER_TTL, TimeUnit.MINUTES);
  30. //8.放行
  31. return true;
  32. }
  33. @Override
  34. public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
  35. UserHolder.removeUser();
  36. }
  37. }

MvcConfig修改:

  1. @Configuration
  2. public class MvcConfig implements WebMvcConfigurer {
  3. @Resource
  4. private StringRedisTemplate stringRedisTemplate;
  5. @Override
  6. public void addInterceptors(InterceptorRegistry registry) {
  7. registry.addInterceptor(new LoginInterceptor(stringRedisTemplate))
  8. .excludePathPatterns(
  9. "/user/code",
  10. "/user/login",
  11. "/blog/hot",
  12. "/shop/**",
  13. "/shop-type/**",
  14. "/voucher/**",
  15. "/upload/**");
  16. }
  17. }

UserServiceImpl修改:

  1. @Service
  2. public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
  3. @Resource
  4. private StringRedisTemplate stringRedisTemplate;
  5. @Override
  6. public Result sendCode(String phone, HttpSession session) {
  7. //1.校验手机号
  8. if(RegexUtils.isPhoneInvalid(phone)){
  9. //2.如果不符合,返回错误信息
  10. return Result.fail("手机号格式错误!");
  11. }
  12. //3.符合,生成验证码
  13. String code = RandomUtil.randomNumbers(6);
  14. //4.保存验证码到redis中
  15. stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY+phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);
  16. //5.发送验证码
  17. log.debug("发送短信验证码成功,验证码:{}"+code);
  18. //返回ok
  19. return Result.ok();
  20. }
  21. @Override
  22. public Result login(LoginFormDTO loginForm, HttpSession session) {
  23. //1.校验手机号
  24. String phone = loginForm.getPhone();
  25. if(RegexUtils.isPhoneInvalid(phone)){
  26. //如果不符合,返回错误信息
  27. return Result.fail("手机号格式错误!");
  28. }
  29. //2.校验验证码
  30. String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
  31. String code = loginForm.getCode();
  32. if(cacheCode==null ||!cacheCode.equals(code)){
  33. //3.不一致,报错
  34. return Result.fail("验证码错误!");
  35. }
  36. //4.一致,根据手机号查询用户 select * from tb_user where phone = ?
  37. User user = query().eq("phone", phone).one();
  38. //5.判断用户是否存在
  39. if(user==null) {
  40. //6.不存在,创建新用户并保存
  41. user=createUsrWithPhone(phone);
  42. }
  43. //保存用户信息到redis中
  44. //1.随机生成token,作为登录令牌
  45. String token = UUID.randomUUID().toString(true); //true代表isSimple,即不带中划线
  46. //2.将User对象转为Hash存储
  47. UserDTO userDTO=BeanUtil.copyProperties(user,UserDTO.class);
  48. Map<String, Object> userMap = BeanUtil.beanToMap(userDTO,new HashMap<>(), CopyOptions.create().setIgnoreNullValue(true).setFieldValueEditor((fieldName,fieldValue)->fieldValue.toString()));
  49. String tokenKey=LOGIN_USER_KEY+token;
  50. //7.存储
  51. stringRedisTemplate.opsForHash().putAll(tokenKey,userMap);
  52. //设置token有效期
  53. stringRedisTemplate.expire(tokenKey,LOGIN_USER_TTL,TimeUnit.MINUTES);
  54. return Result.ok();
  55. }
  56. private User createUsrWithPhone(String phone) {
  57. //1.创建用户
  58. User user = new User();
  59. user.setPhone(phone);
  60. user.setNickName(USER_NICK_NAME_PREFIX+RandomUtil.randomString(10));
  61. //2.保存用户
  62. save(user);
  63. return user;
  64. }
  65. }

Redis代替session需要考虑的问题:

选择合适的数据结构

选择合适的key

选择合适的存储粒度

解决登录状态刷新的问题

RefreshTokenInterceptor

  1. public class RefreshTokenInterceptor implements HandlerInterceptor {
  2. private StringRedisTemplate stringRedisTemplate;
  3. public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate){
  4. this.stringRedisTemplate=stringRedisTemplate;
  5. }
  6. @Override
  7. public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
  8. //1.获取请求头中的token
  9. String token = request.getHeader("authorization");
  10. //判断token是否为空
  11. if(StrUtil.isBlank(token)){
  12. return true;
  13. }
  14. String key=LOGIN_USER_KEY+token;
  15. //2.基于token获取redis中的用户
  16. Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
  17. //3.判断用户是否存在
  18. if(userMap.isEmpty()){
  19. return true;
  20. }
  21. //5.存在,将查询到的Hash数据转为UserDTO对象
  22. UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
  23. //6.保存用户到ThreadLocal
  24. UserHolder.saveUser(userDTO);
  25. //7.刷新token有效期
  26. stringRedisTemplate.expire(key,LOGIN_USER_TTL, TimeUnit.MINUTES);
  27. //8.放行
  28. return true;
  29. }
  30. @Override
  31. public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
  32. UserHolder.removeUser();
  33. }
  34. }

LoginInterceptor

  1. public class LoginInterceptor implements HandlerInterceptor {
  2. @Override
  3. public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
  4. //判断是否需要拦截(ThreadLocal中是否有用户)
  5. if(UserHolder.getUser()==null){
  6. //没有,需要拦截,设置状态码
  7. response.setStatus(401);
  8. return false;
  9. }
  10. //有用户,则放行
  11. return true;
  12. }
  13. @Override
  14. public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
  15. UserHolder.removeUser();
  16. }
  17. }

配置添加拦截器

        注意通过order控制拦截器执行顺序,order越小越先执行

  1. @Configuration
  2. public class MvcConfig implements WebMvcConfigurer {
  3. @Resource
  4. private StringRedisTemplate stringRedisTemplate;
  5. @Override
  6. public void addInterceptors(InterceptorRegistry registry) {
  7. registry.addInterceptor(new LoginInterceptor())
  8. .excludePathPatterns(
  9. "/user/code",
  10. "/user/login",
  11. "/blog/hot",
  12. "/shop/**",
  13. "/shop-type/**",
  14. "/voucher/**",
  15. "/upload/**").order(1);
  16. registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).order(0);
  17. }
  18. }

二、商户缓存查询

什么是缓存

缓存就是数据交换的缓冲区(称作Cache),是存储数据的临时地方,一般读写性能较高

添加Redis缓存

  1. /**
  2. * 根据id查询商铺信息
  3. * @param id 商铺id
  4. * @return 商铺详情数据
  5. */
  6. @GetMapping("/{id}")
  7. public Result queryShopById(@PathVariable("id") Long id) {
  8. return shopService.queryById(id);
  9. }

ShopServiceImpl:

  1. @Service
  2. public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
  3. @Resource
  4. private StringRedisTemplate stringRedisTemplate;
  5. @Override
  6. public Result queryById(Long id) {
  7. String key=CACHE_SHOP_KEY+id;
  8. //1.从redis查询商铺缓存
  9. String shopJson = stringRedisTemplate.opsForValue().get(key);
  10. //2.判断是否存在
  11. if(StrUtil.isNotBlank(shopJson)) {
  12. //3.存在,直接返回
  13. Shop shop = JSONUtil.toBean(shopJson, Shop.class);
  14. return Result.ok(shop);
  15. }
  16. //4.不存在,根据id查询数据库
  17. Shop shop = getById(id);
  18. //5.不存在,返回错误
  19. if(shop==null){
  20. return Result.fail("店铺不存在!");
  21. }
  22. //6.存在,写入redis
  23. stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop));
  24. //7.返回
  25. return Result.ok(shop);
  26. }
  27. }

练习:给店铺类型查询业务添加缓存

  1. @Service
  2. public class ShopTypeServiceImpl extends ServiceImpl<ShopTypeMapper, ShopType> implements IShopTypeService {
  3. @Autowired
  4. private ShopTypeMapper shopTypeMapper;
  5. @Autowired
  6. private StringRedisTemplate stringRedisTemplate;
  7. @Override
  8. public Result queryBatch() {
  9. String key="CACHE_SHOP_TYPE_KEY";
  10. String jsonType = stringRedisTemplate.opsForValue().get(key);
  11. if(StrUtil.isNotBlank(jsonType)){
  12. List<ShopType> shopTypes = JSONUtil.toList(jsonType, ShopType.class);
  13. return Result.ok(shopTypes);
  14. }
  15. List<ShopType> shopTypes = shopTypeMapper.selectList(new QueryWrapper<>());
  16. if(shopTypes.isEmpty()){
  17. return Result.fail("您查询的页面不存在!");
  18. }
  19. stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shopTypes));
  20. return Result.ok(shopTypes);
  21. }
  22. }

缓存更新策略

 业务场景:

低一致性需求:使用内存淘汰机制。例如店铺类型的查询缓存

高一致性需求:主动更新,并以超时剔除作为兜底方案。例如店铺详情查询的缓存

主动更新策略:

 操作缓存和数据库时有三个问题需要考虑:

1.删除缓存还是更新缓存?

        更新缓存:每次更新都更新缓存,无效写操作较多

        删除缓存:更新数据库时让缓存失效,查询时再更新缓存(胜出)

 2.如何保证缓存与数据库的操作的同时成功或失败?

        单体系统:将缓存与数据库操作放在一个事务

        分布式系统:利用TCC等分布式事务方案

3.先操作缓存还是先操作数据库?

 总结:

     

实现缓存与数据库的双写一致

 案例:给查询商铺的缓存添加超时剔除和主动更新的策略

修改ShopController中的业务逻辑,满足下面的需求:

①根据id查询店铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间

②根据id修改店铺时,先修改数据库,再删除缓存

更新操作:

  1. @Override
  2. @Transactional
  3. public Result update(Shop shop) {
  4. Long id=shop.getId();
  5. if(id==null){
  6. return Result.fail("店铺id不能为空!");
  7. }
  8. //1.更新数据库
  9. updateById(shop);
  10. //2.删除缓存
  11. stringRedisTemplate.delete(CACHE_SHOP_KEY+shop.getId());
  12. return Result.ok();
  13. }

缓存穿透

缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库

编码解决商铺查询的缓存穿透问题

  1. @Override
  2. public Result queryById(Long id) {
  3. String key=CACHE_SHOP_KEY+id;
  4. //1.从redis查询商铺缓存
  5. String shopJson = stringRedisTemplate.opsForValue().get(key);
  6. //2.判断是否存在
  7. if(StrUtil.isNotBlank(shopJson)) {
  8. //3.存在,直接返回
  9. Shop shop = JSONUtil.toBean(shopJson, Shop.class);
  10. return Result.ok(shop);
  11. }
  12. //命中的是否为空值
  13. if(shopJson!=null){
  14. return Result.fail("店铺信息不存在!");
  15. }
  16. //4.不存在,根据id查询数据库
  17. Shop shop = getById(id);
  18. //5.不存在,返回错误
  19. if(shop==null){
  20. //将空值写入redis
  21. stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
  22. return Result.fail("店铺信息不存在!");
  23. }
  24. //6.存在,写入redis
  25. stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);
  26. //7.返回
  27. return Result.ok(shop);
  28. }

总结:

缓存穿透产生的原因是什么?

        用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力

缓存穿透的解决方案有哪些?

        缓存null值

        布隆过滤

        增强id的复杂度,避免被猜测id规律

        做好数据的基础格式校验

        加强用户权限校验

        做好热点参数的限流

缓存雪崩

缓存雪崩是指同一时刻大量的缓存key同时失效或者redis服务宕机,导致大量请求到达数据库,带来巨大压力

解决方案:

给不同的Key的TTL添加随机值

利用Redis集群提高服务的可用性

给缓存业务添加降级限流策略

给业务添加多级缓存

缓存击穿

缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

 常见的解决方案有两种:

互斥锁

逻辑过期

互斥锁和逻辑过期对比:

 互斥锁和逻辑过期优缺点:

利用互斥锁解决缓存击穿问题

需求:修改根据id查询商铺的业务,基于互斥锁方式来解决缓存击穿问题

  1. public Shop queryWithMutex(Long id){
  2. String key = CACHE_SHOP_KEY+id;
  3. //从redis查询商铺缓存
  4. String shopJson = stringRedisTemplate.opsForValue().get(key);
  5. //判断缓存是否命中
  6. if(StrUtil.isNotBlank(shopJson)){
  7. //命中则直接返回数据
  8. return JSONUtil.toBean(shopJson, Shop.class);
  9. }
  10. //判断是否是缓存穿透
  11. if(shopJson!=null){
  12. return null;
  13. }
  14. //实现缓存重建
  15. //1.获取互斥锁
  16. String lockKey=LOCK_SHOP_KEY+id;
  17. try{
  18. boolean isLock = tryLock(lockKey);
  19. //2.判断是否获取成功
  20. if(!isLock) {
  21. //3.失败,则休眠并重试
  22. Thread.sleep(50);
  23. queryWithMutex(id);
  24. }
  25. //4.成功,根据id查询数据库
  26. Shop shop = getById(id);
  27. //5.不存在,返回错误
  28. if(shop==null){
  29. stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
  30. return null;
  31. }
  32. //6.存在,写入redis
  33. stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL,TimeUnit.MINUTES);
  34. return shop;
  35. }catch(InterruptedException e){
  36. throw new RuntimeException(e);
  37. }finally{
  38. unlock(lockKey);
  39. }
  40. }
  41. private boolean tryLock(String key){
  42. Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", LOCK_SHOP_TTL, TimeUnit.MINUTES);
  43. return BooleanUtil.isTrue(flag); //为防止程序在拆箱的时候出现空指针,要手动拆箱
  44. }
  45. private void unlock(String key){
  46. stringRedisTemplate.delete(key);
  47. }

基于逻辑过期方式解决缓存击穿问题

需求:修改根据id删除商铺的业务,基于逻辑过期的方式来解决缓存击穿问题

添加逻辑过期时间:

  1. @Data
  2. public class RedisData {
  3. private LocalDateTime expireTime;
  4. private Object data;
  5. }

整体实现逻辑:

  1. private static final ExecutorService CACHE_REBUILD_EXECUTOR= Executors.newFixedThreadPool(10);
  2. public Shop queryWithLogicalExpire(Long id){
  3. String key=CACHE_SHOP_KEY+id;
  4. String shopJson = stringRedisTemplate.opsForValue().get(key);
  5. //缓存未命中
  6. if(StrUtil.isBlank(shopJson)){
  7. return null;
  8. }
  9. //命中,需要先把json反序列化为对象
  10. RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
  11. Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
  12. LocalDateTime expireTime = redisData.getExpireTime();
  13. //判断是否过期
  14. if(expireTime.isAfter(LocalDateTime.now())) {
  15. //未过期,直接返回店铺信息
  16. return shop;
  17. }
  18. //已过期,需要缓存重建
  19. String lockKey=LOCK_SHOP_KEY+id;
  20. //获取互斥锁
  21. boolean isLock = tryLock(lockKey);
  22. //判断是否获取锁成功
  23. if(isLock) {
  24. //成功,开启独立线程,实现缓存重建
  25. CACHE_REBUILD_EXECUTOR.submit(()->{
  26. try {
  27. saveShop2Redis(id, CACHE_SHOP_TTL);
  28. }catch(Exception e){
  29. throw new RuntimeException(e);
  30. }finally{
  31. unlock(lockKey);
  32. }
  33. });
  34. }
  35. //返过期的商铺信息
  36. }
  37. private boolean tryLock(String key){
  38. Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", LOCK_SHOP_TTL, TimeUnit.MINUTES);
  39. return BooleanUtil.isTrue(flag); //为防止程序在拆箱的时候出现空指针,要手动拆箱
  40. }
  41. private void unlock(String key){
  42. stringRedisTemplate.delete(key);
  43. }
  44. public void saveShop2Redis(Long id,Long expireSeconds){
  45. //1.查询店铺数据
  46. Shop shop = getById(id);
  47. //2.封装逻辑过期时间
  48. RedisData redisData = new RedisData();
  49. redisData.setData(shop);
  50. redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
  51. //3.写入Redis
  52. stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));
  53. }

封装Redis工具类

基于StringRedisTemplate封装一个缓存工具类,满足下列要求:

方法1:任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间

方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题

方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题

方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题

  1. @Component
  2. public class CacheClient {
  3. private final StringRedisTemplate stringRedisTemplate;
  4. public CacheClient(StringRedisTemplate stringRedisTemplate){
  5. this.stringRedisTemplate=stringRedisTemplate;
  6. }
  7. public void set(String key, Object value, Long time, TimeUnit unit){
  8. stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);
  9. }
  10. public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit){
  11. //设置逻辑过期
  12. RedisData redisData = new RedisData();
  13. redisData.setData(value);
  14. redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
  15. stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
  16. }
  17. public <R,ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID,R>dbFallback,Long time,TimeUnit unit){
  18. String key=keyPrefix+id;
  19. String json = stringRedisTemplate.opsForValue().get(key);
  20. if(StrUtil.isNotBlank(json)){
  21. return JSONUtil.toBean(json,type);
  22. }
  23. //判断命中的是否为空值
  24. if(json!=null){
  25. return null;
  26. }
  27. R r = dbFallback.apply(id);
  28. if(r==null){
  29. stringRedisTemplate.opsForValue().set(key,"",time,unit);
  30. return null;
  31. }
  32. this.set(key,r,time,unit);
  33. return r;
  34. }
  35. private static final ExecutorService CACHE_REBUILD_EXECUTOR= Executors.newFixedThreadPool(10);
  36. public <R,ID> R queryWithLogicalExpire(String keyPrefix,ID id,Class<R> type,Function<ID,R>dbFallback,Long time,TimeUnit unit){
  37. String key=keyPrefix+id;
  38. String json = stringRedisTemplate.opsForValue().get(key);
  39. if(StrUtil.isBlank(json)){
  40. return null;
  41. }
  42. RedisData redisData = JSONUtil.toBean(json, RedisData.class);
  43. R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
  44. LocalDateTime expireTime = redisData.getExpireTime();
  45. //判断是否过期
  46. if(expireTime.isAfter(LocalDateTime.now())){
  47. //未过期,直接返回店铺信息
  48. return r;
  49. }
  50. //已过期,需要缓存重建
  51. String lockKey=LOCK_SHOP_KEY+id;
  52. boolean isLock = tryLock(lockKey);
  53. if(isLock){
  54. CACHE_REBUILD_EXECUTOR.submit(()->{
  55. try{
  56. //重建缓存
  57. //1.查询数据库
  58. R apply = dbFallback.apply(id);
  59. //2.存入redis
  60. this.setWithLogicalExpire(key,apply,time,unit);
  61. }catch (Exception e){
  62. throw new RuntimeException(e);
  63. }finally{
  64. unlock(key);
  65. }
  66. });
  67. }
  68. return r;
  69. }
  70. private boolean tryLock(String key){
  71. Boolean flag=stringRedisTemplate.opsForValue().setIfAbsent(key,"1",10,TimeUnit.SECONDS);
  72. return BooleanUtil.isTrue(flag);
  73. }
  74. private void unlock(String key){
  75. stringRedisTemplate.delete(key);
  76. }
  77. }

缓存总结

认识缓存

什么是缓存?

        一种具备高效读写能力的数据暂存区域

缓存的作用?

        降低后端负载

        提高读写响应速度

缓存的成本?

        开发成本

        运维成本

        一致性问题

缓存更新策略

三种策略:

        内存淘汰:redis自带的内存淘汰机制

        过期淘汰:利用expire命令给数据设置过期时间

        主动更新:主动完成数据库与缓存的同时更新

策略选择:

        低一致性需求:内存淘汰或过期淘汰

        高一致性需求:

                主动更新为主

                过期淘汰兜底

主动更新方案

        Cache Aside:缓存调用者在更新数据库的同时完成对缓存的更新

                一致性良好、实现难度一般

        Read/Write Through:缓存与数据库集成为一个服务,服务保证两者的一致性,对外暴露API接口,调用者调用API,无需知道自己操作的是数据库还是缓存,不关心一致性

                一致性优秀、实现复杂、性能一般

        Write Back:缓存调用者的CRUD都针对缓存完成。有独立线程异步将缓存数据写到数据库,实现最终一致

                一致性差、性能好、实现复杂

Cache Aside模式选择

更新缓存还是删除缓存?

        更新缓存会产生无效更新,并且存在较大的线程安全问题

        删除缓存本质是延迟更新,没有无效更新,线程安全问题相对较低

先操作数据库还是缓存?

        先更新数据,再删除缓存——在满足原子性的情况下,安全问题概率较低

        先删除缓存,再更新数据库——安全问题概率极高

如何确保数据库与缓存操作原子性?

        单体系统——利用事务机制

        分布式系统——利用分布式事务机制

最佳实践

查询数据时

1.先查询缓存

2.如果缓存命中,直接返回

3.如果缓存未命中,则查询数据库

4.将数据库数据写入缓存

5.返回结果

修改数据库时:

1.先修改数据库

2.然后删除缓存

===>确保两者的原子性

缓存穿透

产生原因:客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库

解决方案:

①缓存空对象:

        思路:对于不存在的数据也在redis建立缓存,值为空,并设置一个较短的TTL时间

        优点:实现简单,便于维护

        缺点:额外的内存消耗、短期的数据不一致问题

②布隆过滤:

        思路:利用布隆过滤算法,在请求进入Redis之前先判断是否存在,如果不存在则直接拒绝请求

        优点:内存占用少

        缺点:实现复杂、存在误判的可能性

③其他:

        做好数据的基础格式校验

        加强用户权限校验

        做好热点参数的限流

缓存雪崩

产生原因:在同一时段大量的缓存key同时失效或者redis服务宕机,导致大量请求到达数据库,带来巨大压力

解决方案:

        给不同的Key的TTL添加随机值

        利用Redis集群提高服务的可用性

        给缓存业务添加降级限流策略

        给业务添加多级缓存

缓存击穿(热点key)

产生原因:

        热点key:①在某一时段被高并发访问 ②缓存重建耗时较长

        热点key突然过期,因为缓存重建耗时长,在这段时间内大量请求落到数据库,带来巨大冲击

解决方案:

互斥锁:

        思路:给缓存重建过程加锁,确保重建过程只有一个线程执行,其他线程等待

        优点:①实现简单 ②没有额外内存消耗 ③一致性好

        缺点:①等待导致性能下降

        缺点:有死锁风险

逻辑过期:

        思路:

                ①热点key缓存永不过期,而是设置一个逻辑过期时间,查询到数据时通过对逻辑过期时间判断,来决定是否需要重建缓存

                ②重建缓存也通过互斥锁来保证单线程执行

                ③重建缓存利用独立线程异步执行

                ④其他线程无需等待,直接查询到旧数据即可

        优点:现成无需等待,性能较好

        缺点:

                ①不保证一致性

                ②有额外内存消耗

                ③实现复杂

三、优惠券秒杀

 全局ID生成器

每个店铺都可以发布优惠券:

 当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就会存在一些问题:

id的规律性太明显

受单表数据量的限制

全局ID生成器:

全局ID生成器是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:

 为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:

 ID的组成部分:

符号位:1bit,永远为0

时间戳:31bit,以秒为单位,可以使用69年

序列号:32bit,秒内的计数器,可以支持每秒产生2^32个不同的ID

Redis实现全局唯一ID

  1. @Component
  2. public class RedisIdWorker {
  3. //开始时间戳 2023-01-01 00:00:00
  4. private static final long BEGIN_TIMESTAMP=1672531200L;
  5. private static final int COUNT_BITS=32;
  6. private StringRedisTemplate stringRedisTemplate;
  7. public RedisIdWorker(StringRedisTemplate stringRedisTemplate){
  8. this.stringRedisTemplate=stringRedisTemplate;
  9. }
  10. public long nextId(String keyPrefix){
  11. //1.生成时间戳
  12. LocalDateTime now = LocalDateTime.now();
  13. long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
  14. long timestamp=nowSecond-BEGIN_TIMESTAMP;
  15. //2.生成序列号
  16. //2.1 获取当前日期,精确到天
  17. String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
  18. //2.2 自增长
  19. long count=stringRedisTemplate.opsForValue().increment("icr:"+keyPrefix+":"+date);
  20. //3.拼接并返回
  21. return timestamp<<COUNT_BITS | count;
  22. }
  23. }

测试代码:

  1. @Resource
  2. private RedisIdWorker redisIdWorker;
  3. private ExecutorService es= Executors.newFixedThreadPool(500);
  4. @Test
  5. void testIdWorker() throws InterruptedException {
  6. CountDownLatch latch = new CountDownLatch(300);
  7. Runnable task=()->{
  8. for (int i = 0; i < 100; i++) {
  9. long id = redisIdWorker.nextId("order");
  10. System.out.println("id="+id);
  11. }
  12. latch.countDown();
  13. };
  14. latch.countDown();
  15. long begin = System.currentTimeMillis();
  16. for (int i = 0; i < 300; i++) {
  17. es.submit(task);
  18. }
  19. latch.await();
  20. long end=System.currentTimeMillis();
  21. System.out.println("time="+(end-begin));
  22. }

总结:

全局唯一ID生成策略:

UUID

Redis自增

snowflake算法

数据库自增

Redis自增ID策略:

每天一个key,方便统计订单量

ID构造是 时间戳+计数器

添加优惠券

每个店铺都可以发布优惠券,分为评价券和特价劵。平价券可以任意购买,而特价券需要秒杀抢购:

表关系如下:

tb_voucher:优惠券的基本信息,优惠金额、使用规则等

tb_seckill_voucher:优惠券的库存、开始抢购时间、结束抢购时间。特价优惠券才需要填写这些信息。

通过postman添加优惠券:

  1. {
  2. "shopId":1,
  3. "title":"100元代金券",
  4. "subTitle":"周一至周五均可使用",
  5. "rules":"全场通用\\n无需预约\\n可无限叠加\\n不兑换、不找零\\n仅限堂食",
  6. "payValue":8000,
  7. "actualValue":10000,
  8. "type":1,
  9. "stock":100,
  10. "beginTime":"2023-04-29T10:09:17",
  11. "endTime":"2023-04-29T23:09:04"
  12. }

实现秒杀下单

下单时需要判断两点:

秒杀是否开始或结束,如果尚未开始或已经结束则无法下单

库存是否充足,不足则无法下单

  1. @Service
  2. public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
  3. @Resource
  4. private ISeckillVoucherService seckillVoucherService;
  5. @Resource
  6. private RedisIdWorker redisIdWorker;
  7. @Override
  8. @Transactional
  9. public Result seckillVoucher(Long voucherId) {
  10. //1.查询优惠券
  11. SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
  12. //2.判断秒杀是否开始
  13. if(voucher.getBeginTime().isAfter(LocalDateTime.now())) {
  14. //尚未开始
  15. return Result.fail("秒杀尚未开始!");
  16. }
  17. //3.判断秒杀是否结束
  18. if(voucher.getEndTime().isBefore(LocalDateTime.now())){
  19. //已经结束
  20. return Result.fail("秒杀已经结束!");
  21. }
  22. //4.判断库存是否充足
  23. if(voucher.getStock()<1){
  24. //库存不足
  25. return Result.fail("库存不足!");
  26. }
  27. //5.扣减库存
  28. boolean success=seckillVoucherService
  29. .update()
  30. .setSql("stock=stock-1")
  31. .eq("voucher_id",voucherId)
  32. .update();
  33. if(!success){
  34. //扣减失败
  35. return Result.fail("库存不足!");
  36. }
  37. //6.创建订单
  38. VoucherOrder voucherOrder = new VoucherOrder();
  39. //6.1 订单id
  40. long orderId = redisIdWorker.nextId("order");
  41. voucherOrder.setId(orderId);
  42. //6.2 用户id
  43. Long userId = UserHolder.getUser().getId();
  44. voucherOrder.setUserId(userId);
  45. //6.3 代金券id
  46. voucherOrder.setVoucherId(voucherId);
  47. save(voucherOrder);
  48. //7.返回订单id
  49. return Result.ok(orderId);
  50. }
  51. }

 

超卖问题

用jmeter模拟高并发:

 模拟结果:

 原因:

 超卖问题就是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:

乐观锁的关键是判断之前查询得到的数据是否有被修改过,常见的方式有两种:

        版本号法

        CAS法

 乐观锁解决超卖问题

  1. boolean success=seckillVoucherService
  2. .update()
  3. .setSql("stock=stock-1")
  4. .eq("voucher_id",voucherId).eq("stock",voucher.getStock())
  5. .update();

乐观锁问题:成功率太低

乐观锁优化:

  1. boolean success=seckillVoucherService
  2. .update()
  3. .setSql("stock=stock-1")
  4. .eq("voucher_id",voucherId).gt("stock",0)
  5. .update();

运行结果:50%的失败率,订单恰好没有超卖

超卖这样的线程安全问题,解决方案有哪些?

1.悲观锁:添加同步锁,让线程串行执行

        优点:简单粗暴

        缺点:性能一般

2.乐观锁:不加锁,在更新时判断是否有其他线程在修改

        优点:性能好

        缺点:存在成功率低的问题

实现一人一单功能

需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单

 代码实现:

  1. //一人一单
  2. Long userId = UserHolder.getUser().getId();
  3. Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
  4. if(count>0){
  5. //用户已经存在
  6. return Result.fail("该用户已经购买过一次!");
  7. }

对于高并发,解决线程安全问题:

(先提交事务,再释放锁)

第一步:添加依赖

  1. <dependency>
  2. <groupId>org.aspectj</groupId>
  3. <artifactId>aspectjweaver</artifactId>
  4. </dependency>

第二步:暴露代理对象 @EnableAspectJAutoProxy(exposeProxy=true)

  1. @MapperScan("com.hmdp.mapper")
  2. @SpringBootApplication
  3. @EnableAspectJAutoProxy(exposeProxy = true)
  4. public class HmDianPingApplication {
  5. public static void main(String[] args) {
  6. SpringApplication.run(HmDianPingApplication.class, args);
  7. }
  8. }

第三步:对方法加锁

        如果用this.createVoucherOrder(voucherId); 首先我们要知道事务如果想生效,需要Spring对该类做动态代理,拿到了代理对象,来对事务进行处理

        这个this是没有事务功能的,因为拿到的目标对象(非代理对象)

        所以需要拿到事务代理对象AopContext.currentProxy()

  1. synchronized (userId.toString().intern()) {
  2. //获取代理对象(事务)
  3. IVoucherOrderService proxy =(IVoucherOrderService) AopContext.currentProxy();
  4. return proxy.createVoucherOrder(voucherId);
  5. }
  1. @Transactional
  2. public Result createVoucherOrder(Long voucherId) {
  3. //一人一单
  4. Long userId = UserHolder.getUser().getId();
  5. Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
  6. if (count > 0) {
  7. //用户已经存在
  8. return Result.fail("该用户已经购买过一次!");
  9. }
  10. //5.扣减库存
  11. boolean success = seckillVoucherService
  12. .update()
  13. .setSql("stock=stock-1")
  14. .eq("voucher_id", voucherId).gt("stock", 0)
  15. .update();
  16. if (!success) {
  17. //扣减失败
  18. return Result.fail("库存不足!");
  19. }
  20. //6.创建订单
  21. VoucherOrder voucherOrder = new VoucherOrder();
  22. //6.1 订单id
  23. long orderId = redisIdWorker.nextId("order");
  24. voucherOrder.setId(orderId);
  25. //6.2 用户id
  26. voucherOrder.setUserId(userId);
  27. //6.3 代金券id
  28. voucherOrder.setVoucherId(voucherId);
  29. save(voucherOrder);
  30. //7.返回订单id
  31. return Result.ok(orderId);
  32. }

压测结果:

集群模式下线程锁失效

因为集群模式下,每个JVM有各自的锁监视器

四、分布式锁

分布式锁基本原理

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁

 

分布式锁的实现:

分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见有三种:

Redis分布式锁的基本实现

实现分布式锁时需要实现的两个基本方法:

获取锁:

        互斥:确保只能有一个线程获取锁

        #添加锁,利用setnx的互斥特性

        setnx lock thread1

        #添加锁过期时间,避免服务宕机引起的死锁

        expire lock 10

        非阻塞:尝试一次,成功返回true,失败返回false

释放锁:

        手动释放:del key

        超时释放:获取锁时添加一个超时时间

实现Redis分布式锁初级版本

需求:定义一个类,实现下面的接口,利用Redis实现分布式锁功能

  1. @AllArgsConstructor
  2. public class SimpleRedisLock implements ILock{
  3. private String name;
  4. private StringRedisTemplate stringRedisTemplate;
  5. private static final String KEY_PREFIX="lock:";
  6. @Override
  7. public boolean tryLock(long timeoutSec) {
  8. long threadId = Thread.currentThread().getId();
  9. Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
  10. return Boolean.TRUE.equals(success);
  11. }
  12. @Override
  13. public void unlock() {
  14. //释放锁
  15. stringRedisTemplate.delete(KEY_PREFIX+name);
  16. }
  17. }

 优化一人一单:

  1. Long userId=UserHolder.getUser().getId();
  2. //创建锁对象
  3. SimpleRedisLock lock = new SimpleRedisLock("order:"+userId,stringRedisTemplate);
  4. boolean isLock = lock.tryLock(1200);
  5. if(!isLock) {
  6. //获取锁失败,返回错误或重试
  7. return Result.fail("一个人只能下一单!");
  8. }
  9. try {
  10. //获取代理对象(事务)
  11. IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
  12. return proxy.createVoucherOrder(voucherId);
  13. }finally {
  14. lock.unlock();
  15. }

Redis分布式锁误删

 解决方案:在释放锁前判断这个锁是否还属于自己

 

改进Redis的分布式锁

需求:修改之前的分布式锁实现,满足:

1.在获取锁时存入线程标识(可以用UUID表示)

2.在释放锁时现货区锁中的线程标识,判断是否与当前线程标识一致

        如果一致则释放锁

        如果不一致则不释放锁

  1. @AllArgsConstructor
  2. public class SimpleRedisLock implements ILock{
  3. private String name;
  4. private StringRedisTemplate stringRedisTemplate;
  5. private static final String KEY_PREFIX="lock:";
  6. private static final String ID_PREFIX= UUID.randomUUID().toString(true)+"-";
  7. @Override
  8. public boolean tryLock(long timeoutSec) {
  9. String threadId =ID_PREFIX + Thread.currentThread().getId();
  10. Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId , timeoutSec, TimeUnit.SECONDS);
  11. return Boolean.TRUE.equals(success);
  12. }
  13. @Override
  14. public void unlock() {
  15. //获取线程标识
  16. String threadId=ID_PREFIX + Thread.currentThread().getId();
  17. //获取锁中的标识
  18. String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
  19. //判断标识是否一致
  20. if(threadId.equals(id)) {
  21. //释放锁
  22. stringRedisTemplate.delete(KEY_PREFIX + name);
  23. }
  24. }
  25. }

分布式锁的原子性问题

判断和释放锁是两个动作,这中间可能会发生并发问题

Lua脚本解决多条命令原子性问题

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言。

这里重点介绍Redis提供的调用函数,语法如下:

  1. #执行redis命令
  2. redis.call('命令名称','key','其它参数',...)
  3. #比如,我们要执行set name jack
  4. redis.call('set','name','jack')
  5. #如果我们要执行set name Rose,再执行get name,则脚本如下
  6. redis.call('set','name','Rose')
  7. local name = redis.call('get','name')
  8. return name

写好脚本以后,需要用Redis命令来调用脚本,调用脚本常见命令如下:

执行无参脚本:

执行有参脚本:

 如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入keys数组,其它参数会放入argv数组,在脚本中可以从keys和argv数组获取这些参数:

基于Redis的分布式锁

释放锁的业务流程:

        1.获取锁中的线程标识

        2.判断是否与指定的标识(当前线程标识)一致

        3.如果一致则释放锁(删除)

        4.如果不一致则什么都不做

  1. -- 锁的key
  2. local key = KEYS[1]
  3. -- 当前线程标识
  4. local threadId = ARGV[1]
  5. -- 获取锁中的线程标识 get key
  6. local id = redis.call('get',key)
  7. --比较线程标识与锁中的标识是否一致
  8. if(id == threadId) then
  9. -- 释放锁
  10. return redis.call('del',key)
  11. end
  12. return 0

脚本优化:

  1. if(redis.call('get',KEYS[1]==ARGV[1])) then
  2. return redis.call('del',KEYS[1])
  3. end
  4. return 0

再次改进Redis的分布式锁

需求:基于Lua脚本实现分布式锁的释放锁逻辑

提示:RedisTemplate调用Lua脚本的API如下:

  1. private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
  2. static {
  3. UNLOCK_SCRIPT=new DefaultRedisScript<>();
  4. UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
  5. UNLOCK_SCRIPT.setResultType(Long.class);
  6. }
  7. @Override
  8. public void unlock() {
  9. //调用lua脚本
  10. stringRedisTemplate.execute(UNLOCK_SCRIPT,
  11. Collections.singletonList(KEY_PREFIX+name),
  12. ID_PREFIX+Thread.currentThread().getId());
  13. }

基于Redis的分布式锁实现思路:

        利用setnx nx ex获取锁,并设置过期时间,保存线程标识

        释放锁时先判断线程标识与自己是否一致,一致则删除锁

特性:

        利用set nx满足互斥性

        利用set ex保证故障时依然能释放,避免死锁,提高安全性

        利用Redis集群保证高可用和高并发特性

Redisson功能介绍

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

Redisson快速入门

第一步:引入依赖

  1. <dependency>
  2. <groupId>org.redisson</groupId>
  3. <artifactId>redisson</artifactId>
  4. <version>3.20.0</version>
  5. </dependency>

第二步:配置Redisson

  1. @Configuration
  2. public class RedissonConfig {
  3. @Bean
  4. public RedissonClient redissonClient(){
  5. //配置
  6. Config config = new Config();
  7. config.useSingleServer().setAddress("redis://192.168.202.128:6379").setPassword("123321");
  8. //创建RedissonClient对象
  9. return Redisson.create(config);
  10. }
  11. }

3.使用Redisson分布式锁

使用jmeter压测通过,一个用户只能下一单:

Redisson可重入锁原理

redis无法实现可重入锁的原因:

 Redisson可以实现可重入锁的原理:

 lua脚本实现:

 测试代码:

  1. @SpringBootTest
  2. @Slf4j
  3. public class RedissonTest {
  4. @Autowired
  5. private RedissonClient redissonClient;
  6. private RLock lock;
  7. @BeforeEach
  8. void setup(){
  9. lock=redissonClient.getLock("order");
  10. }
  11. @Test
  12. void method1(){
  13. boolean isLock = lock.tryLock();
  14. if(!isLock){
  15. log.error("获取锁失败……1");
  16. return;
  17. }
  18. try{
  19. log.info("获取锁成功……1");
  20. method2();
  21. log.info("开始执行业务……1");
  22. }finally{
  23. log.warn("准备释放锁……1");
  24. lock.unlock();
  25. }
  26. }
  27. void method2(){
  28. boolean isLock = lock.tryLock();
  29. if(!isLock){
  30. log.error("获取锁失败……2");
  31. return;
  32. }
  33. try{
  34. log.info("获取锁成功……2");
  35. log.info("开始执行业务……2");
  36. }finally{
  37. log.warn("准备释放锁……2");
  38. lock.unlock();
  39. }
  40. }
  41. }

运行截图:

 查看源码实现:

trylock:

 unlock:

Redisson的锁重试和WatchDog机制

 Redisson分布式锁原理:

可重入:利用hash结构记录线程id和重入次数

可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制

超时续约:利用watchDog,每隔一段时间(releaseTime/3),重置超时时间

修改method1:

        boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);

跟入源码:

注:redis.call('pttl',KEYS[1]);中的pttl返回以毫秒为单位,而ttl返回以秒为单位

如果该线程拿到锁,则返回nil;否则返回剩余有效期

代码精华部分:

  1. while (true) {
  2. long currentTime = System.currentTimeMillis();
  3. ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
  4. // lock acquired
  5. if (ttl == null) {
  6. return true;
  7. }
  8. time -= System.currentTimeMillis() - currentTime;
  9. if (time <= 0) {
  10. acquireFailed(waitTime, unit, threadId);
  11. return false;
  12. }
  13. // waiting for message
  14. currentTime = System.currentTimeMillis();
  15. if (ttl >= 0 && ttl < time) {
  16. commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  17. } else {
  18. commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
  19. }
  20. time -= System.currentTimeMillis() - currentTime;
  21. if (time <= 0) {
  22. acquireFailed(waitTime, unit, threadId);
  23. return false;
  24. }
  25. }
  26. } finally {
  27. unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
  28. }

怎么确保锁是因为业务结束而释放,而非阻塞导致的超时释放呢?

自动更新续期:

                    scheduleExpirationRenewal(threadId);

 renewExpiration:更新有效期

 实现永不过期的代码:

  1. Timeout task = commandExecutor.getServiceManager().newTimeout(new TimerTask() {
  2. @Override
  3. public void run(Timeout timeout) throws Exception {
  4. ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  5. if (ent == null) {
  6. return;
  7. }
  8. Long threadId = ent.getFirstThreadId();
  9. if (threadId == null) {
  10. return;
  11. }
  12. CompletionStage<Boolean> future = renewExpirationAsync(threadId);
  13. future.whenComplete((res, e) -> {
  14. if (e != null) {
  15. log.error("Can't update lock {} expiration", getRawName(), e);
  16. EXPIRATION_RENEWAL_MAP.remove(getEntryName());
  17. return;
  18. }
  19. if (res) {
  20. // reschedule itself
  21. renewExpiration();
  22. } else {
  23. cancelExpirationRenewal(null);
  24. }
  25. });
  26. }
  27. }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

那么,何时取消计时刷新呢?

在锁释放的时候结束计时

Redisson的multiLock问题

redisson分布式锁主从一致问题:如果主节点宕机,而未将数据同步给从节点,可能会导致并发问题

解决方案:所有节点都变成独立的redis节点

总结:

1)不可重入的Redis分布式锁:

原理:利用setnx的互斥性,利用ex避免死锁;释放锁时判断线程标识

缺陷:不可重入、无法重试、锁超时失败

2)可重入的Redis分布式锁:

原理:利用hash结果,记录线程标识和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待

缺陷:redis宕机引起锁失效问题

3)Redisson的multiLock:

原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功

缺陷:运维成本高、实现复杂

五、秒杀优化

异步秒杀思路

 怎么保证一人一单呢?Set集合

 怎么确保代码执行的原子性?lua脚本

 基于Redis完成秒杀资格判断

需求:

①新增秒杀优惠券的同时,将优惠券的信息保存到Redis中

  1. @Override
  2. @Transactional
  3. public void addSeckillVoucher(Voucher voucher) {
  4. // 保存优惠券
  5. save(voucher);
  6. // 保存秒杀信息
  7. SeckillVoucher seckillVoucher = new SeckillVoucher();
  8. seckillVoucher.setVoucherId(voucher.getId());
  9. seckillVoucher.setStock(voucher.getStock());
  10. seckillVoucher.setBeginTime(voucher.getBeginTime());
  11. seckillVoucher.setEndTime(voucher.getEndTime());
  12. seckillVoucherService.save(seckillVoucher);
  13. //保存秒杀库存到Redis中
  14. stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());
  15. }

新增优惠券:

②基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功

  1. -- 1.参数列表
  2. -- 1.1 优惠券id
  3. local voucherId = ARGV[1]
  4. --1.2 用户id
  5. local userId = ARGV[2]
  6. -- 2.数据key
  7. -- 2.1 库存key
  8. local stockKey = 'seckill:stock:' .. voucherId
  9. -- 2.2 订单key
  10. local orderKey = 'seckill:order:' .. voucherId
  11. -- 3.脚本业务
  12. -- 3.1 判断库存是否充足 get stockKey
  13. if(tonumber(redis.call('get',stockKey))<=0) then
  14. -- 3.2 库存不足,返回1
  15. return 1
  16. end
  17. -- 3.2 判断用户是否下单 SISMEMBER orderKey userId
  18. if(redis.call('sismember',orderKey,userId) == 1) then
  19. -- 3.3 存在,说明是重复下单,返回2
  20. return 2
  21. end
  22. -- 3.4 扣库存 incrby stockKey -1
  23. redis.call('incrby',stockKey,-1)
  24. -- 3.5 下单(保存用户)sadd orderKey userId
  25. redis.call('sadd',orderKey,userId)
  26. return 0

③如果抢购成功,将优惠券id和用户id封装后存入阻塞队列

  1. @Resource
  2. private ISeckillVoucherService seckillVoucherService;
  3. @Resource
  4. private RedisIdWorker redisIdWorker;
  5. @Autowired
  6. private StringRedisTemplate stringRedisTemplate;
  7. private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
  8. static {
  9. SECKILL_SCRIPT = new DefaultRedisScript<>();
  10. SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
  11. SECKILL_SCRIPT.setResultType(Long.class);
  12. }
  13. @Override
  14. public Result seckillVoucher(Long voucherId) {
  15. // 取出用户
  16. Long userId = UserHolder.getUser().getId();
  17. //1. 执行lua脚本
  18. Long result = stringRedisTemplate.execute(
  19. SECKILL_SCRIPT,
  20. Collections.emptyList(),
  21. voucherId.toString(),userId.toString()
  22. );
  23. int r=result.intValue();
  24. //2. 判断结果是否为0
  25. if(r != 0) {
  26. //2.1 不为0,没有购买资格
  27. return Result.fail(r==1 ? "库存不足" : "不能重复下单");
  28. }
  29. //2.1 为0,有购买资格,把下单信息保存到阻塞队列
  30. long orderId = redisIdWorker.nextId("order");
  31. // TODO 保存阻塞队列
  32. //3.返回订单id
  33. return Result.ok(orderId);
  34. }

④开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

基于阻塞队列实现秒杀业务

源代码有大改动,包括之前写好的createVoucherOrder,所以附上完整源码:

  1. @Service
  2. @Slf4j
  3. public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
  4. @Resource
  5. private ISeckillVoucherService seckillVoucherService;
  6. @Resource
  7. private RedisIdWorker redisIdWorker;
  8. @Resource
  9. private RedissonClient redissonClient;
  10. @Autowired
  11. private StringRedisTemplate stringRedisTemplate;
  12. private BlockingQueue<VoucherOrder> orderTasks=new ArrayBlockingQueue<>(1024*1024);
  13. private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
  14. private IVoucherOrderService proxy;
  15. @PostConstruct
  16. private void init(){
  17. SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
  18. }
  19. private class VoucherOrderHandler implements Runnable{
  20. @Override
  21. public void run() {
  22. while(true){
  23. try {
  24. //1.获取队列中的订单信息
  25. VoucherOrder voucherOrder = orderTasks.take();//没有任务时会自动阻塞
  26. //2.创建订单
  27. handleVoucherOrder(voucherOrder);
  28. } catch (InterruptedException e) {
  29. log.error("处理订单异常",e);
  30. }
  31. }
  32. }
  33. }
  34. private void handleVoucherOrder(VoucherOrder voucherOrder) {
  35. //1.获取用户
  36. Long userId = voucherOrder.getUserId();
  37. //2.创建锁对象
  38. RLock lock = redissonClient.getLock("lock:order:" + userId);
  39. //3.获取锁
  40. boolean isLock = lock.tryLock();
  41. //4.判断是否获取锁成功
  42. if(!isLock){
  43. log.error("不允许重复下单");
  44. return;
  45. }
  46. try{
  47. proxy.createVoucherOrder(voucherOrder);
  48. }finally{
  49. lock.unlock();
  50. }
  51. }
  52. private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
  53. static {
  54. SECKILL_SCRIPT = new DefaultRedisScript<>();
  55. SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
  56. SECKILL_SCRIPT.setResultType(Long.class);
  57. }
  58. @Override
  59. public Result seckillVoucher(Long voucherId) {
  60. // 取出用户
  61. Long userId = UserHolder.getUser().getId();
  62. //1. 执行lua脚本
  63. Long result = stringRedisTemplate.execute(
  64. SECKILL_SCRIPT,
  65. Collections.emptyList(),
  66. voucherId.toString(),userId.toString()
  67. );
  68. int r=result.intValue();
  69. //2. 判断结果是否为0
  70. if(r != 0) {
  71. //2.1 不为0,没有购买资格
  72. return Result.fail(r==1 ? "库存不足" : "不能重复下单");
  73. }
  74. //2.1 为0,有购买资格,把下单信息保存到阻塞队列
  75. long orderId = redisIdWorker.nextId("order");
  76. // TODO 保存阻塞队列
  77. VoucherOrder voucherOrder = new VoucherOrder();
  78. voucherOrder.setId(orderId);
  79. voucherOrder.setUserId(userId);
  80. voucherOrder.setVoucherId(voucherId);
  81. orderTasks.add(voucherOrder);
  82. //3.返回订单id
  83. return Result.ok(orderId);
  84. }
  85. @Transactional
  86. public void createVoucherOrder(VoucherOrder voucherOrder) {
  87. //一人一单
  88. Long userId = voucherOrder.getUserId();
  89. Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
  90. if (count > 0) {
  91. //用户已经存在
  92. log.error("用户已经购买过一次");
  93. return;
  94. }
  95. //5.扣减库存
  96. boolean success = seckillVoucherService
  97. .update()
  98. .setSql("stock=stock-1")
  99. .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0)
  100. .update();
  101. if (!success) {
  102. //扣减失败
  103. log.error("库存不足");
  104. }
  105. save(voucherOrder);
  106. }
  107. }

秒杀业务的优化思路是什么?

①先利用Redis完成库存余量、一人一单判断,完成抢单业务

②再将下单业务放入阻塞队列,利用独立线程异步下单

基于阻塞队列的异步秒杀存在哪些问题?

内存限制问题

数据安全问题

六、Redis消息队列

认识消息队列

消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

消息队列:存储和管理消息,也被称为消息代理(Message Broker)

生产者:发送消息到消息队列

消费者:从消息队列获取消息并处理消息

Redis提供了三种不同的方式来实现消息队列:

list结构:基于List结构模拟消息队列

PubSub:基本的点对点消息模型

Stream:比较完善的消息队列模型

基于List结构模拟消息队列

消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果

队列的入口和出口不在一边,因此我们可以利用:LPUSH结合RPOP、或者RPUSH结合LPOP来实现

不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用的是BRPOP或者BLPOP来实现阻塞效果。

示例:

基于List的消息队列有哪些优缺点?

优点:

        利用Redis存储,不受限于JVM内存上限

        基于Redis的持久化机制,数据安全性有保证

        可以满足消息有序性

缺点:

        无法避免消息丢失

        只支持单消费者

基于PubSub的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息

SUBSCRIBE channel [channel] :订阅一个或多个频道

PUBLISH channel msg:向一个频道发送消息

PSUBSCRIBE pattern [pattern]:订阅与pattern格式匹配的所有频道

        pattern:

        ?代表一个字符

        * 代表零个或多个字符

        [ae] 代表可以是a也可以是e

示例:

 

基于PubSub的消息队列有哪些优缺点?

优点:

        采用发布订阅模型,支持多生产、多消费

缺点:

        不支持数据持久化

        无法避免消息丢失

        消息堆积有上限,超出时数据丢失

 基于Stream的消息队列

Stream是Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列

示例1:

 

===>结论 :基于stream的消息队列,消息会被持久化存储,可以被多个消费者读取,也可以被一个消费者读取多次

示例2:阻塞等待,block 0 代表一直阻塞直到有新消息的到来

 

消费者应用:

 bug:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题

消息漏读:

STREAM类型消息队列的XREAD命令特点:

消息可回溯

一个消息可以被多个消费者读取

可以阻塞读取

有消息漏读的风险

基于Stream的消费队列---消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

 

 ==>队列中的消息如果还想用,ID从0开始;如果不想用了,ID从$开始

示例:

Java代码实现:

STREAM类型消息队列的XREADGROUP命令特点:

消息可回溯

可以多消费者争抢消息,加快消费速度

可以阻塞读取

没有消息漏读的风险

有消息确认机制,保证消息至少被消费一次

总结:

基于Redis的Stream结构作为消息队列,实现异步秒杀下单

需求:

①创建一个Stream类型的消息队列,名为stream.orders

参数MKSTREAM,在创建组的时候,如果消息队列不存在,则自动创建组和队列

②修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId

添加orderId:

  1. --1.3 订单id
  2. local orderId = ARGV[3]

发送消息到队列中

  1. -- 3.6 发送消息到队列中
  2. redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)

改造Java代码:

  1. @Override
  2. public Result seckillVoucher(Long voucherId) {
  3. // 取出用户
  4. Long userId = UserHolder.getUser().getId();
  5. long orderId = redisIdWorker.nextId("order");
  6. //1. 执行lua脚本
  7. Long result = stringRedisTemplate.execute(
  8. SECKILL_SCRIPT,
  9. Collections.emptyList(),
  10. voucherId.toString(),userId.toString(),String.valueOf(orderId)
  11. );
  12. int r=result.intValue();
  13. //2. 判断结果是否为0
  14. if(r != 0) {
  15. //2.1 不为0,没有购买资格
  16. return Result.fail(r==1 ? "库存不足" : "不能重复下单");
  17. }
  18. proxy=(IVoucherOrderService) AopContext.currentProxy();
  19. //3.返回订单id
  20. return Result.ok(orderId);
  21. }

③项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

  1. private class VoucherOrderHandler implements Runnable{
  2. String queueName = "stream.orders";
  3. @Override
  4. public void run() {
  5. while(true) {
  6. try {
  7. //1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order >
  8. List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
  9. Consumer.from("g1", "c1"),
  10. StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
  11. StreamOffset.create(queueName, ReadOffset.lastConsumed())
  12. );
  13. //2.判断消息获取是否成功
  14. if (list == null || list.isEmpty()) {
  15. // 如果获取失败,说明没有消息,继续下一次循环
  16. continue;
  17. }
  18. // 解析消息中的订单信息
  19. MapRecord<String, Object, Object> record = list.get(0);
  20. Map<Object, Object> values = record.getValue();
  21. VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
  22. //3.创建订单
  23. handleVoucherOrder(voucherOrder);
  24. //4.ACK确认 SACK stream.orders g1 id
  25. stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
  26. } catch (Exception e) {
  27. log.error("处理订单异常", e);
  28. handlePendingList();
  29. }
  30. }
  31. }
  32. private void handlePendingList(){
  33. while(true){
  34. try{
  35. //1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS streams.order 0
  36. List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
  37. Consumer.from("g1", "c1"),
  38. StreamReadOptions.empty().count(1),
  39. StreamOffset.create(queueName, ReadOffset.from("0"))
  40. );
  41. //2.判断消息是否获取成功
  42. if(list == null || list.isEmpty()){
  43. //没有消息,说明pending-list中没有异常消息,结束循环
  44. break;
  45. }
  46. //3.解析消息中的订单信息
  47. MapRecord<String, Object, Object> record = list.get(0);
  48. Map<Object, Object> values = record.getValue();
  49. VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
  50. //4.如果获取成功,可以下单
  51. handleVoucherOrder(voucherOrder);
  52. //5.ACK确认
  53. stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
  54. }catch(Exception e){
  55. log.error("处理pending-list订单异常",e);
  56. try {
  57. Thread.sleep(20);
  58. } catch (InterruptedException ex) {
  59. ex.printStackTrace();
  60. }
  61. }
  62. }
  63. }
  64. }

七、达人探店

发布探店笔记

探店笔记类似点评网站的评价,往往是图文结合。对应的表有两个:

tb_blog:探店笔记表,包含笔记中的标题、文字、图片等

tb_blog_comments:其他用户对探店笔记的评价

第一步:将SystemConstants的常量改为部署在nginx项目下的imgs目录

    public static final String IMAGE_UPLOAD_DIR = "D:\\lesson\\nginx-1.18.0\\html\\hmdp\\imgs\\";

测试效果:

 

实现查看发布探店笔记的接口

Controller接口:

  1. @GetMapping("/{id}")
  2. public Result queryBlogById(@PathVariable("id") Long id){
  3. return blogService.queryBlogById();
  4. }

Service实现:

  1. @Override
  2. public Result queryBlogById(Long id) {
  3. //1.查询blog
  4. Blog blog = getById(id);
  5. if(blog == null){
  6. return Result.fail("笔记不存在!");
  7. }
  8. //2.查询blog有关的用户
  9. queryBlogUser(blog);
  10. return Result.ok(blog);
  11. }
  12. private void queryBlogUser(Blog blog) {
  13. Long userId = blog.getUserId();
  14. User user = userService.getById(userId);
  15. blog.setName(user.getNickName());
  16. blog.setIcon(user.getIcon());
  17. }

点赞

需求:

同一个用户只能点赞一次,再次点击则取消点赞

如果当前用户已经点赞,则点赞按钮高亮显示(前段已实现,判断字段Blog类的isLike属性)

实现步骤:

①给Blog类中添加一个isLike字段,标识是否被当前用户点赞

  1. /**
  2. * 是否点赞过了
  3. */
  4. @TableField(exist = false)
  5. private Boolean isLike;

②修改点赞功能,利用Redis的set集合判断是否点赞过,未点赞过则点赞数+1,已点赞过则点赞数-1

③修改根据id查询Blog的业务,判断当前登录用户是否点赞过,赋值給isLike字段

④修改分页查询Blog业务,判断当前登录用户是否点赞过,赋值给isLike字段

  1. @PutMapping("/like/{id}")
  2. public Result likeBlog(@PathVariable("id") Long id) {
  3. return blogService.likeBlog(id);
  4. }
  1. @Override
  2. public Result queryHotBlog(Integer current) {
  3. // 根据用户查询
  4. Page<Blog> page = query()
  5. .orderByDesc("liked")
  6. .page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
  7. // 获取当前页数据
  8. List<Blog> records = page.getRecords();
  9. // 查询用户
  10. records.forEach(blog->{
  11. this.queryBlogUser(blog);
  12. this.isBlogLiked(blog);
  13. });
  14. return Result.ok(records);
  15. }
  16. @Override
  17. public Result queryBlogById(Long id) {
  18. //1.查询blog
  19. Blog blog = getById(id);
  20. if(blog == null){
  21. return Result.fail("笔记不存在!");
  22. }
  23. //2.查询blog有关的用户
  24. queryBlogUser(blog);
  25. //3.查询blog是否被点赞
  26. isBlogLiked(blog);
  27. return Result.ok(blog);
  28. }
  29. private void isBlogLiked(Blog blog) {
  30. Long userId = UserHolder.getUser().getId();
  31. String key = "blog:liked:" + blog.getId();
  32. Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
  33. blog.setIsLike(BooleanUtil.isTrue(isMember));
  34. }
  35. @Override
  36. public Result likeBlog(Long id) {
  37. //1. 获取登录用户
  38. Long userId = UserHolder.getUser().getId();
  39. //2. 判断当前登录用户是否已经点赞
  40. String key = "blog:liked:" + id;
  41. Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
  42. if(BooleanUtil.isFalse(isMember)) {
  43. //3. 如果未点赞,可以点赞
  44. //3.1 数据库点赞数+1
  45. boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
  46. //3.2 保存用户到Redis的set集合
  47. if(isSuccess){
  48. stringRedisTemplate.opsForSet().add(key,userId.toString());
  49. }
  50. }else {
  51. //4. 如果已经点赞,则取消点赞
  52. //4.1 数据库点赞数-1
  53. boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
  54. //4.2 把用户从Redis的set集合移除
  55. if (isSuccess) {
  56. stringRedisTemplate.opsForSet().remove(key, userId.toString());
  57. }
  58. }
  59. return Result.ok();
  60. }


点赞排行榜

需求:按照点赞时间先后排序,返回Top5的用户

一人只能点赞一次功能实现:

因为SortedSet中没有isMember的判断,所以在添加元素的时候加上时间戳;查询的时候如果对应的元素没有时间戳,则代表集合中没有这个元素

  1. private void isBlogLiked(Blog blog) {
  2. UserDTO user = UserHolder.getUser();
  3. if(user==null){
  4. return;
  5. }
  6. Long userId = UserHolder.getUser().getId();
  7. String key = BLOG_LIKED_KEY + blog.getId();
  8. Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
  9. blog.setIsLike(score!=null);
  10. }
  11. @Override
  12. public Result likeBlog(Long id) {
  13. //1. 获取登录用户
  14. Long userId = UserHolder.getUser().getId();
  15. //2. 判断当前登录用户是否已经点赞
  16. String key = BLOG_LIKED_KEY + id;
  17. Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
  18. if(score==null) {
  19. //3. 如果未点赞,可以点赞
  20. //3.1 数据库点赞数+1
  21. boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
  22. //3.2 保存用户到Redis的set集合 zadd key value score
  23. if(isSuccess){
  24. stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());
  25. }
  26. }else {
  27. //4. 如果已经点赞,则取消点赞
  28. //4.1 数据库点赞数-1
  29. boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
  30. //4.2 把用户从Redis的set集合移除
  31. if (isSuccess) {
  32. stringRedisTemplate.opsForZSet().remove(key, userId.toString());
  33. }
  34. }
  35. return Result.ok();
  36. }

查询点赞排行前五:

  1. @Override
  2. public Result queryBlogLikes(Long id) {
  3. String key=BLOG_LIKED_KEY+id;
  4. //1. 查询top5的点赞用户 zrange key 0 4
  5. Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
  6. if(top5 == null || top5.isEmpty()){
  7. return Result.ok(Collections.emptyList());
  8. }
  9. //2. 解析出其中的用户id
  10. List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
  11. String idStr = StrUtil.join(",", ids);
  12. //3. 根据用户id查询用户
  13. List<UserDTO> userDTOs = userService
  14. .query()
  15. .in("id",ids)
  16. .last("order by field(id,"+idStr+")")
  17. .list()
  18. .stream()
  19. .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
  20. .collect(Collectors.toList());
  21. //4. 返回
  22. return Result.ok(userDTOs);
  23. }

八、好友关注

关注和取关

 需求:基于该表数据结构,实现两个接口:

①关注和取关接口

②判断是否关注的接口

Controller实现:

  1. @RestController
  2. @RequestMapping("/follow")
  3. public class FollowController {
  4. @Resource
  5. private IFollowService followService;
  6. @PutMapping("/{id}/{isFollow}")
  7. public Result follow(@PathVariable("id")Long followUserId,@PathVariable("isFollow")Boolean isFollow){
  8. return followService.follow(followUserId,isFollow);
  9. }
  10. @GetMapping("/or/not/{id}")
  11. public Result follow(@PathVariable("id")Long followUserId){
  12. return followService.isFollow(followUserId);
  13. }
  14. }

ServiceImpl具体实现:

  1. @Service
  2. public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {
  3. @Override
  4. public Result follow(Long followUserId, Boolean isFollow) {
  5. // 获取登录用户
  6. Long userId = UserHolder.getUser().getId();
  7. //1.判断到底是关注还是取关
  8. if (isFollow) {
  9. //2.关注,新增数据
  10. Follow follow = new Follow();
  11. follow.setUserId(userId);
  12. follow.setFollowUserId(followUserId);
  13. save(follow);
  14. } else {
  15. //3.取关,删除
  16. remove(new QueryWrapper<Follow>().eq("user_id",userId).eq("follow_user_id",followUserId));
  17. }
  18. return Result.ok();
  19. }
  20. @Override
  21. public Result isFollow(Long followUserId) {
  22. //1.获取登录用户
  23. Long userId = UserHolder.getUser().getId();
  24. //2.查询是否关注
  25. Integer count = query().eq("user_id", userId).eq("follow_user_id", followUserId).count();
  26. //3.判断
  27. return Result.ok(count>0);
  28. }
  29. }

共同关注

将下面的代码插入到UserController:

  1. @GetMapping("/{id}")
  2. public Result queryUserById(@PathVariable("id")Long userId){
  3. User user = userService.getById(userId);
  4. if(user == null){
  5. return Result.ok();
  6. }
  7. UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
  8. return Result.ok(userDTO);
  9. }

将下面的代码插入到BlogController

  1. @GetMapping("/of/user")
  2. public Result queryBlogByUserId(
  3. @RequestParam(value="current",defaultValue = "1")Integer current,
  4. @RequestParam("id")Long id){
  5. Page<Blog> page = blogService.query().eq("user_id", id).page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
  6. List<Blog> records = page.getRecords();
  7. return Result.ok(records);
  8. }

要实现共同关注的查找,可以考虑集合set的交集

需求:利用Redis中恰当的数据结构,实现共同关注功能。在博主个人页面展示出当前用户与博主的共同好友

①修改follow代码,将用户和关注的用户加入集合

  1. @Override
  2. public Result follow(Long followUserId, Boolean isFollow) {
  3. // 获取登录用户
  4. Long userId = UserHolder.getUser().getId();
  5. String key="follows:"+userId;
  6. //1.判断到底是关注还是取关
  7. if (isFollow) {
  8. //2.关注,新增数据
  9. Follow follow = new Follow();
  10. follow.setUserId(userId);
  11. follow.setFollowUserId(followUserId);
  12. boolean isSuccess = save(follow);
  13. //判断是否关注成功
  14. if(isSuccess){
  15. //如果关注成功,把关注用户的id放入redis的set集合
  16. stringRedisTemplate.opsForSet().add(key,followUserId.toString());
  17. }
  18. } else {
  19. //3.取关,删除
  20. boolean isSuccess = remove(new QueryWrapper<Follow>().eq("user_id", userId).eq("follow_user_id", followUserId));
  21. if (isSuccess) {
  22. stringRedisTemplate.opsForSet().remove(key, followUserId.toString());
  23. }
  24. }
  25. return Result.ok();
  26. }

②求共同关注

controller:

  1. @GetMapping("/commons/{id}")
  2. public Result followCommons(@PathVariable("id") Long id){
  3. return followService.followCommons(id);
  4. }

具体实现:

  1. @Resource
  2. private IUserService userService;
  3. @Override
  4. public Result followCommons(Long id) {
  5. //1.获取当前用户
  6. Long userId = UserHolder.getUser().getId();
  7. String key="follows:"+userId;
  8. //2.求交集
  9. String key2="follows:"+id;
  10. Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key, key2);
  11. if(intersect==null || intersect.isEmpty()){
  12. //无交集
  13. return Result.ok(Collections.emptyList());
  14. }
  15. //3.解析id集合
  16. List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
  17. //4.查询用户
  18. List<UserDTO> users = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());
  19. return Result.ok(users);
  20. }

运行效果:

关注推送

Feed流实践方案分析

关注推送也叫作Feed流,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无线下拉刷新获取新的信息。

Feed流产品有两种常见的模式:

Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈。

        优点:信息全面,不会有缺失。并且实现也相对简单

        缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低

智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣的信息来吸引用户

        优点:投喂用户感兴趣的信息,用户粘度很高,容易沉迷

        缺点:如果算法不精准,可能起到反作用

本例中的个人页面,是基于关注的好友来做Feed流,因此采用Timeline的模式。该模式的实现方案有三种:

①拉模式

②推模式

③推拉结合

拉模式:也叫读扩散

每次发送消息的时候都会加上一个时间戳,当用户打开个人页面,就会去关注的人的发件箱里拉取消息并按时间戳排序。缺点是耗时较长,优点是节省内存。

推模式:也叫作写扩散。

关注的博主发消息的时候会直接把消息推送到个人主页,并排好序。在本人阅读个人主页的时候,无需等待拉取信息等。优点是延时低,缺点是消耗内存。

推拉结合模式:也叫读写混合,兼具推和拉两种模式的优点。

对于普通人,发送的消息可以直接推送到粉丝的收件箱。

对于大V,发送的消息,对于活跃粉丝使用推模式,对于普通粉丝使用拉模式。

总结:

 基于推模式实现关注推送功能

需求:

①修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱

②收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现

③查询收件箱数据时,可以实现分页查询

Feed流的滚动分页:

Feed流中的数据会不断更新,所以数据的角标也在变化,因此不能采用传统的分页模式。

接口:

  1. @PostMapping
  2. public Result saveBlog(@RequestBody Blog blog) {
  3. return blogService.saveBlog(blog);
  4. }

具体实现:

  1. @Resource
  2. private IFollowService followService;
  3. @Override
  4. public Result saveBlog(Blog blog) {
  5. //1.获取登录用户
  6. UserDTO user = UserHolder.getUser();
  7. blog.setUserId(user.getId());
  8. //2.保存探店笔记
  9. boolean isSuccess = save(blog);
  10. if(!isSuccess){
  11. return Result.fail("新增笔记失败!");
  12. }
  13. //3.查询笔记作者的所有粉丝
  14. List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();
  15. //4.返回id
  16. for(Follow follow:follows){
  17. //4.1 获取粉丝id
  18. Long userId = follow.getUserId();
  19. //4.2 推送
  20. String key="feed:"+userId;
  21. stringRedisTemplate.opsForZSet().add(key,blog.getId().toString(),System.currentTimeMillis());
  22. }
  23. return Result.ok(blog.getId());
  24. }

实现关注推送页面的分页查询

需求:在个人主页的“关注”卡片中,查询并展示推送的Blog信息:

以z1为例复习相应命令:

①倒序按范围查询

 ②混乱插入(此时插入一条数据,继续查询)

        可以发现按角标查询,会出现内容重复

 ③解决方法:可以按照分数查询

zrevrangebyscore key max min withscores limit offsest count

其中offset中0代表小于等于max的第一条,如果要实现小于max的第一条,则应将offset置1

count代表一次查询多少条(数量)

 ④存在的问题:如果value不一样,但是分数值一样,可能会查询到重复的数据

将m7的分数值改为6:

 此时再按照记忆的最后一条分数去做查询,会发现数据重复:

 ⑤总结:

滚动分页查询参数:

max:当前时间戳或者上一次查询的最小时间戳

min:0

offset:0 或者 在上一次的结果中,与最小值一样的元素的个数

count:3(与前端约定好)

 

实现滚动查询

①定义滚动查询结果的实体类

  1. @Data
  2. public class ScrollResult {
  3. private List<?> list;
  4. private Long minTime;
  5. private Integer offset;
  6. }

②定义接口

  1. @GetMapping("/of/follow")
  2. public Result queryBlogOfFollow(@RequestParam("lastId")Long max,@RequestParam(value = "offset",defaultValue = "0")Integer offset){
  3. return blogService.queryBlogOfFollow(max,offset);
  4. }

③具体实现

  1. @Override
  2. public Result queryBlogOfFollow(Long max, Integer offset) {
  3. //1.获取当前用户
  4. Long userId = UserHolder.getUser().getId();
  5. //2.查询收件箱
  6. String key=FEED_KEY+userId;
  7. Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);
  8. //3.非空判断
  9. if(typedTuples==null || typedTuples.isEmpty()){
  10. return Result.ok();
  11. }
  12. //4.解析数据:blogId,minTime(时间戳),offset
  13. ArrayList<Object> ids = new ArrayList<>(typedTuples.size());
  14. long minTime=0;
  15. int os=1;
  16. for(ZSetOperations.TypedTuple<String> tuple:typedTuples) {
  17. //4.1 获取id
  18. ids.add(Long.valueOf(tuple.getValue()));
  19. //4.2 获取分数(时间戳)
  20. long time = tuple.getScore().longValue();
  21. if (time == minTime) {
  22. os++;
  23. } else {
  24. minTime = time;
  25. os = 1;
  26. }
  27. }
  28. //5.根据id查询blog
  29. String idStr = StrUtil.join(",", ids);
  30. List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();
  31. for (Blog blog : blogs) {
  32. //5.1 查询blog有关的用户
  33. queryBlogUser(blog);
  34. //5.2 查询blog是否被点赞
  35. isBlogLiked(blog);
  36. }
  37. //6.封装并返回
  38. ScrollResult r = new ScrollResult();
  39. r.setList(blogs);
  40. r.setOffset(os);
  41. r.setMinTime(minTime);
  42. return Result.ok(r);
  43. }

九、附近商户

 GEO数据结构

 案例:练习Redis的GEO功能

需求:

1.添加下面几条数据:

北京南站(116.378248 39.865275)

北京站(116.42803 39.903738)

北京西站(116.322287 39.893729)

 查看redis客户端:

==>geo底层采用ZSET实现

2.计算北京西站到北京南站的距离

==>默认单位是米

3.搜索天安门(116.397904 39.909005)附近10km内的所有火车站,并按照距离升序排序

附近商户搜索

导入店铺数据到GEO

 按照商户类型做分组,类型相同的商户作为同一组,以typeId为key存入同一个GEO集合中即可

 导入数据(通过单元测试):

  1. @Test
  2. void loadShopData(){
  3. //1.查询店铺信息
  4. List<Shop> list = shopService.list();
  5. //2.把店铺按照typeId分组,id一致的放到一个集合
  6. Map<Long,List<Shop>> map=list.stream().collect(Collectors.groupingBy(Shop::getTypeId));
  7. //3.分批完成写入Redis
  8. for(Map.Entry<Long,List<Shop>> entry:map.entrySet()){
  9. //3.1 获取类型id
  10. Long typeId = entry.getKey();
  11. String key="shop:geo:"+typeId;
  12. //3.2 获取同类型的店铺集合
  13. List<Shop> value = entry.getValue();
  14. //3.3 写入redis
  15. List<RedisGeoCommands.GeoLocation<String>> locations=new ArrayList<>(value.size());
  16. for(Shop shop:value){
  17. locations.add(new RedisGeoCommands.GeoLocation<>(shop.getId().toString(),new Point(shop.getX(),shop.getY())));
  18. }
  19. stringRedisTemplate.opsForGeo().add(key,locations);
  20. }
  21. }

实现附近商户功能

首先,项目中redis相关的依赖版本太低,不支持GEOSEARCH方法,需要更换版本

我们将5.3.7和1.3.9版本排除,观察代码:

 然后手动引入新版本:

  1. <dependency>
  2. <groupId>org.springframework.data</groupId>
  3. <artifactId>spring-data-redis</artifactId>
  4. <version>2.6.2</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>io.lettuce</groupId>
  8. <artifactId>lettuce-core</artifactId>
  9. <version>6.1.6.RELEASE</version>
  10. </dependency>

接口:

  1. /**
  2. * 根据商铺类型分页查询商铺信息
  3. * @param typeId 商铺类型
  4. * @param current 页码
  5. * @return 商铺列表
  6. */
  7. @GetMapping("/of/type")
  8. public Result queryShopByType(
  9. @RequestParam("typeId") Integer typeId,
  10. @RequestParam(value = "current", defaultValue = "1") Integer current,
  11. @RequestParam(value = "x",required = false) Double x,
  12. @RequestParam(value = "y",required = false) Double y
  13. ) {
  14. return shopService.queryShopByType(typeId,current,x,y);
  15. }

具体实现:

  1. @Override
  2. public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
  3. //1.判断是否需要根据坐标查询
  4. if(x==null || y==null){
  5. Page<Shop> page = query().eq("type_id", typeId).page(new Page<>(current, DEFAULT_PAGE_SIZE));
  6. return Result.ok(page.getRecords());
  7. }
  8. //2.计算分页参数
  9. int from=(current-1)*DEFAULT_PAGE_SIZE;
  10. int end=current*DEFAULT_PAGE_SIZE;
  11. //3.查询redis、按照距离排序、分页 结果:shopId、distance
  12. String key=SHOP_GEO_KEY+typeId;
  13. GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo()
  14. .search(
  15. key,
  16. GeoReference.fromCoordinate(x, y),
  17. new Distance(5000),
  18. RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end)
  19. );
  20. //4.解析出id
  21. if(results==null){
  22. return Result.ok(Collections.emptyList());
  23. }
  24. List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();
  25. if(list.size()<=from){
  26. return Result.ok(Collections.emptyList());
  27. }
  28. //4.1 截取from ~ end的那部分
  29. List<Long> ids=new ArrayList<>(list.size());
  30. Map<String,Distance> distanceMap=new HashMap<>(list.size());
  31. list.stream().skip(from).forEach(result->{
  32. //4.2 获取店铺id
  33. String shopIdStr = result.getContent().getName();
  34. ids.add(Long.valueOf(shopIdStr));
  35. //4.3 获取距离
  36. Distance distance = result.getDistance();
  37. distanceMap.put(shopIdStr,distance);
  38. });
  39. //5.根据id查询店铺Shop
  40. String idStr = StrUtil.join(",", ids);
  41. List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();
  42. for(Shop shop:shops){
  43. shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
  44. }
  45. //6.返回
  46. return Result.ok(shops);
  47. }

效果:

 注意,在stream流中跳过了一部分商铺,很有可能导致跳过以后没有商铺可查,从而出现问题,所以需要加上如下判断:

  1. if(list.size()<=from){
  2. return Result.ok(Collections.emptyList());
  3. }

十、用户签到

 BitMap用法

我们按月来统计用户签到信息,签到记录为1,未签到则记录为0

 把每一个bit位对应当月的每一天,形成了映射关系。用0和1标示业务状态,这种思路就称为位图(BitMap)

Redis中就是利用String类型数据结构实现BitMap,因此最大上限是512M,转换为bit则是2^32个bit位。

 练习:

BITFIELD bm1 GET u2 0含义:读取bm1,以无符号十进制的方式读取两位(u2),从第一位开始读取。返回值3代表读取的比特位是11

BITPOS bm1 0 :代表查找第一个0出现的位置

 

 签到功能

需求:实现签到接口,将当前用户当天签到信息保存到Redis中

 提示:因为BitMap底层是基于String数据结构,因此其操作也都封装在字符串相关操作中了。(Redis的字符串)

接口:

  1. @PostMapping("/sign")
  2. public Result sign(){
  3. return userService.sign();
  4. }

具体实现:

  1. @Override
  2. public Result sign() {
  3. //1.获取当前登录的用户
  4. Long userId = UserHolder.getUser().getId();
  5. //2.获取日期
  6. LocalDateTime now = LocalDateTime.now();
  7. //3.拼接key
  8. String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
  9. String key=USER_SIGN_KEY+userId+keySuffix;
  10. //4.获取今天是本月的第几天
  11. int dayOfMonth=now.getDayOfMonth();
  12. //5.写入Redis
  13. stringRedisTemplate.opsForValue().setBit(key,dayOfMonth-1,true);
  14. return Result.ok();
  15. }

测试:

 

签到统计

Q1:什么叫做连续签到天数?

A1:从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数。

Q2:如何得到本月到今天为止的所有签到数据?

BITFIELD key GET u[dayOfMonth] 0

Q3:如何从后向前遍历每个bit位?

与1做与运算,就能得到最后一个bit位

随后右移1位,下一个bit位就成为了最后一个bit位

案例:实现签到统计功能

需求:实现下面接口,统计当前用户截止当前时间在本月的连续签到天数

 接口:

  1. @GetMapping("/sign/count")
  2. public Result signCount(){
  3. return userService.signCount();
  4. }

具体实现:

  1. @Override
  2. public Result signCount() {
  3. //1.获取当前登录用户
  4. Long userId = UserHolder.getUser().getId();
  5. //2.获取日期
  6. LocalDateTime now = LocalDateTime.now();
  7. //3.拼接key
  8. String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
  9. String key=USER_SIGN_KEY+userId+keySuffix;
  10. //4.获取今天是本月的第几天
  11. int dayOfMonth = now.getDayOfMonth();
  12. //5.获取本月截止今天为止的所有签到记录,返回的是一个十进制的数字
  13. List<Long> result = stringRedisTemplate.opsForValue().bitField(key,
  14. BitFieldSubCommands
  15. .create()
  16. .get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth))
  17. .valueAt(0));
  18. if(result==null || result.isEmpty()){
  19. //没有任何签到结果
  20. return Result.ok(0);
  21. }
  22. Long num = result.get(0);
  23. if(num==null || num==0){
  24. return Result.ok(0);
  25. }
  26. //6.循环遍历
  27. int count=0;
  28. while(true){
  29. //6.1 让这个数字与1做与运算,得到数字的最后一个bit位
  30. if((num&1)==0){
  31. //如果为0,说明未签到,结束
  32. break;
  33. }else{
  34. //如果不为0,说明已签到,计数器+1
  35. count++;
  36. }
  37. num >>>= 1; // >>>代表无符号右移
  38. }
  39. return Result.ok(count);
  40. }

测试:返回值为3

 实际上也确实是3个连续的1:

 测试通过!

十一、UV统计

HyperLogLog用法

UV:全称Unique Visitor,也叫独立访问量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次

PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。

UV统计在服务端做会比较麻烦,因为要判断用户是否已经统计过了,要将统计过的用户信息保存,但是如果每个访问的用户都保存到Redis中,数据量会非常恐怖。

 命令练习:

 

实现UV统计

我们直接利用单元测试,向HyperLogLog中添加100万条数据,看看内存占用和统计效果如何:

  1. @Test
  2. void testHyperLogLog(){
  3. String[] values=new String[1000];
  4. int j=0;
  5. for (int i = 0; i < 1000000; i++) {
  6. j=i%1000;
  7. values[j]="user_"+i;
  8. if(j==999){
  9. //发送到Redis
  10. stringRedisTemplate.opsForHyperLogLog().add("hl2",values);
  11. }
  12. }
  13. //统计数量
  14. Long count = stringRedisTemplate.opsForHyperLogLog().size("hl2");
  15. System.out.println("count="+count);
  16. }

运行结果:

内存消耗:

运行前内存:

 运行后内存:

1929640-1505552= 424088

424088/1024/1024=0.4M

 


Redis实战篇完结,恭喜大家~~❀

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/682905
推荐阅读
相关标签
  

闽ICP备14008679号