当前位置:   article > 正文

《黑马点评》Redis高并发项目实战笔记【完结】P1~P72_黑马点评笔记

黑马点评笔记

花费4周敲完《黑马点评》的课程,做了详细的笔记,感觉受益匪浅,一直一直都在不停成长着。

突然想起《苍穹外卖》系列至今已收获200+个赞,500+个收藏,好评颇多,私信我的人不计其数,在此谢谢大家。

下一篇开始学习12306订票系统项目,大家敬请期待吧。

如有问题欢迎加文末微信,商业订单、项目需求都欢迎前来咨询。想进粉丝群的朋友们见文末。

 P1 Redis企业实战课程介绍

P2 短信登录 导入黑马点评项目

首先在数据库连接下新建一个数据库hmdp,然后右键hmdp下的表,选择运行SQL文件,然后指定运行文件hmdp.sql即可(建议MySQL的版本在5.7及以上):

下面这个hm-dianping文件是项目源码。在IDEA中打开。

记得要修改数据库连接和Redis连接的密码:

运行程序后尝试访问:localhost:8081/shop-type/list 进行简单测试:

将nginx文件复制到一个没有中文路径的目录,然后点击nginx.exe运行:

在nginx所在目录打开CMD窗口,输入命令:start nginx.exe

访问:localhost:8080,选择用手机模式看,可以看到具体的页面:

P3 短信登录 基于session实现短信登录的流程

点击发送验证码可以看到验证码发送成功:

P4 短信登录 实现发送短信验证码功能

 controller/UserController中写入如下代码:

  1. @PostMapping("code")
  2. public Result sendCode(@RequestParam("phone") String phone, HttpSession session) {
  3. //发送短信验证码并保存验证码
  4. return userService.sendCode(phone,session);
  5. }

service/IUserService中写入如下代码:

  1. public interface IUserService extends IService<User> {
  2. Result sendCode(String phone, HttpSession session);
  3. }

service/impl/UserServiceImpl中写入如下代码:

  1. @Service
  2. public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
  3. @Override
  4. public Result sendCode(String phone, HttpSession session) {
  5. //校验手机号
  6. if(RegexUtils.isPhoneInvalid(phone)){
  7. //不符合
  8. return Result.fail("手机号格式错误");
  9. }
  10. //生成验证码
  11. String code = RandomUtil.randomNumbers(6);
  12. //保存验证码到session
  13. session.setAttribute("code",code);
  14. //发送验证码
  15. log.debug("发送短信验证码成功,验证码:"+code);
  16. return Result.ok();
  17. }
  18. }

P5 短信登录 实现短信验证码登录和注册功能

service/impl/UserServiceImpl的UserServiceImpl中写入如下代码:

  1. @Override
  2. public Result login(LoginFormDTO loginForm, HttpSession session) {
  3. String phone = loginForm.getPhone();
  4. //校验手机
  5. if(RegexUtils.isPhoneInvalid(phone)){
  6. return Result.fail("手机号格式错误");
  7. }
  8. //校验验证码
  9. Object cacheCode = session.getAttribute("code");
  10. String code = loginForm.getCode();
  11. if(cacheCode==null || !cacheCode.toString().equals(code)){
  12. //不一致,报错
  13. return Result.fail("验证码错误");
  14. }
  15. //一致根据手机号查用户
  16. User user = query().eq("phone", phone).one();
  17. //判断用户是否存在
  18. if(user==null){
  19. //不存在,创建用户并保存
  20. user = createUserWithPhone(loginForm.getPhone());
  21. }
  22. //保存用户信息到session
  23. session.setAttribute("user",user);
  24. return null;
  25. }
  26. private User createUserWithPhone(String phone){
  27. //1.创建用户
  28. User user = new User();
  29. user.setPhone(phone);
  30. user.setNickName(USER_NICK_NAME_PREFIX+RandomUtil.randomString(10));
  31. //2。保存用户
  32. save(user);
  33. return user;
  34. }

前端点击发送验证码,后端直接把验证码摘抄后输入:

 

勾选协议然后确定登录,出现如下代码:

然后看到数据库后台记录已更新:

P6 短信登录 实现登录校验拦截器

preHandle前置拦截:

postHandle后置拦截:

afterCompletion视图渲染之后返回给用户之前:

在utils下面编写一个LoginInterceptor类,实现preHandle和afterCompletion这两个方法(这里User和UserDto的问题,我推荐的是统一使用UserDto,采用BeanUtils里的copy方法即可):

  1. public class LoginInterceptor implements HandlerInterceptor {
  2. @Override
  3. public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
  4. //获取session
  5. HttpSession session = request.getSession();
  6. //获取用户
  7. User user = (User) session.getAttribute("user");
  8. //判断用户是否存在
  9. if(user==null){
  10. response.setStatus(401);
  11. return false;
  12. }
  13. UserDTO userDTO = new UserDTO();
  14. BeanUtils.copyProperties(user,userDTO);
  15. //存在,保存用户信息的ThreadLocal
  16. UserHolder.saveUser(userDTO);
  17. //放行
  18. return true;
  19. }
  20. @Override
  21. public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
  22. //移除用户
  23. UserHolder.removeUser();
  24. }
  25. }

在config下面创建一个MvcConfig类:

通过addInterceptors方法来添加拦截器,registry是拦截器的注册器。

用.excludePathPatterns来排除不需要拦截的路径。在这里code、login、bloghot、shop、shopType、upload和voucher等都不需要拦截。

  1. @Configuration
  2. public class MvcConfig implements WebMvcConfigurer {
  3. @Override
  4. public void addInterceptors(InterceptorRegistry registry){
  5. registry.addInterceptor(new LoginInterceptor())
  6. .excludePathPatterns(
  7. "/user/code",
  8. "/user/login",
  9. "/upload/**",
  10. "/blog/hot",
  11. "/shop/**",
  12. "/shop-type/**",
  13. "/voucher/**"
  14. );
  15. }
  16. }

输入手机号码点击获取验证码,写入返回后端的验证码,勾选协议之后,登录会直接返回首页,此时看我的个人主页没问题:

P7 短信登录 隐藏用户敏感信息

在P6已将User转为UserDTO返回给前端。

P8 短信登录 session共享的问题分析

多台Tomcat并不共享session存储空间,当请求切换不同Tomcat服务器时会导致数据丢失的问题。

session的替代方案应该满足:1.数据共享。2.内存存储。3.key、value结构。

P9 短信登录 Redis代替session的业务流程

想要保存用户的登录信息有2种方法:1.用String类型。2.用Hash类型。

String类型是以JSON字符串格式来保存,比较简单直观,但是占用内存比较多(因为有name和age这类的json格式):

Hash结构可以将对象中的每个字段独立存储,可以针对单个字段做CRUD,并且内存占用更少:

以随机的token作为key来存储用户的数据,token是用一个随机的字符串。

P10 短信登录 基于Redis实现短信登录

在UserServiceImpl中写入如下代码(调用StringRedisTemplate中的set方法进行数据插入,最好在key的前面加入业务前缀以示区分,形成区分):

  1. @Resource
  2. private StringRedisTemplate stringRedisTemplate;

在sendCode这个方法里将保存验证码的代码替换为下面:

  1. //保存验证码到redis
  2. stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY+phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);

在login这个方法里进行如下2处修改:

 首先是校验验证码:

  1. //校验验证码
  2. String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);

然后是添加把用户信息添加到Redis的逻辑:

  1. //7.保存用户信息到redis----------------
  2. //7.1 随机生成Token作为登录令牌
  3. String token = UUID.randomUUID().toString(true);
  4. //7.2 将User对象转为Hash存储
  5. UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
  6. Map<String, Object> userMap = BeanUtil.beanToMap(userDTO);
  7. //7.3 存储
  8. stringRedisTemplate.opsForHash().putAll("login:token:"+token,userMap);
  9. //7.4设置token有效期
  10. String tokenKey = LOGIN_USER_KEY+token;
  11. stringRedisTemplate.expire(tokenKey,LOGIN_USER_TTL,TimeUnit.MINUTES);
  12. return Result.ok(token);

在MvcConfig类上有@Configuration注解,说明是由Spring来负责依赖注入。 

在MvcConfig类中要编写如下的代码:

  1. @Configuration
  2. public class MvcConfig implements WebMvcConfigurer {
  3. @Resource
  4. private StringRedisTemplate stringRedisTemplate;
  5. @Override
  6. public void addInterceptors(InterceptorRegistry registry){
  7. registry.addInterceptor(new LoginInterceptor(stringRedisTemplate))
  8. .excludePathPatterns(
  9. "/user/code",
  10. "/user/login",
  11. "/upload/**",
  12. "/blog/hot",
  13. "/shop/**",
  14. "/shop-type/**",
  15. "/voucher/**"
  16. );
  17. }
  18. }

 在utils下的LoginInterceptor中写入如下代码:

  1. public class LoginInterceptor implements HandlerInterceptor {
  2. @Resource
  3. private StringRedisTemplate stringRedisTemplate;
  4. public LoginInterceptor(StringRedisTemplate stringRedisTemplate){
  5. this.stringRedisTemplate = stringRedisTemplate;
  6. }
  7. @Override
  8. public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
  9. //TODO;1.获取请求头中的token
  10. String token = request.getHeader("authorization");
  11. if(StrUtil.isBlank(token)){
  12. //不存在,拦截,返回401状态码
  13. response.setStatus(401);
  14. return false;
  15. }
  16. //TODO:2.基于TOKEN获取redis的用户
  17. Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(LOGIN_USER_KEY + token);
  18. //判断用户是否存在
  19. if(userMap.isEmpty()){
  20. //不存在,拦截,返回401状态码
  21. response.setStatus(401);
  22. return false;
  23. }
  24. //TODO:3.将查询到的Hash数据转化为UserDTO对象
  25. UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
  26. //TODO:4.存在,保存用户信息的ThreadLocal
  27. UserHolder.saveUser(userDTO);
  28. //TODO:5.刷新token有效期
  29. stringRedisTemplate.expire(LOGIN_USER_KEY + token,RedisConstants.LOGIN_USER_TTL, TimeUnit.MINUTES);
  30. //放行
  31. return true;
  32. }
  33. @Override
  34. public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
  35. //移除用户
  36. UserHolder.removeUser();
  37. }
  38. }

测试:首先把Redis和数据库都启动。 原始的项目的Redis的服务器ID需要更改为自己的。点击发送验证码,redis中有记录,没问题:

但点击登录的时候会报一个无法将Long转String的错误。因为用的是stringRedisTemplate要求所有的字段都是string类型的。

需要对UserServiceImpl中如下的位置进行修改:

  1. Map<String, Object> userMap = BeanUtil.beanToMap(userDTO,new HashMap<>(),
  2. CopyOptions.create()
  3. .setIgnoreNullValue(true)
  4. .setFieldValueEditor((fieldName,fieldValue)->fieldValue.toString()));

效果如下:

P11 短信登录 解决状态登录刷新问题

现在只有在用户访问拦截器拦截的页面才会刷新页面,假如用户访问的是不需要拦截的页面则不会导致页面的刷新。

现在的解决思路是:新增一个拦截器,拦截一切路径。

复制LoginInterceptor变成一份新的RefreshTokenInterceptor,把下面几处地方改为return true即可:

LoginInterceptor的代码变成如下:

  1. public class LoginInterceptor implements HandlerInterceptor {
  2. @Override
  3. public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
  4. //1.判断是否需要拦截(ThreadLocal中是否有用户)
  5. if(UserHolder.getUser()==null){
  6. //没有,需要拦截,设置状态码
  7. response.setStatus(401);
  8. //拦截
  9. return false;
  10. }
  11. //放行
  12. return true;
  13. }
  14. @Override
  15. public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
  16. //移除用户
  17. UserHolder.removeUser();
  18. }
  19. }

现在还需要在MvcConfig里面对拦截器进行更新配置,需要(用order)调整拦截器的执行顺序: 

  1. @Configuration
  2. public class MvcConfig implements WebMvcConfigurer {
  3. @Resource
  4. private StringRedisTemplate stringRedisTemplate;
  5. @Override
  6. public void addInterceptors(InterceptorRegistry registry){
  7. registry.addInterceptor(new LoginInterceptor())
  8. .excludePathPatterns(
  9. "/user/code",
  10. "/user/login",
  11. "/upload/**",
  12. "/blog/hot",
  13. "/shop/**",
  14. "/shop-type/**",
  15. "/voucher/**"
  16. ).order(1);
  17. registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate))
  18. .addPathPatterns("/**").order(0);
  19. }
  20. }

P12 什么是缓存

缓存就是数据交换的缓冲区,是存储数据的临时地方,一般读写性能较高。

缓存作用:降低后端负载;提高读写的效率,降低响应时间。

缓存成本:数据一致性成本(数据库里的数据如果发生变化,容易与缓存中的数据形成不一致)。代码维护成本高(搭建集群)。运营成本高。

P13 添加商户缓存

在ShopController类的queryShopById方法中:

  1. @GetMapping("/{id}")
  2. public Result queryShopById(@PathVariable("id") Long id) {
  3. return Result.ok(shopService.queryById(id));
  4. }

在IShopService接口中编写如下代码:

  1. public interface IShopService extends IService<Shop> {
  2. Object queryById(Long id);
  3. }

在ShopServiceImpl类的queryById方法中编写具体代码:

  1. @Service
  2. public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
  3. @Resource
  4. private StringRedisTemplate stringRedisTemplate;
  5. @Override
  6. public Object queryById(Long id) {
  7. String key = CACHE_SHOP_KEY + id;
  8. //1.从Redis查询缓存
  9. String shopJson = stringRedisTemplate.opsForValue().get(key);
  10. //2.判断是否存在
  11. if(StrUtil.isNotBlank(shopJson)){
  12. //3.存在,直接返回
  13. Shop shop = JSONUtil.toBean(shopJson, Shop.class);
  14. return Result.ok(shop);
  15. }
  16. //4.不存在,根据id查询数据库
  17. Shop shop = getById(id);
  18. //5.不存在,返回错误
  19. if(shop==null){
  20. return Result.fail("店铺不存在!");
  21. }
  22. //6.存在,写入Redis
  23. stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop));
  24. return Result.ok(shop);
  25. }
  26. }

 核心是通过调用hutool工具包中的JSONUtil类来实现对象转JSON(方法:toJsonStr(对象))和JSON转对象(方法:toBean(json,Bean的类型))。

P14 缓存练习题分析

TODO:对分类进行缓存。

P15 缓存更新策略

主动更新:编写业务逻辑,在修改数据库的同时,更新缓存。

适用于高一致性的需求:主动更新,以超时剔除作为兜底方案。

主动更新策略:

1.由缓存的调用者,在更新数据库的同时更新缓存。(一般情况下使用该种方案)

2.缓存与数据库聚合为一个服务,由服务来维护一致性。调用者调用该服务,无需关心缓存的一致性问题。

3.调用者只操作缓存,由其它线程异步的将缓存数据持久化到数据库,保证最终一致。

对1进行分析:

1.选择删除缓存还是更新缓存?如果是更新缓存:每次更新数据库都会更新缓存,无效的写操作比较多。删除缓存:更新数据库时让缓存失效,查询时再更新缓存。

2.如何保证缓存与数据库的操作的同时成功或失败?

单体系统:将缓存与数据库操作放在一个事务。

分布式系统:利用TCC等分布式事务方案。

3.先操作缓存还是先操作数据库?

先删缓存,再操作(写)数据库:

先操作(写)数据库,再删除缓存(出现的概率比较低)

要求线程1来查询的时候缓存恰好失效了->在写入缓存的时候突然来了线程2,对数据库的数据进行了修改->此时线程1写回缓存的是旧数据。

P16 实现商铺缓存与数据库的双写一致

给查询商铺的缓存添加超时剔除和主动更新的策略。

修改ShopController中的业务逻辑,满足下面要求:

1.根据id查询商铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间。

2.根据id修改店铺时,先修改数据库,再删除缓存。

首先修改ShopServiceImpl的redis过期时间:

stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);

修改ShopController中的updateShop方法:

  1. @PutMapping
  2. public Result updateShop(@RequestBody Shop shop) {
  3. // 写入数据库
  4. return Result.ok(shopService.update(shop));
  5. }

向IShopService接口中添加update方法:

Object update(Shop shop);

向ShopServiceImpl类中添加update方法:

  1. @Override
  2. public Object update(Shop shop) {
  3. Long id = shop.getId();
  4. if(id == null){
  5. return Result.fail("商铺id不存在");
  6. }
  7. updateById(shop);
  8. stringRedisTemplate.delete(CACHE_SHOP_KEY + id);
  9. return Result.ok();
  10. }

首先删除缓存中的数据,然后看SQL语句是否执行,是否加上了TTL过期时间。

在PostMan中访问http://localhost:8081/shop,然后修改101茶餐厅为102茶餐厅:

 注意要发送的是PUT请求,请求的内容如下:

  1. {
  2. "area": "大关",
  3. "openHours": "10:00-22:00",
  4. "sold": 4215,
  5. "address": "金华路锦昌文华苑29号",
  6. "comments": 3035,
  7. "avgPrice": 80,
  8. "score": 37,
  9. "name": "102茶餐厅",
  10. "typeId": 1,
  11. "id": 1
  12. }

然后去数据库看是否名称更新为102茶餐厅,然后看缓存中的数据是否被删除,用户刷新页面看到102茶餐厅,缓存中会有最新的数据。

P17 缓存穿透的解决思路

缓存穿透指的是客户端请求的数据在缓存中和数据库中都不存在,使得缓存永远不会生效,请求都会打到数据库。

2种解决方法:

1.缓存空对象。优点:实现简单,维护方便。缺点:额外的内存消耗。可能造成短期的不一致(可以设置TTL)。

2.布隆过滤。在客户端和Redis之间加个布隆过滤器(存在不一定存在,不存在一定不存在,有5%的错误率)。

优点:内存占用较少,没有多余key。缺点:实现复杂,存在误判可能。

P18 编码解决商铺查询的缓存穿透问题

下图是原始的:

下面是更改后的:

在ShopServiceImpl类里对queryById方法进行修改:

  1. @Override
  2. public Object queryById(Long id) {
  3. String key = CACHE_SHOP_KEY + id;
  4. //1.从Redis查询缓存
  5. String shopJson = stringRedisTemplate.opsForValue().get(key);
  6. //2.判断是否存在
  7. if(StrUtil.isNotBlank(shopJson)){
  8. //3.存在,直接返回
  9. Shop shop = JSONUtil.toBean(shopJson, Shop.class);
  10. return Result.ok(shop);
  11. }
  12. //上面是有值的情况,下面是无值的2种情况:A:空字符串。B:null
  13. if(shopJson != null){
  14. return Result.fail("店铺信息不存在!");
  15. }
  16. //4.不存在,根据id查询数据库
  17. Shop shop = getById(id);
  18. //5.不存在,返回错误
  19. if(shop==null){
  20. stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
  21. return Result.fail("店铺不存在!");
  22. }
  23. //6.存在,写入Redis
  24. stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);
  25. return Result.ok(shop);
  26. }

测试:

localhost:8080/api/shop/1此时是命中数据。

localhost:8080/api/shop/0此时未命中数据。打开缓存可以看到缓存的是空,并且TTL是200秒。

总结缓存穿透:用户请求的数据在缓存中和数据库中都不存在,不断发起请求,会给数据库造成巨大压力。

缓存穿透:缓存null值和布隆过滤器。还可以增强id的复杂度,避免被猜测id规律。做好数据的基础格式校验。加强用户权限校验。做好热点参数的限流。

P19 缓存雪崩问题及解决思路

缓存雪崩:是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求打到数据库,带来巨大的压力。

解决方案:

1.(解决大量缓存key同时失效)给不同Key的TTL添加随机值。

2.(解决Redis宕机)利用Redis集群提高服务的可用性。

3.给缓存业务添加降级限流策略。

4.给业务添加多级缓存(浏览器可以有缓存,nginx可以有缓存,redis可以有缓存,数据库可以有缓存)。

P20 缓存击穿问题及解决方案

缓存击穿问题:也叫热点key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然消失了,无数的请求访问在瞬间给数据库带来巨大的冲击。

解决方案:

1.互斥锁。由获取互斥锁成功的线程来查询数据库重建缓存数据。缺点:未获得互斥锁的线程需要等待,性能略差。

2.逻辑过期。设置一个逻辑时间字段,查询缓存的时候检查逻辑时间看是否已过期。如果某个线程获取到互斥锁就开启新线程,由新线程查询数据库重建缓存数据。

其它线程在获取互斥锁失败后不会等待,而是直接返回过期的数据。只有当缓存重建完毕之后释放锁,新线程才会读到最新的数据。

互斥锁优点:

互斥锁没有额外的内存消耗:因为逻辑过期需要维护一个逻辑过期的字段,有额外内存消耗。

互斥锁可以保证强一致性,所有线程拿到的是最新数据。实现也很简单。

互斥锁缺点:

线程需要等待,性能受到影响。可能会有死锁的风险。

逻辑过期优点:

线程无需等待,性能较好。

逻辑过期缺点:

不保证一致性。有额外内存消耗。实现复杂。

P21 利用互斥锁解决缓存击穿问题

在ShopServiceImpl类中定义一个tryLock方法(在Redis中的setnx相当于setIfAbsent方法。)

  1. public boolean tryLock(String key){
  2. Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
  3. return BooleanUtil.isTrue(flag);
  4. }

在ShopServiceImpl类中定义一个unLock方法用于解锁。

  1. public void unLock(String key){
  2. stringRedisTemplate.delete(key);
  3. }

在ShopServiceImpl类中定义一个queryWithPassThrough方法。

  1. public Shop queryWithPassThrough(Long id){
  2. String key = CACHE_SHOP_KEY + id;
  3. //1.从Redis查询缓存
  4. String shopJson = stringRedisTemplate.opsForValue().get(key);
  5. //2.判断是否存在
  6. if(StrUtil.isNotBlank(shopJson)){
  7. //3.存在,直接返回
  8. Shop shop = JSONUtil.toBean(shopJson, Shop.class);
  9. return shop;
  10. }
  11. //上面是有值的情况,下面是无值的2种情况:A:空字符串。B:null
  12. if(shopJson != null){
  13. return null;
  14. }
  15. //4.不存在,根据id查询数据库
  16. Shop shop = getById(id);
  17. //5.不存在,返回错误
  18. if(shop==null){
  19. stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
  20. return null;
  21. }
  22. //6.存在,写入Redis
  23. stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);
  24. return shop;
  25. }

在ShopServiceImpl类中定义一个queryWithMutex方法:

  1. public Shop queryWithMutex(Long id){
  2. String key = CACHE_SHOP_KEY + id;
  3. //1.从Redis查询缓存
  4. String shopJson = stringRedisTemplate.opsForValue().get(key);
  5. //2.判断是否存在
  6. if(StrUtil.isNotBlank(shopJson)){
  7. //3.存在,直接返回
  8. Shop shop = JSONUtil.toBean(shopJson, Shop.class);
  9. return shop;
  10. }
  11. //上面是有值的情况,下面是无值的2种情况:A:空字符串。B:null
  12. if(shopJson != null){
  13. return null;
  14. }
  15. //4.实现缓存重建
  16. //4.1 获取互斥锁
  17. String lockKey = LOCK_SHOP_KEY+id;
  18. Shop shop = null;
  19. try {
  20. boolean isLock = tryLock(lockKey);
  21. //4.2 判断是否获取成功
  22. if(!isLock){
  23. //4.3 失败,则休眠并重试
  24. Thread.sleep(50);
  25. return queryWithMutex(id);
  26. }
  27. //4.4 获取互斥锁成功,根据id查询数据库
  28. shop = getById(id);
  29. //模拟重建的延时
  30. Thread.sleep(200);
  31. //5.数据库查询失败,返回错误
  32. if(shop==null){
  33. stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
  34. return null;
  35. }
  36. //6.存在,写入Redis
  37. stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);
  38. } catch (InterruptedException e) {
  39. throw new RuntimeException(e);
  40. }finally {
  41. //7.释放互斥锁
  42. unLock(lockKey);
  43. }
  44. //8.返回
  45. return shop;
  46. }

在ShopServiceImpl类中修改queryById,调用queryWithMutex:

  1. public Object queryById(Long id) {
  2. //缓存穿透
  3. //Shop shop = queryWithPassThrough(id);
  4. //互斥锁解决缓存击穿
  5. Shop shop = queryWithMutex(id);
  6. return Result.ok(shop);
  7. }

测试:

定义1000个线程,Ramp-Up时间为5。

请求地址:localhost:8081/shop/1。

设置完毕后点击绿色箭头运行,此时会提示是否保存测试文件,选择不保存(我测试选择保存会报错)。

可以在结果树这里看请求是否发送成功:

先删掉缓存,然后点击绿色箭头发送并发请求,可以发现所有线程请求成功,控制台对数据库的查询只有1次(没有出现多个线程争抢查询数据库的情况),测试成功。

P22 利用逻辑过期解决缓存击穿问题

如何添加逻辑过期字段?答:可以在utils包下定义RedisData类(可以让Shop继承RedisData类),也可以在RedisData中设置一个Shop类的data属性:

  1. @Data
  2. public class RedisData {
  3. private LocalDateTime expireTime;
  4. private Object data;
  5. }

在ShopServiceImpl类中定义saveShop2Redis方法:

  1. public void saveShop2Redis(Long id,Long expireSeconds){
  2. //1.查询店铺数据
  3. Shop shop = getById(id);
  4. //2.封装逻辑过期时间
  5. RedisData redisData = new RedisData();
  6. redisData.setData(shop);
  7. redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
  8. //3.写入Redis
  9. stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));
  10. }

单元测试,在test包下的HmDianPingApplicationTests中创建testSaveShop类写入测试代码(这里要注意的是输入alt+insert之后选择Test Method要选择Junit 5来进行测试方法的编写):

  1. @SpringBootTest
  2. class HmDianPingApplicationTests {
  3. @Resource
  4. private ShopServiceImpl shopService;
  5. @Test
  6. void testSaveShop() {
  7. shopService.saveShop2Redis(1L,10L);
  8. }
  9. }

可以看到redis中确实存入了数据:

在ShopServiceImpl中复制一份缓存穿透的代码,更改名称为queryWithLogicalExpire:

  1. private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
  2. public Shop queryWithLogicalExpire(Long id){
  3. String key = CACHE_SHOP_KEY + id;
  4. //1.从Redis查询缓存
  5. String shopJson = stringRedisTemplate.opsForValue().get(key);
  6. //2.判断是否存在
  7. if(StrUtil.isBlank(shopJson)){
  8. //3.不存在,返回空
  9. return null;
  10. }
  11. //4.命中,需要先把json反序列化为对象
  12. RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
  13. JSONObject data = (JSONObject) redisData.getData();
  14. Shop shop = JSONUtil.toBean(data, Shop.class);
  15. //5.判断是否过期
  16. //5.1 未过期直接返回店铺信息
  17. LocalDateTime expireTime = redisData.getExpireTime();
  18. if(expireTime.isAfter(LocalDateTime.now())){
  19. return shop;
  20. }
  21. //5.2 已过期重建缓存
  22. //6.缓存重建
  23. //6.1.获取互斥锁
  24. String lockKey = LOCK_SHOP_KEY + id;
  25. boolean isLock = tryLock(lockKey);
  26. //6.2.判断是否获取互斥锁成功
  27. if(isLock){
  28. //6.3.成功,开启独立线程,实现缓存重建
  29. CACHE_REBUILD_EXECUTOR.submit(()->{
  30. try {
  31. saveShop2Redis(id,20L); //实际中应该设置为30分钟
  32. } catch (Exception e) {
  33. throw new RuntimeException(e);
  34. } finally {
  35. unLock(lockKey);
  36. }
  37. });
  38. }
  39. //6.4.失败,返回过期的商铺信息
  40. return shop;
  41. }

测试:

先到数据库把102茶餐厅改为103茶餐厅(因为Redis之前插入了一条缓存为102茶餐厅,并且已经过期,此时数据库与缓存不一致),新的HTTP请求会将逻辑过期的数据删除,然后更新缓存。

线程数设置为100,Ramp-up时间设置为1

在查看结果树里面到中间某个HTTP请求会完成重建,响应数据会改变。

1.安全性问题:在高并发情况下是否会有很多线程来做重建。

2.一致性问题:在重建完成之前得到的是否是旧的数据。

P23 封装Redis工具类

在utils包下创建CacheClient类,先写入如下基础的代码:

  1. @Slf4j
  2. @Component
  3. public class CacheClient {
  4. private final StringRedisTemplate stringRedisTemplate;
  5. public CacheClient(StringRedisTemplate stringRedisTemplate) {
  6. this.stringRedisTemplate = stringRedisTemplate;
  7. }
  8. public void set(String key, Object value, Long time, TimeUnit unit){
  9. stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(value),time,unit);
  10. }
  11. public void setWithLogicalExpire(String key, Object value,Long expire,TimeUnit unit){
  12. //设置逻辑过期
  13. RedisData redisData = new RedisData();
  14. redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(expire)));
  15. redisData.setData(value);
  16. stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(redisData));
  17. }
  18. }

在CacheClient类中编写缓存穿透的共性方法queryWithPassThrough: 

  1. public <R,ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type,
  2. Function<ID,R> dbFallBack,Long time,TimeUnit unit){
  3. String key = keyPrefix + id;
  4. //1.从Redis查询缓存
  5. String shopJson = stringRedisTemplate.opsForValue().get(key);
  6. //2.判断是否存在
  7. if(StrUtil.isNotBlank(shopJson)){
  8. //3.存在,直接返回
  9. return JSONUtil.toBean(shopJson, type);
  10. }
  11. //上面是有值的情况,下面是无值的2种情况:A:空字符串。B:null
  12. if(shopJson != null){
  13. return null;
  14. }
  15. //4.不存在,根据id查询数据库
  16. R r = dbFallBack.apply(id);
  17. //5.不存在,返回错误
  18. if(r==null){
  19. stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
  20. return null;
  21. }
  22. //6.存在,写入Redis
  23. this.set(key,r,time,unit);
  24. return r;
  25. }

编写完queryWithPassThrough之后可以到ShopServiceImpl中直接调用新的方法(记得引入CacheClient类):

  1. @Resource
  2. private CacheClient cacheClient;
  3. @Override
  4. public Object queryById(Long id) {
  5. //调用工具类解决缓存击穿
  6. Shop shop = cacheClient.queryWithPassThrough(CACHE_SHOP_KEY, id, Shop.class, this::getById, CACHE_SHOP_TTL, TimeUnit.MINUTES);
  7. if(shop==null){
  8. return Result.fail("店铺不存在!");
  9. }
  10. return Result.ok(shop);
  11. }

进行测试:成功会对不存在的店铺空值进行缓存。

 

接下来拷贝queryWithLogicalExpire的代码到CacheClient类中进行改写:

  1. private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
  2. public <R,ID> R queryWithLogicalExpire(String keyPrefix,ID id,Class<R> type,Function<ID,R> dbFallBack,Long time,TimeUnit unit){
  3. String key = keyPrefix + id;
  4. //1.从Redis查询缓存
  5. String shopJson = stringRedisTemplate.opsForValue().get(key);
  6. //2.判断是否存在
  7. if(StrUtil.isBlank(shopJson)){
  8. //3.不存在,返回空
  9. return null;
  10. }
  11. //4.命中,需要先把json反序列化为对象
  12. RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
  13. JSONObject data = (JSONObject) redisData.getData();
  14. R r = JSONUtil.toBean(data, type);
  15. //5.判断是否过期
  16. //5.1 未过期直接返回店铺信息
  17. LocalDateTime expireTime = redisData.getExpireTime();
  18. if(expireTime.isAfter(LocalDateTime.now())){
  19. return r;
  20. }
  21. //5.2 已过期重建缓存
  22. //6.缓存重建
  23. //6.1.获取互斥锁
  24. String lockKey = LOCK_SHOP_KEY + id;
  25. boolean isLock = tryLock(lockKey);
  26. //6.2.判断是否获取互斥锁成功
  27. if(isLock){
  28. //6.3.成功,开启独立线程,实现缓存重建
  29. CACHE_REBUILD_EXECUTOR.submit(()->{
  30. try {
  31. //查询数据库
  32. R r1 = dbFallBack.apply(id);
  33. //写入redis
  34. this.setWithLogicalExpire(key,r1,time,unit);
  35. } catch (Exception e) {
  36. throw new RuntimeException(e);
  37. } finally {
  38. unLock(lockKey);
  39. }
  40. });
  41. }
  42. //6.4.失败,返回过期的商铺信息
  43. return r;
  44. }
  45. public boolean tryLock(String key){
  46. Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
  47. return BooleanUtil.isTrue(flag);
  48. }
  49. public void unLock(String key){
  50. stringRedisTemplate.delete(key);
  51. }

 改写test下的HmDianPingApplicationTests类:

  1. @SpringBootTest
  2. class HmDianPingApplicationTests {
  3. @Resource
  4. private CacheClient cacheClient;
  5. @Resource
  6. private ShopServiceImpl shopService;
  7. @Test
  8. void testSaveShop() throws InterruptedException {
  9. Shop shop = shopService.getById(1L);
  10. cacheClient.setWithLogicalExpire(CACHE_SHOP_KEY+1L,shop,10L,TimeUnit.SECONDS);
  11. }
  12. }

测试:首先运行HmDianPingApplicationTests类里的测试方法,10秒后逻辑过期,此时运行后台程序,修改数据库1号商铺的name字段,此时访问:localhost:8080/api/shop/1 会出现效果第1次访问为缓存旧值,然后发现缓存过期开始重建,第2次访问开始就是新值。数据库也只有1次重建。

P24 缓存总结

P25 优惠券秒杀 全局唯一ID

每个店铺都可以发布优惠券,当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID会存在一些问题。

1.id的规律性太明显。

2.受单表数据量的限制(分表之后每张表都自增长,id会出现重复)。

全局ID生成器:是一种在分布式系统下用来生成全局唯一ID的工具。

要求全局唯一ID生成器满足如下几点:1.唯一性。2.高可用。3.高性能。4.递增性。5.安全性。

为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息。

符号位永远为0代表整数。

31位的时间戳是以秒为单位,定义了一个起始时间,用当前时间减起始时间,预估可以使用69年。

32位的是序列号是Redis自增的值,支持每秒产生2^32个不同ID。

P26 优惠券秒杀 Redis实现全局唯一id

在utils包下定义一个RedisWorker类,是一个基于Redis的ID生成器。

如果只使用一个key来自增记录有一个坏处,最终key的自增数量会突破容量的上限,假如自增超过32位彼时便无法再存储新的数据,解决的方案是采用拼接日期。

  1. @Component
  2. public class RedisIdWorker {
  3. private static final long BEGIN_TIMESTAMP = 1640995200L;
  4. //序列号的位数
  5. private static final int COUNT_BITS=32;
  6. private StringRedisTemplate stringRedisTemplate;
  7. public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
  8. this.stringRedisTemplate = stringRedisTemplate;
  9. }
  10. public long nextId(String keyPrefix){
  11. //1.生成时间戳
  12. LocalDateTime now = LocalDateTime.now();
  13. long timeStamp = now.toEpochSecond(ZoneOffset.UTC) - BEGIN_TIMESTAMP;
  14. //2.生成序列号
  15. //2.1获取当前日期,精确到天
  16. String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
  17. //2.2自增长
  18. long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
  19. //3.拼接并返回
  20. return timeStamp << COUNT_BITS | count;
  21. }
  22. }

在HmDianPingApplicationTests中写入如下的测试代码:

  1. @Resource
  2. private ShopServiceImpl shopService;
  3. @Resource
  4. private RedisIdWorker redisIdWorker;
  5. private ExecutorService es = Executors.newFixedThreadPool(500);
  6. @Test
  7. void testIdWorker() throws InterruptedException {
  8. CountDownLatch latch = new CountDownLatch(300);
  9. Runnable task = ()->{
  10. for(int i=0;i<100;i++){
  11. long id = redisIdWorker.nextId("order");
  12. System.out.println("id="+id);
  13. }
  14. latch.countDown();
  15. };
  16. long begin = System.currentTimeMillis();
  17. for(int i=0;i<300;i++){
  18. es.submit(task);
  19. }
  20. latch.await();
  21. long end = System.currentTimeMillis();
  22. System.out.println("Result Time = " + (end-begin));
  23. }

运行之后可以看到以十进制输出的所有编号: 

 

可以在Redis中看到自增长的结果,1次是30000: 

大概2秒可以生成3万条,速度还是可以的。

全局唯一ID生成策略:

1.UUID利用JDK自带的工具类即可生成,生成的是16进制的字符串,无单调递增的特性。

2.Redis自增(每天一个key,方便统计订单量。时间戳+计数器的格式。)

3.snowflake雪花算法(不依赖于Redis,性能更好,对于时钟依赖)

4.数据库自增

P27 优惠券秒杀 添加优惠券

每个店铺都可以发放优惠券,分为平价券和特价券。平价券可以任意抢购,特价券需要秒杀抢购。

tb_voucher:优惠券基本信息,优惠金额,使用规则等。

tb_seckill_voucher:优惠券的库存,开始抢购时间,结束抢购时间,只有特价优惠券才需要填写这些信息。

请求的信息如下可自行复制(注意beginTime和endTime需要修改):

  1. {
  2. "shopId":1,
  3. "title":"100元代金券",
  4. "subTitle":"周一至周五均可使用",
  5. "rules":"全场通用\\n无需预约\\n可无限叠加\\不兑现、不找零\\n仅限堂食",
  6. "payValue":8000,
  7. "actualValue":10000,
  8. "type":1,
  9. "stock":100,
  10. "beginTime":"2024-04-10T10:09:17",
  11. "endTime":"2024-04-11T12:09:04"
  12. }

注意要在请求头中带Authorization参数否则会报401(登录后进入“我的”页面,看网络包有Authorization的值): 

以如下格式发送请求:

首先在tb_voucher表中可以看到新增的优惠券:

在tb_seckill_voucher表中也可以看到秒杀优惠券的具体信息:

在前端也能看到新增的100元代金券,注意优惠券的时间一定要进行更改,如果不在开始和结束时间区间内优惠券会处于下架状态是看不到的。

 P28 优惠券秒杀 实现秒杀下单

首先要判断秒杀是否开始或结束,所以要先查询优惠券的信息,如果尚未开始或者已经结束无法下单。

要判断库存是否充足,如果不足则无法下单。

在VouchrOrderController类中:

  1. @RestController
  2. @RequestMapping("/voucher-order")
  3. public class VoucherOrderController {
  4. @Resource
  5. private IVoucherService voucherService;
  6. @PostMapping("seckill/{id}")
  7. public Result seckillVoucher(@PathVariable("id") Long voucherId) {
  8. return voucherService.seckillVoucher(voucherId);
  9. }
  10. }

在IVoucherOrderService中写入如下代码:

  1. public interface IVoucherOrderService extends IService<VoucherOrder> {
  2. Result seckillVoucher(Long voucherId);
  3. }

在VoucherOrderServiceImpl中写入如下代码:

  1. @Service
  2. @Transactional
  3. public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
  4. @Resource
  5. private ISeckillVoucherService seckillVoucherService;
  6. @Resource
  7. private RedisIdWorker redisIdWorker;
  8. @Override
  9. public Result seckillVoucher(Long voucherId) {
  10. //1.查询优惠券信息
  11. SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
  12. //2.判断秒杀是否开始
  13. //2.1秒杀尚未开始返回异常
  14. if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
  15. return Result.fail("秒杀尚未开始");
  16. }
  17. //2.2秒杀已结束返回异常
  18. if(voucher.getEndTime().isBefore(LocalDateTime.now())){
  19. return Result.fail("秒杀已经结束");
  20. }
  21. //3.判断库存是否充足
  22. if(voucher.getStock()<1){
  23. //3.1库存不足返回异常
  24. return Result.fail("库存不足!");
  25. }
  26. //3.2库存充足扣减库存
  27. boolean success = seckillVoucherService.update()
  28. .setSql("stock = stock - 1")
  29. .eq("voucher_id", voucherId).update();
  30. if(!success){
  31. return Result.fail("库存不足!");
  32. }
  33. //4.创建订单,返回订单id
  34. VoucherOrder voucherOrder = new VoucherOrder();
  35. long orderId = redisIdWorker.nextId("order");//订单id
  36. voucherOrder.setId(orderId);
  37. Long userId = UserHolder.getUser().getId();//用户id
  38. voucherOrder.setUserId(userId);
  39. voucherOrder.setVoucherId(voucherId);//代金券id
  40. save(voucherOrder);
  41. return Result.ok(orderId);
  42. }
  43. }

测试:点击限时抢购之后会提示抢购成功。

P29 优惠券秒杀 库存超卖问题分析

Jmeter的配置如下:

注意Authorization要事先登录获取:

下面是结果:

发现tb_seckill_voucher中库存为-9,在tb_voucher_order中插入了109条数据,说明出现了超卖的问题。

正常逻辑:

非正常逻辑:

超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案是加锁。

悲观锁:认为线程安全问题一定会发送,因此在操作数据之前要先获取锁,确保线程串行执行。像Synchronized、Lock都属于悲观锁。

乐观锁:认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改。

如果没有修改则认为是安全的,自己才更新数据。

如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常。‘

乐观锁关键是判断之前查询得到的数据是否被修改过,常见的方法有2种:

1.版本号法:

2.CAS法(版本号法的简化版):查询的时候把库存查出来,更新的时候判断库存和之前查到的库存是否一致,如果一致则更新数据。

P30 优惠券秒杀 乐观锁解决超卖

只需加上下面这段代码即可:.eq("stock",voucher.getStock()) 。用于比较当前数据库的库存值和之前查询到的库存值是否相同,只有相同时才可以执行set语句。

  1. //3.2库存充足扣减库存
  2. boolean success = seckillVoucherService.update()
  3. .setSql("stock = stock - 1") //相当于set条件 set stock = stock - 1
  4. .eq("voucher_id", voucherId) //相当于where条件 where id = ? and stock = ?
  5. .eq("stock",voucher.getStock()).update();

但现在出现了异常值偏高的问题,正常的请求大约只占10%。 

原理是因为:假如一次有30个线程涌入,查询到库存值为100,只有1个线程能把值改为99,其它29个线程比对库存值99发现和自己查询到的库存值100不同,所以都认为数据已经被修改过,所以都失败了。

乐观锁的问题,成功率太低。

现在只需要保证stock>0即可,只要存量大于0就可以任意扣减。

  1. boolean success = seckillVoucherService.update()
  2. .setSql("stock = stock - 1") //相当于set条件 set stock = stock - 1
  3. .eq("voucher_id", voucherId) //相当于where条件 where id = ? and stock = ?
  4. .gt("stock",0).update();

乐观锁缺陷:

需要大量对数据库进行访问,容易导致数据库的崩溃。

总结:

 P31 优惠券秒杀 实现一人一单功能

修改秒杀业务,要求同一个优惠券,一个用户只能下一单。

首先不建议把锁加在方法上,因为任何一个用户来了都要加这把锁,而且是同一把锁,方法之间变成串行执行,性能很差。

因此可以把锁加在用户id上,只有当id相同时才会对锁形成竞争关系。但是因为toString的内部是new了一个String字符串,每调一次toString都是生成一个全新的字符串对象,锁对象会变。

所以可以调用intern()方法,intern()方法会优先去字符串常量池里查找与目标字符串值相同的引用返回(只要字符串一样能保证返回的结果一样)。

但是因为事务是在函数执行结束之后由Spring进行提交,如果把锁加在createVoucherOrder内部其实有点小——因为如果解锁之后,其它线程可以进入,而此时事务尚未提交,仍然会导致安全性问题。

因此最终方案是把synchronized加在createVoucherOrder的方法外部,锁住的是用户id。

关于代理对象事务的问题:通常情况下,当一个使用了@Transactional注解的方法被调用时,Spring会从上下文中获取一个代理对象来管理事务。

但是如果加@Transactional方法是被同一个类中的另一个方法调用时,Spring不会使用代理对象,而是直接调用该方法,导致事务注解失效。

为避免这种情况,可以使用AopContext.currentProxy方法获取当前的代理对象,然后通过代理对象调用被@Transactional注解修饰的方法,确保事务生效。

在VoucherOrderServiceImpl中写入如下代码(注意:ctrl+alt+m可以把含有return的代码段进行提取):

  1. @Service
  2. public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
  3. @Resource
  4. private ISeckillVoucherService seckillVoucherService;
  5. @Resource
  6. private RedisIdWorker redisIdWorker;
  7. @Override
  8. public Result seckillVoucher(Long voucherId) {
  9. //1.查询优惠券信息
  10. SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
  11. //2.判断秒杀是否开始
  12. //2.1秒杀尚未开始返回异常
  13. if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
  14. return Result.fail("秒杀尚未开始");
  15. }
  16. //2.2秒杀已结束返回异常
  17. if(voucher.getEndTime().isBefore(LocalDateTime.now())){
  18. return Result.fail("秒杀已经结束");
  19. }
  20. voucher = seckillVoucherService.getById(voucherId);
  21. //3.判断库存是否充足
  22. if(voucher.getStock()<1){
  23. //3.1库存不足返回异常
  24. return Result.fail("库存不足!");
  25. }
  26. Long userId = UserHolder.getUser().getId();
  27. synchronized (userId.toString().intern()){
  28. //获取代理对象
  29. IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
  30. return proxy.createVoucherOrder(voucherId);
  31. }
  32. }
  33. @Transactional
  34. public Result createVoucherOrder(Long voucherId) {
  35. //6.一人一单
  36. Long userId = UserHolder.getUser().getId();
  37. //6.1查询订单
  38. int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
  39. //6.2判断是否存在
  40. if(count>0){
  41. //用户已经购买过了
  42. return Result.fail("用户已经购买过一次!");
  43. }
  44. //3.2库存充足扣减库存
  45. boolean success = seckillVoucherService.update()
  46. .setSql("stock = stock - 1") //相当于set条件 set stock = stock - 1
  47. .eq("voucher_id", voucherId) //相当于where条件 where id = ? and stock = ?
  48. .gt("stock",0).update();
  49. if(!success){
  50. return Result.fail("库存不足!");
  51. }
  52. //4.创建订单,返回订单id
  53. VoucherOrder voucherOrder = new VoucherOrder();
  54. long orderId = redisIdWorker.nextId("order");//订单id
  55. voucherOrder.setId(orderId);
  56. voucherOrder.setUserId(userId);
  57. voucherOrder.setVoucherId(voucherId);//代金券id
  58. save(voucherOrder);
  59. return Result.ok(orderId);
  60. }
  61. }

在IVoucherOrderService接口中加入下面这个方法:

Result createVoucherOrder(Long voucherId);

在pom.xml中引入如下的依赖:

  1. <dependency>
  2. <groupId>org.aspectj</groupId>
  3. <artifactId>aspectjweaver</artifactId>
  4. </dependency>

 在启动类HmDianPingApplication上加如下注解:

@EnableAspectJAutoProxy(exposeProxy = true)

测试: 成功实现一名用户只能领取一张优惠券。

 
P32 优惠券秒杀 集群下的线程并发安全问题

本P主要是为了验证在集群下synchronized并不能保证线程的并发安全。

如下图可以设置项目启动的端口号,确保启动的项目之间端口号不同:

在nginx.conf中放开8082的这个配置:

向下面这个页面发送请求:

http://localhost:8080/api/voucher/list/1

 可以看到请求会分别被8082和8081接收,是轮询的效果:

首先到tb_voucher_order把之前的订单删除,到tb_seckill_voucher中把stock重新改回100。

准备2个相同的秒杀请求:要注意请求的地址是:http://localhost:8080/api/voucher-order/seckill/13

我这里直接用Jemeter来进行测试,模拟高并发场景:

下面是效果:可以看到并发请求能够同时进入集群的每台结点。

正常情况:

在集群模式下,每一个节点都是一个全新的JVM,每个JVM都有自己的锁。锁监视器只能在当前JVM的范围内,监视线程实现互斥。

现在就要实现让多个JVM使用的是同一把锁。跨JVM、跨进程的锁。

P33 分布式锁 基本原理和不同实现方式对比

synchronized只能保证单个JVM内部的多个线程之间的互斥,而没法让集群下多个JVM进程间的线程互斥。

 要让多个JVM进程能看到同一个锁监视器,而且同一时间只有一个线程能拿到锁监视器。

所以必须使用分布式锁,分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。

分布式锁要满足:多进程可见+互斥+高可用+高性能+安全性。

分布式锁可以通过MySQL或Redis或Zookeeper来实现。

MySQL:

1.互斥:是利用mysql本身的互斥锁机制。在执行写操作的时候,MySQL会自动分配一个互斥的锁。

2.可用性:好。3.性能:受限于MySQL性能。

4.安全性:事务机制,如果断开连接,会自动释放锁。

Redis:

1.互斥:利用setnx这样的互斥命令。往Redis里set数据只有不存在时才能set成功。

2.可用性:好,Redis支持主从和集群。3.性能:好。

4.安全性:如果没有执行删除key的操作,key不会自动释放。但可以利用锁的超时机制,到期自动释放。

Zookeeper:

1.利用节点的唯一性(节点不重复)和有序性(节点递增)实现互斥。利用有序性:id最小的节点获取锁成功;释放锁只需要删除id最小的节点。

2.可用性:好。3.性能:比Redis差,一般,强调强一致性,主从间同步需要时间。

4.安全性:好。因为是临时节点,断开连接会自动释放。

P34 分布式锁 Redis的分布式锁实现思路

假如获取锁后宕机,锁无法释放——>可以添加超时过期时间。

为了防止锁在SETEX和EXPIRE之间过期,可以直接用一条命令(原子操作)来实现设置过期时间(EX)和只有lock不存在时才能设置(NX)。

采用非阻塞式获取锁,如果成功返回true,失败返回false。

P35 分布式锁 实现Redis分布式锁版本1

在utils下面创建一个ILock接口:

  1. public interface ILock {
  2. //尝试获取锁
  3. boolean tryLock(long timeoutSec);
  4. //释放锁
  5. void unlock();
  6. }

在utils下面实现SimpleRedisLock类:

  1. public class SimpleRedisLock implements ILock {
  2. private String name;
  3. private StringRedisTemplate stringRedisTemplate;
  4. public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
  5. this.name = name;
  6. this.stringRedisTemplate = stringRedisTemplate;
  7. }
  8. private static final String KEY_PREFIX = "lock:";
  9. @Override
  10. public boolean tryLock(long timeoutSec) {
  11. //获取线程标示
  12. long threadId = Thread.currentThread().getId();
  13. Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,threadId+"",timeoutSec, TimeUnit.SECONDS);
  14. return Boolean.TRUE.equals(success);
  15. }
  16. @Override
  17. public void unlock() {
  18. //释放锁
  19. stringRedisTemplate.delete(KEY_PREFIX+name);
  20. }
  21. }

更改VoucherOrderServiceImpl类中的seckillVoucher方法的代码:

  1. @Resource
  2. private RedisIdWorker redisIdWorker;
  3. @Resource
  4. private StringRedisTemplate stringRedisTemplate;
  5. @Override
  6. public Result seckillVoucher(Long voucherId) {
  7. //1.查询优惠券信息
  8. SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
  9. //2.判断秒杀是否开始
  10. //2.1秒杀尚未开始返回异常
  11. if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
  12. return Result.fail("秒杀尚未开始");
  13. }
  14. //2.2秒杀已结束返回异常
  15. if(voucher.getEndTime().isBefore(LocalDateTime.now())){
  16. return Result.fail("秒杀已经结束");
  17. }
  18. voucher = seckillVoucherService.getById(voucherId);
  19. //3.判断库存是否充足
  20. if(voucher.getStock()<1){
  21. //3.1库存不足返回异常
  22. return Result.fail("库存不足!");
  23. }
  24. Long userId = UserHolder.getUser().getId();
  25. SimpleRedisLock lock = new SimpleRedisLock("order:"+userId,stringRedisTemplate);
  26. boolean isLock = lock.tryLock(1200);
  27. //判断是否获取锁成功
  28. if(!isLock) {
  29. return Result.fail("不允许重复下单");
  30. }
  31. try {
  32. //获取代理对象
  33. IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
  34. return proxy.createVoucherOrder(voucherId);
  35. }finally {
  36. lock.unlock();
  37. }
  38. }

经测试多台节点相同用户只能获取同一张优惠券成功: 

 

P36 分布式锁 Redis分布式锁误删问题

假如某个线程(线程A)获取到锁之后,出现了业务阻塞,导致阻塞时间超过了锁自动释放的时间,锁因超时自动释放。此时其它线程(线程B)过来拿到了锁,开始执行业务。但线程A此时业务执行完毕,释放了锁,但释放的是线程B的锁。此时线程C过来看锁已被释放,趁虚而入拿到锁,此时线程B和线程C是并行执行。

要解决这个问题:线程在删除锁之前要先看锁是否是自己加的(获取锁的标示并判断是否一致)。

P37 分布式锁 解决Redis分布式锁误删问题

1.在获取锁时存入线程标示(可以用UUID表示)。

2.在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致(如果一致释放锁,如果不一致则不释放锁)。

首先要修改SimpleRedisLock里面的如下代码,主要是调用hutool工具包生成UUID(每次线程调用都会生成一个唯一的UUID),让Redis的前缀变成UUID+线程ID:

  1. private static final String ID_PREFIX = UUID.fastUUID().toString(true)+"-";
  2. @Override
  3. public boolean tryLock(long timeoutSec) {
  4. //获取线程标示
  5. String threadId = ID_PREFIX + Thread.currentThread().getId();
  6. Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,threadId,timeoutSec, TimeUnit.SECONDS);
  7. return Boolean.TRUE.equals(success);
  8. }

现在要修改的是SimpleRedisLock类里面的unlock方法,主要是比较当前线程的标示和Redis中锁的标示是否一致,只有标示一致才能释放锁:

  1. @Override
  2. public void unlock() {
  3. //获取线程标示
  4. String threadId = ID_PREFIX + Thread.currentThread().getId();
  5. //获取锁中的标示
  6. String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
  7. if(threadId.equals(id)){
  8. //释放锁
  9. stringRedisTemplate.delete(KEY_PREFIX+name);
  10. }
  11. }

P38 分布式锁 分布式锁的原子性问题

现在假设出现了其它问题,比如线程1在判断完锁标示是否一致之后出现了阻塞(比如JVM垃圾回收FULL GC导致阻塞了过长时间),此时锁超时了,线程2趁虚而入获取了锁,此时线程1直接释放了线程2的锁,此时线程3趁虚而入继续给Redis加锁,此时会出现线程2和线程3并行执行。

根本的原因是:获取锁标示和释放锁的操作不是原子性的,现在要解决的问题就是将这两个操作变成原子性的。

P39 分布式锁 Lua脚本解决多条命令原子性问题

Redis提供Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。

Lua是一种编程语言,它的基本语法可以参考网站:https://www.runoob.com/lua/lua-tutorial.html

执行脚本的方法:

执行一个写死的set命令:

在Lua语言里,数组的第一个元素下标是1。

P40 分布式锁 Java调用lua脚本改造分布式锁

 

繁琐版的Lua脚本内容如下:

  1. -- 锁的key
  2. local key = KEYS[1]
  3. -- 当前线程标示
  4. local threadId = ARGV[1]
  5. --获取锁中的线程标示
  6. local id = redis.call('get',key)
  7. --比较线程标示与锁中的标示是否一致
  8. if(id == threadId) then
  9. --释放锁 del key
  10. return redis.call('del',key)
  11. end
  12. return 0

简化版的Lua脚本内容如下:

  1. --比较线程标示与锁中的标示是否一致
  2. if(redis.call('get',KEYS[1]) == ARGV[1]) then
  3. --释放锁 del key
  4. return redis.call('del',KEYS[1])
  5. end
  6. return 0

在resources下创建unlock.lua,会提示下载一个plugins点击install,然后只需要下载一个EmmyLua即可,实测如果下载了多个Lua相关的插件会产生冲突,最终导致IDEA打不开,这真是血泪的教训!

 在SimpleRedisLock中写入如下的代码,因为我们希望的是在一开始就将Lua的脚本加载好,而不是等到要调用释放锁的时候再去加载Lua脚本,所以采用静态变量和静态代码块,这些部分在类初始化的时候就会被加载:

  1. private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
  2. static {
  3. UNLOCK_SCRIPT = new DefaultRedisScript<>();
  4. UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
  5. UNLOCK_SCRIPT.setResultType(Long.class);
  6. }

在SimpleRedisLock类的unlock方法中写入如下的代码:

  1. @Override
  2. public void unlock() {
  3. stringRedisTemplate.execute(UNLOCK_SCRIPT,
  4. Collections.singletonList(KEY_PREFIX+name),
  5. ID_PREFIX + Thread.currentThread().getId());
  6. }

在程序1和程序2的下面这个位置打上断点:

在测试API中测试访问如下的URL:

http://localhost:8080/api/voucher-order/seckill/14

分别测试秒杀优惠券1和2: 

 

在Redis中能看到程序1获取锁成功,然后直接把lock锁删掉,模拟超时释放的情况:

然后让程序2往下走一步,可以看到程序2获取到了锁

然后可以直接放行程序1,会看到结果是程序2加的锁没有被删除。

最后放行程序2,会看到程序2加的锁被删除。

总结:

基于Redis的分布式锁的实现思路:

1.利用set nx ex获取锁,并设置过期时间,保存线程标示。

2.释放锁时先判断线程标示是否与自己一致,一致则删除锁。

特性:

1.利用set nx满足互斥性。

2.利用set nx保障故障时锁依然能够释放,避免死锁,提高安全性。

3.利用Redis集群保障高可用和高并发的特性。

 P41 分布式锁 Redisson功能介绍

目前基于setnx实现的分布式锁存在以下几个问题:

1.不可重入:同一线程无法多次获取同一把锁。

2.不可重试:获取锁只尝试一次就返回false,没有重试机制。

3.超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放存在安全隐患。

4.主从一致性:如果Redis提供了主从集群,主从同步存在延迟,当主节点宕机时,如果从节点还未同步主节点中的锁数据,则会出现锁信息的不一致。

Redisson是一个在Redis的基础上实现的Java驻内存数据网格。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中包含了各种分布式锁的实现。

P42 分布式锁 Redisson快速入门

第1步,先引入依赖:

  1. <!--redisson-->
  2. <dependency>
  3. <groupId>org.redisson</groupId>
  4. <artifactId>redisson</artifactId>
  5. <version>3.13.6</version>
  6. </dependency>

第2步,在config包下创建RedissonConfig类,写入如下代码:

  1. @Configuration
  2. public class RedissonConfig{
  3. @Bean
  4. public RedissonClient redissonClient(){
  5. //配置
  6. Config config = new Config();
  7. config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("");
  8. //创建RedissonClient对象
  9. return Redisson.create(config);
  10. }
  11. }

第3步,引入RedissonClient,调用getLock获取锁对象,然后用tryLock获取锁。

 

第4步,启动服务

发送下面的请求:

在执行释放锁的语句前,可以看到Redis中有锁的记录:

用jmeter来测试,可以发现没有出现并发安全问题:

P43 分布式锁 Redisson的可重入锁原理

ReentrantLock可重入锁的原理:获取锁的时候在判断这个锁已经被占有的情况下,会检查占有锁的是否是当前线程,如果是当前线程,也会获取锁成功。会有一个计数器记录重入的次数。

会通过下面的结构来记录某个线程重入了几次锁。

每释放一次锁采用的策略是把重入次数减1。

加锁和释放锁是成对出现的,因此当方法执行到最外层结束时,重入的次数一定会减为0。

1.是否存在锁

2.存在锁,判断是否是自己的。

是,锁计数+1。

不是,获取锁失败。

3.不存在锁

获取锁,添加线程标示。

Redisson底层可重入锁加锁的逻辑:

Redisson底层可重入锁解锁的逻辑:

 P44 分布式锁 Redisson的锁重试和WatchDog机制

下面是对含有waitTime(等待时间)的tryLock的跟踪:

看门狗超时时间是30秒

subscribeFuture.await等待的是释放锁的通知,如果future在指定时间内获得,返回true,等待的是time的时间,time是锁的剩余最大等待时间。

如果超时返回false,然后会进到cancel里,调用unsubscribe方法,取消订阅。

不是无休止的忙等机制,而是只有当锁释放后获得通知后才进行加锁尝试,在没收到通知前是被阻塞状态。

下面是定时更新锁的有效期的逻辑:

相当于设置了一个定时任务每隔10秒重置一次有效期。

定时任务的结束是在解锁的逻辑当中:

获取锁机制:

1.判断ttl是否为null

        1.1 为null,获取锁成功(涉及自动更新锁过期时间),判断leaseTime是否为-1

                1.1.1 为-1自动开启看门狗机制,定时更新锁的过期时间

                        看门狗默认30秒,每隔10秒会更新一次过期时间。

                1.1.2 不为-1返回true

        1.2 不为null,获取锁失败(涉及获取锁的失败重试),判断剩余等待时间是否大于0

                1.2.1 大于0,订阅并等待释放锁的信号

                        在受到释放信号后会判断是否超时,如未超时继续尝试获取锁

                1.2.2 不大于0,获取锁失败

释放锁机制:

1.尝试释放锁,判断是否成功

        1.1 释放成功。

                发送锁释放的消息(与获取锁的失败重试关联)

                取消看门狗机制(与自动更新锁过期时间关联)

        1.2 释放失败。返回异常。

Redisson是如何解决可重入问题、获取锁的失败重试、锁超时释放问题的?

可重入问题:利用哈希表记录线程id和重入次数。

获取锁的失败重试:利用消息订阅和信号量方式实现获取锁失败时的等待、唤醒和锁的重试获取。

锁超时释放:利用看门狗机制,每隔一段时间,重置超时时间。

P45 分布式锁 Redissson的multiLock原理

主节点负责写,从节点负责读,主节点和从节点间需要同步,会存在延迟。

如果主节点宕机,会从从节点中选拔一个新的节点作为主节点。如果主从同步尚未完成,会出现锁失效的问题。

现在在所有主节点中都存放一份锁,要求一个线程必须从所有主节点中获取锁,才算真正获取锁。

假如此时有一个主节点宕机,恰好主从同步没有完成,此时有其它线程趁虚而入获取到了新主节点的锁,但因为没能获取其它主节点的锁,因此也是获取锁失败的。

这种锁叫作联锁。

P46 秒杀优化 异步秒杀思路

秒杀业务流程:

1.扣减优惠券的库存(不能超卖,判断库存是否充足)

2.将用户抢购的优惠券信息写入订单,完成订单的创建

3.一个用户对一个优惠券只能下一单

为了获取1000名用户的token,我爆肝1h写了下面的生成代码。

生成效果如下:共计1008名用户,给每位用户生成了专属的token:

并且把所有token存入了output.txt文件中,方便Jmeter读取:

下面是生成代码,把代码放入UserServiceImpl中,然后在login方法中调用即可:

  1. public void generateToken() {
  2. String[] phoneNumbers = {
  3. "13456762069", "13456789001", "13456789011", "13686869696", "13688668889", "13688668890", "13688668891", "13688668892", "13688668893", "13688668894",
  4. "13688668895", "13688668896", "13688668897", "13688668898", "13688668899", "13688668900", "13688668901", "13688668902", "13688668903", "13688668904",
  5. "13688668905", "13688668906", "13688668907", "13688668908", "13688668909", "13688668910", "13688668911", "13688668912", "13688668913", "13688668914",
  6. "13688668915", "13688668916", "13688668917", "13688668918", "13688668919", "13688668920", "13688668921", "13688668922", "13688668923", "13688668924",
  7. "13688668925", "13688668926", "13688668927", "13688668928", "13688668929", "13688668930", "13688668931", "13688668932", "13688668933", "13688668934",
  8. "13688668935", "13688668936", "13688668937", "13688668938", "13688668939", "13688668940", "13688668941", "13688668942", "13688668943", "13688668944",
  9. "13688668945", "13688668946", "13688668947", "13688668948", "13688668949", "13688668950", "13688668951", "13688668952", "13688668953", "13688668954",
  10. "13688668955", "13688668956", "13688668957", "13688668958", "13688668959", "13688668960", "13688668961", "13688668962", "13688668963", "13688668964",
  11. "13688668965", "13688668966", "13688668967", "13688668968", "13688668969", "13688668970", "13688668971", "13688668972", "13688668973", "13688668974",
  12. "13688668975", "13688668976", "13688668977", "13688668978", "13688668979", "13688668980", "13688668981", "13688668982", "13688668983", "13688668984",
  13. "13688668985", "13688668986", "13688668987", "13688668988", "13688668989", "13688668990", "13688668991", "13688668992", "13688668993", "13688668994",
  14. "13688668995", "13688668996", "13688668997", "13688668998", "13688668999", "13688669000", "13688669001", "13688669002", "13688669003", "13688669004",
  15. "13688669005", "13688669006", "13688669007", "13688669008", "13688669009", "13688669010", "13688669011", "13688669012", "13688669013", "13688669014",
  16. "13688669015", "13688669016", "13688669017", "13688669018", "13688669019", "13688669020", "13688669021", "13688669022", "13688669023", "13688669024",
  17. "13688669025", "13688669026", "13688669027", "13688669028", "13688669029", "13688669030", "13688669031", "13688669032", "13688669033", "13688669034",
  18. "13688669035", "13688669036", "13688669037", "13688669038", "13688669039", "13688669040", "13688669041", "13688669042", "13688669043", "13688669044",
  19. "13688669045", "13688669046", "13688669047", "13688669048", "13688669049", "13688669050", "13688669051", "13688669052", "13688669053", "13688669054",
  20. "13688669055", "13688669056", "13688669057", "13688669058", "13688669059", "13688669060", "13688669061", "13688669062", "13688669063", "13688669064",
  21. "13688669065", "13688669066", "13688669067", "13688669068", "13688669069", "13688669070", "13688669071", "13688669072", "13688669073", "13688669074",
  22. "13688669075", "13688669076", "13688669077", "13688669078", "13688669079", "13688669080", "13688669081", "13688669082", "13688669083", "13688669084",
  23. "13688669085", "13688669086", "13688669087", "13688669088", "13688669089", "13688669090", "13688669091", "13688669092", "13688669093", "13688669094",
  24. "13688669095", "13688669096", "13688669097", "13688669098", "13688669099", "13688669100", "13688669101", "13688669102", "13688669103", "13688669104",
  25. "13688669105", "13688669106", "13688669107", "13688669108", "13688669109", "13688669110", "13688669111", "13688669112", "13688669113", "13688669114",
  26. "13688669115", "13688669116", "13688669117", "13688669118", "13688669119", "13688669120", "13688669121", "13688669122", "13688669123", "13688669124",
  27. "13688669125", "13688669126", "13688669127", "13688669128", "13688669129", "13688669130", "13688669131", "13688669132", "13688669133", "13688669134",
  28. "13688669135", "13688669136", "13688669137", "13688669138", "13688669139", "13688669140", "13688669141", "13688669142", "13688669143", "13688669144",
  29. "13688669145", "13688669146", "13688669147", "13688669148", "13688669149", "13688669150", "13688669151", "13688669152", "13688669153", "13688669154",
  30. "13688669155", "13688669156", "13688669157", "13688669158", "13688669159", "13688669160", "13688669161", "13688669162", "13688669163", "13688669164",
  31. "13688669165", "13688669166", "13688669167", "13688669168", "13688669169", "13688669170", "13688669171", "13688669172", "13688669173", "13688669174",
  32. "13688669175", "13688669176", "13688669177", "13688669178", "13688669179", "13688669180", "13688669181", "13688669182", "13688669183", "13688669184",
  33. "13688669185", "13688669186", "13688669187", "13688669188", "13688669189", "13688669190", "13688669191", "13688669192", "13688669193", "13688669194",
  34. "13688669195", "13688669196", "13688669197", "13688669198", "13688669199", "13688669200", "13688669201", "13688669202", "13688669203","13688669204",
  35. "13688669205", "13688669206", "13688669207", "13688669208", "13688669209", "13688669210", "13688669211", "13688669212", "13688669213", "13688669214",
  36. "13688669215", "13688669216", "13688669217", "13688669218", "13688669219", "13688669220", "13688669221", "13688669222", "13688669223", "13688669224",
  37. "13688669225", "13688669226", "13688669227", "13688669228", "13688669229", "13688669230", "13688669231", "13688669232", "13688669233", "13688669234",
  38. "13688669235", "13688669236", "13688669237", "13688669238", "13688669239", "13688669240", "13688669241", "13688669242", "13688669243", "13688669244",
  39. "13688669245", "13688669246", "13688669247", "13688669248", "13688669249", "13688669250", "13688669251", "13688669252", "13688669253", "13688669254",
  40. "13688669255", "13688669256", "13688669257", "13688669258", "13688669259", "13688669260", "13688669261", "13688669262", "13688669263", "13688669264",
  41. "13688669265", "13688669266", "13688669267", "13688669268", "13688669269", "13688669270", "13688669271", "13688669272", "13688669273", "13688669274",
  42. "13688669275", "13688669276", "13688669277", "13688669278", "13688669279", "13688669280", "13688669281", "13688669282", "13688669283", "13688669284",
  43. "13688669285", "13688669286", "13688669287", "13688669288", "13688669289", "13688669290", "13688669291", "13688669292", "13688669293", "13688669294",
  44. "13688669295", "13688669296", "13688669297", "13688669298", "13688669299", "13688669300", "13688669301", "13688669302", "13688669303", "13688669304",
  45. "13688669305", "13688669306", "13688669307", "13688669308", "13688669309", "13688669310", "13688669311", "13688669312", "13688669313", "13688669314",
  46. "13688669315", "13688669316", "13688669317", "13688669318", "13688669319", "13688669320", "13688669321", "13688669322", "13688669323", "13688669324",
  47. "13688669325", "13688669326", "13688669327", "13688669328", "13688669329", "13688669330", "13688669331", "13688669332", "13688669333", "13688669334",
  48. "13688669335", "13688669336", "13688669337", "13688669338", "13688669339", "13688669340", "13688669341", "13688669342", "13688669343", "13688669344",
  49. "13688669345", "13688669346", "13688669347", "13688669348", "13688669349", "13688669350", "13688669351", "13688669352", "13688669353", "13688669354",
  50. "13688669355", "13688669356", "13688669357", "13688669358", "13688669359", "13688669360", "13688669361", "13688669362", "13688669363", "13688669364",
  51. "13688669365", "13688669366", "13688669367", "13688669368", "13688669369", "13688669370", "13688669371", "13688669372", "13688669373", "13688669374",
  52. "13688669375", "13688669376", "13688669377", "13688669378", "13688669379", "13688669380", "13688669381", "13688669382", "13688669383", "13688669384",
  53. "13688669385", "13688669386", "13688669387", "13688669388", "13688669389", "13688669390", "13688669391", "13688669392", "13688669393", "13688669394",
  54. "13688669395", "13688669396", "13688669397", "13688669398", "13688669399", "13688669400", "13688669401", "13688669402", "13688669403", "13688669404",
  55. "13688669405", "13688669406", "13688669407", "13688669408", "13688669409", "13688669410", "13688669411", "13688669412", "13688669413", "13688669414",
  56. "13688669415", "13688669416", "13688669417", "13688669418", "13688669419", "13688669420", "13688669421", "13688669422", "13688669423", "13688669424",
  57. "13688669425", "13688669426", "13688669427", "13688669428", "13688669429", "13688669430", "13688669431", "13688669432", "13688669433", "13688669434",
  58. "13688669435", "13688669436", "13688669437", "13688669438", "13688669439", "13688669440", "13688669441", "13688669442", "13688669443", "13688669444",
  59. "13688669445", "13688669446", "13688669447", "13688669448", "13688669449", "13688669450", "13688669451", "13688669452", "13688669453", "13688669454",
  60. "13688669455", "13688669456", "13688669457", "13688669458", "13688669459", "13688669460", "13688669461", "13688669462", "13688669463", "13688669464",
  61. "13688669465", "13688669466", "13688669467", "13688669468", "13688669469", "13688669470", "13688669471", "13688669472", "13688669473", "13688669474",
  62. "13688669475", "13688669476", "13688669477", "13688669478", "13688669479", "13688669480", "13688669481", "13688669482", "13688669483", "13688669484",
  63. "13688669485", "13688669486", "13688669487", "13688669488", "13688669489", "13688669490", "13688669491", "13688669492", "13688669493", "13688669494",
  64. "13688669495", "13688669496", "13688669497", "13688669498", "13688669499", "13688669500", "13688669501", "13688669502", "13688669503", "13688669504",
  65. "13688669505", "13688669506", "13688669507", "13688669508", "13688669509", "13688669510", "13688669511", "13688669512", "13688669513", "13688669514",
  66. "13688669515", "13688669516", "13688669517", "13688669518", "13688669519", "13688669520", "13688669521", "13688669522", "13688669523","13688669524",
  67. "13688669525", "13688669526", "13688669527", "13688669528", "13688669529", "13688669530", "13688669531", "13688669532", "13688669533", "13688669534",
  68. "13688669535", "13688669536", "13688669537", "13688669538", "13688669539", "13688669540", "13688669541", "13688669542", "13688669543", "13688669544",
  69. "13688669545", "13688669546", "13688669547", "13688669548", "13688669549", "13688669550", "13688669551", "13688669552", "13688669553", "13688669554",
  70. "13688669555", "13688669556", "13688669557", "13688669558", "13688669559", "13688669560", "13688669561", "13688669562", "13688669563", "13688669564",
  71. "13688669565", "13688669566", "13688669567", "13688669568", "13688669569", "13688669570", "13688669571", "13688669572", "13688669573", "13688669574",
  72. "13688669575", "13688669576", "13688669577", "13688669578", "13688669579", "13688669580", "13688669581", "13688669582", "13688669583", "13688669584",
  73. "13688669585", "13688669586", "13688669587", "13688669588", "13688669589", "13688669590", "13688669591", "13688669592", "13688669593", "13688669594",
  74. "13688669595", "13688669596", "13688669597", "13688669598", "13688669599", "13688669600", "13688669601", "13688669602", "13688669603", "13688669604",
  75. "13688669605", "13688669606", "13688669607", "13688669608", "13688669609", "13688669610", "13688669611", "13688669612", "13688669613", "13688669614",
  76. "13688669615", "13688669616", "13688669617", "13688669618", "13688669619", "13688669620", "13688669621", "13688669622", "13688669623", "13688669624",
  77. "13688669625", "13688669626", "13688669627", "13688669628", "13688669629", "13688669630", "13688669631", "13688669632", "13688669633", "13688669634",
  78. "13688669635", "13688669636", "13688669637", "13688669638", "13688669639", "13688669640", "13688669641", "13688669642", "13688669643", "13688669644",
  79. "13688669645", "13688669646", "13688669647", "13688669648", "13688669649", "13688669650", "13688669651", "13688669652", "13688669653", "13688669654",
  80. "13688669655", "13688669656", "13688669657", "13688669658", "13688669659", "13688669660", "13688669661", "13688669662", "13688669663", "13688669664",
  81. "13688669665", "13688669666", "13688669667", "13688669668", "13688669669", "13688669670", "13688669671", "13688669672", "13688669673", "13688669674",
  82. "13688669675", "13688669676", "13688669677", "13688669678", "13688669679", "13688669680", "13688669681", "13688669682", "13688669683", "13688669684",
  83. "13688669685", "13688669686", "13688669687", "13688669688", "13688669689", "13688669690", "13688669691", "13688669692", "13688669693", "13688669694",
  84. "13688669695", "13688669696", "13688669697", "13688669698", "13688669699", "13688669700", "13688669701", "13688669702", "13688669703", "13688669704",
  85. "13688669705", "13688669706", "13688669707", "13688669708", "13688669709", "13688669710", "13688669711", "13688669712", "13688669713", "13688669714",
  86. "13688669715", "13688669716", "13688669717", "13688669718", "13688669719", "13688669720", "13688669721", "13688669722", "13688669723", "13688669724",
  87. "13688669725", "13688669726", "13688669727", "13688669728", "13688669729", "13688669730", "13688669731", "13688669732", "13688669733", "13688669734",
  88. "13688669735", "13688669736", "13688669737", "13688669738", "13688669739", "13688669740", "13688669741", "13688669742", "13688669743", "13688669744",
  89. "13688669745", "13688669746", "13688669747", "13688669748", "13688669749", "13688669750", "13688669751", "13688669752", "13688669753", "13688669754",
  90. "13688669755", "13688669756", "13688669757", "13688669758", "13688669759", "13688669760", "13688669761", "13688669762", "13688669763", "13688669764",
  91. "13688669765", "13688669766", "13688669767", "13688669768", "13688669769", "13688669770", "13688669771", "13688669772", "13688669773", "13688669774",
  92. "13688669775", "13688669776", "13688669777", "13688669778", "13688669779", "13688669780", "13688669781", "13688669782", "13688669783", "13688669784",
  93. "13688669785", "13688669786", "13688669787", "13688669788", "13688669789", "13688669790", "13688669791", "13688669792", "13688669793", "13688669794",
  94. "13688669795", "13688669796", "13688669797", "13688669798", "13688669799", "13688669800", "13688669801", "13688669802", "13688669803", "13688669804",
  95. "13688669805", "13688669806", "13688669807", "13688669808", "13688669809", "13688669810", "13688669811", "13688669812", "13688669813", "13688669814",
  96. "13688669815", "13688669816", "13688669817", "13688669818", "13688669819", "13688669820", "13688669821", "13688669822", "13688669823", "13688669824",
  97. "13688669825", "13688669826", "13688669827", "13688669828", "13688669829", "13688669830", "13688669831", "13688669832", "13688669833", "13688669834",
  98. "13688669835", "13688669836", "13688669837", "13688669838", "13688669839", "13688669840", "13688669841", "13688669842", "13688669843", "13688669844",
  99. "13688669845", "13688669846", "13688669847", "13688669848","13688669849", "13688669849", "13688669850", "13688669851", "13688669852", "13688669853",
  100. "13688669854", "13688669855", "13688669856", "13688669857", "13688669858", "13688669859", "13688669860", "13688669861", "13688669862", "13688669863",
  101. "13688669864", "13688669865", "13688669866", "13688669867", "13688669868", "13688669869", "13688669870", "13688669871", "13688669872", "13688669873",
  102. "13688669874", "13688669875", "13688669876", "13688669877", "13688669878", "13688669879", "13688669880", "13688669881", "13688669882", "13688669883",
  103. "13688669884", "13688669885", "13688669886", "13688669887", "13688669888", "13838411438", "17359456898"
  104. };
  105. for(String phone : phoneNumbers){
  106. //一致根据手机号查用户
  107. User user = query().eq("phone", phone).one();
  108. //7.保存用户信息到redis----------------
  109. //7.1 随机生成Token作为登录令牌
  110. String token = UUID.randomUUID().toString(true);
  111. String filePath = "C:\\code\\output.txt";
  112. String content = token+'\n';
  113. try (FileWriter fileWriter = new FileWriter(filePath, true);
  114. BufferedWriter bufferedWriter = new BufferedWriter(fileWriter)) {
  115. // 写入内容
  116. bufferedWriter.write(content);
  117. // 确保内容都已写入文件
  118. bufferedWriter.flush();
  119. } catch (IOException e) {
  120. throw new RuntimeException(e);
  121. }
  122. System.out.println(token);
  123. //7.2 将User对象转为Hash存储
  124. UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
  125. Map<String, Object> userMap = BeanUtil.beanToMap(userDTO,new HashMap<>(),
  126. CopyOptions.create()
  127. .setIgnoreNullValue(true)
  128. .setFieldValueEditor((fieldName,fieldValue)->fieldValue.toString()));
  129. //7.3 存储
  130. stringRedisTemplate.opsForHash().putAll("login:token:"+token,userMap);
  131. //7.4设置token有效期
  132. String tokenKey = LOGIN_USER_KEY+token;
  133. stringRedisTemplate.expire(tokenKey,999999999,TimeUnit.MINUTES);
  134. }
  135. }

更改秒杀库存为200: 

订单表清空:

Jmeter中线程数设为1000:

在HTTP信息头管理器中进行如下设置:

在CSV数据文件设置中进行如下设置:

下面是测试结果:

最小值和最大值是响应时间的最小值和最大值。平均值是平均响应时间。

优惠券被抢完,没有超领和少领的情况发生:

刚好200条订单记录:

查询优惠券、查询订单、减库存、创建订单都需要与数据库交互,导致效率低下。特别是减库存和创建订单都是对数据库的写操作,耗时较久。

异步开启一个独立的线程去完成Tomcat的操作。

库存:KEY用string类型,VALUE用数值类型。

一人一单:KEY用string类型,VALUE用set集合类型。

因为这段代码比较长要用Lua脚本来编写:

首先要执行Lua脚本,然后判断返回结果是否为0,如返回0代表成功下单优惠券,将优惠券id、用户id和订单id放入阻塞队列,直接返回订单id给用户。

如果想提高写入数据库的性能,可以多开线程,由单个线程的写,变成多个线程批量的写。

P47 秒杀优化 基于Redis完成秒杀资格判断

在VoucherServiceImpl的addSeckillVoucher方法的末尾添加下面这段代码把秒杀的库存保存到Redis中:

  1. //保存秒杀的库存到Redis
  2. stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY +voucher.getId(),voucher.getStock().toString());

发送请求,新增一份优惠券: 

可以看到在Redis中记录了优惠券的记录:

在redis中可以用sadd来往set集合中添加键值,可以用sismember来查询集合中是否有某个元素。

Lua脚本编写如下:

  1. --1.参数列表
  2. --1.1.优惠券id
  3. local voucherId = ARGV[1]
  4. --1.2.用户id
  5. local userId = ARGV[2]
  6. --2.数据key
  7. --2.1.库存key
  8. local stockKey = 'seckill:stock:' .. voucherId
  9. --2.2.订单key
  10. local orderKey = 'seckill:order:' .. voucherId
  11. --3.脚本业务
  12. --3.1.判断库存是否充足 get stockKey
  13. if(tonumber(redis.call('get',stockKey)) <= 0) then
  14. --3.1.2.库存不足,返回1
  15. return 1
  16. end
  17. --3.2.判断用户是否下单 SISMEMBER orderKey userId
  18. if(redis.call('sismember',orderKey,userId)==1) then
  19. --3.2.1.存在,说明是重复下单,返回2
  20. return 2
  21. end
  22. --3.3.扣库存 incrby stockKey -1
  23. redis.call('incrby',stockKey,-1)
  24. --3.4.下单 sadd orderKey userId
  25. redis.call('sadd',orderKey,userId)
  26. return 0

在VoucherOrderServiceImpl类中写入如下代码: 

Lua脚本的加载:

  1. private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
  2. static{
  3. SECKILL_SCRIPT = new DefaultRedisScript<>();
  4. SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
  5. SECKILL_SCRIPT.setResultType(Long.class);
  6. }

执行Lua脚本:

  1. Long result = stringRedisTemplate.execute( //调用execute方法,返回值
  2. SECKILL_SCRIPT, //加载的模板对象
  3. Collections.emptyList(), //键参数
  4. voucherId.toString(), //值参数1
  5. UserHolder.getUser().getId().toString() //值参数2
  6. );

 更改seckillVoucher方法的代码如下:

  1. @Override
  2. public Result seckillVoucher(Long voucherId) {
  3. //1.执行Lua脚本
  4. Long result = stringRedisTemplate.execute(
  5. SECKILL_SCRIPT,
  6. Collections.emptyList(),
  7. voucherId.toString(),
  8. UserHolder.getUser().getId().toString()
  9. );
  10. int r = result.intValue();
  11. if(r != 0){ //2.判断结果是否为0,不为0,代表没有购买资格
  12. return Result.fail(r==1 ? "库存不足":"不能重复下单");
  13. }
  14. //2.2.为0,有购买资格,把下单信息保存到阻塞队列
  15. long orderId = redisIdWorker.nextId("order");
  16. // TODO 保存阻塞队列
  17. //3.返回订单id
  18. return Result.ok(orderId);
  19. }

在Apifox中发送测试数据,秒杀下单,成功后返回订单id:

在Redis中库存成功扣减1,order有缓存:

如果再次发送会提示不能重复下单:

准备在Jemeter中测试,首先把缓存中的优惠券库存改为200:

测试后库存减为0,新增200条订单记录:

可以看到平均响应时间减少10倍,最快响应时间减少60倍,最大响应时间缩短:

可见这种优化对系统的性能提升非常大!

P48 秒杀优化 基于阻塞队列实现秒杀异步下单

阻塞队列:尝试从队列获取元素,如果没有元素会被阻塞,直到队列中有元素才会被唤醒,获取元素。  

只要类一启动,用户随时都有可能来抢购,因此VoucherOrderHandler这个类的初始化必须在类初始化后执行。

在VoucherOrderServiceImpl类中,首先要新增一个orderTasks阻塞队列,然后设置一个线程池和run方法。

在run方法中调用阻塞队列的take方法,orderTasks.take方法是一个阻塞方法,如果队列中有元素会获取,如果队列中无元素则阻塞等待。

这里相当于是开启了一个全新的线程来执行获取队列中订单信息和异步创建订单的任务:

  1. private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);
  2. private static ExecutorService seckill_order_executor = Executors.newSingleThreadExecutor();
  3. @PostConstruct
  4. private void init(){
  5. seckill_order_executor.submit(new VoucherOrderHandler());
  6. }
  7. private class VoucherOrderHandler implements Runnable{
  8. @SneakyThrows
  9. @Override
  10. public void run() {
  11. while(true){
  12. try {
  13. //1.获取队列中的订单信息
  14. VoucherOrder voucherOrder = orderTasks.take();
  15. //2.创建订单
  16. handleVoucherOrder(voucherOrder);
  17. } catch (InterruptedException e) {
  18. log.debug("处理订单异常",e);
  19. }
  20. }
  21. }
  22. }

然后新增一个handleVoucherOrder方法,这个方法主要用来获取锁然后调用createVoucherOrder方法:

  1. public IVoucherOrderService proxy ;
  2. private void handleVoucherOrder(VoucherOrder voucherOrder) {
  3. //1.获取用户
  4. Long userId = voucherOrder.getUserId();
  5. //2.创建锁对象
  6. RLock lock = redissonClient.getLock("lock:order:"+userId);
  7. //3.获取锁
  8. boolean isLock = lock.tryLock();
  9. //4.判断是否获取锁成功
  10. if(!isLock) {
  11. log.error("不允许重复下单");
  12. return;
  13. }
  14. try {
  15. //获取代理对象
  16. proxy.createVoucherOrder(voucherOrder);
  17. }finally {
  18. lock.unlock();
  19. }
  20. }

createVoucherOrder方法主要是用来对数据库操作,比如扣减库存,然后保存订单的信息到数据库,会有额外的对一人一单和库存数量的判断,虽然这些在Redis中已经判断过,但这里是双重保险。

异步处理不需要再返回给前端任何东西。

  1. @Transactional
  2. public void createVoucherOrder(VoucherOrder voucherOrder) {
  3. //6.一人一单
  4. Long userId = voucherOrder.getUserId();
  5. //6.1查询订单
  6. int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
  7. //6.2判断是否存在
  8. if(count>0){
  9. //用户已经购买过了
  10. log.error("用户已经购买过一次!");
  11. return;
  12. }
  13. //3.2库存充足扣减库存
  14. boolean success = seckillVoucherService.update()
  15. .setSql("stock = stock - 1") //相当于set条件 set stock = stock - 1
  16. .eq("voucher_id", voucherOrder.getVoucherId()) //相当于where条件 where id = ? and stock = ?
  17. .gt("stock",0).update();
  18. if(!success){
  19. log.error("库存不足!");
  20. return;
  21. }
  22. long orderId = redisIdWorker.nextId("order");//订单id
  23. voucherOrder.setId(orderId);
  24. voucherOrder.setUserId(userId);
  25. voucherOrder.setVoucherId(voucherOrder.getVoucherId());//代金券id
  26. save(voucherOrder);
  27. }

下面是对seckillVoucher的简单修改:

  1. public Result seckillVoucher(Long voucherId) {
  2. //1.执行Lua脚本
  3. Long result = stringRedisTemplate.execute(
  4. SECKILL_SCRIPT,
  5. Collections.emptyList(),
  6. voucherId.toString(),
  7. UserHolder.getUser().getId().toString()
  8. );
  9. int r = result.intValue();
  10. if(r != 0){ //2.判断结果是否为0,不为0,代表没有购买资格
  11. return Result.fail(r==1 ? "库存不足":"不能重复下单");
  12. }
  13. //2.2.为0,有购买资格,把下单信息保存到阻塞队列
  14. long orderId = redisIdWorker.nextId("order");
  15. //封装
  16. VoucherOrder voucherOrder = new VoucherOrder();
  17. voucherOrder.setId(orderId);//订单id
  18. voucherOrder.setUserId(UserHolder.getUser().getId());//用户id
  19. voucherOrder.setVoucherId(voucherId);//代金券id
  20. //保存阻塞队列
  21. orderTasks.add(voucherOrder);
  22. //获取代理对象
  23. proxy = (IVoucherOrderService) AopContext.currentProxy();
  24. //3.返回订单id
  25. return Result.ok(orderId);
  26. }

测试:

先把tb_voucher_order内容清空。把tb_seckill_voucher的stock库存改为200。

然后把Redis中对应优惠券的库存改为200。清空之前生成的订单。检查是否有1000个用户的token。

先用Apifox进行测试,测试一人一单的情况:第2次下单显示不能重复下单。

检查数据库是否多1条订单记录,库存是否减少1,缓存中库存是否减少1。

接下来用Jemeter进行测试,会发现库存扣减为0,数据库中多200条数据,缓存中的库存也扣减到0。

看聚合报告的结果如下:

因为做了异步下单,会占用一定的CPU,所以平均值要比第2次更长。

和下面前2次的结果进行对比可以发现,响应的平均值比最初提高10倍,最快响应时间提高了80倍,最慢响应时间提高了6倍。

秒杀业务的优化思路:

1.先利用Redis完成库存量、一人一单的判断,完成抢单业务。

2.将下单业务放入阻塞队列,利用独立线程异步下单。

基于阻塞队列的异步秒杀存在哪些问题:

1.内存限制问题。使用的是jdk提供的阻塞队列,使用的是JVM的内存,在一开始写死了队列空间的大小,如果在高并发的情况下,队列很快会被占满,如果不对队列的空间加以限制,很容易造成内存的溢出。

2.数据安全问题。缺乏持久化机制,是基于内存来保存信息,如果服务突然宕机,内存中保存的信息都会丢失。如果任务被取出,但由于突然发生事故异常,导致任务没有被消费,任务丢失,会造成数据不一致问题。

P49 Redis消息队列 认识消息队列

1.消息队列是在JVM外部的独立服务,不受JVM内存的限制。

2.消息队列不仅负责数据存储,还要保证数据安全。消息队列在消费者接收到消息后要进行消息确认

Redis提供了3种不同的方式来实现消息队列:

1.list结构:基于List结构模拟消息队列。

2.PubSub(发布订阅):基本的点对点消息模型。

3.Stream:比较完善的功能强大的消息队列模型。

P50 Redis消息队列 基于List实现消息队列

Redis的list数据结构是一个双向链表,容易模拟出队列效果。

队列的入口和出口不在一边,可以利用:LPUSH结合RPOP,RPUSH结合LPOP来实现。

如果队列中没有消息时RPOP或LPOP的操作会返回null,不会像JVM的阻塞队列那样阻塞并等待消息,因此这里应该用BRPOPBLPOP来实现阻塞效果。

List消息队列优点:

1.利用Redis存储,不受限于JVM内存上限。

2.基于Redis的持久化机制,数据安全性有保证。

3.可以保证消息的有序性。

缺点:

1.无法避免消息丢失。

2.只支持单消费者。

P51 Redis消息队列 PubSub实现消息队列

PubSub(Publish Subscribe 发布订阅):Redis2.0版本引入的消息传递模型,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

SUBSCRIBE 频道名称    :   订阅一个或多个频道。

PUBLISH channel msg    :   向一个频道发送消息。

PSUBSCRIBE pattern   :   订阅与pattern格式匹配的所有频道。

基于PubSub的消息队列有哪些优缺点:

优点:

1.采用发布订阅模型,支持多生产、多消费。

缺点:

1.不支持数据持久化。

2.无法避免消息丢失(如果发布的消息没有人订阅,消息直接丢失)安全性无法保障

3.消息堆积有上限(消费者缓存的空间有上限),超出时数据丢失。

P52 Redis消息队列 Stream的单消费模式

要注意,如果想使用Stream消息队列必须把Redis的版本上升到5.0之后。

需要注意的是key和*|ID中间那俩参数是可选参数,一个是用来判断是否自动创建队列,一个是用来设置队列最大消息数量。

当指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次读取的还是最新的一条消息,此时中间的几条消息会被漏读。

STREAM类型消息队列的XREAD命令特点:

1.消息可回溯。

2.一个消息可以被多个消费者读取。

3.可以阻塞读取。

4.有消息漏读风险。

 P53 Redis消息队列 Stream的消费组模式

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。消费者之间是竞争关系。

1.消息分流:队列中的消息会分流给组内不同消费者,而不是重复消费,从而加快消息处理的速度。

2.消息标示:消费者组会维护一个标示(类似于标签,记录读到哪里了),记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息,确保每一个消息都会被消费。

3.消息确认(解决消息丢失问题):消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。

创建消费者组:

XGROUP CREATE key groupName ID [MKSTREAM]

key是队列名称

groupName是消费者组名称

ID是起始ID标示,$代表队列最后一个消息,0代表队列第一个消息。

MKSTREAM是队列不存在时自动创建队列。

删除指定的消费者组:

XGROUP DESTORY key groupName

给指定的消费者组添加消费者:

XGROUP CREATECONSUMER key groupname consumername

删除消费者组中的指定消费者:

XGROUP DELCONSUMER key groupname consumername

创建消费者组:

从消费者组读取消息:

group:是消费者组名称。

consumer:是消费者名称,如果消费者不存在,会自动创建。

count:本次查询最大数量。

BLOCK:阻塞时最长等待时间。

NOACK:取消消费者手动ACK,获取到消息自动确认(不建议开启)

STREAMS key:指定队列名称

ID:获取消息的起始ID。(">":从下一个未消费的消息开始读取。其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始)

可以发现在同一个消费者组里的消费者对消息不会重复读取,而是依次读取,已被读取的消息不会再次被读取。

 消费者确认消息:

XACK key group ID 

key:是队列名称。

group:是消费者组名称。

ID:是接收到的消息的ID。

查看Pending-list队列的信息:

key:是队列名称。

group:是组名称。

下面是消息确认:

对消息进行确认,确认完消息会被移除:

Pending-list队列里面存储的是已经读取,但是还没确认的消息。

假如一台节点读取完消息还没却来得及确认就宕机了,可以通过以下的方法解决:

正常情况下先用>,如果出现异常,信息会进入到Pending-list,把ID从>改为0,此时取的就是在Pending-list里的消息。

STREAM类型消息队列的XREADGROUP命令特点:

1.消息可回溯。

2.可以多消费者争抢消息,加快消费速度。

3.可以阻塞读取。

4.没有消息漏读的风险。

5.有消息确认机制,消息至少被消费一次。

6.支持消息持久化

如果公司业务比较庞大,对于消息队列的要求更加严格,还是要用RabbitMQ和RocketMQ。

P54 Redis消息队列 基于Stream消息队列实现异步秒杀

直接通过控制台创建一个stream.orders队列:

直接在Lua脚本中编写代码(主要增加一个局部变量,): 

  1. --1.参数列表
  2. --1.1.优惠券id
  3. local voucherId = ARGV[1]
  4. --1.2.用户id
  5. local userId = ARGV[2]
  6. --1.3.订单id
  7. local orderId = ARGV[3]
  8. --2.数据key
  9. --2.1.库存key
  10. local stockKey = 'seckill:stock:' .. voucherId
  11. --2.2.订单key
  12. local orderKey = 'seckill:order:' .. voucherId
  13. --3.脚本业务
  14. --3.1.判断库存是否充足 get stockKey
  15. if(tonumber(redis.call('get',stockKey)) <= 0) then
  16. --3.1.2.库存不足,返回1
  17. return 1
  18. end
  19. --3.2.判断用户是否下单 SISMEMBER orderKey userId
  20. if(redis.call('sismember',orderKey,userId)==1) then
  21. --3.2.1.存在,说明是重复下单,返回2
  22. return 2
  23. end
  24. --3.3.扣库存 incrby stockKey -1
  25. redis.call('incrby',stockKey,-1)
  26. --3.4.下单 sadd orderKey userId
  27. redis.call('sadd',orderKey,userId)
  28. --3.5.发送消息到队列中 XADD stream.orders * k1 v1 k2 v2
  29. redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'orderId',orderId)
  30. return 0

在VoucherOrderServiceImpl类中修改seckillVoucher方法:

  1. public Result seckillVoucher(Long voucherId) {
  2. //获取订单id
  3. long orderId = redisIdWorker.nextId("order");
  4. //1.执行Lua脚本(判断用户是否有购买资格,消息发出)
  5. Long result = stringRedisTemplate.execute(
  6. SECKILL_SCRIPT,
  7. Collections.emptyList(),
  8. voucherId.toString(),
  9. UserHolder.getUser().getId().toString(),
  10. String.valueOf(orderId)
  11. );
  12. int r = result.intValue();
  13. if(r != 0){ //2.判断结果是否为0,不为0,代表没有购买资格
  14. return Result.fail(r==1 ? "库存不足":"不能重复下单");
  15. }
  16. //获取代理对象
  17. proxy = (IVoucherOrderService) AopContext.currentProxy();
  18. //3.返回订单id
  19. return Result.ok(orderId);
  20. }

 在VoucherOrderServiceImpl中修改VoucherOrderHandler方法的代码:

代码思路如下:

1.从消息队列中尝试读消息。

        1.1.获取失败,继续循环。

2.获取成功,进行解析和转换。

3.调用createVoucherOrder(voucherOrder)方法完成下单。

4.ACK确认

        4.1.确认失败,调用handlePendingList()方法进行处理。

  1. private static ExecutorService seckill_order_executor = Executors.newSingleThreadExecutor();
  2. @PostConstruct
  3. private void init(){
  4. seckill_order_executor.submit(new VoucherOrderHandler());
  5. }
  6. private class VoucherOrderHandler implements Runnable{
  7. String queueName = "stream.order";
  8. @SneakyThrows
  9. @Override
  10. public void run() {
  11. while(true){
  12. try {
  13. //1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order >
  14. List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
  15. Consumer.from("g1", "c1"),
  16. StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
  17. StreamOffset.create(queueName, ReadOffset.lastConsumed())
  18. );
  19. //2.判断消息获取是否成功
  20. if(list==null || list.isEmpty()){
  21. //2.1.获取失败,没有消息,继续下一次循环
  22. continue;
  23. }
  24. //3.解析消息中的订单信息
  25. MapRecord<String, Object, Object> record = list.get(0);
  26. //4.获取成功,可以下单
  27. Map<Object, Object> values = record.getValue();
  28. //3.创建订单
  29. VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
  30. createVoucherOrder(voucherOrder);
  31. //4.ACK确认
  32. stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
  33. } catch (Exception e) {
  34. log.debug("处理订单异常",e);
  35. handlePendingList();
  36. }
  37. }
  38. }

在VoucherOrderServiceImpl中添加handlePendingList()方法的代码:

下面有几个修改点:1.XREADGROUP语句末尾改为0,表示读Pending-list队列。2.Pending-list消息获取失败结束循环。3.如果抛异常只是暂停一下,然后会继续循环读。

  1. private void handlePendingList() {
  2. while(true){
  3. try {
  4. //1.获取Pending-List中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order 0
  5. List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
  6. Consumer.from("g1", "c1"),
  7. StreamReadOptions.empty().count(1),
  8. StreamOffset.create(queueName, ReadOffset.from("0"))
  9. );
  10. //2.判断消息获取是否成功
  11. if(list==null || list.isEmpty()){
  12. //2.1.获取失败,说明Pending-list里没有异常消息,结束循环
  13. break;
  14. }
  15. //3.解析消息中的订单信息
  16. MapRecord<String, Object, Object> record = list.get(0);
  17. //4.获取成功,可以下单
  18. Map<Object, Object> values = record.getValue();
  19. //3.创建订单
  20. VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
  21. //4.ACK确认
  22. stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
  23. } catch (Exception e) {
  24. log.debug("处理Pending-list异常",e);
  25. try {
  26. Thread.sleep(20);
  27. } catch (InterruptedException ex) {
  28. throw new RuntimeException(ex);
  29. }
  30. }
  31. }
  32. }

启动项目进行测试:

首先用Apifox进行测试,测试接口请求发送成功:

测试成功后可以看到:tb_voucher_order表多了1条记录,tb_seckill_voucher表对应优惠券的库存-1;在Redis中seckill:order下出现订单记录,在stockill:stock下的库存-1,在stream.orders下出现1条新的记录。

然后用Jmeter进行测试:可以发现相较于未做异步处理的情况性能仍有较大提升。

 P55 达人探店 发布探店笔记

一般来讲企业开发会将图片等文件上传到一个专门的文件服务器上。

但我们这个项目目前只会将文件上传到前端服务器上。

复制下面的链接,然后保存到下面这个位置:

上传图片:

在我的和首页都可以看到新发布的博文:

P56 达人探店 查看探店笔记

探店笔记要包含笔记的内容和博主的相关信息。所以选择在Blog表中添加如下2个字段,这两个字段需要后续我们手动维护(赋值)。

 

下面是接口的请求地址和说明,建议先不看视频自己写,写完后和视频比较差异。

 

在BlogController类里添加一个queryBlogById方法(虽然Controller里不应该出现业务代码,但鉴于只是简单的查询操作,就不必在意细节了):

  1. @GetMapping("/{id}")
  2. public Result queryBlogById(@PathVariable("id") Long id){
  3. Blog blog = blogService.getById(id);
  4. if(blog==null){
  5. return Result.fail("笔记不存在");
  6. }
  7. User user = userService.getById(id);
  8. blog.setIcon(user.getIcon());
  9. blog.setName(user.getNickName());
  10. return Result.ok(blog);
  11. }

P57 达人探店 点赞功能

现在的点赞逻辑是,一个人可以对同一篇笔记点赞无数次。

需求:

1.同一个用户只能点赞一次,如果再次点击则取消点赞。

2.如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段Blog类的isLike属性)

分析:

1.给Blog类中添加一个isLike字段,标示是否被当前用户点赞。

2.修改点赞功能,利用Redis的Set集合判断是否点赞过,未点赞则点赞数+1,已点赞则点赞数-1。

3.修改分页查询Blog业务和根据id查询Blog的业务,判断当前用户是否点赞过,赋值给isLike字段。

代码如下:

在BlogController类中新增likeBlog方法:

  1. @PutMapping("/like/{id}")
  2. public Result likeBlog(@PathVariable("id") Long id) {
  3. return blogService.likeBlog(id);
  4. }

在IBlogService接口中添加方法声明:

Result likeBlog(Long id);

在BlogServiceImpl类中添加下面代码:

  1. private final StringRedisTemplate stringRedisTemplate;
  2. public BlogServiceImpl(StringRedisTemplate stringRedisTemplate) {
  3. this.stringRedisTemplate = stringRedisTemplate;
  4. }
  5. @Override
  6. public Result likeBlog(Long id) {
  7. //1.获取登录用户
  8. UserDTO user = UserHolder.getUser();
  9. Long userId = user.getId();
  10. //2.判断当前用户是否已经点赞过
  11. String key = "blog:liked:" +id;
  12. Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
  13. if(BooleanUtil.isFalse(isMember)){
  14. //3.未点赞,可以点赞
  15. //3.1.数据库点赞数+1
  16. boolean isSuccess = update().setSql("liked=liked+1").eq("id", id).update();
  17. //3.2.保存用户到Redis
  18. if(isSuccess){
  19. stringRedisTemplate.opsForSet().add(key,userId.toString());
  20. }
  21. }else{
  22. //4.已点赞,取消点赞
  23. //4.1.数据库点赞数-1
  24. boolean isSuccess = update().setSql("liked=liked-1").eq("id", id).update();
  25. //4.2.把用户从Redis的set集合移除
  26. stringRedisTemplate.opsForSet().remove(key,userId.toString());
  27. }
  28. return Result.ok();
  29. }

 因为我的queryBlogById和queryHotBlog的业务代码都沿用原本的代码写在BlogController中,因此我是直接在BlogController中写入isBlogLiked代码:

  1. public Boolean isBlogLiked(Blog blog) {
  2. Long userId = null;
  3. try {
  4. //1.获取登录用户
  5. userId = UserHolder.getUser().getId();
  6. } catch (Exception e) {
  7. log.debug("用户未登录!");
  8. return false;
  9. }
  10. //2.判断当前用户是否已经点赞过
  11. String key = "blog:liked:" +blog.getId();
  12. Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
  13. try {
  14. blog.setIsLike(BooleanUtil.isTrue(isMember));
  15. } catch (Exception e) {
  16. log.debug("点赞信息为空!");
  17. return false;
  18. }
  19. return isMember;
  20. }

isBlogLiked的作用主要是给blog对象设置值(在Java中对象是引用传递),前端会根据返回的blog对象的isLike参数的true或false来给点赞标签高亮或灰暗。

测试效果:点赞一次高亮,点赞两次取消。在缓存中有相应的记录:

P58 达人探店 点赞排行榜

需求:在探店笔记详情页面,按照时间排序,把最早点赞的TOP5列举出来,形成点赞排行榜。

我们选用SortedSet来实现功能。

可以用ZADD命令添加元素,ZSCOPE来获得分数对应的元素,ZRANGE来

修改的点有如下几个:

在Redis缓存中多了一个score:

在BlogController类中添加如下方法:

  1. @GetMapping("/likes/{id}")
  2. public Result queryBlogLikes(@PathVariable("id") Long id) {
  3. return blogService.queryBlogLikes(id);
  4. }

 在IBlogService接口中添加如下方法:

Result queryBlogLikes(Long id);

 在BlogServiceImpl类中添加如下方法:

  1. @Override
  2. public Result queryBlogLikes(Long id) {
  3. String key = RedisConstants.BLOG_LIKED_KEY +id;
  4. //1.查询top5的点赞用户 zrange key 0 4
  5. Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
  6. if(top5==null || top5.isEmpty()){
  7. return Result.ok(Collections.emptyList());
  8. }
  9. //2.解析出其中的用户id
  10. List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
  11. //3.根据用户id查询用户
  12. List<UserDTO> userDTOS = userService.listByIds(ids)
  13. .stream()
  14. .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
  15. .collect(Collectors.toList());
  16. //4.返回
  17. return Result.ok(userDTOS);
  18. }

现在会出现左图问题,先点赞的反而被排到后面了: 

 

下面是修改后的queryBlogLikes:

  1. @Override
  2. public Result queryBlogLikes(Long id) {
  3. String key = RedisConstants.BLOG_LIKED_KEY +id;
  4. //1.查询top5的点赞用户 zrange key 0 4
  5. Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
  6. if(top5==null || top5.isEmpty()){
  7. return Result.ok(Collections.emptyList());
  8. }
  9. //2.解析出其中的用户id
  10. List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
  11. //3.根据用户id查询用户
  12. String idStr = StrUtil.join(",", ids);
  13. List<UserDTO> userDTOS = userService.query()
  14. .in("id",ids)
  15. .last("ORDER BY FIELD(id,"+idStr+")").list()
  16. .stream()
  17. .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
  18. .collect(Collectors.toList());
  19. //4.返回
  20. return Result.ok(userDTOS);
  21. }

能够正常展示: 

P59 好友关注 关注和取关

关注和取关请求:

查看是否已关注:

关注是给表新增记录,取关是删除表中记录。

在FollowController类中写入如下代码:

  1. @RestController
  2. @RequestMapping("/follow")
  3. public class FollowController {
  4. @Autowired
  5. IFollowService followService;
  6. @PutMapping("/{id}/{isFollow}")
  7. public Result follow(@PathVariable("id") Long followUserId, @PathVariable("isFollow") Boolean isFollow) {
  8. return followService.follow(followUserId,isFollow);
  9. }
  10. @GetMapping("/or/not/{id}")
  11. public Result isFollow(@PathVariable("id") Long followUserId) {
  12. return followService.isFollow(followUserId);
  13. }
  14. }

在IFollowService中写入如下代码:

  1. public interface IFollowService extends IService<Follow> {
  2. Result follow(Long followUserId, Boolean isFollow);
  3. Result isFollow(Long followUserId);
  4. }

在FollowServiceImpl类中写入如下代码:

  1. @Service
  2. public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {
  3. @Override
  4. public Result follow(Long followUserId, Boolean isFollow) {
  5. Long userId = UserHolder.getUser().getId();
  6. //1.判断是关注还是取关
  7. if(isFollow){
  8. //2.关注,新增数据
  9. Follow follow = new Follow();
  10. follow.setUserId(userId);
  11. follow.setFollowUserId(followUserId);
  12. save(follow);
  13. }else{
  14. //3.取关,删除记录
  15. remove(new QueryWrapper<Follow>()
  16. .eq("user_id", userId).eq("follow_user_id", followUserId));
  17. }
  18. return Result.ok();
  19. }
  20. @Override
  21. public Result isFollow(Long followUserId) {
  22. Long userId = UserHolder.getUser().getId();
  23. //1.查询是否关注
  24. Integer count = query().eq("user_id", userId).eq("follow_user_id", followUserId).count();
  25. //2.判断是否关注
  26. return Result.ok(count>0);
  27. }
  28. }

测试:点击关注显示关注成功,数据库里会有记录。

取消关注后记录消息。

P60 好友关注 共同关注

把准备好的queryUserById方法放入到UserController中

把准备好的queryBlogByUserId方法放入到BlogController中

功能实现的思路是:在关注某位用户之后,同时将被关注的用户id存入到Redis中。查看共同关注的时候,只需要求被查看的这位用户与自己关注列表的交集即可。

首先要更改FollowServiceImpl代码中的follow方法,主要在关注时把被关注用户的id放入redis,取关时从redis中移除id:

  1. @Resource
  2. private final StringRedisTemplate stringRedisTemplate;
  3. @Resource
  4. private IUserService userService;
  5. public FollowServiceImpl(StringRedisTemplate stringRedisTemplate) {
  6. this.stringRedisTemplate = stringRedisTemplate;
  7. }
  8. @Override
  9. public Result follow(Long followUserId, Boolean isFollow) {
  10. Long userId = UserHolder.getUser().getId();
  11. String key = "follows:" + userId;
  12. //1.判断是关注还是取关
  13. if(isFollow){
  14. //2.关注,新增数据
  15. Follow follow = new Follow();
  16. follow.setUserId(userId);
  17. follow.setFollowUserId(followUserId);
  18. boolean isSuccess = save(follow);
  19. if(isSuccess){
  20. //把关注用户的id放入redis的set集合
  21. stringRedisTemplate.opsForSet().add(key,followUserId.toString());
  22. }
  23. }else{
  24. //3.取关,删除记录
  25. boolean isSuccess = remove(new QueryWrapper<Follow>()
  26. .eq("user_id", userId).eq("follow_user_id", followUserId));
  27. if(isSuccess){
  28. //把关注用户的id从Redis移除
  29. stringRedisTemplate.opsForSet().remove(key,followUserId.toString());
  30. }
  31. }
  32. return Result.ok();
  33. }

下面进行简单测试,关注时数据存入redis没问题: 

 点击共同关注报错,但发出了请求,原因是还没编写方法:

可以通过SINTER命令求出交集:

在FollowController中写入下面代码:

  1. @GetMapping("/common/{id}")
  2. public Result followCommons(@PathVariable("id") Long id){
  3. return followService.followCommons(id);
  4. }

 在IFollowService中写入下面代码:

Result followCommons(Long id);

在FollowServiceImpl类中写入下面代码:

  1. @Override
  2. public Result followCommons(Long id) {
  3. //求的是目标用户和当前用户关注的交集
  4. //1.获取key
  5. Long userId = UserHolder.getUser().getId();
  6. String key1 = "follows:"+userId;
  7. String key2 = "follows:"+id;
  8. //2.求交集
  9. Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key1, key2);
  10. if(intersect==null||intersect.isEmpty()){
  11. return Result.ok(Collections.emptyList());
  12. }
  13. //3.解析id集合
  14. List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
  15. //4.查询用户
  16. List<UserDTO> users = userService.listByIds(ids)
  17. .stream()
  18. .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
  19. .collect(Collectors.toList());
  20. return Result.ok(users);
  21. }

像我现在关注的是小鱼同学和可可今天不吃肉。接下来换号,换成小鱼同学,电话:13686869696,让小鱼同学关注我和可可今天不吃肉。以小鱼同学的视角来查看我,可以看到我们共同关注了可可今天不吃肉。

P61 好友关注  Feed流实现方案分析

关注推送也叫做Feed流,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无限下拉刷新获取新的消息。

Feed流产品有2种常见模式:

1.Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈。

优点:信息全面,不会有缺失。并且实现也相对简单。

缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低。

2.智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣的信息来吸引用户。

优点:投喂用户感兴趣的信息,用户粘度高,容易沉迷。

缺点:如果算法不精确,可能起反作用。

— — — — — — — — — — — — — — — —

1.拉模式:读扩散。

信息的发送端会把信息发送到发件箱。等信息的接受端要读信息时,把发件箱的消息拉取到收件箱,然后将所有消息按照时间进行排序。

缺点:操作耗费时间多,存在延迟。

2.推模式:写扩散。

信息的发送端会直接把信息发送到所有接收方的收件箱。接收方收件箱内的消息按时间逐个进行排序。

缺点:需要保存大量的消息。

3.推拉结合模式:读写混合,兼具推和拉两种模式的优点。

当普通人发消息,因为粉丝少,直接采用推模式。

当大V发消息,因为粉丝多,对于活跃粉丝(数量少)采用推模式,对于普通粉丝僵死粉(数量多)采用拉模式。

P62 好友关注 推送到粉丝收件箱

要实现分页查询要指定page和size,计算从哪里开始到哪里结束。

List有角标,SortedSet有排名。

Feed流中的数据会不断更新,所以数据的角标也在变化,因此不能用传统的分页模式。

用滚动分页模式:

score范围进行查询,记住最小的时间戳,下次找比这个更小的时间戳。

在BlogController里面写入如下代码:

  1. @PostMapping
  2. public Result saveBlog(@RequestBody Blog blog) {
  3. // 获取登录用户
  4. UserDTO user = UserHolder.getUser();
  5. blog.setUserId(user.getId());
  6. // 保存探店博文
  7. blogService.saveBlog(blog);
  8. // 返回id
  9. return Result.ok(blog.getId());
  10. }

在IBlogService接口里面写入如下代码:

Result saveBlog(Blog blog);

在BlogServiceImpl类中写入如下代码:

  1. @Override
  2. public Result saveBlog(Blog blog) {
  3. //1.获取登录用户
  4. UserDTO user = UserHolder.getUser();
  5. blog.setUserId(user.getId());
  6. //2.保存探店笔记
  7. boolean isSuccess = save(blog);
  8. if(!isSuccess){
  9. return Result.fail("新增笔记失败!");
  10. }
  11. //3.查询笔记作业的所有粉丝
  12. //select * from tb_follow where follow_user_id = ?
  13. List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();
  14. //4.推送笔记id给粉丝
  15. for(Follow follow : follows){
  16. //4.1.获取粉丝id
  17. Long userId = follow.getUserId();
  18. //4.2.推送到粉丝收件箱是sortedSet
  19. String key = "feed::"+userId;
  20. stringRedisTemplate.opsForZSet().add(key,blog.getId().toString(),System.currentTimeMillis());
  21. }
  22. //返回id
  23. return Result.ok(blog.getId());
  24. }

P63 好友关注 滚动分页查询收件箱的思路

ZRANGE是按照角标从小到大排序:

ZREVRANGE是按照角标从大到小排序:

ZREVRANGEBYSCORE是按照分数从大到小排序:

滚动查询:每一次都记住上一次查询分数的最小值,将最小值作为下一次的最大值

分数的最大值,分数的最小值,偏移量,查的数量。

规律:分数最小值和查的数量固定不变。最大值为上一次查询的最小值、偏移量第1次给0,第1次后给在上一次的结果中,与最小值一样的元素的个数。

当分数一致出现问题:

P64 好友关注 实现滚动分页查询

在BlogController中写入下面代码:

  1. @GetMapping("/of/follow")
  2. public Result queryBlogOfFollow(@RequestParam("lastId") Long max,@RequestParam(value = "offset",defaultValue = "0") Integer offset) {
  3. return blogService.queryBlogOfFollow(max,offset);
  4. }

在IBlogService写入如下代码:

Result queryBlogOfFollow(Long max, Integer offset);

 在BlogServiceImpl中写入如下代码:

  1. @Override
  2. public Result queryBlogOfFollow(Long max, Integer offset) {
  3. //1.获取当前用户
  4. Long userId = UserHolder.getUser().getId();
  5. //2.查询收件箱 ZREVRANGEBYSCORE key Max Min LIMIT offset count
  6. String key = FEED_KEY+userId;
  7. Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
  8. .reverseRangeByScoreWithScores(key, 0, max, offset, 3);
  9. //2.1.非空判断
  10. if(typedTuples==null || typedTuples.isEmpty()){
  11. return Result.ok();
  12. }
  13. //3.解析数据:blogId、minTime(时间戳)、offset
  14. List<Long> ids = new ArrayList<>(typedTuples.size());
  15. long minTime = 0;
  16. int os = 1;
  17. for(ZSetOperations.TypedTuple<String> tuple:typedTuples){
  18. //4.1.获取id
  19. String idStr = tuple.getValue();
  20. ids.add(Long.valueOf(idStr));
  21. //4.2.获取分数
  22. long time = tuple.getScore().longValue();
  23. if(time == minTime){
  24. os++;
  25. }else{
  26. minTime = time;
  27. os=1;
  28. }
  29. }
  30. //4.根据id查询blog
  31. String idStr = StrUtil.join(",",ids);
  32. List<Blog> blogs = query().in("id",ids).last("ORDER BY FIELD(id,"+idStr+")").list();
  33. for (Blog blog : blogs) {
  34. isBlogLiked(blog);
  35. User user = userService.getById(blog.getUserId());
  36. blog.setName(user.getNickName());
  37. blog.setIcon(user.getIcon());
  38. }
  39. //5.封装并返回
  40. ScrollResult r = new ScrollResult();
  41. r.setList(blogs);
  42. r.setOffset(os);
  43. r.setMinTime(minTime);
  44. return Result.ok(r);
  45. }

效果图如下(我关注了小鱼同学,于是我可以看到小鱼同学发布的文章): 

 

P65 附近商铺 GEO数据结构的基本用法

GEO是Geolocation的简写形式,代表地理坐标,在Redis的3.2版本后加入了GEO的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。

P66 附近商铺 导入店铺数据到GEO

在src/test/java/com/hmdp的HmDianPingApplicationTests类中写入如下的方法:

  1. @Resource
  2. StringRedisTemplate stringRedisTemplate;
  3. @Test
  4. void loadShopData(){
  5. //1.查询店铺信息
  6. List<Shop> list = shopService.list();
  7. //2.把店铺分组,按照typeId分组,typeId一致的放到一个集合
  8. Map<Long,List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));
  9. //3.分批完成写入Redis
  10. for (Map.Entry<Long, List<Shop>> entry : map.entrySet()) {
  11. //3.1.获取类型id
  12. Long typeid = entry.getKey();
  13. String key = "shop:geo:"+typeid;
  14. //3.2.获取同类型的店铺的集合
  15. List<Shop> value = entry.getValue();
  16. //3.3.写入redis GEOADD key 经度 纬度 member
  17. List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>(value.size());
  18. for(Shop shop : value){
  19. // stringRedisTemplate.opsForGeo().add(key,new Point(shop.getX(),shop.getY()),shop.getId().toString());
  20. locations.add(new RedisGeoCommands.GeoLocation<>(
  21. shop.getId().toString(),
  22. new Point(shop.getX(),shop.getY())
  23. ));
  24. }
  25. stringRedisTemplate.opsForGeo().add(key,locations);
  26. }
  27. }

点击运行之后,在Redis中能够看到导入的数据:

P67 附近商铺 实现附近商户功能

 首先排除掉spring-data-redis和lettuce-core这俩依赖,然后引入这俩依赖的新版本:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-data-redis</artifactId>
  5. <exclusions>
  6. <exclusion>
  7. <groupId>spring-data-redis</groupId>
  8. <artifactId>org.springframework.data</artifactId>
  9. </exclusion>
  10. <exclusion>
  11. <groupId>lettuce-core</groupId>
  12. <artifactId>io.lettuce</artifactId>
  13. </exclusion>
  14. </exclusions>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.springframework.data</groupId>
  18. <artifactId>spring-data-redis</artifactId>
  19. <version>2.6.2</version>
  20. <scope>compile</scope>
  21. </dependency>
  22. <dependency>
  23. <groupId>io.lettuce</groupId>
  24. <artifactId>lettuce-core</artifactId>
  25. <version>6.1.6.RELEASE</version>
  26. <scope>compile</scope>
  27. </dependency>
  28. </dependencies>

在ShopController中修改queryShopByType方法:

  1. @GetMapping("/of/type")
  2. public Result queryShopByType(
  3. @RequestParam("typeId") Integer typeId,
  4. @RequestParam(value = "current", defaultValue = "1") Integer current,
  5. @RequestParam(value="x",required=false) Double x,
  6. @RequestParam(value="y",required=false) Double y
  7. ) {
  8. return shopService.queryShopByType(typeId,current,x,y);
  9. }

在IShopService接口中写入如下方法:

Result queryShopByType(Integer typeId, Integer current, Double x, Double y);

在ShopServiceImpl类中写入如下方法:

  1. @Override
  2. public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
  3. //1.判断是否需要根据坐标查询
  4. if(x==null || y==null){
  5. //不需要查询坐标,按数据库查
  6. Page<Shop> page = query()
  7. .eq("type_id",typeId)
  8. .page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));
  9. return Result.ok(page.getRecords());
  10. }
  11. //2.计算分页参数
  12. int from = (current - 1)*SystemConstants.DEFAULT_PAGE_SIZE;
  13. int end = current*SystemConstants.DEFAULT_PAGE_SIZE;
  14. //3.查询redis,按照距离排序、分页。结果:shopId,distance
  15. String key = SHOP_GEO_KEY+typeId;
  16. GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo()
  17. .search(
  18. key,
  19. GeoReference.fromCoordinate(x, y),
  20. new Distance(5000),
  21. RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end)
  22. );
  23. //4.解析出id
  24. if(results==null){
  25. return Result.ok(Collections.emptyList());
  26. }
  27. List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();
  28. //4.1.截取from-end的部分
  29. List<Long> ids = new ArrayList<>(list.size());
  30. Map<String,Distance> distanceMap = new HashMap<>(list.size());
  31. if(list.size()<=from){
  32. return Result.ok(Collections.emptyList());
  33. }
  34. list.stream().skip(from).forEach(result->{ //跳过可能把所有数据跳过了
  35. //4.2.获取店铺id
  36. String shopIdStr = result.getContent().getName();
  37. ids.add(Long.valueOf(shopIdStr));
  38. //4.3.获取距离
  39. Distance distance = result.getDistance();
  40. distanceMap.put(shopIdStr,distance);
  41. });
  42. //5.根据id查询shop
  43. String idStr = StrUtil.join(",", ids);
  44. List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD ( id," + idStr + ")").list();
  45. for(Shop shop : shops){
  46. shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
  47. }
  48. //6、返回
  49. return Result.ok(shops);
  50. }

效果:

 

P68 用户签到 BitMap功能演示

BITFIELD能一次查询多个比特位的值。

BITFIELD key GET(代表查询) u(u代表无符号,i代表有符号)截取几位作为结果 开始的位置

P69 用户签到 实现签到功能

在UserController中写入下面的方法:

  1. @PostMapping("/sign")
  2. public Result sign(){
  3. return userService.sign();
  4. }

 在IUserService中写入下面的方法:

Result sign();

 在UserServiceImpl类中写入如下代码:

  1. @Override
  2. public Result sign() {
  3. //1.获取当前登录用户
  4. Long userId = UserHolder.getUser().getId();
  5. //2.获取日期
  6. LocalDateTime now = LocalDateTime.now();
  7. //3.拼接key
  8. String keySuffix = now.format(DateTimeFormatter.ofPattern("yyyyMM"));
  9. String key = USER_SIGN_KEY + userId + keySuffix;
  10. //4.获取今天是本月第几天
  11. int dayOfMonth = now.getDayOfMonth();
  12. //5.写入Redis SETBIT key offset 1
  13. stringRedisTemplate.opsForValue().setBit(key,dayOfMonth-1,true);
  14. return Result.ok();
  15. }

用Apifox进行测试:

P70 用户签到 统计连续签到

在UserController类中写入如下代码:

  1. @GetMapping("/sign/count")
  2. public Result signCount(){
  3. return userService.signCount();
  4. }

在IUserService类中写入如下代码:

 Result signCount();

在UserServiceImpl类中写入如下代码:

  1. @Override
  2. public Result signCount() {
  3. //1.获取当前登录用户
  4. Long userId = UserHolder.getUser().getId();
  5. //2.获取日期
  6. LocalDateTime now = LocalDateTime.now();
  7. //3.拼接key
  8. String keySuffix = now.format(DateTimeFormatter.ofPattern("yyyyMM"));
  9. String key = USER_SIGN_KEY + userId + keySuffix;
  10. //4.获取今天是本月第几天
  11. int dayOfMonth = now.getDayOfMonth();
  12. //5.获取本月截止今天为止的所有的签到记录,返回的是一个十进制的数字 BITFIELD sign:5:202203 GET u14 0
  13. List<Long> result = stringRedisTemplate.opsForValue().bitField(
  14. key, BitFieldSubCommands.create()
  15. .get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
  16. );
  17. if(result==null || result.isEmpty()){
  18. //没有任何签到结果
  19. return Result.ok(0);
  20. }
  21. Long num = result.get(0);
  22. if(num==null || num==0){
  23. return Result.ok(0);
  24. }
  25. //6.循环遍历
  26. int count=0;
  27. while(true){
  28. //6.1.让这个数字与1做与运算,得到数字的最后一个bit
  29. if((num&1)==0){//6.2.判断这个bit位是否为0
  30. //6.3.如果为0,说明未签到结束
  31. break;
  32. }else{
  33. //6.4.如果不为0,说明已签到,计数器+1
  34. count++;
  35. }
  36. //6.5.把数字右移一位,抛弃最后一个bit位,继续下一个bit
  37. num >>>= 1;
  38. }
  39. return Result.ok(count);
  40. }

P71 UV统计 HyperLogLog的用法

不论添加几次,永远只记录一次。

P72 UV统计 测试百万数据的统计

在src/test/java/com/hmdp的HmDianPingApplicationTests类中写入如下代码:

  1. @Test
  2. void testHyperLogLog(){
  3. String[] values = new String[1000];
  4. int j=0;
  5. for(int i=0;i<1000000;i++){
  6. j=j%1000;
  7. values[j] = "user_"+i;
  8. if(j == 999){
  9. stringRedisTemplate.opsForHyperLogLog().add("hl3",values);
  10. }
  11. j++;
  12. }
  13. //统计数量
  14. Long res = stringRedisTemplate.opsForHyperLogLog().size("hl3");
  15. System.out.println("hl3"+res);
  16. }

100万数据成功写入:

测试前:

测试后:

大约占用11kb,确实是小于16kb。

备忘录:

tasklist | findstr nginx

taskkill /F /PID 47096

netstat -aon | findstr :8080

在我电脑上Redis是在C:\cangqiongwaimai\Redis-x64-3.2.100\redis-server.exe里。

Jmeter是在C:\software\apache-jmeter-5.5\bin\jmeter.bat里。

生活不易,时间有限,因咨询人数众多,暂时改为收费咨询制。

帮助真正有需要的小伙伴们答疑解惑,减少改BUG的痛苦。

如需源码,可转3.88元后私信我获取(含简单运行指导)。

如有问题需单独答疑,可转6.88元后私信我,务必耐心帮您解答。

如需做定制化需求(含毕设),单独报价,费用合理,可商议。

面试八股文笔记(3合1)19.88元(原价是98元)感兴趣可咨询。

一次性支付99.86元大家交个朋友,加入铁粉交流群(100+大佬),无限次咨询,资源免费提供。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/647706
推荐阅读
相关标签
  

闽ICP备14008679号